|
|
. j `6 `3 \# n: J<h1 id="如何实现一个线程池">如何实现一个线程池</h1>( X9 O7 d2 z e7 _2 U
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>+ f: V* `; d! {; h1 J5 g4 o
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
) x4 O1 P" c5 }7 I5 U<p>线程池Pool</p>
" t$ X5 x8 U6 Y- [) R. H<pre><code>pub struct Pool {- h I/ k3 [' _: h$ C# y- g" ?
max_workers: usize, // 定义最大线程数
/ e0 }! A6 e9 M9 F& \}
0 N5 S2 `: P% ^8 _" k1 u' M2 v F+ z" S0 [) ]& g- r7 N% l
impl Pool {
N6 k9 Q& C5 G$ E fn new(max_workers: usize) -> Pool {}/ c8 r! h: R( @! B, i+ N3 d! i- c* v. `
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}, t- m& g; h2 e9 m* t! h: M
}9 U4 x8 ]" {7 V0 s0 _0 J0 `; e
r4 ]4 L3 F7 p0 d8 C; y7 u
</code></pre>
2 c0 |' C" y; z0 M1 z<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
7 x- G) |7 Y' p# G<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>9 \6 H3 x+ A4 z0 J
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
% g. u! p+ ^; J, Y" R<pre><code>struct Worker where) u; h" b. x7 N- v4 X6 M
{; ]' ~8 T% E2 B4 A; u* H
_id: usize, // worker 编号$ w, ~' g3 A& K: H$ {$ {! o
}
: ?/ r' Y: X9 _; w% e" X8 L</code></pre>
6 ]% _% u1 n& }<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
$ b6 I7 T6 E3 P9 s* |把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
1 E \ W% }6 N+ O6 Q9 W4 Y<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
. M9 L+ |& u! o" n<p>Pool的完整定义</p>
5 j+ {& f! r* K! u) m<pre><code>pub struct Pool {( x) c& i2 `% `: h3 Z9 q
workers: Vec<Worker>,
$ s# s2 G, a/ I6 f max_workers: usize,
5 v/ p/ u. @ P0 q: H' } sender: mpsc::Sender<Message>7 a& V$ |. `0 V' x% v
}' D) C! Y2 M7 G4 ]7 B* j
</code></pre>
D3 R9 o" k- \% G! `( F<p>该是时候定义我们要发给Worker的消息Message了<br>
5 |% T! `$ @8 L/ N定义如下的枚举值</p>
% q9 s+ I% V% U; }+ E0 P( e1 G<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;- Q/ V' `2 `1 f# k3 C- L& g" g
enum Message {
$ B0 d1 @( @, t+ g+ w6 d& V' z ByeBye,6 s. b( h( U' a% a$ b: P; D
NewJob(Job),
$ Y% T, V" G5 n) a3 d8 G& E}0 z' v P" V4 ]" ?( p9 K; [% N
</code></pre>
2 X z- f+ c9 P! p! n<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>0 r( o% H9 Y, N% t- o% c6 {+ |1 {
<p>只剩下实现Worker和Pool的具体逻辑了。</p>" E9 ~. f$ E+ l8 J4 b
<p>Worker的实现</p>, u2 ^. m& |& H2 ?$ B& ?
<pre><code>impl Worker
7 w( v" z8 Z+ z& C# u8 \1 V1 k& i{! i6 ?: z: @& ]4 N% [$ x( `
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {0 x) \( }) D4 m: `1 x& I% G# ^
let t = thread::spawn( move || {; R2 I1 z+ C9 F$ b! b
loop {
! z0 Z. @6 }: ^$ l let receiver = receiver.lock().unwrap();
7 }5 _1 B- r6 h5 K let message= receiver.recv().unwrap();' y2 N9 a9 L8 q7 U! _* L# ~
match message {- I# E% H% y- `7 X+ C7 N5 d
Message::NewJob(job) => {
3 {9 B) S. R* c) |% N4 w println!("do job from worker[{}]", id);
/ M! n. ~' X( \* T2 e$ z4 Q& m job();0 G* y- l# a9 n7 x& y! H" c+ H
},7 M+ l& Y2 q6 \$ s$ i' P+ o4 n- E
Message::ByeBye => {% s/ m. U \8 ^# J6 U4 h
println!("ByeBye from worker[{}]", id);% B; J+ H% M# ^% e$ L
break( i1 g# ^8 N9 c6 s! A/ f
},/ d8 ~, W J" R' x0 j( a% A
} 6 q* a- u1 M% B( ]' g! u' T
}
+ r6 h4 S! }% ~2 Q });: ]* r) {+ J2 V6 a5 K" l G' r
8 i' y/ e& g" }' J7 B8 |
Worker {
! [4 P. n/ a& ]; v! D& `8 R0 N _id: id,
. N2 T$ u( i0 z+ q/ k t: Some(t),
! u9 m R/ n4 ~+ R0 V }& d: A9 r" j2 F# X
}, U& B' d% ^. y: R" t
}
8 k" V1 I0 r5 m$ G5 F</code></pre>: Q8 r9 V7 O8 L! h' C* l4 `; _9 P
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
3 A/ u6 v. z/ ]但如果写成</p>
" Q4 Q) P5 g/ X0 [& {* [) D' q; J" u<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
% l2 T8 \% Q% _};
+ W, y' E3 l" ^/ C8 O3 w</code></pre>$ @# _ U& t/ O, W' v6 q
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
3 w+ V8 j8 b- l! H, d/ r6 }rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>5 f2 {! {7 W6 N
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
# L* u9 h" _$ m/ J" K# x7 f! E1 B<pre><code>impl Drop for Pool {
8 ?8 p$ B, l. Y fn drop(&mut self) {1 F5 d: ~) S! h7 M" M
for _ in 0..self.max_workers {. e5 X, u2 I0 C% i3 G
self.sender.send(Message::ByeBye).unwrap();5 q M' d2 Q) ~
}- y4 `! j6 r: p ]4 k: V
for w in self.workers.iter_mut() {' D' X/ g' W* X, v6 [
if let Some(t) = w.t.take() {
, Q/ L7 y4 V! w3 S. o, m2 F9 Z t.join().unwrap();
. p& z% w* H7 ]9 z }" c( B$ V, l( [& e
}
3 Q+ N2 }5 F* j: Q }
" @7 ~, a( h1 g( v. J}
4 l8 L7 ^- e' @. x; V! E0 T: i1 M4 ^5 k! P* q3 r( P* m4 [
</code></pre>* k; t/ a; j& G$ k6 R2 d7 P! u
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>3 n3 R% F; r' O+ L$ ?0 _- e4 S
<pre><code>for w in self.workers.iter_mut() {( O- B- T4 Z2 @- a& b
if let Some(t) = w.t.take() {0 E/ a9 l4 `4 i: I( S
self.sender.send(Message::ByeBye).unwrap();
' ^: u, ^$ M7 F4 |/ k t.join().unwrap();
8 c, ?9 D$ H1 y& d' G0 y | C }$ B( G1 z: T. G9 ^ ^
}/ d/ n7 S+ H7 y4 {
8 c7 h. l6 \( u! g; C, s, G( O
</code></pre>3 t5 c2 h- d1 S
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>% g0 |: q7 k! {( A
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
) U4 [' A9 M/ P6 q; f<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>: ^0 E; S' a* D% r B: ~* c% g: m# b
<ol>1 R$ d* `% E4 r, Y9 u) E
<li>t.join 需要持有t的所有权</li>9 T9 j$ y4 ?" J4 q- \# S
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
% e3 u H9 s1 Q</ol>
- h1 ]+ e7 e# c; v/ |# z- a<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>, o+ N0 @) s" O' ?7 c) _' W
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>& ^7 m4 K0 S! I$ H6 A/ _$ j2 L: s
<pre><code>struct Worker where4 J4 Y! @6 `5 q+ r1 X2 p! g0 U
{9 ]1 S7 N& D& V! c* [& M, U
_id: usize,
- E/ U; b1 S1 m1 R, W. E( d; C* _ t: Option<JoinHandle<()>>,
! n [+ h% H5 W) `}
" M8 r/ T+ W$ h: \</code></pre>
$ }; y( C* U$ P% r<h1 id="要点总结">要点总结</h1>
, [: X- Z! j" l6 l2 H$ r% u<ul>
. m( b8 _: b* `+ b6 ^6 c; z& {<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>- Q4 I! a, H+ H9 w( D
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
h# I+ d; o2 x6 a</ul>
! H g" F$ J H i0 P# Y<h1 id="完整代码">完整代码</h1>; O9 N; S* c: Q: u, [1 z
<pre><code>use std::thread::{self, JoinHandle};5 l; O/ t: | {6 t% V
use std::sync::{Arc, mpsc, Mutex};8 h" @' \( q7 r A3 U6 A; y) G
5 U$ K( j1 ~$ m6 z0 G
3 i% v4 T6 F) }% K, z& l: _
type Job = Box<dyn FnOnce() + 'static + Send>;
5 Q: J* n3 x* wenum Message {
8 }, `$ S6 ]5 l0 N* u6 m6 c+ K ByeBye,
S8 ^4 T& a4 t# ~1 p5 H NewJob(Job), S: C% c2 X( s! a7 {
}+ A) m- Z* F$ z! c) ^3 z
2 n; J1 M7 t5 ~2 G
struct Worker where
' D2 W, z# Q4 l+ l{3 D! @# S9 n4 G( d
_id: usize,* b8 g8 S" c3 c5 l8 F. g1 n
t: Option<JoinHandle<()>>,/ a; e7 T0 |9 F' D6 t
}
" X8 q% ?1 K3 X0 G% W( A% |# s: } ^1 F1 M# o* p7 o& | H; V, W! o7 N
impl Worker
# t; V7 W* K Y! I0 J% T0 Y+ d# a! g{8 z0 ]# `: f9 {( _/ P) F" v5 {) R
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
6 r# C0 Z: X6 p+ x4 \& @ let t = thread::spawn( move || {
! T& g% E8 C+ x5 ?1 m/ E loop {
5 R' R7 e1 M0 R; r3 |% g a let message = receiver.lock().unwrap().recv().unwrap();
) {3 R+ j- p, P* E1 N0 e8 ] match message {% _/ }4 A0 r" D% }8 r! c
Message::NewJob(job) => {
! s( U3 h' E5 r0 R5 N. d6 v2 V" f println!("do job from worker[{}]", id);
( ^* o5 n6 K N- x0 ?+ J: |* \+ U0 G job();; O9 J( a+ T; Y4 g4 R) ?& i, D9 M
},9 \: R, H( g- F) ~( C+ m9 F
Message::ByeBye => {# L( S1 W* S3 k$ }! O, Y6 m
println!("ByeBye from worker[{}]", id);
2 b+ ~, q' E g. t break
, J6 W1 [4 B! }2 W- Y* q( v },
5 R8 _4 ? i, A5 ^1 a8 p+ W- J } % ^" T8 d; ~1 v: E
}
% y$ p/ G1 t9 Q! C9 H& B });8 o# X5 X2 S) Z3 p
3 w& n6 S( I* p5 X1 ]/ o* \ Worker {
4 B: Y) q3 [5 e4 E. z, P, T" N _id: id,! R% O$ q' X% z2 ~/ ?2 L
t: Some(t),7 O& W- x% C I8 k5 i6 D7 d
}! u& v' {, ]* O; C' S# ?" p
}' x0 b7 a0 w+ _/ @1 P7 Z
}( |8 i2 A+ t% k5 i# o
- c; Y; s+ Z4 Y" I6 f b% Z
pub struct Pool {
8 q$ n$ W; ]$ r, D% v3 n3 m/ Z workers: Vec<Worker>,# Z5 b5 l( ?9 E/ Y- V
max_workers: usize,
; w S( W n- Z( N8 M& N sender: mpsc::Sender<Message>& H3 M' r; a6 z- y; D- X& M: G
}2 q) g, S n4 Z7 g
. J/ Y4 _# o) cimpl Pool where {, c. F1 r. z/ u4 d; r
pub fn new(max_workers: usize) -> Pool {
6 c/ N: k1 g2 F0 G# N if max_workers == 0 {
9 e5 ?6 C- N6 | panic!("max_workers must be greater than zero!")" c8 O, A( ~) j, n. ]
}3 ~: ^ ^) d' j G8 ?0 ]" A( f+ p
let (tx, rx) = mpsc::channel();
+ h% W2 S0 W% Y/ `& y$ y- p2 C7 q
let mut workers = Vec::with_capacity(max_workers);
% A: X. P( D' I! F" ~, U/ W let receiver = Arc::new(Mutex::new(rx));
$ | z9 ]+ |8 [$ W" e; q" G for i in 0..max_workers {1 k7 g# M- d9 G4 |2 O# c
workers.push(Worker::new(i, Arc::clone(&receiver)));- g1 H/ R1 u. ?3 c5 j
}; V5 ~7 g9 g* ?* s4 c; G
) n" Y1 U& k' P1 w
Pool { workers: workers, max_workers: max_workers, sender: tx }
- i; V' c/ z6 T! }# Q4 H7 l& z }
) I: W( B4 Y0 X/ }. ^/ s/ t% J
; u: S' @6 e! ~: F. H pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
$ F/ y/ g5 U+ o- ?4 B% B$ u {
0 m7 Z* a8 w: n. ]0 V4 A
& f# W+ V6 v, t' y4 x- d5 y( ` let job = Message::NewJob(Box::new(f));
]: Y: [2 G; ~" a; X5 v3 S self.sender.send(job).unwrap();( o. I% A$ z5 p5 Q
}
8 Z9 }* S( k; b* I9 x" X* W5 O: k}
0 s @% A |' S7 {: x4 R& e1 r- J
) n3 B7 i* @% b8 z& [% j- G Uimpl Drop for Pool {( f0 U7 I( Y2 P! [
fn drop(&mut self) {
* u1 E4 L5 w4 G8 ~ T6 y8 E for _ in 0..self.max_workers {
# f T$ m/ A( t self.sender.send(Message::ByeBye).unwrap();
/ ?$ M9 y- z5 k" v( v8 a }$ \- `$ C, j( r" U Y$ S# g4 t+ E
for w in self.workers {3 h, r S( o2 O4 G$ Q
if let Some(t) = w.t.take() {
: o% f: x' P5 t( V6 \6 v2 H t.join().unwrap();
7 i* Q: S/ ]2 ~, e. H- D* E0 _ }
# \2 D% Z* y; O! z1 y }
- n4 |6 o4 T* R9 s' \' g }. f' y8 u9 p" U% [, B6 \
}
: c/ a! w0 Q! G) K; I% ?, g4 j& x8 |2 B& h, Y& r( f, i3 D. P
6 @! o" i# ~- H
#[cfg(test)]" v7 ~+ T$ A2 [* F4 m
mod tests {4 h) q5 X5 x# S {3 \; X0 J
use super::*;% I+ L, k- @) T6 x& n6 _4 G
#[test]
) a: @. w3 i) a* A6 H* [, z fn it_works() {# e2 Q, ~5 S0 S
let p = Pool::new(4);
! W+ l0 @& W' b1 o p.execute(|| println!("do new job1"));
' C% J! e) Z1 b) _# n2 y9 B% U p.execute(|| println!("do new job2"));
2 t/ _& C5 U" J$ h# j4 _+ j. _ p.execute(|| println!("do new job3"));
" _9 @5 @( p$ x" m p.execute(|| println!("do new job4"));( R) s9 R: w: |+ w0 s
}0 `' k! e3 M8 N5 x
}( \. L" P# s( { y$ m
</code></pre>
) a1 P: w8 k8 F6 \# s0 m- f3 h7 b+ H
|
|