|  | 
 
| $ }$ k9 L& ]6 d: Y4 h2 A0 T/ R<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
 ( b5 {3 ?. X7 R: u1 V<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
 9 q5 b) U6 t" Q. M0 o9 r<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
 5 ]% e0 @# E/ G0 o2 C+ c  E3 e<p>线程池Pool</p>
 " ]2 l+ u: G" G1 p8 |<pre><code>pub struct Pool {% j0 Y! T- n4 Z: c$ H6 G
 max_workers: usize, // 定义最大线程数
 8 T, F' f# B% `" t* _}
 ; W5 n# n/ W4 ]' J' t9 ^3 w
 6 X: R% r1 N/ T1 `impl Pool {8 T8 b% j  [( Q1 @/ |& E
 fn new(max_workers: usize) -> Pool {}0 L' O. [6 a# c& }6 s: K# K1 `
 fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}' @; J* m1 `2 e
 }
 7 {& D: x6 q* ?: w/ `
 & L8 I3 H6 O4 q7 o5 N( e7 [</code></pre>
 0 |) X, Q# m9 G1 x4 H<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
 3 e7 k* i& s- ]. p<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
 , z) V1 J% H" L. I可以看作在一个线程里不断执行获取任务并执行的Worker。</p>" k- y" O( q5 o4 U
 <pre><code>struct Worker where, Z) ?  G: T+ p+ |0 N0 r1 w
 {7 u6 f; }8 |" W0 }
 _id: usize, // worker 编号% Z& ~, U/ T: s" n
 }
 # N( T3 _4 I" v" L! l) p" H</code></pre>, x6 Q8 U5 {( H% A& l
 <p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>$ H* J6 F1 w/ x- l
 把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
 9 L% L" P. p  H* X! W: i<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>2 ?+ N5 z& b" C: `- Z) K
 <p>Pool的完整定义</p>: f" d& v) c1 @% L. f
 <pre><code>pub struct Pool {
 3 \0 K3 O# s# ]* D7 b& D4 S& e6 g    workers: Vec<Worker>,
 $ O% C9 Z: D/ i: Q7 z# ~    max_workers: usize,( E! c2 ~6 ~+ j) A* J# U
 sender: mpsc::Sender<Message>
 ' G! v' k  |- i5 t. T' s# u}
 ]3 ]% }- a: N! \3 S</code></pre>
 , d7 a  `8 S/ S4 u9 z# ]% }<p>该是时候定义我们要发给Worker的消息Message了<br>8 M" t, D7 r, ]  i3 v& \1 y
 定义如下的枚举值</p>
 5 k, r% q- u3 O1 M) A  w* G<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
 [  I* N$ e+ Venum Message {
 ( y1 B8 Z+ [; Q    ByeBye,# \! u/ P9 K3 W' w1 X2 ?4 x
 NewJob(Job),
 ( o9 @  z1 }  I) E! y8 Q# L}
 5 q5 ~9 K3 x+ D- }0 h& D</code></pre>
 % [6 s6 |: S( t) a1 F. W- a, F0 v2 u<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
 3 D5 p9 F" o8 G0 _' j, j<p>只剩下实现Worker和Pool的具体逻辑了。</p>
 2 @" {% T# {8 M: _1 P( c- H/ T3 k<p>Worker的实现</p>
 # Y# r2 k. w! A% B0 i<pre><code>impl Worker
 / t, O( f5 e8 i( }0 h7 @+ {{1 I5 Z) v1 G' E9 Q5 x0 V
 fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
 ( m  b% N7 R- z5 z! ]4 t3 m        let t = thread::spawn( move || {
 5 P2 I, |3 e$ q1 D            loop {
 1 K8 {3 W' I  P; K3 @  V3 N                let receiver = receiver.lock().unwrap();0 d1 `0 f; j! i
 let message=  receiver.recv().unwrap();0 |' k8 V1 B( f3 d! u; ^, v1 S
 match message {
 / h3 n' z1 c$ b$ I" `                    Message::NewJob(job) => {
 $ h3 ~# ^$ e( P( ]& a( L2 ^                        println!("do job from worker[{}]", id);3 E+ h# M4 {$ @) D
 job();: E8 {8 P/ V  `5 M5 k
 },
 / D" c* x. O) H4 }8 n                    Message::ByeBye => {# n& p4 m  _8 y. J! `$ w0 @) ^
 println!("ByeBye from worker[{}]", id);
 $ {$ o' W- L6 G- m" Q                        break
 : R* c9 C% ?( K) {' x$ J                    },
 / P5 \7 d# v( r) `# }* [                }  " B& a# F0 s/ Y  @2 I6 l
 }
 4 ]- i8 u5 |0 V- f  F' `        });  [" y6 S& Y) X' e9 X$ z+ ]. d
 
 * g) g! c7 s* R' c% z        Worker {
 ! f. h! h# C7 [            _id: id,
 / g5 _$ c) A- p  C) r% x4 H            t: Some(t),* n) T7 i& j. k4 p! b6 P
 }
 1 w/ ^2 i+ Y$ O6 d( x2 G  s    }5 H/ {! ?- p' j* |, ?. \  d- t
 }$ ]4 Q' q$ j2 J, D# R+ g
 </code></pre>
 ( {8 B) ]) M0 w' c( Q, }<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>/ R* G8 {3 f% }) h
 但如果写成</p>
 + I1 t6 f* I. Z% r' o4 e0 o" j<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {  A6 p2 y/ p4 c6 E6 o5 w/ f
 };
 4 N3 @4 Q! L' B9 b% A0 _3 N</code></pre>7 ~  S! F# }( j: w8 R, Z
 <p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
 ; m* q' C# j& S% n2 k0 I/ b/ frust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
 % n) @* @2 [6 U8 ], L0 M& n<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
 1 y4 S( v8 e+ ?- ]. P& e; V4 V7 x! L<pre><code>impl Drop for Pool {; N5 a, `2 C! P
 fn drop(&mut self) {9 |7 Q( x7 {2 w( Z2 z
 for _ in 0..self.max_workers {4 e% ^3 U, O  B. N* z3 L7 C% }
 self.sender.send(Message::ByeBye).unwrap();, C0 W! M% m$ ~, l7 O
 }% ]! q) A4 E4 T$ }
 for w in self.workers.iter_mut() {
 3 W6 [# K% V1 x" v8 e            if let Some(t) = w.t.take() {
 9 z* m2 a; c; E4 q3 U# O% Y                t.join().unwrap();
 ' U# T* m7 m, g# \) N: i# H$ L            }4 c' C. l9 L! Q. K% g9 Z4 H# Q
 }
 . f6 }+ i! s9 C) [  G0 Q    }
 : X# u* L4 G# y/ F2 s}0 {9 }6 g) r1 M/ A$ ~
 
 - s" U! u5 |. o. x  q</code></pre>2 F6 V( A1 e& J4 Q1 T6 T  m1 l
 <p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
 " X; z1 s. r; Q& T<pre><code>for w in self.workers.iter_mut() {
 + q, w0 B% \+ T& b    if let Some(t) = w.t.take() {
 : {  V& {5 C3 T: o" S        self.sender.send(Message::ByeBye).unwrap();! Y, r! \$ _  f1 W
 t.join().unwrap();3 R( J; D4 _3 g/ f
 }
 " T# e* i  V$ ?$ N) b0 D  {& X}
 3 a6 A0 X' r4 R% o$ s* _0 v
 2 h. B' D1 s6 ~3 _: }$ z</code></pre>; l& p6 p% X/ }7 d
 <p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
 0 L$ Y2 u2 S4 z: i我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
 9 s: l: ]. E: p3 B( e<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>7 p) D& v6 {7 Q7 B5 T1 G' I5 g( z
 <ol>. ~. t0 V) C. L8 P' Y( p
 <li>t.join 需要持有t的所有权</li>
 7 w6 g  ]  z' \' m. L<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>" x8 j3 ~3 K8 Q, q9 i- F! [
 </ol>
 Z/ f  P# J: i1 Y: b' c' _+ _<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>( O9 y$ `6 x% X7 p* U& n& E
 换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
 3 k% H" e$ g, ~' i  R' z<pre><code>struct Worker where* F4 X5 X+ Y0 y& ]5 {. c* N
 {' h; @, v9 @, D
 _id: usize,
 4 `+ W/ j8 ^) u' @* J    t: Option<JoinHandle<()>>,
 8 B5 f$ l. Z+ S+ Z% D% b}
 $ C( e3 ?7 ]( B4 R$ j</code></pre>
 3 X, ^' `. O% z/ c, ?<h1 id="要点总结">要点总结</h1>: K# y5 E1 |+ G7 H
 <ul>
 6 q+ @$ }0 j- U& e3 B<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>6 Z% N2 |* |) H$ P4 D
 <li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>7 @7 }. Y& b- ~. I
 </ul>
 6 o# A) r+ e; z% V" w1 O) H5 I<h1 id="完整代码">完整代码</h1>* n/ r5 t! B' c/ E
 <pre><code>use std::thread::{self, JoinHandle};3 K, \* X/ f  u: K  q: j
 use std::sync::{Arc, mpsc, Mutex};& s2 n$ i4 f% y6 ]1 x1 ~$ _: W
 
 " v+ _. O* y4 d0 ^- ]# b# m- O$ r4 c1 T, g, y+ ]
 type Job = Box<dyn FnOnce() + 'static + Send>;
 , Z) K/ r' z7 W; ]$ B: G6 Kenum Message {- H: d' r+ J8 t5 |* O
 ByeBye,
 - f/ G3 G4 I, M0 y" D) k# A% p6 M    NewJob(Job),' K- P' ]# k% w6 v7 r* F* P
 }
 ( F% ^% `- U" t) h- [
 0 N5 x" r+ }& P9 }7 C* }& D4 O, zstruct Worker where0 W' i0 w4 r5 [7 v9 l& E
 {: ]( S* b0 N5 C, Z9 D- c5 J: A
 _id: usize,9 X) |* E% K- B- ?; b" u+ C
 t: Option<JoinHandle<()>>,/ u  I$ _! l) c! e3 q) f' r
 }! Q8 P4 R! n# e0 V* }/ @' x
 
 ! X6 z, B. N/ w+ |6 x/ J5 oimpl Worker
 & u3 |! W2 `5 s- W7 }/ j{3 r. C9 W; I5 x5 h9 s+ r9 O
 fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {5 Y: J! U- ?' _1 k
 let t = thread::spawn( move || {7 x- S, }9 [8 O
 loop {
 . \4 D0 E8 b% o& v0 h6 c                let message = receiver.lock().unwrap().recv().unwrap();6 @5 Y: j, ?# _) ?& }! H
 match message {
 " H% P$ K3 Y0 b# n2 t( p                    Message::NewJob(job) => {$ ^* d4 g1 i' Q) k' Y. \
 println!("do job from worker[{}]", id);/ O6 {# t& y1 f3 k# d5 U* {- x
 job();
 ' x* ?9 Z) \/ ?! s% [                    },( O* N' D7 q, {0 \
 Message::ByeBye => {
 1 l) {3 |, a# \                        println!("ByeBye from worker[{}]", id);
 }; N1 W/ n6 w( Q, i) A/ F5 Z) _                        break
 9 _9 I0 j5 l* o9 \! m3 i; p                    },
 ' T+ }, v4 ?! S                }  1 m8 t: p( N' s2 n; P2 o2 G
 }
 - h0 u8 Y, d" Z3 Q9 G0 d/ `8 [! G        });
 ; D5 u  I  [& }! h& _, d7 |% H
 8 i% R: d, x6 q5 `        Worker {
 ; ~3 c9 s6 W9 P' v% @8 I            _id: id,9 j9 Y5 T9 l8 Q  Y- F8 J
 t: Some(t),- l& x# x" j- W4 g
 }1 ?8 ]4 U  X- f: V' C; ~1 O: ]! \1 B0 t
 }/ G8 d: r7 N5 v, u
 }! q9 g2 [+ _% t* x1 G; T
 
 o3 e2 I! d/ }pub struct Pool {
 + o7 W. E. P' d, i5 a, m  ~    workers: Vec<Worker>,
 R. F6 z  p7 S    max_workers: usize,
 ' O" P3 @' t4 y7 o    sender: mpsc::Sender<Message>
 $ q- x- B) A, D( [: c}4 f/ ~' N! F, S5 I0 T& e
 
 2 T3 z! J8 O- ~0 ^; u- |) Jimpl Pool where {
 5 N$ ^4 \7 I0 K# k# x    pub fn new(max_workers: usize) -> Pool {
 ) u+ M5 L+ V- `, E        if max_workers == 0 {
 8 a$ q2 q0 N3 y  i  m6 Z            panic!("max_workers must be greater than zero!")7 J1 r+ F6 ]/ B
 }
 % @7 w0 t3 _# L& b        let (tx, rx) = mpsc::channel();
 % ^+ l* r, s8 e. A) L' k- E/ @; ]0 X% K  }- f$ ~4 f* j( [
 let mut workers = Vec::with_capacity(max_workers);
 6 Y- Q1 T2 l! V, ~        let receiver = Arc::new(Mutex::new(rx));0 r5 z: y3 G! _# d
 for i in 0..max_workers {: v: f3 R3 w  ^( g
 workers.push(Worker::new(i, Arc::clone(&receiver)));+ ?! b# \! a2 K! B5 I8 T
 }# H& Y5 Y) H+ z8 C" F" i
 * T3 `/ H- G" H
 Pool { workers: workers, max_workers: max_workers, sender: tx }
 $ h* H; H0 W0 w* O+ [) z- c    }
 " y( {$ u4 v/ [( Z, E
 # n/ \; `6 N6 h3 D5 M  u    pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
 , J9 y( v$ L' D+ M/ \    {
 $ z1 T1 @) a: O* [
 3 t* D. S0 f8 L- r" D        let job = Message::NewJob(Box::new(f));
 / B" }  w4 Q  ^' Z  p/ n        self.sender.send(job).unwrap();. P% J* k0 F% ^4 b
 }6 W- l1 A4 j. a
 }
 % `' A* b3 [. R4 c' N+ J( h3 \& P+ y- C$ e& k0 t  t4 C
 impl Drop for Pool {. h7 X8 H8 j8 \& p. J0 A
 fn drop(&mut self) {3 k: i3 {8 ?- W% c7 r% B" b
 for _ in 0..self.max_workers {
 ! a' O; b$ Z. k& @6 y* o: @! s            self.sender.send(Message::ByeBye).unwrap();
 " |9 \3 V5 N1 L; g: {        }
 / {* Z6 y' j: x7 R; s6 S$ @. w0 j        for w in self.workers {
 1 q% {8 f5 D8 Y% b            if let Some(t) = w.t.take() {6 W9 c( M. f4 x5 E! x
 t.join().unwrap();
 * H7 y1 P3 o; I! R" _$ ]            }  X4 u' ~: y8 e6 i2 {0 V
 }* L( P+ d; Z* u5 z8 s4 o
 }! _: j! W2 u  R& R; b% C1 l# L. s
 }
 8 Y; c& x" D9 ?; X9 w+ {. l3 N* S8 l5 B- m; R: c
 . Y/ w" I4 v7 f# {! s1 c: R; F
 #[cfg(test)]- H0 i) Y. b: h# K' `0 }! J9 {
 mod tests {7 ]5 G8 W, a0 W6 Y$ P1 M% `; ]
 use super::*;
 $ I2 }) q' \1 c0 k2 ?    #[test]
 3 x; P% X$ ~2 L4 I+ V% W- Q    fn it_works() {9 b1 U; E. _1 l) v; a
 let p = Pool::new(4);7 f' A) J7 s4 \. u
 p.execute(|| println!("do new job1"));" r. v6 F3 p: G2 ]! R% i" g2 o1 d
 p.execute(|| println!("do new job2"));
 % z3 l6 P- J# l2 K5 u. ]2 r        p.execute(|| println!("do new job3"));
 1 R: v/ o2 Z9 C( H; a& Y$ W. `/ a0 V        p.execute(|| println!("do new job4"));
 + ]. U, z$ a" X: w3 w8 Q    }5 w- X9 N0 y5 o5 a: U
 }
 , O8 G( a7 J. v: Q0 o9 L/ g! B  B" I</code></pre>! R6 H, ?* `; Y7 w9 W
 5 h) F1 |" ^" g4 \( V
 
 | 
 |