飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 15440|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8242

主题

8330

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
27056
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

7 B" Q! g, ~# z8 a$ Q) n<h1 id="如何实现一个线程池">如何实现一个线程池</h1>- n: `' \; e$ W5 d9 S
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>+ h- _; t3 Y0 c/ ^
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>! p2 W1 `! N3 h7 U, t& Z
<p>线程池Pool</p>
# L9 q/ @5 |# ~' k# h* G<pre><code>pub struct Pool {
- A# {) D' W4 N" q1 w  max_workers: usize, // 定义最大线程数2 F4 n) `0 ]9 w+ O' b
}
- A& I, |- ?- I6 Q1 e% r" G& a& ?2 i, l% \$ f# t
impl Pool {  S3 A. X8 f2 e. {
  fn new(max_workers: usize) -&gt; Pool {}; T' t* T! }, j
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}8 I! R6 b( d6 r( N1 v
}
" J8 e# K% J5 `0 A4 ]7 X
- J" z- t, ?$ W) W9 Z& \4 X: s7 Q; H</code></pre>
+ r- X' y/ ]1 x  I- B2 D<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>* `8 Q( ^& G  {% z; v  T
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>' U) J: _. d- s' B8 \# `
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>( F/ c1 m& ~2 M/ a0 k+ F8 d+ z6 a$ Y
<pre><code>struct Worker where4 z- j$ _) g8 {
{# G( s  h8 D8 c/ ^0 |
    _id: usize, // worker 编号
( J; ^% p1 P3 w( P0 K( v1 M/ `( P}. m- I2 V6 y8 N; O1 a# W5 E
</code></pre>
! S. i! `/ e! p. U<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
. Z8 n- |! o. G3 z3 X: }把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>/ `; O. T3 J' K3 I3 V/ p
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>4 b* q3 Z( r$ b! K
<p>Pool的完整定义</p>$ @" k, l8 w6 R; M$ V! E2 u
<pre><code>pub struct Pool {
" Q" ~* L+ z) v9 U# f# n& M( Y    workers: Vec&lt;Worker&gt;,
  g/ a2 e/ L! T3 x    max_workers: usize,! ]7 r6 f$ M" S$ ^% t/ O
    sender: mpsc::Sender&lt;Message&gt;
! M& z/ }) q" n. l- K' g}
5 n( Y$ z; c- f</code></pre># Z1 Y0 [. e6 ?
<p>该是时候定义我们要发给Worker的消息Message了<br>4 B; {. ^: Q- \" M( a8 |/ t
定义如下的枚举值</p>
4 C, }0 i5 ^' |/ k- ~' x<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;9 U7 `* b- t: W4 B
enum Message {$ J0 Z! ^. b5 c1 X, m) I7 ~( _
    ByeBye,
1 |1 s  M9 y: `. W) M% e( \" U    NewJob(Job),, B$ N1 o( I2 p2 A1 R
}+ Q$ F" Q6 Y7 h4 x8 X  w2 @6 J
</code></pre>2 l" {7 T* d% d7 a% O( N
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
) R) j" Z: g; x" {. r<p>只剩下实现Worker和Pool的具体逻辑了。</p>
! T4 M+ [0 c/ L4 u) x) k<p>Worker的实现</p>! f* C4 I* W/ l. }; x% l3 v
<pre><code>impl Worker
; y( x7 W4 r" O; V5 r{5 v& v& o+ n) s; G' O
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
0 |& b' G% P- ~0 {, n3 B        let t = thread::spawn( move || {
3 l% J6 Q0 q" t, m            loop {2 b/ Q7 e" J1 _: a5 v" g
                let receiver = receiver.lock().unwrap();- k6 w; Q8 M, G" R# u# s
                let message=  receiver.recv().unwrap();
8 W6 k8 s8 g0 d# g" c. ~* ?                match message {8 t# {- m  C# h* t0 F
                    Message::NewJob(job) =&gt; {
& s! G" k+ U; Y+ k                        println!("do job from worker[{}]", id);
9 v5 ]8 t% X! T- \5 W                        job();
2 [, t2 {3 P+ E* k                    },
9 U; ]; J0 F0 Q                    Message::ByeBye =&gt; {" _6 u# q# e# b7 @9 z/ a
                        println!("ByeBye from worker[{}]", id);
7 }+ `/ U% d6 E) M                        break2 D: [, G* K3 K6 G/ @
                    },
( v8 a! g- X9 [1 |/ J                }  
/ w1 H! D- w& I' z/ E" ?1 T4 T            }
7 |  W2 m: u( Q/ _/ V$ u% p        });" Q$ O" \. \+ d, n6 V4 [4 ^

7 {6 D  H2 B% W) z# r        Worker {' Y1 n& H+ b6 E9 x. r* S) }4 h
            _id: id,
# y! F- x" d& {( K/ T- g$ l            t: Some(t),
9 K1 o% o* n# w6 i1 Y: i        }, z3 e, I; N/ \' h% v
    }
2 Y' ^) |" T( F% ~( T}4 q6 D! n- w$ N+ J( l  K1 R
</code></pre>
2 M; I9 J* u1 d" [<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>. u2 d0 m# E) b
但如果写成</p>
+ z% j3 D/ e. U% e<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
' X  _9 \* r* N: o+ K' I. |9 k};
; o: Y2 Y2 M3 T' g6 z/ c+ C% C& ~; V</code></pre>
- c& G' j4 S& ~3 \" G( y+ Z- q9 C% b# o<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
, g+ Z6 b3 j# T% mrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>0 o& x4 Z4 m9 @  e
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>4 `( ]# F8 p5 G* T
<pre><code>impl Drop for Pool {
- N% p# M6 N% x. j    fn drop(&amp;mut self) {
2 n/ P- l6 l& W$ W8 n4 ]$ h        for _ in 0..self.max_workers {
. b* ^* {; B  O  ~1 Y3 ^, C' G            self.sender.send(Message::ByeBye).unwrap();8 v0 `) j/ w) x
        }0 `2 L/ D: ^# y: M/ v' E5 l
        for w in self.workers.iter_mut() {  e3 }2 i+ `: Z' F
            if let Some(t) = w.t.take() {
$ C7 E, t, W% @! u5 C                t.join().unwrap();
) ]! ~, \& c* L8 W8 |2 j7 T0 M            }
( F' e- A; a3 u, |, j        }
" R+ p. d% e+ Z! p2 h    }
5 q1 f7 F' t7 x}: [) ]4 m3 ?$ D. |

, U' g. ?2 J: d+ n% H" j</code></pre>8 L; V( Y& H' a" \6 Z/ v; y! h
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>5 E# }+ P# H9 g: W% V( z
<pre><code>for w in self.workers.iter_mut() {. O* d7 d% {) L4 c4 @
    if let Some(t) = w.t.take() {
7 d6 g  z3 \- D& |% E        self.sender.send(Message::ByeBye).unwrap();
/ c  S- z! x! q) Y; r/ b        t.join().unwrap();
, j! L* Q: k' Z% v, ]/ s    }' ]. Q( k2 r; m$ m9 W
}
( |& H8 t4 c. g! }+ C# \# A8 H& Z9 V0 t
</code></pre>
6 i4 O5 ]3 J8 D; f# `<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
7 }) o) i/ K2 a5 A( t) d4 {2 s我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
) J( Y+ b5 A" o0 U<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>; F3 Z! w: C' K  b& C4 d8 J2 [
<ol>$ o8 x" q$ G; l7 M& J  O( R
<li>t.join 需要持有t的所有权</li>. d( B/ w; o  r. |4 H9 p9 d/ C
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>2 {$ z# A4 }8 R; M5 R0 ?+ f
</ol>
2 E- t, p1 }' q4 ?# C<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>" s7 p6 j6 }  Q" s4 `
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
5 K( j/ _4 G, g% u! W<pre><code>struct Worker where
& g5 B  `: h, Q6 y{
7 D9 K1 u3 M2 C1 r0 A    _id: usize,
+ i" R# H1 J& @: b    t: Option&lt;JoinHandle&lt;()&gt;&gt;,5 r  r7 n! S$ |) f0 w+ C
}
, d  k$ E: G1 p& _5 ~</code></pre>
2 q1 s- ]( C  p$ ~<h1 id="要点总结">要点总结</h1>- Y+ b9 h# _& C7 M! w
<ul>
. G& U: _* n9 F9 Q<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>! s4 m) C* X& P' t/ y3 @) V
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
# A  q) l  U7 [</ul>
4 B( H! [( R, h' n1 d6 c, \) s2 v<h1 id="完整代码">完整代码</h1>+ R/ [% \5 O1 b) h: h* H+ ^6 @
<pre><code>use std::thread::{self, JoinHandle};" y9 R4 A% }& j' l( a7 u
use std::sync::{Arc, mpsc, Mutex};4 ?9 B8 i0 X' z! v
! O3 x3 [! r7 y8 b: f
  b; i/ u% @' V- I8 D/ N
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
7 x  V9 H' d3 l5 e4 `enum Message {8 p4 L7 f8 e* p1 c' l3 b
    ByeBye,' |( K8 B) F( w3 Y" U
    NewJob(Job),
, R/ g4 U" S) O( o/ {}$ s6 y$ f' d, F4 K: u+ W8 B

; C/ O# ?$ X: |9 r7 p- O' Vstruct Worker where
  F' O7 @5 N6 G* |* L5 @  M{
, R# V7 g6 `0 _; s  }2 e. m: E    _id: usize,
# y: F! \& y& R; u# N    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
1 ]4 K  }" e, a: n& y% L/ F6 A}
7 m/ A( g7 R1 S" d  ~; [9 j& C" f. m7 f
impl Worker  r8 a  m' x5 `- p
{
, H1 Z! X, Z) q& @    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
6 K. E$ Y1 e! X0 Y        let t = thread::spawn( move || {# J& J) V& T$ d& h* Y
            loop {
5 T( ^. m, l0 I8 E; H5 C                let message = receiver.lock().unwrap().recv().unwrap();
( }. k- A% r4 c% A: r7 s' E2 y# |                match message {5 t9 Z: ~# I- r$ ]/ Z4 k' R
                    Message::NewJob(job) =&gt; {( P2 G5 P& l' c. F
                        println!("do job from worker[{}]", id);) [, c1 K* Q( e' _+ Q$ E6 n5 i5 ~. Q
                        job();+ I9 Z7 g5 k* _& a8 l: H
                    },& `8 [, {0 [# c
                    Message::ByeBye =&gt; {
& k1 \- s' n9 F                        println!("ByeBye from worker[{}]", id);. O" u! l- z; p# c6 z
                        break
  N4 ~1 v& \  i3 u) Y! X& E/ N9 j                    },# Q/ U4 m8 R2 T+ \" J9 [2 Q1 T
                }  , {8 k8 f8 Q+ a  k5 \. F8 T  y" K' ~7 U
            }5 G# t- a  U9 O2 N) N; S
        });- S% z& Q7 k; ]$ w$ K! K+ V% ?& n3 l

1 |; o( ~1 r- H  K        Worker {
, u2 C1 L0 u  o5 X8 s! \' g            _id: id,5 D0 S% t* ^. o; Y% T+ T1 \- S! W
            t: Some(t),9 W( U2 o: J. Q( A+ L2 A% v
        }
) p  b1 n2 N) u+ u" X1 f    }
7 X+ x8 @2 U5 O) E* O; d, [2 m}
4 ^, H; L$ Z0 x
/ p  s8 r1 _2 W3 O$ xpub struct Pool {
  \+ [+ r. ^, E: F) M8 g, W    workers: Vec&lt;Worker&gt;,
* i, O" C1 }) u. l: T    max_workers: usize,$ s3 I/ p* e" j1 m
    sender: mpsc::Sender&lt;Message&gt;
8 {( \4 ?& d; ^0 o$ j8 K}) k+ P: O) Z7 v" x; q5 i
+ p$ a1 d  ?) J) C* N
impl Pool where {
1 ~+ s: U4 g9 U) L% ^! Q9 D$ G    pub fn new(max_workers: usize) -&gt; Pool {
0 S, v- V3 K) N! j8 O7 e- h        if max_workers == 0 {% B1 \" Q" @- Y9 ^
            panic!("max_workers must be greater than zero!")
! m# w: J5 p8 k7 P7 X        }
9 Q: U/ v6 c  M, n* ?        let (tx, rx) = mpsc::channel();( g3 L; P# z. m5 S

* Z% [3 I3 [7 J3 G) z1 \8 h4 R% x        let mut workers = Vec::with_capacity(max_workers);
( C) q+ P4 x! {. \        let receiver = Arc::new(Mutex::new(rx));
$ L; x. T7 u! T. E/ e' ^$ J        for i in 0..max_workers {
' @8 e% x' W! N- |: J            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
1 a) G6 I. [, R4 c# ?% R3 n2 x9 R        }0 }; J, W; `, _0 o; ^0 f# d

0 S* _% @0 K; E5 P$ Q        Pool { workers: workers, max_workers: max_workers, sender: tx }
! D8 u" p8 d5 b6 B# Q    }5 g- v9 R1 Y) n4 \( c# |) Y
   
, w8 v' g  ]+ l$ S0 U5 I6 A    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send4 `' I" Q" _! A2 D% S
    {9 z2 r3 {' R' Z; L  O9 N- w
' \! |: B0 p: D9 ~/ v" v
        let job = Message::NewJob(Box::new(f));
3 W2 b' @# f7 o; m( {) [# B        self.sender.send(job).unwrap();9 A1 _3 i' U& h/ N$ s
    }  M& ~' X' b8 l) S4 b  C
}- |& M; _: x3 A  E* `
! C5 f7 g" n* P8 t; }6 |' a/ ]9 g9 H
impl Drop for Pool {  m9 s) ]& r# U# c8 g
    fn drop(&amp;mut self) {- ^* R! F; w( n8 I. ?, N- k
        for _ in 0..self.max_workers {
9 |4 S5 j  G* Z            self.sender.send(Message::ByeBye).unwrap();" _$ {8 W# g4 `! X! }& ]
        }
* s* I5 X3 c+ X$ O4 S. \% z        for w in self.workers {1 `- h8 R1 W0 f/ l# I/ E
            if let Some(t) = w.t.take() {
  V# J, X, a. _- \( G5 c                t.join().unwrap();
% v1 [- ~4 x8 s. p4 n! x            }
; Z2 P9 d" j/ x8 [* C; l        }
' ]# S# F' q7 \. R5 t! m    }5 K7 D6 j8 K' b8 x1 |
}" M* G8 `, d/ l5 V6 x! w
2 t/ B; n% M( m5 [* e( |

. z9 K9 V( a/ M; g7 y#[cfg(test)]
  f/ w' ]. M' t' rmod tests {
2 p$ A6 [4 `2 R* S    use super::*;( a* L, y& L- |, a7 m$ p! ]
    #[test]
3 P; t' j/ p$ Y: o# H3 G$ w8 L    fn it_works() {
% |4 {3 [1 x  N3 P4 E2 \, E        let p = Pool::new(4);8 |6 `  w  m* q; t
        p.execute(|| println!("do new job1"));6 n* R( o$ v2 A7 ?( j- B
        p.execute(|| println!("do new job2"));# a1 c0 b" c; B7 p  a& _( M8 ^8 Z' G6 \
        p.execute(|| println!("do new job3"));7 h$ I; t/ u1 m2 N
        p.execute(|| println!("do new job4"));
% ^) l8 ^$ z+ \4 E7 {, u! a    }2 I* G* [1 R8 z6 O! u; U
}( t: k& |$ ?5 D
</code></pre>
& t* R9 ?$ C% q# o( k3 K- p$ }/ b) G/ J- h
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2026-2-26 22:48 , Processed in 0.672475 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表