飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 15442|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8242

主题

8330

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
27056
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
' c2 V6 ?* p5 p; e
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>1 y4 M  G& {! p0 c2 e
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
4 W8 c) u3 _, M& z- W6 U. E<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
( I3 g# K8 \* l# t4 }0 R% ^: K6 l5 r<p>线程池Pool</p>
% U1 E" [" s2 E; A& z4 ~<pre><code>pub struct Pool {& ]+ N) j& A$ K+ }: w) h
  max_workers: usize, // 定义最大线程数
* ], L4 L+ t6 K# h4 P0 {" l}
# \7 u4 E8 @9 ]$ X& h% @- W, O4 T- v; J
impl Pool {
4 @6 q$ D! E/ j: E( ^  fn new(max_workers: usize) -&gt; Pool {}
# C+ \- x1 ]& M' W, I. U+ d  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}3 s! k7 |. u' I
}; N5 C3 Z( ^" B

8 Z; G0 o7 y) K$ E' F6 ]- k  {7 g</code></pre>
& a! ^! u" C. f& M, h<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>  J2 j9 _. f3 |# C! l' p% m+ P
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>6 g/ k- {! Q7 f# v( E& L
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>6 N0 ]+ E8 X/ k  A# A
<pre><code>struct Worker where3 B5 n- Q+ a+ j; Q! x. K) I  m* t
{
# U) I2 D) d1 Q1 S, x    _id: usize, // worker 编号
6 |5 i: Y+ {$ L$ {5 e- S0 O& R4 N}
- m# Y6 d$ P% F' Z  _! }</code></pre>
+ B, j: s' }$ U: |! O# [<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>0 U7 J  d+ W- \9 a: J  L0 r0 f
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
" D( a0 J+ C7 a& L( G2 c4 K1 \+ l<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>/ j0 {$ }4 y7 P
<p>Pool的完整定义</p>
! x% _! N9 t9 I% A  I, c, I<pre><code>pub struct Pool {
3 d& K( p9 S- J% u: W; e    workers: Vec&lt;Worker&gt;,1 r6 S/ M% o( \7 |
    max_workers: usize,0 N. C' J" v9 U. Y$ a8 j) m
    sender: mpsc::Sender&lt;Message&gt;
* D& p* ^( s& q- _/ P& `}' x, J$ Z. X2 {: s2 c, F
</code></pre>  C0 w( x0 C. F  J. L. w  Y
<p>该是时候定义我们要发给Worker的消息Message了<br>" N. A6 X  n) M9 t' V6 U& [
定义如下的枚举值</p>
5 v. O/ {6 S; c" H  O7 r7 Z<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;) k/ K' P* Y: `* Z1 N
enum Message {
* {5 ~. A2 J/ k' ]) q; Y9 y" [    ByeBye,
, Y9 z* e3 [2 x9 B0 h    NewJob(Job),2 S7 R' |* @  k, Z0 U. b4 o
}
1 |$ K7 w& a! n% Z, M( C</code></pre>
/ k' z: k5 u, @9 Q7 c<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>3 H4 \4 J5 y& f% j1 f4 u$ B3 _
<p>只剩下实现Worker和Pool的具体逻辑了。</p>' K* T% H2 ?; J4 B
<p>Worker的实现</p>5 e5 m3 k6 U1 j1 i1 _
<pre><code>impl Worker
6 p- H+ z5 [+ N* R{, U3 W* u" O/ j6 q" M+ |5 s# z
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
4 u/ O" k$ _5 h" O. N, B        let t = thread::spawn( move || {) B5 J/ h7 q! I4 @; ]0 ?2 ]+ `
            loop {" Y+ f! p+ @! r" m7 E
                let receiver = receiver.lock().unwrap();  _$ q9 _6 {* g. ^+ a2 M
                let message=  receiver.recv().unwrap();
% l8 C# c0 o$ f9 ?                match message {
; g6 t7 R$ |2 `1 y/ @                    Message::NewJob(job) =&gt; {9 }  }, ^2 e1 Q1 ]
                        println!("do job from worker[{}]", id);
$ p+ I2 P: E3 I  s- P                        job();
" D0 ^. |/ j, t8 `                    },
# j  Z" }( o, A6 {                    Message::ByeBye =&gt; {0 r3 V! \* B: [* \
                        println!("ByeBye from worker[{}]", id);+ i- E% O+ h2 m
                        break" c3 I) |, v1 u/ ?4 O
                    },
8 P2 c# i8 q3 B6 O) W4 i                }  . {" {0 Y+ B% O& k6 {
            }
$ t& }( V4 R1 G3 ^2 y5 e8 t        });/ x8 _1 l9 \' H  _7 X. v0 s9 B6 i
& p5 Y/ ]# G: t+ i/ V6 _
        Worker {1 z1 @6 l; G; h$ ^
            _id: id,/ ]& O* T9 w% ]3 d* p% E
            t: Some(t),
; x8 J9 N: s" Q3 ?. U        }5 I+ }1 v& i* a1 u: q3 \! Q
    }6 }5 x0 \! P1 O% _& T$ G6 p; l
}8 E6 O! k" k; Q9 J" s
</code></pre>$ o" b6 b1 a# D) |, C6 j
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
- ?6 s6 {/ U% n* A) a- r7 L但如果写成</p>0 h9 ~) M5 k2 j" ~$ E$ z
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
. F" @; U4 T% h1 ?};3 v& a4 r0 E# C* A- I3 u
</code></pre>
' l2 L* J' A6 f" ~8 }<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>* J5 }2 h# g8 \1 `' ?
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>/ ?) N6 D4 ~% [7 V  u) _9 K
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
3 X8 C7 F) O8 s6 q' n1 T<pre><code>impl Drop for Pool {8 S! R. A/ p: [  L$ u  E
    fn drop(&amp;mut self) {
2 W! T2 p7 J1 N. {9 Y, y7 D0 u        for _ in 0..self.max_workers {
3 ]3 f+ A% {1 Y" D6 R& L: V            self.sender.send(Message::ByeBye).unwrap();& U  \1 u! ?4 s) C1 X' ~
        }) Q5 Q& A; h- t0 I2 Q2 N
        for w in self.workers.iter_mut() {; T) g) M9 L" K3 g7 x, f
            if let Some(t) = w.t.take() {
% K; I- U/ R, h: S                t.join().unwrap();* ^- c4 n- i8 B" V4 @9 r
            }
& p* `/ r+ K7 N0 M! [9 S        }
  [9 r. x, v; W8 q2 S  Y2 H    }; I  m- f) _9 A' i- r  Y" [
}8 I2 T7 k9 u* C7 |, Y
; G5 J- K4 K8 D# @1 ]
</code></pre>$ Z: T2 [2 S& m. d4 V6 {  g
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>3 Z$ R1 y1 m8 X' l
<pre><code>for w in self.workers.iter_mut() {: b$ H6 ?$ R3 Z
    if let Some(t) = w.t.take() {$ M( d- u$ r: K* l" I
        self.sender.send(Message::ByeBye).unwrap();
- h! b) Z) R/ L& f$ s4 L- f) m        t.join().unwrap();
1 Q, W, C$ l0 t4 |8 M    }8 Z' e) D9 e0 b0 x" A
}+ t4 c& U+ I3 O9 j1 q; e
8 O& k8 _+ N1 D. R
</code></pre>
6 k+ E# }  |7 f! u<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
7 |4 e3 [- Q$ D2 K" M$ Z我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
/ f2 S' W# p/ a3 p) _2 M5 J0 q<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>8 |2 s& ^" M+ ~- Z8 t) a
<ol>
0 S, M' }6 p: `9 Z<li>t.join 需要持有t的所有权</li>0 w/ [" N2 k+ G* t6 M! V5 j
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
8 }+ S/ @7 J5 b4 l0 x</ol>
, s4 K0 f" g! z" @& E<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
  [% @# u: X" ~5 @. G8 w换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
8 J: _. s7 |! M. h<pre><code>struct Worker where
) B7 q- V' G0 f, b$ I9 V{
" l# Y) }. v, C( u  E( I9 Z    _id: usize,
, E( k  f9 U( ?  x" O/ z    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
( Y* G, s; w1 I4 N}: d8 h$ x2 w  y- j" C( s5 q
</code></pre>
* d+ o; o7 ?6 K6 T6 d  E<h1 id="要点总结">要点总结</h1>; J3 g, o7 E. P2 [
<ul>! T' b, d6 g* ?) l
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>" ~% E- v+ _2 w" f- `! t
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>, P) C, u( |' T4 h
</ul>2 l) N7 Z9 j4 p8 Y7 Z0 @
<h1 id="完整代码">完整代码</h1>" ^& [% f! @# e4 E" a3 J
<pre><code>use std::thread::{self, JoinHandle};) N6 R" f4 K) ?
use std::sync::{Arc, mpsc, Mutex};
/ I& K3 W8 w7 _% Z, R- C3 z) v, v6 G. p6 {/ P& Q# @5 T
. o) T2 b% v/ G. Z& ]& |/ S# F2 q# Z
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
9 M4 T* @4 G' p- _: venum Message {& z6 {4 T% I% V4 C/ t' h
    ByeBye,/ n2 W7 a. l, a) j3 m* Y' V) [
    NewJob(Job),5 G% x' l2 S1 K8 o0 H
}, S* `) q; o! q2 e* l8 a) V0 n
* e) {, m1 E' j9 d
struct Worker where0 A0 _$ Z& h- d
{
* ]5 i1 H  c! f  Z) h; {    _id: usize,
) c) }/ k# D& H1 {+ x5 S. W/ ?    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
: s# p+ U3 ^6 c/ n}9 O5 g. X& Z1 ?
) O: A6 u; n6 }+ K8 m- K
impl Worker" T7 |) W3 T4 o6 ?
{
2 k# D/ s: v+ |/ u5 l! G; q& M, S1 k    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {: p! Z, I" i9 y# ^3 L7 p
        let t = thread::spawn( move || {' d* B9 v+ D# j! n
            loop {
1 P& F3 y' u$ h4 X2 Z( Q                let message = receiver.lock().unwrap().recv().unwrap();
1 f8 O8 o# Q- z; Z4 u, `+ P6 @                match message {. b+ u; K6 O0 W- l4 l
                    Message::NewJob(job) =&gt; {
) q5 T$ e) O% }0 s  D8 w) ?( ~                        println!("do job from worker[{}]", id);
0 o3 H9 o9 k' g7 |                        job();2 |/ s3 ]" g9 s  @
                    },0 v1 N: q8 O2 |+ E
                    Message::ByeBye =&gt; {
8 v' ?* {. y5 d% b4 t                        println!("ByeBye from worker[{}]", id);
" \5 v! X' y+ |3 ~7 s, ?                        break! l8 {; W% S  y! |5 }
                    },
1 j$ ^2 I$ M+ o$ q% y  n2 o9 {7 O3 {                }  
5 m/ ?, [# [+ N            }/ i$ b0 l! {2 D# X
        });" X0 _5 J3 `1 _2 E3 w, m) n
' n" @) n( O3 h
        Worker {) d5 P( U  ~6 w
            _id: id,' a7 _, v1 b2 {8 y
            t: Some(t),
7 u6 \3 W+ q& K! g% l( \        }5 |2 |: j0 f7 |* S4 o
    }  h/ {6 L! j. [) S3 Y) |
}
  N2 l, o7 ^2 B+ K2 f& w
$ u3 s5 D* {- L  E5 s2 gpub struct Pool {
6 W% u5 v$ E) v6 p    workers: Vec&lt;Worker&gt;,( R* T; Q  d. p7 N, J; O! B
    max_workers: usize,
% s$ x$ J% C2 q; N: p: X$ T    sender: mpsc::Sender&lt;Message&gt;) B. b& v/ p7 e6 a2 o
}$ h9 W% b8 y5 N$ z4 u% x  s  F
7 M# t/ S4 P+ k, @, L1 j
impl Pool where {/ w8 T4 h- t% q* l6 a$ u4 F3 n& Y  _
    pub fn new(max_workers: usize) -&gt; Pool {6 g, H7 i8 Y8 e5 i
        if max_workers == 0 {
/ B/ T- a- H5 a/ G% w8 }            panic!("max_workers must be greater than zero!")# c# \/ P1 q, J  G0 z
        }
" m2 d/ k: y5 d3 o7 X: a' |        let (tx, rx) = mpsc::channel();( B8 A4 [( X/ l# }2 z
/ w  ~- l$ P# Q2 M- D5 \
        let mut workers = Vec::with_capacity(max_workers);# g$ m' O( b' b+ ~0 U0 g
        let receiver = Arc::new(Mutex::new(rx));( [6 S9 ^# b- C+ z5 `! G
        for i in 0..max_workers {! l7 b0 G/ G+ ~0 s# ?
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));. R2 X+ N# \! |
        }. e% g8 V* a& D0 }. j: A1 Y
: j3 V* y1 w" J! v! H: e
        Pool { workers: workers, max_workers: max_workers, sender: tx }2 o1 |4 S# m3 o; G! t. b
    }# f1 ^  D  I7 n4 }8 U/ d# ?8 f
   
3 D. f& x! R: X, ?2 X; S0 I7 g    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send$ ]- v  T& t/ o$ M
    {+ x& [6 }# c8 E2 F( m# B

. c$ _# E& Y, e) n        let job = Message::NewJob(Box::new(f));( [( C' y! Z) O& \8 [& H
        self.sender.send(job).unwrap();3 K2 q& o( }+ V& q7 M$ w9 L2 @( E
    }
  {$ x1 O' m0 A$ T3 b' @}
0 J% a5 ]* \3 s) j1 j2 _
  `# q$ s* X# i4 f* Jimpl Drop for Pool {
  {2 q2 y6 F% z5 s" }, k    fn drop(&amp;mut self) {
, K5 V, O/ y, ?' u3 C8 B        for _ in 0..self.max_workers {
0 p9 y0 @4 v- u6 m+ q% O5 Q! Y            self.sender.send(Message::ByeBye).unwrap();2 V5 x0 S( B) [! {, Z( W
        }
) e. b) F3 E8 W9 D% u8 H        for w in self.workers {( e" h9 t5 I. F, {$ j: c
            if let Some(t) = w.t.take() {0 Y  D) K/ Y* b5 t' c
                t.join().unwrap();8 K- ]% q0 _2 o5 S8 z
            }, k# n4 o' U& n% m
        }4 h. f% c# g3 x; v% q. @$ c+ U0 [1 _
    }
. R) H" C- E% `+ x% M; c}
6 i% H6 x( z! b! }+ p8 l# T! ]0 C, L; e4 n& S; I
' a* Q% C4 A8 f1 S0 B: i/ Q
#[cfg(test)]
" N  N  J) U& {! kmod tests {1 L7 O, h- I; `0 H
    use super::*;
2 Z, @1 F/ ^: Q4 s    #[test]
  z9 e+ x# R6 O2 X! z0 j: ?    fn it_works() {
; y% R9 Z& c6 ?2 v4 d% e0 s        let p = Pool::new(4);# y( P. Y! x% x! U8 z& C
        p.execute(|| println!("do new job1"));
7 ?# p$ Y7 B' K        p.execute(|| println!("do new job2"));" a0 E7 u/ Y; l7 `
        p.execute(|| println!("do new job3"));) L4 j: q" b9 H) X6 b
        p.execute(|| println!("do new job4"));0 E8 ?' B2 ^$ q2 M2 H7 ?3 A
    }
) o7 u. I% E4 h9 p$ k5 y5 z+ |}, T5 r2 P$ g0 K4 B3 Z3 _- e8 e
</code></pre>2 J% \: {6 w/ k: X

; ?3 R/ Z% k% U4 B, S" q. W$ r% q
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2026-2-27 07:22 , Processed in 0.066370 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表