飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 9475|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

6738

主题

6826

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
22542
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
9 }0 s' m7 `4 J; A9 a+ t" E& M
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
9 i$ W- r  h- ]) b<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
9 t( W. H! X' r) w. t- ?<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
0 G( X- Z: d/ @6 _# S<p>线程池Pool</p>
6 d  l* }- J* L6 h<pre><code>pub struct Pool {
4 h4 r# ~! W& A: |$ W  max_workers: usize, // 定义最大线程数; k/ T& a0 K9 \
}, V0 Q+ b0 n1 w# B8 a+ O1 x$ f4 H

3 P8 E6 F: A& L/ ?3 E) a9 _3 a6 o; c* dimpl Pool {+ K& O$ A* e  E  F( o  P
  fn new(max_workers: usize) -&gt; Pool {}! z* @' b7 N. a# G! |
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
3 [: s5 I0 T% [! Q. H6 y}
. T) }, X! H5 ^9 d) D# H# x$ G5 e3 N: C' _0 _  X
</code></pre>
$ M% r, Y. _* T& W+ }<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>/ p7 D. \, b6 y% n: C/ Y& Q
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
, r5 P: L1 n  t7 V# {$ H可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
# E$ X; b& X  j7 m<pre><code>struct Worker where# i, Z) P7 H% f+ Q1 O
{/ C3 [7 e. m' h
    _id: usize, // worker 编号  B0 T& _/ n0 @0 ?6 ~$ ]; g
}# ~0 g) x/ L  P. T! g
</code></pre>
. r/ [. k  U  S" L, i: ~<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>0 z/ o6 ]4 s" Q
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
' P" l& ^" r4 p: g7 k, @# h<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>  a/ s6 C. L2 _2 E3 ?) o
<p>Pool的完整定义</p>6 f2 x8 s" E; Z, o
<pre><code>pub struct Pool {0 G3 r, E# t6 M. G1 z# j* B
    workers: Vec&lt;Worker&gt;,
3 k7 e. a8 l! K2 ~    max_workers: usize,* b( E5 a' {; G0 r4 K; j5 |: X, b& l
    sender: mpsc::Sender&lt;Message&gt;3 v/ Y  T- M& n( z: B5 O
}9 x1 ~* X, y& O, S
</code></pre>
8 H( W- {* {* t5 q" y* }<p>该是时候定义我们要发给Worker的消息Message了<br>% f8 T1 t& Y% I  @' {5 S& d% ^
定义如下的枚举值</p>9 [' I3 y% [* [' R5 ?, ~
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;$ N4 _, v& I( R2 e
enum Message {0 \, e$ Z9 k8 q1 N
    ByeBye,
0 G/ @0 b# ^, f+ p" A    NewJob(Job),* L# V, _4 e6 _5 r  R% y7 Z) X
}$ h2 k) K! j1 K6 ~
</code></pre>
6 n9 V2 r3 F: ^& g3 O& m7 a<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>- B  s/ O" W: R. V0 t
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
9 o- L+ h- t5 z$ f/ C9 C<p>Worker的实现</p>2 s/ A5 [. {$ |3 N0 y1 _
<pre><code>impl Worker* F2 e) l! L" C& N
{
' u. F9 P+ m& M( u8 ]    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {9 T: {) g  k( c: c0 H2 B0 {
        let t = thread::spawn( move || {
/ e, ?, o6 I, G! T            loop {7 c% w2 N) b5 ?8 p) k
                let receiver = receiver.lock().unwrap();; a/ G) b" i5 V8 ?% B1 {! [5 ^- R
                let message=  receiver.recv().unwrap();
& y5 g, n! x9 D' O. l# x* E* N0 i4 {                match message {( x5 H: C) N  j% ?6 O2 \0 E
                    Message::NewJob(job) =&gt; {
" r7 n7 }/ U& {' e: g: G) e                        println!("do job from worker[{}]", id);% H4 t1 y" P3 S5 f0 V- i
                        job();
2 R; }  o& Z1 C# h* H$ @. W! h                    },* s; b( y3 \  L) ~
                    Message::ByeBye =&gt; {1 T5 n. ~. B. u" ?, t
                        println!("ByeBye from worker[{}]", id);
1 h% o1 h# c8 ]. v" P: P- m                        break
# G. z  R3 H  j/ f7 V- y+ r  R$ h                    },. ]6 }6 b2 K7 i; U% B
                }  - k5 U% g7 B9 e6 D/ \- @: y
            }7 I8 {4 c8 _1 n' B! _5 Z7 z
        });) w7 Q! M0 ^4 f! u$ ^

1 ~) |# z* v! n2 H        Worker {& y9 }9 d( ]8 N- V( K
            _id: id,
) a0 G% k' S0 y! e! T- ^7 C            t: Some(t),$ Z% X4 D3 w' N5 `- d' h4 P
        }; p/ O- o% v  z: @
    }
. P: h9 Y) ~- A: Q$ o( |}
' m* m8 ?/ q$ A. q; Y</code></pre>. `4 e! p$ h! D* ?% s" s
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
! Q( A3 l- @& t/ N  @  C但如果写成</p>
3 W- t+ ~( O( x+ z<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {( \& c9 f2 P5 y+ Y( i
};
- K8 k* m  f6 i) c$ E* L$ R; y</code></pre>/ S) f7 x" @8 ?7 C) c/ A0 g$ r
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>+ F3 K: w2 Q7 h; i" x
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>& b) f8 r( C& j% g5 u
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>% S' p7 j/ [; ^# m, t
<pre><code>impl Drop for Pool {
9 u7 |  [" x7 V, j& E: q    fn drop(&amp;mut self) {) _& e; N7 y: }& S7 O
        for _ in 0..self.max_workers {) I" E. ~# R. ]2 N) n3 U% Y3 F0 }
            self.sender.send(Message::ByeBye).unwrap();
% F  R0 r& O% y# M- P6 S        }7 o+ k7 {& g# x
        for w in self.workers.iter_mut() {
5 t; L2 P* S9 \            if let Some(t) = w.t.take() {
5 |$ a3 z; _+ R! r# i3 ^                t.join().unwrap();
3 ]" B; Q- U' h            }
, {! `7 \+ z; v        }- I, u8 {* Q) x# @8 H* N$ D1 ^
    }
4 o$ S( Z8 E3 W0 o; v+ k) n5 S}- S0 I9 O" \3 H1 @$ l* q" \$ B
# F. q/ B5 |4 `' E& f" N
</code></pre>& c- x, {- K3 P8 d0 \& F4 I
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
9 h+ ?+ T; C, y<pre><code>for w in self.workers.iter_mut() {
& L. k6 X, r' ?3 f, `/ [    if let Some(t) = w.t.take() {4 q( v( R- g' z
        self.sender.send(Message::ByeBye).unwrap();
1 q$ O1 F  B/ q/ N) G        t.join().unwrap();+ _5 }; q" Q- K/ V. N3 E7 k8 n& h, Y
    }
  ]4 l! e2 j2 n6 _! C}  f: P1 O. D; m
6 P5 C6 W/ b4 Z: x1 X2 y
</code></pre>, n) E9 t) `8 J& `( I1 U3 T; }1 x
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>  x  p: W9 |/ k- \
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
" f3 Z8 a6 L  }9 Y/ U/ ]9 F: {<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
3 D# x, L" Q& H8 m' M- C* k: T<ol>
) I2 t/ [9 ^/ ^- D<li>t.join 需要持有t的所有权</li>
$ y  G4 H% B1 b9 i: N1 m<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>% Z! ?% ]- q2 E
</ol>3 V% n) v1 e* g2 D0 I' ]
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
2 t0 a$ i7 C# R( _! z换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
2 i$ h7 a2 ^+ r% M<pre><code>struct Worker where. p% }1 ~: t/ n+ y1 n
{8 F- S0 o, d1 Q( r
    _id: usize,
' c! {2 n, S4 t# g: I0 Z0 ^    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
6 h4 a9 ^. K4 f" O2 }1 d}, S( E9 h" S' H2 d
</code></pre>
: m9 N7 X( i1 M. [<h1 id="要点总结">要点总结</h1>( p: U. F+ j% r, m
<ul>
+ }% _9 E- G& r4 ?<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>) T: m+ k+ D$ Z
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>3 C! @2 }9 n6 ?" F
</ul>
9 @( v5 I% @6 I( n6 p0 k; K<h1 id="完整代码">完整代码</h1>% K6 l5 m. K0 P8 w$ u- V1 }' Z
<pre><code>use std::thread::{self, JoinHandle};3 ^/ G; F2 C9 M* M% J8 \, Y
use std::sync::{Arc, mpsc, Mutex};
  m( P0 k2 {/ t. r. f: \0 K, N9 p4 Z* b* l) }& H

; k- v' X  E' @7 H5 [+ @: u5 Etype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;' o! O7 t* W" U, o( O2 e' Z" S( c6 w
enum Message {+ P; T" A' j# f0 \( x; Z
    ByeBye,$ f5 u/ U; c3 p" Y) d9 T% Q& U; N
    NewJob(Job),
2 X7 a: h' J' i3 G. c  o}: w* [" w/ v1 C3 a. X( t. B
1 A& ?) a5 Z* t8 y) e
struct Worker where
1 B* L+ u# P- J9 @4 g{
* s1 t1 A5 e/ p+ o2 L    _id: usize,
7 ^7 R. N" U& K) N    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
7 j* }% U, W! m) q! g}! w  X+ k& g4 E3 W9 e" T: }3 U

( I& y* U) t) g* cimpl Worker9 @* L7 ~4 F% N4 k- Z0 M( t% R3 t
{
' q" F, v" c# F    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
$ T& i1 e1 x% ^" j: X$ ^- ]        let t = thread::spawn( move || {
. Y8 Z' r+ |  \8 t! Y5 C; a+ U            loop {
' |7 `8 X* C0 T                let message = receiver.lock().unwrap().recv().unwrap();! X$ {; z/ q) a
                match message {
' }) Y# ]" S& a- ~  Z                    Message::NewJob(job) =&gt; {
/ {" Y+ m0 }& [: o. W. ?                        println!("do job from worker[{}]", id);
$ Y9 }. x2 y3 f2 n( }, H                        job();
1 ^0 d# e  g" \( B$ r                    },
( h2 N. v4 [- L* }. Y                    Message::ByeBye =&gt; {
- }, U8 u$ T: _. e1 g                        println!("ByeBye from worker[{}]", id);2 g# u" C' K" R( j/ w
                        break8 q1 c; g# q9 G& e$ ~1 d* n
                    },9 x, z0 b0 T  N2 e2 [4 c
                }  
0 Y# y) V2 v4 x8 g            }
! _, p* W9 [) X  u        });
- {7 w# s$ `! d* q: v) @7 q$ J+ c/ Y* X- I' w+ R0 L
        Worker {/ H' {3 ^( h3 i' e5 w/ Z
            _id: id,
- B7 k6 m* G3 c/ m* X  v* V4 G: B            t: Some(t),
3 V# S+ u) s4 N$ W" Q; Y4 ]1 U5 E  }        }" ?) A: m+ y. N' V& m, N
    }
+ `8 x7 d! u+ l' W}, {( v& U( \9 z  ^
$ `. R$ d# X9 e
pub struct Pool {
. q" g0 W2 ]' N) o    workers: Vec&lt;Worker&gt;,
9 P) T2 u) e- \6 c: D, R    max_workers: usize,
  k6 Q4 [1 i4 W, ]* \6 z4 s    sender: mpsc::Sender&lt;Message&gt;
+ F) Z5 m0 j- B# `' g( T}
4 y8 D( H( o- E( i7 \8 U3 s, I( I$ C. W. `( s0 o% W3 M
impl Pool where {
. i" j( J4 V) S+ g# N9 N& f    pub fn new(max_workers: usize) -&gt; Pool {
/ G7 @, u' K' h0 g        if max_workers == 0 {3 q: [3 ^. s9 c& Z8 F# H
            panic!("max_workers must be greater than zero!")) @% p& f7 c, `# X
        }
4 W: j: I0 M* {6 \9 n4 i$ E( p5 ~+ Y        let (tx, rx) = mpsc::channel();; I2 e9 l  q3 k- M$ o; C

+ B9 |0 T6 u. T+ x/ E        let mut workers = Vec::with_capacity(max_workers);
- k# M& L4 M7 z# A( N( e- P4 @        let receiver = Arc::new(Mutex::new(rx));/ o8 X8 C4 a! V7 I7 `
        for i in 0..max_workers {
9 t' m( c! _. @9 |            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
& z- g# n6 D; T" }        }3 q! h8 p+ n7 _# G3 M; B
8 i& z8 y3 L( _7 d
        Pool { workers: workers, max_workers: max_workers, sender: tx }7 x2 C  i  `, B; j9 c" d7 b$ c
    }
& B1 m; W, G7 e0 @4 B: k/ M   
9 J' F4 |- R4 }, q( }) s" f3 N    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
" g3 a4 _9 X1 K( M    {
9 r( o. l$ ]2 d2 ~8 H1 ?  l5 m) r9 M8 C, C, J* w3 n7 k
        let job = Message::NewJob(Box::new(f));8 X7 m1 D! r6 B' r9 N
        self.sender.send(job).unwrap();+ n" Z9 u( c. ]' Z
    }
" W8 k9 ^6 A' p$ a% P4 t9 e}
/ Y/ c- y+ H& Y' s- t- Z& G: H
) ?7 _* N0 r9 k, E3 cimpl Drop for Pool {3 y$ T5 R5 B. I2 C( `( Y- O# j) p
    fn drop(&amp;mut self) {
/ [0 K& E5 Y/ s* _  i. L) p7 v& Z        for _ in 0..self.max_workers {( ]5 H6 I. O* y$ p. i5 f0 K
            self.sender.send(Message::ByeBye).unwrap();
4 B( D# i2 U# [* t  s  T  [        }+ \% g) R4 H5 b/ x2 S% M2 u* C2 M
        for w in self.workers {
# ~! P2 d2 N6 a+ C% @% @            if let Some(t) = w.t.take() {
3 w( c8 R. ~- l5 Z" Q                t.join().unwrap();" R9 O: ?) X7 H! F" r; a2 c
            }3 A2 w+ o9 k8 E3 ]5 q
        }
3 U* m1 f" j% S+ P    }
/ Z/ b2 @: D  X5 m* u+ W) o}
1 X  O: v' {: f3 {! e# o' u- Y  b+ n9 o, N
( Z; Q0 H% z% }9 I, L
#[cfg(test)]4 \7 i( ?9 Q8 \6 y9 {- ?9 j7 @
mod tests {
" n5 G( e7 ]: ]) J- k    use super::*;
9 Z* j& g, c5 n! v3 T  E( D    #[test]8 f0 v6 W! p4 {3 |; C% j5 X
    fn it_works() {3 M* B$ c1 F& T6 O" X& A( l* j
        let p = Pool::new(4);/ o6 r2 i; W3 h. r$ y' o1 M3 `
        p.execute(|| println!("do new job1"));# x7 |$ A( u# z
        p.execute(|| println!("do new job2"));
  N. _/ b4 ^& c" ~4 L        p.execute(|| println!("do new job3"));
( y' W9 ?1 W5 ?# Q3 A3 o        p.execute(|| println!("do new job4"));
8 \7 X/ w/ l5 S( m    }; x+ [# H; D4 H3 M
}2 J+ T% M' m, Y0 D( K# Z3 R& ~
</code></pre>
  ^' I: R6 }, \" N  Q
7 }/ t- |$ P. {3 N. x% O' Y- P! v
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-6-15 16:54 , Processed in 0.082067 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表