飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 12182|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

7556

主题

7644

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
24998
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
6 ?* A$ m$ e7 G0 E1 F$ @! m
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
: J4 B6 b, \$ w+ p<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>; D  n6 a0 I7 U& H- n3 T( ~5 [6 u
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>1 r5 Q: y" [- P* _$ d
<p>线程池Pool</p>1 i2 ^) X! c5 |6 R$ t# P
<pre><code>pub struct Pool {
$ O+ K( _4 |6 P7 J$ Z! o) |  max_workers: usize, // 定义最大线程数: P. z4 j2 o4 ?3 A1 N
}/ k/ m0 K$ c7 U! S$ t
$ m7 A1 ]8 d6 H# x2 Y
impl Pool {
, V' B9 q3 R! A; C: Z. N6 X/ y  fn new(max_workers: usize) -&gt; Pool {}6 a1 F9 E. S+ ^+ r3 n( b, v& F
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
3 z6 W* Q9 ^9 Q}, t; l' B9 }! r: A
/ U  N% [* T8 d- D/ b, j4 P) X4 f, k
</code></pre>
/ w7 b5 {# X- m4 G( K<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
5 E. \7 T" g5 b: H<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
0 Y0 O6 e' X) @可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
1 x' {0 A$ t4 ^/ O0 x  T- ^<pre><code>struct Worker where
$ C; [: S3 x$ c; e, l* A: ~{
4 b7 v2 x: @# B( g    _id: usize, // worker 编号0 K& [4 P2 h2 x+ U
}' t0 ^% ]1 `8 P& B; U. C
</code></pre>
% G# ~+ m: }" k9 h7 R<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>) h8 c2 F- |( ~4 A- j
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>6 R1 N7 j5 x6 A. w- `
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
4 t" f/ l$ O# y3 K5 B<p>Pool的完整定义</p>
' D3 E; ]9 p7 W) b5 X# K+ m<pre><code>pub struct Pool {0 y1 {6 U$ g1 o6 f( L
    workers: Vec&lt;Worker&gt;,
  u( D$ e1 J* ], |    max_workers: usize,
$ T) r' r7 i* d( y. w    sender: mpsc::Sender&lt;Message&gt;; X7 y& b1 G4 R  i1 f) h9 p2 S
}8 I5 A+ M; Q/ z) V& S% d
</code></pre>
- Q: u9 X: o/ t' q& V<p>该是时候定义我们要发给Worker的消息Message了<br>+ O$ F  B1 X7 {1 S& m
定义如下的枚举值</p>) l, ?& {! q: C/ {7 j. t
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;( j9 F+ S7 ?: \8 P
enum Message {; J% o; N- x9 o% \8 Q
    ByeBye,& K7 k4 A( E% R1 J8 e
    NewJob(Job),
0 x4 W1 P/ e: i! G, K9 N3 F}
- X0 h& H6 H0 }+ l) J) n</code></pre>
  u/ B3 r0 k4 X8 |2 @6 g/ y/ k<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>% c9 b+ \* V8 {* H
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
. g7 O3 Q1 F8 Z% r. d<p>Worker的实现</p>
6 X% O/ n8 ^9 i& F4 {<pre><code>impl Worker% @* e1 F( J' F- ^0 E/ N% L% A  x) @
{
0 f; a* y4 W5 e/ R' }% H    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
7 a; S1 b# @; F, r        let t = thread::spawn( move || {" I9 z3 C3 y3 _1 h& [- B2 H; k
            loop {
. }3 z  l2 {! Y- a" F0 B! b                let receiver = receiver.lock().unwrap();3 a3 `/ i" `# B: f
                let message=  receiver.recv().unwrap();, ?/ T( B6 D( ]4 n9 B
                match message {  ]; A' J- `) S4 n8 [6 @
                    Message::NewJob(job) =&gt; {5 Z! D: v0 D" E, V* i  g  F. ^
                        println!("do job from worker[{}]", id);
# }( n, I& G0 Y% B% N+ m, q* g                        job();% S  l; i9 x* K$ e8 W1 N3 R
                    },2 {9 O7 c) o4 q0 D
                    Message::ByeBye =&gt; {
- K7 D% s3 D* o7 ^                        println!("ByeBye from worker[{}]", id);
+ X& t! ~* Y$ S6 Q                        break
, |0 b/ S: T! a+ v0 r( y! }                    },
. h' k9 M7 I4 U& s' A5 Q: e                }  
/ o; j5 ~4 i% b; ~4 S+ G            }* v! D& ]) B$ n) E: o. }
        });8 b6 R1 w4 E4 F9 h

8 Y0 @! a9 ~' U( S* y3 ^        Worker {
& \- Z: n: ~1 i            _id: id,
1 C3 V: W0 k# O* K1 j0 R5 ~            t: Some(t),
9 Q! j" {6 y% S# }. g7 h        }
  U  M2 }9 N% r! X/ \    }
2 D( v4 ^% {3 W, n( L# A}
1 q* ]5 g8 @5 x# i) K1 b</code></pre>
7 z# Y6 S9 j0 x<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>8 b& E" Y* w, \4 L$ Q; W. I# O
但如果写成</p>  }5 J9 S1 g3 g, Z5 _
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
7 g8 t9 e1 C1 J  O};: o) p, K7 L. j
</code></pre>) A) M' L# n- i# ~- s" {7 u9 w9 S. ~
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>4 U2 @3 U4 n, O, d# E
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>9 \& }6 z# s' Q! F( }2 p
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>5 ?$ `" p$ p: B/ P
<pre><code>impl Drop for Pool {; f; K" s1 u& z4 T  \9 w
    fn drop(&amp;mut self) {
4 m- S* v9 z2 c0 F        for _ in 0..self.max_workers {% ^6 Q. r3 h& d' f) r
            self.sender.send(Message::ByeBye).unwrap();
0 P) s# V) t' X9 O& @9 o! N        }
* g# P# X; R" t5 H* @% ^8 R4 O        for w in self.workers.iter_mut() {5 A* J; v$ f; K; n" v
            if let Some(t) = w.t.take() {
6 X8 d* `# d, ?- Q- w' m                t.join().unwrap();
8 {) T( r$ o, k8 V0 i+ U/ S            }
# m$ o+ ^/ H: @/ o        }1 g8 o7 \8 a8 `" y% _! c2 M" x# g, @
    }
' y" w6 I) N! [" d, J}1 [9 m# B* |8 Z

7 J  I% Z% E. W) [</code></pre>
9 y3 Z9 Y5 R* f4 Q& l1 M8 y) }1 t0 Y<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
$ a+ J) M+ k! W" X3 g  s: a; d1 d<pre><code>for w in self.workers.iter_mut() {
5 e; h& W" Q  F- t3 h    if let Some(t) = w.t.take() {0 m+ v8 I% |% {& Q4 @" Y$ p
        self.sender.send(Message::ByeBye).unwrap();
! J0 r; `4 t5 k: }$ p+ A; v' J2 F        t.join().unwrap();
5 z: m! b1 J6 ^7 w9 o" ?    }$ @+ L1 S8 L4 h7 |6 T
}0 o9 S0 C" e- |+ }' ?. A
: b( M2 h9 `& `" z* P
</code></pre>
+ E" D3 ~& Q9 i" P$ v<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>8 D; W! ]4 e' K+ P& I
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
7 \1 f7 E$ p1 ]6 E! k. D9 b4 s<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
5 E) G; d5 x( r- S% L. }<ol>
: r4 I/ h, r4 x( y8 g( P1 O3 r' Q: V<li>t.join 需要持有t的所有权</li>3 R! u& D1 D  q" e
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
; O* a2 Q- D$ |: u  c8 y+ D</ol>
  x  N- L0 p2 c' _6 B  ~% m0 `<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
" l" d% p0 i; h换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
/ c- L8 S) Y: b! y* e<pre><code>struct Worker where$ N4 v# h9 [' x/ X" t) r; t" s$ r
{
, ?1 t- F+ C$ E" h# ^& |9 {    _id: usize,
' A; \0 V1 h5 Z5 y* q  r2 d' A    t: Option&lt;JoinHandle&lt;()&gt;&gt;,- l* w! G4 P* d" k1 d. |" A
}
7 `" ]+ S1 n8 j( \</code></pre>6 N6 x. g' _; {$ B
<h1 id="要点总结">要点总结</h1># m. C# R# I6 L
<ul>
" K4 O8 B/ E, L. u& `& n<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
  Q+ Q5 F( n5 k" y- p& o% g, R<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>  i  P/ P' N9 u  X: ~
</ul>1 F& l+ J* r$ F" o- Y
<h1 id="完整代码">完整代码</h1>* g, k7 x6 _. a2 ?4 X8 t0 w
<pre><code>use std::thread::{self, JoinHandle};1 b. D! `8 b- R+ s+ J* R
use std::sync::{Arc, mpsc, Mutex};
+ G) W+ N+ \$ I2 ^! y# d/ E3 z/ w. z7 m9 O

7 n6 e0 W/ A# U/ S( B5 n# Ltype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
2 R" S9 m4 H' Y2 u6 d% G1 }enum Message {
+ k. [) x8 R8 Z: D    ByeBye,# }+ N! v$ ~5 }0 w
    NewJob(Job),: X7 S5 i' B4 e. V) c0 f
}4 r" @3 h  j% i5 ?" \$ A
% G# l$ e. I, W7 F. c) u6 k
struct Worker where
2 y! o, t& W9 {7 m$ D0 n: }{
8 b$ ~3 Z. \1 J, r- t    _id: usize,
, E6 a2 Q, X  `1 V  w    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
) K( ^; ?5 y0 Z7 |! d- B}
0 a" f; j% F9 |3 z8 }1 L
9 ^4 l# T! u8 O) }: Timpl Worker
% E$ _/ {' ~% R# V5 O{  X; V5 `6 X" c3 V  J) b
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
7 j9 |2 k" J* m, q4 l% Y1 F4 q        let t = thread::spawn( move || {
& c: L3 c+ K* z/ Q% U) Y            loop {2 c( l* z( w3 F3 Z' Z0 p  W
                let message = receiver.lock().unwrap().recv().unwrap();
8 u7 ?* c. T5 ^! @- L4 H                match message {& k1 M: [5 |: U3 `, ?& y8 G- e% D% U
                    Message::NewJob(job) =&gt; {. b8 L- l3 Y2 S, r
                        println!("do job from worker[{}]", id);9 \0 m1 Y% t6 c  c( _5 P3 b
                        job();
$ T+ e+ V- T) C; Y& M$ x: f                    },
7 g7 o7 Y3 i: }! Q                    Message::ByeBye =&gt; {  Y9 w! m( A% C5 [1 ~  \& E+ z
                        println!("ByeBye from worker[{}]", id);6 X6 c+ M! |/ |) s1 N, R
                        break
& ~& Q. j# @; r, F5 |  }                    },
7 @) q' C) x6 W( p' t                }  - z/ J1 a; a" k- o
            }
0 C( b! m% b6 |" m5 }8 Q% Q, b- S        });
7 }/ _9 ~; p1 |( I/ p/ u
  v9 K  L5 O) f. H" M, d5 ^        Worker {
* N  E% L5 k; E4 N% ?            _id: id,
! i. q/ j! A" i, v( a            t: Some(t),' U/ m% y+ X6 [% d, ^& @$ ]3 O" Q% a
        }4 A8 `- q3 P0 m# z8 |7 G  S! @" e
    }
8 H5 Z, R! T; Z6 A, X}
( i7 T" p4 }' u5 |  O9 y* `4 m9 v7 z% {
pub struct Pool {3 I) P: \! R/ w4 ?# @" b
    workers: Vec&lt;Worker&gt;,1 y3 @5 S( w( Z- S9 f" F
    max_workers: usize,
2 s! j% T* z' z' \$ V% O    sender: mpsc::Sender&lt;Message&gt;
+ S# u# \) u; x}
7 `. e! `4 C6 p+ [$ p! Z( B! k: ^: k7 c9 H, h, x! d! _
impl Pool where {/ M  ?$ |5 n+ ~1 l# I% W4 \
    pub fn new(max_workers: usize) -&gt; Pool {& G5 l" F( K) Q. L
        if max_workers == 0 {
9 _. ^; ?! T6 t: m            panic!("max_workers must be greater than zero!")
; s5 J/ {6 L3 s' M: p        }& g4 Z+ N7 F3 H, E5 D& G2 w
        let (tx, rx) = mpsc::channel();# A" F$ s) r1 [; ^1 [

& m! Z. r; @4 o, m+ _) \2 v        let mut workers = Vec::with_capacity(max_workers);, S' s8 ]- u" {* w' l! C
        let receiver = Arc::new(Mutex::new(rx));
6 b2 Z* g" l  j  b1 C, [1 m2 d. b        for i in 0..max_workers {
8 Q& W0 W* K# t            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
. P: y$ a, r% U  L' L  @7 w' m        }
% k# t# x; c6 S% l' P7 i7 L" L# b" V
        Pool { workers: workers, max_workers: max_workers, sender: tx }6 V+ r9 n- Z4 _  V9 X( a) ^! S
    }$ p# E5 @; P4 f
    0 |6 g( T( M0 V, _- `# o
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send6 ^" V8 m3 j8 v5 u
    {& y# D5 \9 T3 G7 ^& G( y
' f8 G0 m9 ~) d# t
        let job = Message::NewJob(Box::new(f));* `+ v4 ]4 M& b  @8 |
        self.sender.send(job).unwrap();7 }. U, a4 \% Y3 p1 u( }
    }! c! k: b1 d2 x4 O# _3 p3 ]: t# b, U( R
}
1 g, ]5 N+ c* T1 g% v" X; o  A# T/ k9 M, U' b1 W3 D
impl Drop for Pool {
& \5 R3 P5 u/ S0 I8 p    fn drop(&amp;mut self) {
) @1 O2 M% G' M, B6 {$ b; a        for _ in 0..self.max_workers {
$ Z; S1 j, G0 B" w) S            self.sender.send(Message::ByeBye).unwrap();& }& _7 K* Y! K& S- k
        }+ G4 u1 |0 m% m9 K& G! x9 i- E8 l
        for w in self.workers {* v' P# D* F. m  }" A/ h8 z
            if let Some(t) = w.t.take() {1 y* Q) Z; K+ F0 ]* G+ T) o
                t.join().unwrap();- N+ s0 a4 _! C* Y6 D! @
            }, G+ l8 Y: c! p- w5 R) O
        }0 J2 c8 i. f% Q0 y! h; n
    }/ r' ?+ \9 |! l1 e" s5 Q$ u
}' Y+ |+ j# }6 _" {. u% ^* t
1 A4 k- V4 B& p& g. G4 @  r
8 C2 [$ ~  M6 E3 r7 P6 s9 V
#[cfg(test)]- f2 N5 \- K  U& [$ l* j9 n! h2 u
mod tests {7 O1 k% E+ J+ U  n' k! o
    use super::*;
" R8 |5 E4 q: o5 i" _+ X2 o    #[test]
4 {1 c0 O8 c2 {5 ?! X7 s: `% G( |5 Q. Q    fn it_works() {  |' Q# j" c: `3 h
        let p = Pool::new(4);' G& Y8 C. O& H+ g& ?
        p.execute(|| println!("do new job1"));
1 _8 c4 D/ [' d% P5 l4 W+ d        p.execute(|| println!("do new job2"));  m) c* e* H3 V  O
        p.execute(|| println!("do new job3"));# K3 ?6 c5 t: p. r
        p.execute(|| println!("do new job4"));
7 [# {$ j( Q* b3 I* S0 [2 y    }
: D% i: v' [+ g' j; [8 t}
/ K, n5 o8 R, Y* V0 Z- Q" ?; [0 [3 ~</code></pre>. ^+ _2 {" n) i& k0 J9 Z

! G9 x7 d1 d- [9 t/ L
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-9-13 23:19 , Processed in 0.359914 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表