|
|
7 ?$ M6 y( H* d, w' O- |& J
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>2 f ]: R. u6 b; B9 K4 R" }+ v1 y
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
! b; y- m6 [ c5 }0 F0 T$ U# B; w<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>+ N# V1 m% _$ ]# Y2 S
<p>线程池Pool</p>
! W8 I, Y0 D* M* K$ Y# D. L<pre><code>pub struct Pool {5 W0 C' h. M# a& E* t
max_workers: usize, // 定义最大线程数
1 L6 }3 ?/ f- u& O/ |# I4 h6 d0 c}
3 S- I& K! M6 m' C
, w) b" m5 R: R9 I% n7 D1 }impl Pool {
! x& \) f. M& N: A, S2 P fn new(max_workers: usize) -> Pool {}+ k, Y2 Q7 d& m& a
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}1 B' Y, K" V' a
}. a4 P' I8 _; O8 L6 P4 ~
4 ~# W- N M5 P/ }2 v</code></pre>- A% E( ]# S. Q; v
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
+ p- z8 ?- ^# P$ X<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
: |& y5 d! l8 i7 o+ P可以看作在一个线程里不断执行获取任务并执行的Worker。</p> F( n- r) n4 b5 ]7 w0 A$ `
<pre><code>struct Worker where) y- y5 O7 I: K+ l. t' T% N- |4 m
{
" N& w5 h6 V U, R2 s _id: usize, // worker 编号
- s4 [4 @- v# ^}4 ~. ?* p ]6 }9 `9 V6 `
</code></pre>
3 a( x4 F1 q1 ~+ N/ x1 f<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
) \4 S7 Z' v1 J9 f9 L; e$ x+ f+ c. T把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>( r; {% A: }& N, C2 d
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>8 \( N- h L+ E$ p
<p>Pool的完整定义</p>
$ |: c) {' t# |1 Y7 u/ K/ G: \<pre><code>pub struct Pool {
6 l) A- Q1 Z0 a0 s workers: Vec<Worker>,- S, ~* r. [0 x4 G( g
max_workers: usize,
4 ?) N+ i! c: ]! r sender: mpsc::Sender<Message>
+ q/ A5 y: a! u, @2 Y; m5 Y}+ X; o" L9 b- ~+ \ O
</code></pre>
2 E( o2 p/ T B0 |7 f<p>该是时候定义我们要发给Worker的消息Message了<br>/ q) {) p: ?; K1 q
定义如下的枚举值</p>
3 G+ f' Z; n/ P; v8 [4 F2 u% |3 j<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;' k* ]5 u$ T5 G# `; E
enum Message {$ y$ m- N8 R( v7 \2 G- j. ?7 c
ByeBye,
$ }7 |5 i9 P2 K NewJob(Job),
; M5 f- @7 d8 ~, }}7 F% W9 Y8 X' `6 n
</code></pre>$ U" L& A( Z/ S) h! W& ^
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
^6 B8 Y! J8 i3 Z- @" ]+ @<p>只剩下实现Worker和Pool的具体逻辑了。</p>
/ f( B$ ^* R$ p) j$ \- a4 _5 W1 b! Q<p>Worker的实现</p>' m. ]$ Q" z7 c: H! J
<pre><code>impl Worker
7 l% A+ _5 P; a4 L3 Y6 @; z{6 `: e+ W4 L. @
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {, E! \3 u. P9 L |8 L: d" i6 i
let t = thread::spawn( move || {
+ G0 J+ {7 C# G/ `3 H% K loop {
! c( Y; ?6 h: T. i let receiver = receiver.lock().unwrap();
$ f8 f3 a- ^! S ? let message= receiver.recv().unwrap();7 R: S( S2 [" n* w
match message {9 d R. V# p; {) E# W! }* D0 X
Message::NewJob(job) => {
3 z$ _ X- y$ T' ^ println!("do job from worker[{}]", id);
# U' @0 ?# N& R/ x; T* \/ v7 A job();" A, g. c! ~3 T6 @! L: I
},
8 v. I+ V% N2 N& B/ c Message::ByeBye => {
- D, j1 e! }' o8 f9 Q println!("ByeBye from worker[{}]", id);
6 O1 T1 m1 f# w) q3 Y( a break9 d$ P1 k5 I Y% E7 v* p) L
},
. s5 f+ N2 C4 f5 G. d6 G$ L }
- Z$ Y0 n/ D& p* a3 b& O9 x$ ~9 }- Z! x }$ g' `; b) ]5 B! X: T k w1 h
});
1 f3 A" j' e% c; `) A; G
0 e6 d+ \4 V7 U4 X3 u9 ~7 t$ y Worker {, e9 _2 l& m; N; ]& Y
_id: id,
+ F' k' q! Q- X. T0 t/ E+ k/ b t: Some(t),& \# Y" S+ n- |" S$ p0 u, G" ~
} I- J& A% A5 t, {% J
}/ b: P1 J1 Q. ]+ V4 G, J7 t' J
}
. o' k9 F- o4 J1 Y0 ]& ]4 W" y; k</code></pre>% p! e2 i3 n- r( r9 T" q* D& \7 q/ ?
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
$ G0 ]4 } l% `# g' Y. E但如果写成</p>, ~$ p: d2 }; c6 `, R
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
/ K; \( v! Y, E0 @2 A};
8 v' h' h8 q/ h- u- f7 d</code></pre>
! o% g: a, L' b7 j8 [<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>: |5 C$ R" B6 l" T( f# n; _
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>$ u1 z( [4 C- P/ V
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
, K( p. M, u- k<pre><code>impl Drop for Pool {
; K2 T8 e3 N& ^0 w3 V( j4 y; V fn drop(&mut self) {
* U( @1 q$ x7 l2 z) d! i for _ in 0..self.max_workers {
. u4 T3 a+ e8 z k6 K: l self.sender.send(Message::ByeBye).unwrap();& f) |6 v7 J9 `+ Y/ }9 l# c
}
+ `5 l: I9 A$ ?$ w3 H& [5 o for w in self.workers.iter_mut() {
$ ]5 `2 @, w- \5 P1 B! \1 c if let Some(t) = w.t.take() {9 q7 Z0 ^. }$ q$ c x2 z' Q
t.join().unwrap();
) I H, f" y4 A. D }
' T) \$ o3 w" A# v, y }
, \3 z9 v. s( ]1 n }( t- j+ X1 ~& p$ E; C
}
3 L6 u% f7 d8 {% D- r0 B& u% {3 g5 B+ q, F3 N
</code></pre>8 G9 U0 k. P, ?* V1 a ?. G# x4 ]( i; s
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>. E- L) r# y; N& E) |
<pre><code>for w in self.workers.iter_mut() {$ _- y ~' Q- f- T
if let Some(t) = w.t.take() {6 V3 L- |7 b/ B5 f4 s( @0 Z& \/ y& ^
self.sender.send(Message::ByeBye).unwrap();
: N" Q* G' t# ]* v/ h t.join().unwrap();
8 l/ s( O9 {% y8 b2 } }
0 _ ?3 @( g! `- D8 B' Q: `}8 F, I2 E8 }. W& _2 w2 [8 p
1 ?+ |7 i, Y1 t% l7 g
</code></pre>
7 i( q G! N% T# Q+ G<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>$ A0 q ]" S: V; h( o' ]# a7 `! f
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>) v8 c$ | f6 f4 J5 _, X: r
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>2 p! R* t& X" @$ `
<ol>
8 h K8 A8 k. E<li>t.join 需要持有t的所有权</li>
: K5 p+ Q8 h5 N8 n6 `- x<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
& m: V# w% K) A* @</ol>
# _, u% Q4 N/ B6 ?. G<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>0 w: v$ ?) ?" O* ~- }7 z/ G* ~/ b
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p># y% [ x c2 p* M1 E3 z* C
<pre><code>struct Worker where
# G# D" o' \2 b* J. u; A! p( D{
C: V1 Y/ R$ \$ C" c) C$ y _id: usize,
. t. a+ m( z, w2 b t: Option<JoinHandle<()>>,. V0 R3 z0 i6 K4 ?5 O
}0 r8 d3 R8 x# T8 T9 o# x. {( l
</code></pre>
7 D1 r" W+ p$ }<h1 id="要点总结">要点总结</h1>- k7 O/ f* `+ A. X
<ul>
3 e; m' o( U: ?<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>9 ~8 C0 |* G( }& R" X" a
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>- J: _ t1 ?% k0 t: Y: V( x
</ul>! W K+ _* j5 D
<h1 id="完整代码">完整代码</h1>
' @/ j8 n: R. h6 F6 W; j( [<pre><code>use std::thread::{self, JoinHandle};. T( f Z$ @$ a0 N( E+ ^0 T- v& d
use std::sync::{Arc, mpsc, Mutex};
5 V/ N3 J( [8 C* Z4 j( T
& {$ [$ J5 u. ^, K5 k) {4 \( k9 g3 \. v! R$ B+ r. c
type Job = Box<dyn FnOnce() + 'static + Send>;
; r; h* N: n+ O. {- S4 Denum Message {
3 d r, ^* _3 R) D& `& J0 z' Y ByeBye,
6 I/ J& n7 _6 {6 h) s2 @8 Z NewJob(Job),/ {0 S2 x+ A* m5 Y8 c. Y
}4 s7 j+ ]6 ]) _5 W. L6 G
7 U; D' y2 y7 o
struct Worker where1 m# q" M4 s4 k! m4 d5 D: R
{
3 C3 l% z, U6 u) c) q _id: usize,
" h# w% ] X; l1 e& e# z5 }) ] t: Option<JoinHandle<()>>,3 @, X0 L9 ]: O7 n( \; x: l9 J& W: r
}# [' e' N8 t, J# L
3 I2 c3 Z2 L/ P w1 n- h# aimpl Worker
% v% b# y: ^3 P8 p! e/ o) `{7 u3 J6 M! h2 b3 r1 G5 j0 o& ^3 ^
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
: F/ e0 L! w- t3 k$ ~- O let t = thread::spawn( move || {
5 `3 t* G( `0 | loop {% D/ _9 H5 Q3 s' e" |
let message = receiver.lock().unwrap().recv().unwrap();$ M' d7 b# N6 z, x) g* h6 u
match message {
( d, Z [& h/ m& v- X) B Message::NewJob(job) => {
6 ~3 C9 D* V1 \: S' C println!("do job from worker[{}]", id);+ f4 {$ f' M& ?6 m7 s$ \# Y& k3 B
job();5 W4 I7 X L' y1 A7 P( m& ]5 f
},9 x0 I) m" Z- B# u8 C; M6 k8 q
Message::ByeBye => {
& h! ^* H; T: y9 l! _4 n6 o4 J println!("ByeBye from worker[{}]", id);
$ |4 D& P/ c }. O break/ K7 ^6 O, |2 F( F G- L& P
},1 P, \* [. k) R- R# h# G. A$ x1 o
}
& e8 v" }! c8 \5 I' V }
: A: o0 ?% i2 |( a1 W });
* x( w0 Q1 m+ Y! Q
3 e2 N& H% {9 N3 }1 { Worker {
* s( Q" A x$ j( j) H6 t% ` _id: id,! e# t/ z) X! ~3 r+ P" A& N4 }
t: Some(t),
- |/ }8 v7 W U2 X. z }
6 y8 P. N+ B# H7 Z: Z: ]1 X }) [4 A: l2 h3 X
}
( A3 |% _0 ~/ ~3 g' R( z
1 z+ e$ Z# @* H5 Npub struct Pool {
* V0 T4 x' g# D0 g4 z workers: Vec<Worker>,7 v7 |$ I$ m# G
max_workers: usize,1 ?- g5 X% [- y9 c2 ~2 {
sender: mpsc::Sender<Message>& @) R3 M9 N5 d
}
& U; S/ U# v, o$ U" `9 f+ Q1 p
1 M! S/ T' y- B: Simpl Pool where {( S) J) j8 `2 G' H# Z/ z
pub fn new(max_workers: usize) -> Pool {; @ ?# b! G4 r* A
if max_workers == 0 {
; s Z& r" S( _. s( ]" O, \ panic!("max_workers must be greater than zero!")
! X2 A# V4 z6 W' n/ m x* V6 r% g }
3 P1 y* C0 |2 M1 M let (tx, rx) = mpsc::channel();( d. r+ Z* Z" }7 Z t- F" q
' F4 s/ Y" G2 ]. `3 i! L7 ?# S3 Z# [
let mut workers = Vec::with_capacity(max_workers);
* ^- R* U8 u4 z2 Q9 b) n let receiver = Arc::new(Mutex::new(rx));
- R) T" k8 ^7 w" L$ O for i in 0..max_workers {
! K b' J4 y- |; y M workers.push(Worker::new(i, Arc::clone(&receiver)));: _5 Y) d2 u$ a% g: E
}
& T5 h5 U; l( ^$ v
) S6 v' F9 ^% W# C6 n0 F9 ^& s" T7 L3 T Pool { workers: workers, max_workers: max_workers, sender: tx }3 t% u0 c# x( s t. D: R# }+ k* v
}
% U8 I! Z; f- {9 I' N
5 r& j& O8 \1 k; i pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send' v6 b/ c" g3 S- l, r; p% O
{
* ]% l, `7 L9 w4 t9 n% O' F/ \+ [! n
( b: U4 e' q- o& k3 G let job = Message::NewJob(Box::new(f)); }4 d1 m% Y. G b
self.sender.send(job).unwrap();8 k5 Y/ R& \3 G L2 b' g
}) }2 I$ J9 h' t Z% O! y. z5 D
}
3 g, }5 I; u( ?/ Y9 e7 J2 z1 U* p; F5 z+ f" }" m' g! Z
impl Drop for Pool {$ G1 u& C6 u4 B% L( Q' H
fn drop(&mut self) {# C2 _$ \( @' ^& i7 p: X" B
for _ in 0..self.max_workers {! K7 }6 g5 p g' z
self.sender.send(Message::ByeBye).unwrap();
/ ?6 E$ v7 O& x( l) e' e& W$ v }8 p. `5 g( ~. x0 ^; B, i% _0 ?7 q
for w in self.workers {- J9 N8 I, j0 `- e4 m2 j& y
if let Some(t) = w.t.take() {
2 ^, b# I0 J5 i1 \) j4 }, l1 T1 K t.join().unwrap();0 d C! E) ^ V1 ?2 j, m
}
G, a1 f3 A3 a: m T" b8 X }
, u9 ?2 c2 C# b/ _( X- \ }& y* \5 w- M* N& B( ^
}' N1 |0 h# \/ k' S- d& U6 ]1 I
3 E1 F4 a9 f. B, t! _5 m8 W+ i& o) O
#[cfg(test)]
6 u) }/ U9 `4 m* b# D4 }) k8 Dmod tests {1 n& H+ x$ Y* M5 W9 U A
use super::*;
, |- k) Z* h$ h. ~* f, g$ K6 _. Q* T2 x #[test]
' n3 a" a! y' s0 u7 C2 Z0 S fn it_works() {
: S+ ~0 }5 @0 `6 C let p = Pool::new(4);! @; `0 A9 y8 |1 Y5 {; |
p.execute(|| println!("do new job1"));
4 t/ \( G9 E8 R/ z* L p.execute(|| println!("do new job2"));: {- x% L4 U0 ?7 u! ^; W
p.execute(|| println!("do new job3"));4 d' x7 q5 _! d# \
p.execute(|| println!("do new job4"));% S: } L4 s) l8 R( i9 F2 R% E. i
}4 |- D5 K9 M4 M1 U/ N. R+ v1 c
}
% f% y. Y* K, J, c0 _( r1 Y</code></pre># I2 R' _" ^5 E
2 ] Z. n3 C, J
|
|