飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 17615|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8834

主题

8922

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
28832
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
' \8 h9 {! v; I2 G8 z1 c! o% r
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>3 B9 X# j& h$ z
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
9 B# }7 y# y8 o  ]5 v7 g5 C; |<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
8 f# v4 v, ?0 f; n7 N" P6 x* o9 h<p>线程池Pool</p>
3 q% c2 u  ~+ U5 C7 |<pre><code>pub struct Pool {
( E' l, I9 g2 ~8 R1 z  max_workers: usize, // 定义最大线程数, c0 ~+ c, o" M6 z1 S5 ]& C
}* {- Q3 j8 S! x0 Z5 o
# P0 M& M8 a6 b1 Z& }
impl Pool {% L. |8 v+ J" f7 u
  fn new(max_workers: usize) -&gt; Pool {}
2 F4 c  R6 e8 x9 R- F0 s  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
. E6 Q: Y! m0 E3 `}5 U' i' _9 m: N9 a6 O/ V; ]+ D

% |5 n$ o% }; ?% }- L( m# m: Y</code></pre>9 V8 U5 E1 J3 C, p# a6 P
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
# |6 f: H% T  H7 j<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
$ D" R+ o" ~0 e可以看作在一个线程里不断执行获取任务并执行的Worker。</p>$ v+ _9 J; o) e" X
<pre><code>struct Worker where% n6 F4 P% [0 R- I
{% ^# }  C6 `4 ^) Q% _7 i
    _id: usize, // worker 编号
6 r, n! h5 X: u8 R: h# F, w1 A}
2 L! N9 z9 i7 W</code></pre>
+ x! s4 f3 V/ @% Y! C5 K<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
, z  b8 P: X# _8 @- @; U把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>, T5 O* j& k( f" t1 [9 x% r' C
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
3 @0 _4 X. l! ~* u) ^<p>Pool的完整定义</p>
! z4 c% f2 k# a9 V4 H, c<pre><code>pub struct Pool {# ^% a* Q* ^% }7 u- T
    workers: Vec&lt;Worker&gt;,
, g2 v9 |1 _, @( c    max_workers: usize,
" J& I0 ^3 O' ?8 m- O( U. h    sender: mpsc::Sender&lt;Message&gt;6 Q& N1 [! O& Z$ e
}
& j3 h; u/ l$ \" U8 D</code></pre>
8 q# q6 s1 K) L2 }<p>该是时候定义我们要发给Worker的消息Message了<br>2 M2 C& l3 g5 @* A6 `7 ~. G
定义如下的枚举值</p>6 `1 P( A2 o; M
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
8 Z- f1 g6 Y  ]enum Message {
5 [% @/ U2 Q4 ]  o1 @    ByeBye,
/ O# U9 }) z" I6 a2 Z    NewJob(Job),
; P* C1 f, Q6 t. J& K; k}
! {) A3 _' G$ }) U9 x3 C% a( Y</code></pre>
3 v5 \2 B* v" c+ K$ P<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>: ~8 Z- A" j. a4 g* ?
<p>只剩下实现Worker和Pool的具体逻辑了。</p>- Q$ I1 t7 i6 D# u
<p>Worker的实现</p>
$ O. k8 u# b6 l0 ~; \9 X, |* }<pre><code>impl Worker1 K- I0 J0 d3 O% h4 t! P: i$ D
{6 n- _4 B5 r; a* O2 a/ E
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
6 i% c$ T% m" c) _, Y8 C( M        let t = thread::spawn( move || {* Z1 _( ]* G0 i0 F; B3 q; \
            loop {5 Q. I: `: `- U; V; [) P
                let receiver = receiver.lock().unwrap();
# d( W$ U" {: a9 V1 `1 b                let message=  receiver.recv().unwrap();2 L8 ]( B7 P; V
                match message {
6 I! K5 B( N' e0 u! x# \  g8 `                    Message::NewJob(job) =&gt; {
/ k/ a8 _: W/ N; F) Y1 s2 |% ?                        println!("do job from worker[{}]", id);2 x" q; C; c  G0 W: F
                        job();4 d+ ?, }% C% g- e
                    },0 R- }8 B# O* l, M' l
                    Message::ByeBye =&gt; {
# r2 o1 B7 K+ k/ y+ T, u7 r                        println!("ByeBye from worker[{}]", id);/ J  w2 {) A" C9 G3 W" x
                        break
; q5 S- q) \/ e* C3 q- G                    },2 a0 r/ q4 s5 u$ A7 {0 d
                }  
7 V$ a) P/ ?/ r$ @            }
! B) H3 \- E6 t& z% ?4 O4 Q        });2 W- x2 J, l% Z+ i4 b# O

& ^" W' l5 J8 e: H+ d        Worker {
" i6 x4 {, Q+ [+ ~4 x( K            _id: id,' c$ f4 g3 E( X  B" P( h% [" a
            t: Some(t),9 U4 ?- o$ Y7 n. Q$ R/ C
        }
6 B. R$ N' }, T7 G    }
! {8 Z0 i3 W- O" ]- W' V" i}
1 _1 u# Q" B% `# P$ q6 L</code></pre>
) U) R- u; N# t<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>+ D7 |* D, G4 Y2 N# |: {# `
但如果写成</p># t- v9 o9 x3 h1 E- @( v
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
1 p7 |# b2 R: [6 q1 P( |};1 o6 b) j# M! |" R% b4 d# p5 R$ i* Q
</code></pre>) m1 q9 r8 B. r! B
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>! ?; z" ^5 ?) Q- h2 B
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
& N1 ?. y# [3 C8 Z" b# k! R: ^4 [<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>. U2 ^! u1 }3 b8 f: O0 P( q$ j
<pre><code>impl Drop for Pool {
9 F, k8 W& A+ [$ l7 a% ~* ~    fn drop(&amp;mut self) {
8 J8 o( @& l( V# X        for _ in 0..self.max_workers {9 i$ E/ N* j! @6 O# h
            self.sender.send(Message::ByeBye).unwrap();( w( i5 I# p: H- j9 v3 A8 ~
        }
7 P; X! W9 ^8 s2 h1 z        for w in self.workers.iter_mut() {
: v/ a  {9 N, ~* |            if let Some(t) = w.t.take() {
$ t) Q0 H, J/ S5 ]& {                t.join().unwrap();- D) ^9 W& |( _9 {" ?" _& c
            }4 b* C. c" ^$ @
        }/ K$ m0 y" s( b  Y5 C
    }# N# P6 c! h8 Y4 P, L  k7 V
}
+ d& x/ E5 M; a! ~& X4 f- {' O; C& o" Z, F" i
</code></pre>: M' l7 k( {! @
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>0 j5 z. x# p0 g+ l, n
<pre><code>for w in self.workers.iter_mut() {
: t8 @! I$ U) _/ |& Q# D  v    if let Some(t) = w.t.take() {
' T8 y) X7 S9 P0 j        self.sender.send(Message::ByeBye).unwrap();
5 l/ j$ F$ X& v! U4 D; _  P        t.join().unwrap();3 ~  C" a* V3 e& z; W/ B7 Z
    }3 Z# e2 g4 a0 j+ U) l* y2 v# F
}4 t0 v3 `3 W; o8 d
$ r! _# I4 h" D
</code></pre>
- ?0 y4 @0 J3 v: Y* T5 d<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
& @2 g" w+ {- b! ~  \( u我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
  c  i+ E( f& r* c% [<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>: P- x  V& e* m4 K8 w7 O# X( r
<ol>
* k: x9 `& ~( \<li>t.join 需要持有t的所有权</li>7 {6 N+ z0 L4 n. J$ G, {5 c: ?
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>! O3 O4 k% c2 i* u7 `; e+ Z0 ~
</ol>% r2 g3 X9 q  U* H- V+ w; Z  e
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
  B! f. U7 n  b; g# o) d换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
: I$ p) A( l4 X$ ?0 |<pre><code>struct Worker where
% R; `5 ?+ S" ?7 ]8 h# ^2 j{
0 D5 a6 d! w3 ]0 I" D    _id: usize,( h) C0 A; |: s" j  D2 W; U/ k
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
9 t0 a( L6 v3 v% K3 b" r. p}
+ ]/ }8 M" ^% U</code></pre>6 m: u+ ~( k. D/ M: [: D- S/ a
<h1 id="要点总结">要点总结</h1>3 n5 V' q0 O- M4 j' D' N' R
<ul>* N8 g/ l' T, f! R- f
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
. a$ W+ h( x! [/ J& `: s<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
/ ]6 h: ?! h, I* V$ ]2 p</ul>
3 T  {% N( d# c( [  T$ V6 w<h1 id="完整代码">完整代码</h1>) ]3 B1 U! e) @: w# g
<pre><code>use std::thread::{self, JoinHandle};  j) ^& V# Q$ }  S3 O
use std::sync::{Arc, mpsc, Mutex};  r/ m2 d/ K: g. b/ O3 I

, [9 ~3 \) O0 ?4 C/ l+ V
) t$ K2 I) s1 m+ D- ctype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;, \: J+ |  I7 e5 l/ O5 n
enum Message {2 y2 {$ r! I# P7 E& }# P
    ByeBye,% x: r* X+ j) a0 ]' r
    NewJob(Job),0 N6 D7 C. k! \; ~& g
}8 @" [: w# I% q# l" [
# Y; C. i6 Q4 n1 i" {
struct Worker where
9 Q$ m8 l1 O8 I# ]{
# C( J+ m* w6 g$ |    _id: usize,
: u) p( J& x: r+ R& I5 Q1 L    t: Option&lt;JoinHandle&lt;()&gt;&gt;,$ }. H; _/ T5 Q8 n- x
}; }' [3 |3 g" F+ d# P; ]
+ h( w% P4 L, R9 D
impl Worker% E* |# E4 G: b& j% H2 Z' ~
{: ^* T4 Y( J- Y/ Y5 b+ H
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
# y0 C* W( k' m. L( P4 Y: y6 Z        let t = thread::spawn( move || {/ r  z! `  e, X/ |6 g
            loop {
- G8 @# J2 Z& o/ ^, T                let message = receiver.lock().unwrap().recv().unwrap();- e+ ~7 ]9 T& z6 Z, z! L
                match message {
. J3 \4 I3 d; K                    Message::NewJob(job) =&gt; {
. x: E" g9 y& h6 [( d/ l, @- X                        println!("do job from worker[{}]", id);6 M: d1 M6 \4 r7 c+ a% Y7 C
                        job();
5 f' L: C8 D& L. {! Y6 g/ G                    },
3 u3 n# N4 ]1 S8 V5 y                    Message::ByeBye =&gt; {
; x: u8 v( W7 {* }                        println!("ByeBye from worker[{}]", id);
5 _7 ^* f5 H6 R, e7 J4 z+ ]7 M                        break
, N$ P+ F' c1 V# W) J7 _" [                    },
5 _( V+ J# \+ c" y) x                }  - W: Q; X2 H+ ~$ L* y" V
            }
$ s) E* t" M% r7 S1 P$ @3 d7 J% h        });: H0 K, [4 _! D& J1 Q

. z. {1 l! s( P( j1 x$ }5 W        Worker {$ K0 B, Q. }; J/ @; s
            _id: id,& {- a8 `  P6 D+ r+ d
            t: Some(t),0 V) `8 W& H. R* q6 P8 O. Y
        }
0 b# ]7 f7 ^2 z3 U/ I4 I; O    }
) i2 H% _; A% ^9 M0 p3 p$ ?! X}* b% ~% C0 O' _+ Y

  S/ c* d0 @8 i: D# {pub struct Pool {
5 q- [5 Q: y& D# h    workers: Vec&lt;Worker&gt;,
, l( W1 X/ m, |' y; W4 E7 b    max_workers: usize,1 Q0 |. |. g/ |7 y- ]" S% U6 D% a4 u
    sender: mpsc::Sender&lt;Message&gt;" O' o  L& G- m! f' _
}
; T, f( t/ K1 v# y
* v* e" T9 i/ ximpl Pool where {
$ x1 P& h0 M  ~    pub fn new(max_workers: usize) -&gt; Pool {
( q' Y7 y+ k3 B) w( ^+ }        if max_workers == 0 {
, ~2 E, B( a* P( C( u7 v            panic!("max_workers must be greater than zero!"). _8 b' N% O2 b$ V; Y6 A
        }
  k( @9 ]4 |3 G% J3 u( @0 u        let (tx, rx) = mpsc::channel();
# W5 U! L! P/ j6 x! h: \! L- D8 q1 I
2 |* T( p0 n0 j        let mut workers = Vec::with_capacity(max_workers);
! H6 x9 D7 \1 o8 \        let receiver = Arc::new(Mutex::new(rx));
# F( r' m1 k$ k7 F; ]/ N" q3 I        for i in 0..max_workers {
1 ~, Q4 N3 B6 Q$ I4 k            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));. W6 E& r: g& Y  s  E4 V4 a
        }
6 y7 z' l; r! U" z: C
+ g5 W$ e1 P4 Q  [% L        Pool { workers: workers, max_workers: max_workers, sender: tx }
7 I$ X: S! ^9 H, ^3 F2 `. c0 y6 p    }9 o6 A  G; m) O) P
    + N, |3 P$ W2 a5 G# f
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send6 S* @' x7 h7 X
    {
& F( l9 K/ ~! f7 h: a% U* s+ T; C# D& v% N
        let job = Message::NewJob(Box::new(f));
7 F1 U# N7 K6 I+ b+ H7 u        self.sender.send(job).unwrap();1 N5 B% D6 [3 S% V% g' p
    }
' o2 z  e" i0 S+ T}7 z; |! T/ _: [2 z  ?6 q/ F

7 e9 B7 d8 T! j+ v/ G' Y+ V" T+ U3 Dimpl Drop for Pool {0 _. l0 G: C; X1 S6 o  s
    fn drop(&amp;mut self) {- [+ {' t- \! ^! Q2 g0 N+ Z/ f. p
        for _ in 0..self.max_workers {0 M* l. y- L0 j# p8 ?9 l: I
            self.sender.send(Message::ByeBye).unwrap();
! u% m. i) _# ]3 F/ j( `        }; O; ?1 M4 }5 Z3 c( v
        for w in self.workers {
4 K- J6 V% o! J6 w# [# }# X            if let Some(t) = w.t.take() {
  S( Y4 q# ]$ h+ [                t.join().unwrap();
3 q- d) F  q4 X* q# x5 i  \            }
8 P5 x0 c! e+ ]0 t$ N' n& X) ]2 C        }
2 Y! j. O* r& H& K4 g9 |; A+ c    }* Z2 u' r9 G: U4 T
}
. ^8 D( p+ u0 e, E. o' i2 n% e" k" _- ]+ R. H( }& O

& B& e/ C6 ]" f4 h4 I#[cfg(test)]
( R1 W( z- Q7 P  Y4 K0 `mod tests {) @" T0 ~3 O4 Z2 y
    use super::*;
: O. F0 @# |8 P) u" P    #[test]0 }5 O0 l( d  R
    fn it_works() {
( j1 Z: V! D- K; d4 h& F, F8 ~        let p = Pool::new(4);% z1 p! x. \8 @8 _
        p.execute(|| println!("do new job1"));
# a  [1 |4 `9 S" @& P% V& s        p.execute(|| println!("do new job2"));" Y" Z& V( ]$ Q% @8 A
        p.execute(|| println!("do new job3"));; m0 h$ |" c3 v  Q: b
        p.execute(|| println!("do new job4"));
0 t) F. D: y  w: U" R- H    }
$ Y8 T( F; L; j' X/ g}4 ^+ t% ]2 E9 ~9 w$ D! Y
</code></pre>
; w* ?6 e  Y4 k. K' g& d* L) x' v+ A
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2026-6-3 08:50 , Processed in 0.065220 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表