飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 3357|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

4137

主题

4225

帖子

1万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
14711
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
  j! l8 k9 `' b2 v: i2 ^
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>( Q, r6 s- Q4 w/ a: X
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>- R6 F& ~! |6 k2 I% m5 m, a1 g
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
' j5 R& [$ j3 v+ E9 u+ G<p>线程池Pool</p>+ `, o( I7 A7 u" L8 v$ O0 {
<pre><code>pub struct Pool {
; p$ {5 ?0 s2 {  T" P+ f) x6 f3 N+ }  max_workers: usize, // 定义最大线程数
# v1 j. `$ N" k; t+ s; y- ]$ ^7 r" _" o}; L' n; ^; |* g% s
% w! c/ B9 P: o0 M* x/ n- _# _: w
impl Pool {
+ b: p" T/ }' b: o3 ~6 \  fn new(max_workers: usize) -&gt; Pool {}
! ]3 Y" n* t: {* H+ h. [1 P- i  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}! O4 \. q/ k0 h% A4 L
}8 M. V1 m+ d  r. I
( l& p- i# A$ B
</code></pre>0 r3 u! |+ [. ?: f7 a
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
# N6 o5 i! t* g8 y2 D  E<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
+ A1 o/ W9 C7 Q# Z5 g可以看作在一个线程里不断执行获取任务并执行的Worker。</p># k* N4 N$ K  h! Z. G$ N6 L
<pre><code>struct Worker where
4 W4 y" @0 R$ e# s{
' E1 \. F& |' O- {9 c    _id: usize, // worker 编号
& ~' y5 p+ ?) a. W}
& ~9 `7 _& ]/ T4 p* \# C" H7 X</code></pre>
" q" o  ~' N: ~<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
% u9 w2 T% B) n6 Y( m把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
# h5 }/ l9 y& G; t<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
: H1 i. l: m# |9 g# k<p>Pool的完整定义</p>
9 o) ]' x& s9 h5 N! n  T* p5 y- I<pre><code>pub struct Pool {
' z. C/ @4 f& {- W: m    workers: Vec&lt;Worker&gt;,
0 t! [& i) u( ~) c: F" u: G; I    max_workers: usize,
1 l# [& n/ Z  S2 y1 }* k    sender: mpsc::Sender&lt;Message&gt;
- d& x9 w- V+ `5 |}7 M  |6 P$ M" C) Y- G  h% @$ p: a
</code></pre>6 y6 q5 H3 Y/ D& d: b
<p>该是时候定义我们要发给Worker的消息Message了<br>5 k8 p% B0 r: z9 C4 w% w. Z
定义如下的枚举值</p>
3 |( ]$ o! s2 b: W<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
1 R. K2 i7 L6 D; {, ^2 M. Uenum Message {& D2 D6 i8 M, i' _$ D" F9 W7 F# f
    ByeBye,
4 V8 E9 z+ g6 @% r' ]! f& g9 z    NewJob(Job),- L2 K# B1 f9 G* i7 T
}
  K1 ]7 O$ U! U5 O' z</code></pre>
4 w; b( D! F9 }1 }5 B0 T8 S) s<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>, r; j  r& c5 a, O; Z5 p3 k6 w  W2 i
<p>只剩下实现Worker和Pool的具体逻辑了。</p>; ~7 J/ q( H, x  i! ^
<p>Worker的实现</p>
" N& Q5 J5 x8 f+ @% R0 [<pre><code>impl Worker
8 D1 P9 |# u8 f{& s% s! K: n' {. U  @% x/ i( _8 |
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {) _! P3 ?. `7 c+ d# n
        let t = thread::spawn( move || {
" l) M- m5 K" ~/ I1 e# [6 B            loop {, f8 P% J$ b4 b8 S
                let receiver = receiver.lock().unwrap();
- d; T" z3 }, ~4 B6 b" b1 s                let message=  receiver.recv().unwrap();) ~3 E2 C1 P, t# M# T, p) {+ L: c
                match message {7 F+ M" ?# \( f/ ?8 u8 v; i
                    Message::NewJob(job) =&gt; {( C$ l; ^* f! z( ?
                        println!("do job from worker[{}]", id);& }" C, p0 s; ?+ u9 b
                        job();
0 C" `. [! U! W                    },# f0 J/ g, Y7 z! G3 h" z
                    Message::ByeBye =&gt; {/ O/ X; w" t& X0 d
                        println!("ByeBye from worker[{}]", id);4 i+ g/ ^5 ~; b: S/ E
                        break
* f" n! P: J. u- Y                    },) e1 v. I% K3 Z5 x: B
                }  6 y/ \7 V' X' H2 ?4 y
            }! b- N  b% K; w0 [. B# \' S
        });
* J* I# O5 K* q9 T" A: ]( E3 s7 N% q8 ?% I& n6 o
        Worker {
# w5 u& i; c' G, |5 H0 L& r            _id: id,
9 w: |6 g( h8 f/ a0 R- y% m; G            t: Some(t),
& \) K, ?" w4 J& r3 l* P        }
1 s' n; d5 m0 K* s( l    }1 h5 }% f" T2 ~  O% e: C0 j7 {
}
- d8 p" c5 x# E3 `% `: e</code></pre>
4 M" _! C7 u: e( W5 C<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>6 L9 I& q7 O% D) J" A
但如果写成</p>
, Q# t. m+ K. Y. W7 q. R% b, @<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
) E/ s' @: K( ?* \6 e/ \1 \};8 C- W; B* `$ V
</code></pre>
, o* s( D7 g" r( [0 ~) k<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>( ?6 L5 F. F" N+ ^
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>) Q1 \$ |" w5 }" _3 i6 _2 i
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
7 L/ D3 \2 O+ o; b: G<pre><code>impl Drop for Pool {
: ?1 m. E4 G: F+ a& D    fn drop(&amp;mut self) {( {. ]7 ^! t, u& P
        for _ in 0..self.max_workers {
) [+ c, C; N, E4 B5 {& y            self.sender.send(Message::ByeBye).unwrap();( r* N: ^! E+ c! }
        }
4 t: \9 h/ C+ B2 Y# W! w) z9 }/ N        for w in self.workers.iter_mut() {/ I) B6 h% J# Y8 E
            if let Some(t) = w.t.take() {, z, n# \8 t( X5 F1 z
                t.join().unwrap();! n" Q% v* o  g' T) h( l
            }0 y; \! ?# L9 W0 K% E8 s2 A2 ]
        }
' B" g: p  x; C, R' k% u( U& C    }
' \# D- K, m2 Y1 g# \6 p3 t( `}
7 w- ~1 ]( n# `; Y  q: N+ g% g; P0 ?0 }2 x2 Y
</code></pre>  ~. z, i9 K4 J9 ?4 a9 j0 S
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>* r! O+ H: G, d/ z9 ^% H; G
<pre><code>for w in self.workers.iter_mut() {0 r! D$ E1 L/ s2 t8 d7 B7 [
    if let Some(t) = w.t.take() {
& t2 m: Y2 U3 Q$ b3 x        self.sender.send(Message::ByeBye).unwrap();6 h9 h; K& m% ^3 G4 w1 T
        t.join().unwrap();# S" L! ~1 S1 a4 D, a5 R2 J( s
    }
5 z/ o4 i6 \5 Z, H}
8 `7 `: |0 |/ l+ U6 C/ K  m5 V) d
( F, B* s, @: y6 D! Q</code></pre>
" F2 I6 s0 N7 N: F! t; {; ~<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>: n' D- Y# {% e4 A( V# O. d
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
0 `0 b5 x; \; ]* [: H( e" C: C<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
5 `/ H& I) j. Q2 n" F, s# `( Y<ol>
3 q) g. a2 n, E  ]4 \. u: v9 g<li>t.join 需要持有t的所有权</li>" s) ?  r0 J9 j% @
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>: j( @( c) S4 v6 d) j% J
</ol>: n- d; n" i' f: s
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
7 y2 R) u) ~1 `+ y) W# P7 ^换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>8 i, f: p1 d2 T9 `6 S
<pre><code>struct Worker where
$ V  d( J( O  U" _: J* b6 j{$ T. k9 v$ `8 P4 Y9 t( X
    _id: usize,, \3 L5 L* }) ^/ x0 E1 q) ^
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,! R2 O1 A" [+ y9 J' f' O( t
}
! s  |$ O+ K# p8 z</code></pre>
7 H1 ?: D& m0 N) |& z: R3 A: q+ T<h1 id="要点总结">要点总结</h1>
: b8 Q' e& H& L8 E9 k- D, R<ul>
- @  i( d8 q. N<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
) f) P0 b' y, t; G  z1 ]<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>) R0 m' Y0 u! S& o# t. A% `5 I8 v
</ul>
3 m) @0 [; Q7 G' V, n. f<h1 id="完整代码">完整代码</h1>! b( U+ b* [% a2 g2 F
<pre><code>use std::thread::{self, JoinHandle};
* Y" j' o7 L' h& huse std::sync::{Arc, mpsc, Mutex};) U$ f5 v$ F9 R6 r

, |' I; w( {  D6 d. E, A* R  [% j# k. T! _4 e4 y
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;! G4 Q1 o5 m* p1 q) s+ U2 V6 o
enum Message {" h" G$ O  n8 }
    ByeBye,
+ d, u- j4 t, T4 j6 T0 z3 n    NewJob(Job),
0 w! U  k7 G5 V2 v1 k  k}
9 R' C- _& U9 V6 K7 G$ i3 p; q& k3 r
struct Worker where% c/ O. Q+ a3 I7 l  m* F& F
{
: |! k$ m+ Y* P- J7 _) t    _id: usize,1 I; e) Y- P# |$ ?+ B4 v% ^3 [
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,* I# Y! w+ Q' T2 J
}
- F- D0 t2 A# }1 a, K
: l. m6 `: s3 R" Q% f3 I1 N  Gimpl Worker$ Q3 ~1 t1 e4 {; k8 S# m
{
; ~; V% d/ V3 `) o    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
: w) M& u: I) c* k" Y7 q1 m5 @        let t = thread::spawn( move || {
) ?: q3 M1 q& [3 \5 T; Y            loop {
( |5 C. f# w; Y6 c: G; z6 y                let message = receiver.lock().unwrap().recv().unwrap();
' Y0 |% h* D; e7 {0 A# h9 l                match message {' T" V7 a/ J$ K* h/ g, x2 r
                    Message::NewJob(job) =&gt; {
1 s: b: o2 l+ \! [) Y$ C                        println!("do job from worker[{}]", id);
' |3 l0 a8 G( X                        job();
- v/ ]) v2 A$ b; w                    },
  [( T1 r8 V2 W9 W) o  n# w                    Message::ByeBye =&gt; {
( ]" Z* O6 E  f! U& I( C                        println!("ByeBye from worker[{}]", id);4 G' }; X6 K/ C% N" o' P; I! I
                        break
# j  {7 h# M2 @) G; `                    },, E& O! h# I& I8 L! I7 Q. K
                }  
8 Q4 o9 K* {& E# O$ c9 R0 I- v            }
. G+ Q; c. t0 V& y* x6 D* g+ A. V( K        });
9 s9 v4 D+ b7 A
" b9 Q. A$ N+ H1 o- ^6 x: K        Worker {
( T0 b) X6 j0 R5 }, r            _id: id,
8 U7 d- f2 [) e& y0 @0 y            t: Some(t),9 `& K2 N" T. ^  m7 y
        }
8 q& F" n+ F$ x; }    }
* w# C- j  `( b7 w) m* I5 q  f; @}9 B* L, Y. D; V6 P, q4 V  C

7 B* T# }1 ?9 l- u* Cpub struct Pool {; o( c3 c# f) E$ s" S0 o
    workers: Vec&lt;Worker&gt;,5 I# |2 _$ q; G5 w
    max_workers: usize,% g2 v5 Z: m$ q0 a/ G( U9 n
    sender: mpsc::Sender&lt;Message&gt;
9 W; e0 Q- J9 g# h1 T$ `6 |}* \4 ~/ k4 a' J" A

1 ^0 ?. [% M! Y& v+ Pimpl Pool where {
. }+ ]0 F) A9 c* Q. w2 n    pub fn new(max_workers: usize) -&gt; Pool {1 H( n! L* ]' o4 _
        if max_workers == 0 {
+ G! ?2 f. M3 X- r9 J% ]( y% [            panic!("max_workers must be greater than zero!")( o( ~* T  v" }- k3 ~+ ?0 G
        }1 \" G9 ]  f- H% f
        let (tx, rx) = mpsc::channel();2 G! F. d  W; }+ D6 I8 x* ^

; n$ E" ~: ^4 \1 l; M% x3 H  C        let mut workers = Vec::with_capacity(max_workers);8 `7 i* `8 U( x
        let receiver = Arc::new(Mutex::new(rx));
( b/ S3 L8 C# |# |, d6 l* T! ^7 P  `        for i in 0..max_workers {. g9 E# m+ k+ n$ {& L1 p% b, I3 U
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
+ t, _& @9 m- l2 N$ G        }, @% k) |8 S+ B0 I
6 K( R1 H  _! r  M
        Pool { workers: workers, max_workers: max_workers, sender: tx }# S  R& V" y$ W2 Q' N, A: Q
    }4 H/ N+ y2 B6 W: G6 M4 C8 o1 j, S
    1 s; g% T/ d- p' b; N7 Q0 S. X1 g( o
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send7 `+ ~. I/ c  p$ B: O- S5 W7 E
    {
9 I. {! V' A8 O; B) a, e
# L( P* e8 l! Z+ |  O9 [# X( x; m        let job = Message::NewJob(Box::new(f));0 A1 r) q# `4 u
        self.sender.send(job).unwrap();. ]) h) u; U: k1 t/ B9 r# f/ _) t
    }
0 \4 P: r3 k. b  s7 G: o}
, q. \% h9 Z/ G: q* ?3 b/ _* r5 a5 @& y- r6 O
impl Drop for Pool {: U: ~7 H: v+ \9 r( Q
    fn drop(&amp;mut self) {
3 G5 B$ |! N; ^% x        for _ in 0..self.max_workers {
  `4 Y) C0 v3 O8 _            self.sender.send(Message::ByeBye).unwrap();( ~+ m( \& y& n" J. O- R4 A
        }: N+ m* {' t- X+ N
        for w in self.workers {
: D& X% ~+ j5 h* T; H            if let Some(t) = w.t.take() {; \% o. M' B1 ?" ^) p
                t.join().unwrap();. [+ x8 y5 @/ C5 b# c1 p
            }" D5 q7 `+ x* e  G, F
        }
4 r6 S% ^2 R' B/ ~/ w: \2 U3 _    }' G  p% k" \- `8 a# I. y6 H) ~) z
}
( h7 f6 f: Z* a- `1 i% @0 P# `
1 N# N% \6 `+ ?9 l, p, g6 B, v! R: m% u
#[cfg(test)]
* ~) t1 B- @4 C% |. |" J/ }mod tests {
  }( E4 [1 e- w5 S    use super::*;
+ l) t4 i1 A  T) s( a- R9 \    #[test]# w2 ~2 u4 N, y7 b; l( X- u7 H
    fn it_works() {
6 k) M$ z5 X7 I  ~. l9 K) F        let p = Pool::new(4);8 y  D. x' E% W$ C# C
        p.execute(|| println!("do new job1"));" w( S* Z# ?7 X! _6 `
        p.execute(|| println!("do new job2"));# I% f  S# S* |& e1 M
        p.execute(|| println!("do new job3"));
8 ]( w3 @0 K4 ?5 ^/ U        p.execute(|| println!("do new job4"));5 R# ?' H* _6 [) t3 e3 l- w) ~! k
    }& a& Z* A3 B  N: e  Z* v6 r9 b' D
}7 y5 D* {# k' |! ~5 Y
</code></pre>
7 \8 m$ a. ~0 L# y2 v# n& o' f& R2 `6 ]& R# O% ?' N
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2024-4-25 19:36 , Processed in 0.066234 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表