飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 13714|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

7726

主题

7814

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
25508
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

  C3 }" Q! E1 M& F- H/ v<h1 id="如何实现一个线程池">如何实现一个线程池</h1># ?3 s) e9 j. p' e3 h( j# V
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
* s$ f: m2 I; D% f<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>5 k* D7 Q) O- {' z, W
<p>线程池Pool</p>
  i9 L2 D+ W* \& J; c<pre><code>pub struct Pool {9 V2 T2 e# l% U% ^
  max_workers: usize, // 定义最大线程数
+ k& H; H7 u) e7 x# Q}
5 P/ K8 K7 H! Z$ O6 q+ J0 L% O1 s( f$ P) E7 A7 ?
impl Pool {
( k6 [  y9 z& n( I  fn new(max_workers: usize) -&gt; Pool {}# _) V, t) ~2 i( b: C: D7 X
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
* W/ n& h) L  ?}! a) Z  Z; A% c% R

, L$ v3 W5 T) d( @+ e; e3 M  B</code></pre>
6 e" C' C0 C9 K<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
1 N" b  w( {/ E7 ~2 @<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
4 ]& D6 f5 X" |4 x$ U! ^1 q% B可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
' b1 D8 ]" b+ F  ~( C( k9 k3 g<pre><code>struct Worker where# R5 u6 o6 k) n9 m. ?6 T
{/ G6 z( Y1 p( q% y( F( e
    _id: usize, // worker 编号
) ^4 S3 m5 }& \2 u2 C}
6 R" G  o) M7 j; J% _  ?6 \</code></pre>
! G; O, W* X. [& f1 Q<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
) v& n3 y6 A( C5 r把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
! i& W2 u9 n6 a4 f+ F: F<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
! z& ]+ Z6 U! Q7 l7 S% W) R1 V& J<p>Pool的完整定义</p>
9 n( Q% c& p) ]1 W$ i% ]<pre><code>pub struct Pool {
+ d7 M: V" ~, o2 C8 J/ c    workers: Vec&lt;Worker&gt;,
$ X* U; Z9 r6 {) I    max_workers: usize,
1 F2 S! _4 p0 B. m9 g3 e( d( _    sender: mpsc::Sender&lt;Message&gt;& q" n+ @* t+ z( d1 Z& ?1 [
}
9 V5 q8 ^+ z9 b+ n. W( F7 P7 y8 w</code></pre>1 D( o- g/ y; u
<p>该是时候定义我们要发给Worker的消息Message了<br>
3 u, j* P+ {8 i: }7 C1 x定义如下的枚举值</p>
- a: Q, W( X. t( U<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
; Z( t; h9 j& m7 P! c9 m0 Lenum Message {
5 c0 G2 o! M! f# q6 ]6 G    ByeBye,
8 t; M9 R) a& ]/ {" a    NewJob(Job),
" L% V  d% R! t; k! j- a4 {}8 Z  Y/ J9 j5 g5 V$ W
</code></pre>+ O# n% F4 ~1 P- |0 m# z
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>/ S" O% Z# L, B( |4 N  f1 L" A) d6 d
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
, K/ W1 L  c) ?2 Y( M* ~<p>Worker的实现</p>3 k+ N; P$ _: w4 m. Y  D: Z1 C
<pre><code>impl Worker
6 N' `' y9 c0 p! |+ D2 y3 x{) J5 P5 l3 k, G! n) @$ n
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
& {4 _( E* s; ]        let t = thread::spawn( move || {5 R# X- N5 \8 ?8 u3 e
            loop {
4 M) X% {/ b, h$ U                let receiver = receiver.lock().unwrap();; @1 A! I3 t/ m! ?! {* c
                let message=  receiver.recv().unwrap();( a1 b* u2 l; f$ m
                match message {
& w6 f7 Q: U# _. x                    Message::NewJob(job) =&gt; {- o+ ]# Q3 [% ^6 C0 ~* t
                        println!("do job from worker[{}]", id);/ i* T& ]( W3 e8 J. L5 D' `& E
                        job();
8 o6 \  B; a+ ^: K                    },
' |1 f2 W* C* r. x                    Message::ByeBye =&gt; {# Q: J) s/ P6 g
                        println!("ByeBye from worker[{}]", id);4 u5 x; N1 N, f  H
                        break. X2 v. H+ o/ w+ V: h& C/ I* r
                    },7 S8 A' o( I6 I* {% S) m* D, z
                }  
% M( U$ e$ _6 B& V9 |* C            }0 N! ~! u: V; l
        });
- C& S1 P3 k. k! s' l+ p6 Y
$ R" o6 I" e) m2 P% N+ [        Worker {! ^* v1 C+ r5 B: H5 O! O
            _id: id,
" X# n6 j, m  A& B. V! ^5 u            t: Some(t),) e" |5 j9 N7 r+ }$ u
        }
. C( I  k# v, w    }
  v$ Z) w! m' O, k}$ M8 M( v8 \9 J% G  P0 R6 C
</code></pre>3 o& ]& m% b* l
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
  k& p5 L8 y5 ?8 M但如果写成</p>6 E  S4 N& u' ?6 g2 @+ J# r$ A
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
* t! v: ^6 x- A4 ]; n! X};
" [) `$ m. L3 Z! i, z. J3 d</code></pre>( e* }% f7 {& D% x5 H; T5 C
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
1 ^0 z$ w: t( v4 M; _- prust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
4 m  A7 O: V' n; O" D. F! G<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
0 |# x4 q1 t2 ~( B8 e% E4 _( r2 X6 Q- |<pre><code>impl Drop for Pool {9 H+ g* }4 \7 `' m
    fn drop(&amp;mut self) {+ r/ T$ O7 |) P: V# G/ R0 K
        for _ in 0..self.max_workers {6 h) F0 H% C8 G: n5 E  [- {
            self.sender.send(Message::ByeBye).unwrap();
$ U: h9 M6 |* o- {& A$ c& E        }
9 I* R" g% F& n: u& S/ w/ T% U& D        for w in self.workers.iter_mut() {% a7 x9 {8 L* }. S
            if let Some(t) = w.t.take() {. G' Z5 C3 t7 w/ t8 E
                t.join().unwrap();
, v& |# m# |. K4 [  r            }3 u% k: T+ q4 C7 q
        }  {! c3 C/ E& y% n# K& y
    }
' s3 V  J4 ?* W3 I}
( g, w2 n$ G& X9 ~4 k
2 R; N  ~* G" {* c8 ?9 [</code></pre># |5 k. Z0 k7 o3 I1 F- Q0 ]
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>" s2 @0 E- D. i, v& W; H# f, {
<pre><code>for w in self.workers.iter_mut() {
. ?' s) Y2 x' B) W% B    if let Some(t) = w.t.take() {' Z9 i7 e% D( |
        self.sender.send(Message::ByeBye).unwrap();' I7 T6 S5 ]8 y
        t.join().unwrap();
2 U' U1 v: Y7 Y! x9 F# A+ y- l8 L    }" a. V( f- _7 l
}( }6 ]4 z! U# T- L' J- s8 J
  u) \$ @# j  L- \. t
</code></pre>; z  Y; \" [& l" r2 c
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
. h3 }1 p2 O3 t我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
/ v- ]5 C- T; X+ g; X<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>1 u! P. F1 K8 p/ L% O
<ol>
/ r( J- q) k3 y4 X6 O<li>t.join 需要持有t的所有权</li>
! P# x  c' i, `! W4 M7 B<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
3 T$ B: C% W, v+ G! N1 ]% c/ p</ol>. N* H+ w/ M: I; q- X# ?9 j' [
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>9 x1 I/ n0 L/ K
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
/ b0 R. Q; E& x<pre><code>struct Worker where
! ~6 u. [& v+ P, B0 P/ T; S  W{  I7 ]# ~( O4 s" Z* u
    _id: usize,% u6 j: z# i, Q4 N. i
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,6 K& |/ F4 j  e9 d7 A" c3 H
}
- t2 ?% _( t/ y. v5 O</code></pre>. h+ ^9 O- g) @+ }7 e
<h1 id="要点总结">要点总结</h1>
4 K5 O) W, X! Q1 I" H7 ~<ul>* L& h/ o: }( ]9 B
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>- N1 l3 Z2 I6 J8 \
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>- y9 N6 D& }  u; B/ A9 L2 T+ E9 W
</ul>
3 w( q  A1 g- v) |( A# M/ k" |<h1 id="完整代码">完整代码</h1>9 M0 M' F# w% {: U3 G( n
<pre><code>use std::thread::{self, JoinHandle};
- ?, Z' I% |) Z1 T" W) B0 Vuse std::sync::{Arc, mpsc, Mutex};1 t1 \+ u: x: n% S) x. u
3 Y: |/ ^: k) w' d8 d- e' k0 c. y
/ s8 O0 ^/ e  K  S6 Y
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
& z& K" S" Y6 k8 }enum Message {
( \: p+ V9 Z' ~+ ]0 C) I5 H    ByeBye,' H& r1 A4 m# }9 K! f" F7 g
    NewJob(Job),0 o% D; T1 t: t" n+ L
}+ E! u) _$ m9 {

! ~& d, h* d' C9 B/ Lstruct Worker where" w$ F3 i5 n; N, |7 a' C& x; g
{6 D9 c% _, I/ y7 _' Y8 P
    _id: usize,# x# C) A. u) I6 k: d
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,. a1 W7 z* ~8 V/ K' a) }1 p5 Y  {
}) z! G2 I" e4 x
8 v0 m; ]' P4 \, y8 ~. f2 O9 L
impl Worker0 c; Z- Z' W9 q
{
! G5 H: a% F% n' W- c! j! X+ ?    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
7 \( a* y/ K( N) z/ w        let t = thread::spawn( move || {
. \6 h2 E4 h6 F7 t6 c            loop {! k3 _: y! x, [  ?
                let message = receiver.lock().unwrap().recv().unwrap();
0 x+ \. r# d' A* G. S                match message {/ Q6 S5 ^( H$ b+ H
                    Message::NewJob(job) =&gt; {/ J6 E7 x, v$ D( R" w: M
                        println!("do job from worker[{}]", id);
8 L' _( A8 k* }. w6 R* b                        job();
9 \) |' U4 O* A5 [4 n& ?+ E                    },
! I$ e1 f1 _$ \                    Message::ByeBye =&gt; {* n+ N( m) l! j5 s( g5 g' x
                        println!("ByeBye from worker[{}]", id);
& e9 E. |1 P  |* S. D9 ?; |) z" w                        break
6 j7 Z" K7 _. U( K9 O6 V0 i                    },
5 U+ C$ p' @5 \) `" p                }  , A8 [( ?; W- b7 k" R8 Y& E
            }( z8 ^. E2 L; N! @# a! B$ P
        });7 W3 H, T- N- k  F
: C5 N/ a' Y, `+ G* h
        Worker {6 }- @5 W8 F7 m9 T& c$ G# t- w- N
            _id: id,0 a' V3 y8 `. V7 B8 d2 a
            t: Some(t),0 V7 T+ F% p" H3 m: \+ c
        }5 [) a6 j# ^6 T) z5 Z
    }
8 |2 M( g: C/ `% V% f}
  X$ @" h4 A8 L: c8 W4 r) b, @. P) h0 Q  u, I" e
pub struct Pool {! j, B( M! A( I: D) N! @2 D
    workers: Vec&lt;Worker&gt;,
% r( W* P0 P  l& ^4 C1 ~    max_workers: usize,
1 {9 `; o2 g) {8 |: t    sender: mpsc::Sender&lt;Message&gt;3 c* ~, x' a+ r$ D) b9 b
}
5 T  ?/ i/ v$ M5 v: I
0 r: X( e2 k5 _; kimpl Pool where {2 J& o( e. }5 Q' o
    pub fn new(max_workers: usize) -&gt; Pool {
3 m6 R' V9 k8 h5 Y) z% G' _        if max_workers == 0 {
0 e1 `0 E  ?* d  A- _+ W# y: z            panic!("max_workers must be greater than zero!")) ~% `  u, H  x* b: w
        }
  u+ Z5 G, k7 Q  h( _8 u        let (tx, rx) = mpsc::channel();8 g) `+ l9 d! S; D8 G
# T; z1 W% o; g3 R; |. ?. I
        let mut workers = Vec::with_capacity(max_workers);6 c( F1 e% y- k- Z: E2 K8 |
        let receiver = Arc::new(Mutex::new(rx));
: S! d! D2 a6 ]        for i in 0..max_workers {
3 n; x# h# t* [' L            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
0 A) k' H+ U5 \4 v( `0 Q( ]5 g( }! ]        }
% ^( D/ M5 k- F! b3 z* c1 j/ m
( U  g' T' {& x6 G9 W% {1 ?        Pool { workers: workers, max_workers: max_workers, sender: tx }
2 H0 [1 X7 }! A' r    }
# J3 I# L" Q4 P; w4 `    9 E. H* }4 R3 ?3 {2 V" e
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send6 X( ]$ V3 W3 P0 \: A
    {9 Z1 ?6 s* ~! M

5 x6 N! B" D4 c2 x9 g1 R& G/ d0 U        let job = Message::NewJob(Box::new(f));
4 W3 D  i6 q& A1 c4 |        self.sender.send(job).unwrap();8 \) @3 Z& U% M; h( ?- o8 o0 |' c
    }& N0 F  ~1 ^/ Z
}
4 s1 u; ?5 b* n
0 \+ K# O* ^. ?, I8 ~impl Drop for Pool {; ^* {9 e* X% a( {/ G6 J4 |( v
    fn drop(&amp;mut self) {, ^/ _6 c" I. z. `
        for _ in 0..self.max_workers {
" ]* t0 W6 \" o5 v" u* a* H# q            self.sender.send(Message::ByeBye).unwrap();; U" d3 h) R4 D
        }
6 D1 w  [9 a9 A7 T        for w in self.workers {
! S5 w( J* Z. I( u7 i            if let Some(t) = w.t.take() {$ Z1 S' Z& Q/ a( p6 M2 J+ x
                t.join().unwrap();# H% S4 `6 L6 O: b& _$ T' I0 d
            }2 @5 a  R" k# G# R1 F3 x. ?1 F
        }0 T7 K8 V. }5 ^
    }/ X8 D2 o$ I6 v- R6 u3 \. o
}
9 `3 H9 x9 k  a/ K/ s6 x% J" v# t  D; u+ B2 S- }& d) T& I2 P
& w) U3 j. c6 R; b/ \. p7 G. y
#[cfg(test)]
- ]* S9 z' R& j6 X6 vmod tests {' b8 O# K, m3 \) M5 O# Y  s; a
    use super::*;
4 b3 m" V3 x3 N' Q7 g0 H    #[test], g. K" G+ w- x/ ~( g2 v' q
    fn it_works() {* u' j2 Y# J; Z/ |) L8 `: R
        let p = Pool::new(4);
/ |  V8 z. ?- k4 e6 u# F* u        p.execute(|| println!("do new job1"));. G/ j7 d: f0 l; y4 p5 W7 s$ M
        p.execute(|| println!("do new job2"));
3 T- D7 ]( f+ z' Y$ N: q        p.execute(|| println!("do new job3"));( Z7 \+ N  X1 `* L* M
        p.execute(|| println!("do new job4"));
. d; I* }  y/ H6 V* V    }
+ U# O2 w7 {( d/ O0 o}; W" A: V8 T3 Q; W- ^& Z
</code></pre>
+ \9 c2 m3 c* I; q4 F5 V; |6 i' l: V# n$ F
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-10-31 04:25 , Processed in 0.751491 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表