飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 8505|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

6478

主题

6566

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
21758
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
* r1 }/ r! \+ {1 U, r. s  F: f  |$ W
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
) o1 ~- v0 c: {# ]. ^3 f<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>. k, e% O7 a2 }. h
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
$ F4 Z- P" L9 ^5 n! ?4 Q<p>线程池Pool</p>- l( ^. n1 v4 q( t3 O. \
<pre><code>pub struct Pool {. ?" k( N- e- _4 }8 N
  max_workers: usize, // 定义最大线程数; }, S- g1 d3 w) w, l6 V
}
4 j  H3 h3 ~9 e2 j* z$ v3 s* {/ ]" j7 o
impl Pool {
  T3 Y1 p* D: N9 f3 k$ ]  fn new(max_workers: usize) -&gt; Pool {}
) c# N! ^) i( q' `8 ^  v0 E' b  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}  [$ W/ |6 s! M
}
0 g; h0 |# i1 ~8 M7 @
. i9 h: ~% `+ N</code></pre>6 O. f) Z) D: R6 |9 B
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
/ N" f" ]/ M4 [* ~. Z$ c$ ^. I<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
) z0 C8 X2 a, Z2 V2 s可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
4 w/ b  Z! G2 S+ @4 V8 M+ F. c<pre><code>struct Worker where
/ o/ j) d% z* m" E: c) m( v) p{0 T$ E  h6 m- C
    _id: usize, // worker 编号
; L, O, C: l8 T) e0 m4 f}
5 w& y9 x( E0 G7 ~" z2 c</code></pre>( j; A7 z0 g* h( K6 c# [
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>, z4 b  \* w% j' p
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
) ]% u6 P- g5 M+ q& n<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
4 u( k* `3 @1 X* V4 O$ C<p>Pool的完整定义</p>& g( G$ T' {3 Y' q* E3 b7 F! o  n6 H: e
<pre><code>pub struct Pool {- r5 X- i9 d3 V! q% ?+ |0 j
    workers: Vec&lt;Worker&gt;,* x* U  R7 g$ J3 S4 f
    max_workers: usize,: }7 g+ X& h& O  i
    sender: mpsc::Sender&lt;Message&gt;
; I& ^# L. T! V- m/ o, I" C% z: B}
) }( l+ D) {3 W3 l2 ]$ q1 `</code></pre>
! k/ ?: A0 u: x( I; A<p>该是时候定义我们要发给Worker的消息Message了<br>+ u- _# R. c, m* n8 S7 H$ m1 r' H
定义如下的枚举值</p>! N% O& W9 b/ O' H! r& F" z. p
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
, g9 M: J6 [; E3 t; Zenum Message {6 E- S: j4 R- v, C2 U4 o' E. F
    ByeBye,
+ z  Y4 g( D4 C) ^    NewJob(Job),1 o% w9 z( {  r% Q/ H1 B
}) O# f) P$ c% X2 T
</code></pre># H5 ~# j( }, @# M. C4 {
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
- z* E4 z- G; G/ H, ]<p>只剩下实现Worker和Pool的具体逻辑了。</p>
. ?' {+ ?* k! A* n6 F" D<p>Worker的实现</p>
0 P" K% U9 ]8 _* `<pre><code>impl Worker
  q# R" }2 d4 z; M) V{. R4 o/ l5 k% g' r. E5 R
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {' w& I1 W1 S( `6 n- o  ]9 F
        let t = thread::spawn( move || {
( k! Y! d  J4 W) D- C8 k            loop {( k" D0 D5 ~. i! H& x
                let receiver = receiver.lock().unwrap();
; A, A$ f+ N( x7 Z1 s                let message=  receiver.recv().unwrap();
- x9 r' A; K9 f6 x                match message {
7 F9 N) N$ d% a( e7 U9 y' q" R1 o* [/ X                    Message::NewJob(job) =&gt; {
; U, f+ b! B- q8 u6 z) O                        println!("do job from worker[{}]", id);% @$ p- t6 a' x2 k( d
                        job();; @3 b4 ~* \2 e7 Q0 {% W. s/ M* Y
                    },' s5 o$ F+ R3 g* g( d% ?- l
                    Message::ByeBye =&gt; {
- d" Z$ r! Z4 J2 X                        println!("ByeBye from worker[{}]", id);& ?! h  g  g) K% ^4 e" R4 \
                        break
# b0 d" s  s: e: M/ R/ X/ d' ?/ H                    },6 Q; p6 h* g6 Q: m! O" _
                }  
5 a; @( C- |1 }! O; H            }
- Z0 \7 y4 X' c& j/ f- |8 `# {; C        });3 Y0 r: D( o/ _' |+ i: S. o

1 M& |/ i' `2 \* c) @3 t        Worker {1 m  d2 Q7 W# O( y9 n8 k
            _id: id,
  u% r8 c( d; \4 Y            t: Some(t),
$ a: l* [# L4 K        }( f2 c, [* m0 z$ D+ W
    }6 y/ A3 S1 Y! r
}2 L& q9 X% o5 D" r- x
</code></pre>& e' _0 D; f& S) g0 l" C
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>& @" A2 q* M$ f9 C2 m
但如果写成</p>9 ?) g' W" P8 C; ~
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {6 ^; }; ^9 c1 O: D$ P
};; U4 b4 s; R5 y$ Y8 a, b+ s# F
</code></pre>
1 t& K1 U. z6 [7 a" }- ^- a$ e<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>" V. }. c; f" m( Q( X2 C$ V
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
" \  ]$ v; }8 @<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
7 _' E/ n2 P" O; |1 f: A<pre><code>impl Drop for Pool {6 o" b2 _1 p& V/ X( E2 r" c4 K8 J
    fn drop(&amp;mut self) {. V8 d9 d4 i2 u% v% g( [. ?
        for _ in 0..self.max_workers {
4 V: K$ c( L: m  Z            self.sender.send(Message::ByeBye).unwrap();/ h! `" |) ?, n9 g. U& q
        }
9 g6 ?$ l+ ^; `, {6 f( z        for w in self.workers.iter_mut() {
& E& ~* P, l" l            if let Some(t) = w.t.take() {
8 m0 F" p1 t' _; }) X' V                t.join().unwrap();* l4 J( Z2 f% M4 Y
            }
& W; B" d, e' I; X" G  j7 y% d7 e        }& \: j8 y0 e% P; {: }
    }
. u4 o* L& J) K0 Z}1 s" i9 E; j' P7 {$ Q. r, \

- r! [7 R' x* I2 h7 Z7 w, C</code></pre>
# o/ [7 q& h# t* H" b/ g<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
- x% I2 B5 d( b' p; _% V9 |<pre><code>for w in self.workers.iter_mut() {
3 i! H5 v2 l& x- f2 _( K9 m* N; U5 `    if let Some(t) = w.t.take() {
/ G& r, v/ F0 N        self.sender.send(Message::ByeBye).unwrap();" L: P# N) q3 n) E2 l) C$ L$ m
        t.join().unwrap();
! |5 P- {+ Y: B! {. r    }9 R: R9 X- d5 c
}! k1 p+ f: s+ z1 X$ X  C

% x9 ?% c4 v/ i, C* B</code></pre>4 X1 K+ a8 w, R
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
- L' J/ @+ s8 F' f- y我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
$ U! U: U: q0 K3 i9 X- E: X<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>0 J* E) a8 C6 l
<ol>
0 b: J; ~5 y7 k9 C/ y<li>t.join 需要持有t的所有权</li>/ j1 f% Z5 _8 l2 \; s8 H  l
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
' a% i  }' m5 J% S+ a# J3 C</ol>
6 b$ ]( F% e* r; V% B! J, l2 L# s<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>8 ~$ |& A. k' ^. C4 j9 L2 i
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
  e0 J  i9 [$ o' Z: P<pre><code>struct Worker where3 C5 q! W, @4 b: H0 v5 U/ i& K
{0 i+ x2 X, q) L) [
    _id: usize,& m8 S& ^% W! B
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
, V8 P% q7 X/ v& C6 m* N% L}
0 v- O% z3 Y! A' V* [% {+ P- o</code></pre>
, D7 Q4 u9 e" L$ e2 T. U<h1 id="要点总结">要点总结</h1>
8 |! Z, B$ l6 [/ c7 P* k2 Z<ul>
  H6 }0 \4 W3 x/ c% D8 Q<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
2 ^9 A5 V3 h3 ~% a% b3 |) H<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
6 ~  g1 T! Q3 s% L7 j1 m3 Z</ul>. f, b& g: _- [+ m4 E% {) |
<h1 id="完整代码">完整代码</h1>
' o! n* n% ]! M+ G<pre><code>use std::thread::{self, JoinHandle};- q) J/ |; R& r( \% E- C. G& y, C
use std::sync::{Arc, mpsc, Mutex};
& j) A8 A8 O. Y( G$ z
1 ~5 ?% n$ K  r. F9 o& {
; F1 @: K$ L+ z' vtype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
7 G: C/ u4 k7 zenum Message {0 ?* d( `- r2 y8 g) |
    ByeBye,
! B6 y2 v- T/ F( f" n% B8 r    NewJob(Job),$ x# B5 S! F0 U- |2 O+ \- v! T2 e" \
}5 |" P  D6 ~7 m. \- h

2 A4 \+ J. A6 f9 W, j: pstruct Worker where$ i1 M3 G4 G* X
{
3 h0 \& w7 ?* |  S  V    _id: usize,' X: c- K& o+ |
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,/ B: b; O, M2 A; c! J
}5 x! [6 f' {% x# k$ A& v

5 {; |( s4 I" I5 i2 U$ E2 `0 ~impl Worker
7 y& L" M+ C0 P. e% P  {1 v* @{
  N' U; k8 p" s% J- [. E    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
& o. o$ {1 B$ ~" i8 J+ m        let t = thread::spawn( move || {% \" |$ ?( h$ p' ^3 Y6 [) M6 O( A2 \! l
            loop {  X  @* e' \$ J% o4 {% a# i
                let message = receiver.lock().unwrap().recv().unwrap();' x& {, N# ~" J, x1 P$ t) Q
                match message {
% E+ n: T9 B" ~: `( N0 Q3 }2 D' P                    Message::NewJob(job) =&gt; {
6 R1 r: N6 L: ^6 D4 {                        println!("do job from worker[{}]", id);2 U" `$ A$ B) T0 L4 S% O) G8 w. Z
                        job();
2 _$ B, m) p+ @! Y0 J                    },7 A: r$ l( [; R9 U
                    Message::ByeBye =&gt; {" [5 M+ ^% A( V+ o' L3 ~- L
                        println!("ByeBye from worker[{}]", id);
! N7 Q8 W  x) o. Q                        break
9 `/ I, r( V0 b" a" k! R0 d                    },
  w3 K2 c2 e; J: u6 M                }  7 Y9 g9 W) ?3 _5 Z* N
            }
! ~# k2 \' q7 g; s        });: N* Z" H! ]' j. F/ H
7 s+ p1 h$ f7 T
        Worker {
# K. L9 a1 ]6 t3 p/ i            _id: id,6 N/ r4 F; S9 k6 [* m# U1 O
            t: Some(t),
4 Y, `8 M4 O& l- C7 ?$ }        }
' V# ]4 R; W2 j: V    }) Z4 f% v" f6 P& q; N4 g3 i; E
}
% I- L, t# E4 u
0 {2 r" n- G4 V2 S2 o, Qpub struct Pool {4 T# `2 }: o% h: R7 g2 S
    workers: Vec&lt;Worker&gt;,4 V% Y. U2 F: a% A$ Z- O" ?
    max_workers: usize,0 e) E' C: n$ }
    sender: mpsc::Sender&lt;Message&gt;0 H6 e4 l) Q+ d1 D$ D( n
}
2 s7 q9 f+ ?* }$ t8 O: X. B4 B# ]$ T9 D& G3 e* x' V  W( i5 _8 S! O- L
impl Pool where {% e2 C/ m( H6 [- a9 ]: q+ ?+ K7 ^
    pub fn new(max_workers: usize) -&gt; Pool {# Y# H7 ]3 C  {; ~
        if max_workers == 0 {( Y5 g' b, S6 s4 @
            panic!("max_workers must be greater than zero!")
; o' u$ j7 B" g$ A        }& E# D2 j9 O: f5 s0 \* }
        let (tx, rx) = mpsc::channel();
1 Q. n+ F, b6 n5 ^
& S) X; t- c$ I1 X6 a$ G1 e: }" o! a( b3 [        let mut workers = Vec::with_capacity(max_workers);& b* l1 l' e: s* m5 f3 t
        let receiver = Arc::new(Mutex::new(rx));7 z+ r1 R- L. {7 ~+ l4 r- B' J  {
        for i in 0..max_workers {
, s/ F% `, @; i' F4 j            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
# N5 F; V' ?3 C5 D' I6 x3 L        }; s2 L+ A- T! v6 W0 N" L* `) B1 \

$ x# _( }0 Y" ^+ E' h/ X5 D        Pool { workers: workers, max_workers: max_workers, sender: tx }
+ @- A; s1 e8 m% s    }
" s5 N% S9 a( j2 z4 t5 ]    6 X1 j9 j0 m6 I# s" W% j
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
! H4 t. V) m7 s$ ?, G    {
" P* v; P5 j% v; j3 k; A
! h/ d) S) z; `0 h  c' R9 U        let job = Message::NewJob(Box::new(f));  c9 [- O+ y$ \# n
        self.sender.send(job).unwrap();
2 a  M' q. y3 [& u( {$ z$ o% O1 y    }
% o8 Z4 U: I8 S  s  W4 E2 R}
/ B; I0 e. F% @  U7 y! D, U7 R8 {# g% i% k* R
impl Drop for Pool {
$ e6 O4 g. [7 w) x, J' F    fn drop(&amp;mut self) {
7 K. }, m' F' F+ R6 I        for _ in 0..self.max_workers {% E3 ^! E, `4 ]7 [: V
            self.sender.send(Message::ByeBye).unwrap();
/ ^+ I  A% N4 i+ ]        }
, s( M; A1 A+ J# u1 v/ J& }        for w in self.workers {
- p0 p  z' P! O* N4 u$ w9 A            if let Some(t) = w.t.take() {; Z6 k1 z' d, t/ o
                t.join().unwrap();
  S! u) O$ ^+ x1 _            }: @7 v7 w6 h0 S3 l& o8 U! q
        }" c( A% P2 N: u1 k/ Y. o
    }7 b. n3 b+ l$ H, ^: K
}
% D+ ^, J$ i7 b4 r% Z8 S8 q4 x7 \9 W" e# i& }' @. P7 C
" t+ m* S! s4 j. K7 X% k6 M
#[cfg(test)]( @; N$ o+ q  i- l0 e
mod tests {) P4 B: ^( d5 ^7 Y& A7 ?
    use super::*;
( u4 y6 L+ o- }, D; G    #[test]6 r7 U$ C! t+ `  z% O0 N+ g
    fn it_works() {) G& M% I6 |6 r" n
        let p = Pool::new(4);9 x& x& O# L9 z4 F* }1 O( Q
        p.execute(|| println!("do new job1"));
" d: w% d9 b# Z2 s        p.execute(|| println!("do new job2"));
, x6 a, p  A4 T        p.execute(|| println!("do new job3"));; P' C; B( E* z/ D5 Y# o
        p.execute(|| println!("do new job4"));
! c' C, h4 w2 w1 h    }
) |4 [- ^& x5 h, ]" h  w}
7 i' J7 @, v1 u1 Y! m3 I- Z& C</code></pre>$ Z. S  x( M& V' ^4 j

7 [3 I# P. r- n0 B
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-5-1 06:43 , Processed in 0.139924 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表