飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 11369|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

7327

主题

7415

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
24311
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
  E7 z! q3 O" z# ?" p
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
+ U- n; S( L9 W<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>0 E- d) B7 u; r9 r/ R9 ?8 p
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>* n1 ~6 I; a# U
<p>线程池Pool</p>
  t2 [5 e% Q; j<pre><code>pub struct Pool {& G, q. s7 Q' O
  max_workers: usize, // 定义最大线程数# _4 l9 ~$ s3 B. X  V  R% U
}2 J+ L  Z  |7 C6 ]4 J
( H7 V( S: J2 E5 i7 x
impl Pool {7 W$ u" ?3 \7 F; I, O* s& B1 x
  fn new(max_workers: usize) -&gt; Pool {}
' ^5 H% `- k3 ^% F- I  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}. t$ v9 V- m/ M7 S" e
}! G/ z4 n% K( S3 j8 l9 M
6 i5 X' M$ V  y: ~+ X! ]% C
</code></pre>
6 j+ d. l6 |* {. w<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>" B+ {: y8 E3 I4 }
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>0 \+ v1 R( ?& D( D4 G7 V. }5 P
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>; A2 s" M6 `/ z: f  I
<pre><code>struct Worker where
. D; T( Y1 ^4 f- ?8 ^7 x8 F  D) r{7 B- W/ H6 ^* z' R9 d3 n1 U/ L* \
    _id: usize, // worker 编号! @1 C1 L% T9 x3 }, ~) b
}
1 Z5 `( _& }1 Q</code></pre>
# r8 {+ W0 G. |0 U8 w/ ?: L& B3 b<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>$ \& n; {: p( ~
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
- p+ d' j+ x+ @4 W: f$ ~. G- b<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>" c4 t2 z" [, }: ?
<p>Pool的完整定义</p>3 Z" q6 }* l7 K3 L" Q, L/ B
<pre><code>pub struct Pool {
/ r9 x" e. W$ K, |8 @    workers: Vec&lt;Worker&gt;,
2 n* P9 ~$ g# U# n- v/ z$ [$ R/ y    max_workers: usize," q* K( G; M" ~' A4 C
    sender: mpsc::Sender&lt;Message&gt;$ ]% E; s- O' y+ C9 B' ?/ t
}
+ j0 m- R2 A% G/ N; u& [' j" x</code></pre>5 X/ U, ~2 f9 ~2 T
<p>该是时候定义我们要发给Worker的消息Message了<br>$ I$ E4 V% u8 h: ~' @" I7 o- c
定义如下的枚举值</p>" `# c+ L9 U! G$ W7 j* N
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;( Q" Z7 h6 o6 s$ a  o
enum Message {
  z* ?$ J- }0 k8 y/ P$ |8 h    ByeBye,
: a  v1 P: @% z+ `    NewJob(Job),* f: J3 F1 i! u& f9 O
}
( e7 x7 w/ q6 C2 D8 ]3 O# ~; r</code></pre>2 `; H( Z/ w+ M* I7 ^3 s: J- h
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
" J! E; a- e( W4 b! M<p>只剩下实现Worker和Pool的具体逻辑了。</p>  G9 Y* P; W! Y& l) o; \' k# N  W# A
<p>Worker的实现</p>! k/ z3 Y  {4 x; o9 E+ H
<pre><code>impl Worker5 H* B/ w, u0 ~% P* |+ v
{& }: |8 W" b1 n4 s7 k- `
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {7 ~4 B4 H/ Z7 p8 N0 `6 r* h
        let t = thread::spawn( move || {
; A7 e& h1 V# L! g            loop {3 U4 K) J5 n2 k0 Q/ y
                let receiver = receiver.lock().unwrap();! ]2 \- P5 v5 l3 B
                let message=  receiver.recv().unwrap();6 A  \' Z. ]  f( q
                match message {
) q0 D- H# h7 F                    Message::NewJob(job) =&gt; {/ e/ m+ r4 {% _
                        println!("do job from worker[{}]", id);9 i! |. F: f6 D+ a2 b
                        job();0 t# J' f, n$ E+ k$ J5 E
                    },0 x8 J6 b* v/ c1 b9 T7 |$ _
                    Message::ByeBye =&gt; {
/ I! M- J* U. _( L# [( ?* ~                        println!("ByeBye from worker[{}]", id);2 b, h' T. W& d( m
                        break
! d3 Z6 a- D- j" v9 N5 ]/ e- X                    },. ]0 ~3 Q) r* V, w7 W. u0 s
                }  # a, U& B8 R' U
            }& o6 y' Z% x& m/ u( _
        });
5 ~: z& ~' |" _$ ]2 u. k* N2 C( \. v! B$ R4 m
        Worker {
, _) T( i2 g7 v9 T            _id: id,$ _' U, O* Y4 n# l1 Q
            t: Some(t),4 V5 p2 U6 U* k6 R
        }
9 }0 F: ^$ ]+ s4 R) d    }5 x0 K1 O! H' B; H1 |8 l& [
}3 b) o1 x: N7 [  p# q8 J
</code></pre>
; {' B, Y( U" Z4 O! d<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
5 l$ _& H; e* h但如果写成</p>2 j; A( R& I+ b& J6 D
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {( Z5 I/ d) s+ v, S0 ^- y( v" M
};
5 {" V1 b0 D. ~$ U</code></pre>+ k0 d$ n* _* M% N$ l  _
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
; t. d+ g# v" \) z4 R8 lrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>6 e1 L. O+ q! @7 g1 @, o5 N7 U
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>7 q% C, Z- X! L) \8 V
<pre><code>impl Drop for Pool {! P3 s. T+ b8 b0 b1 d
    fn drop(&amp;mut self) {
' |; `4 o, D; X7 B        for _ in 0..self.max_workers {
! X8 s/ n0 J. }/ ]4 V+ c- J7 e            self.sender.send(Message::ByeBye).unwrap();
5 H0 O0 i; u+ a/ N9 U) |# U% d2 v        }
% _0 l' i* y$ v: B        for w in self.workers.iter_mut() {% e% l$ t5 _" z$ |3 I& }: ]
            if let Some(t) = w.t.take() {
* [$ @, E3 ~. k7 r                t.join().unwrap();* M! E9 i# C! ~7 c
            }
' i. A4 Q, r1 A! o        }
" m) E+ m8 i( {- f8 m9 K    }( l9 ]  X: V6 O" Z
}
" C% J4 v& p) q# z& o1 O6 j  c6 k- W% R3 y
</code></pre>
- l! b' m* S" r. B& i<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>9 v. K9 B0 ?- X, s  A
<pre><code>for w in self.workers.iter_mut() {
: s3 H( s7 v# `- ?2 x    if let Some(t) = w.t.take() {* Y3 E# t, n; O5 S9 s4 h, u  y/ ?
        self.sender.send(Message::ByeBye).unwrap();
7 k& K4 P9 H% u9 l! a/ a        t.join().unwrap();
1 x: D. j" b# a  H% h7 E    }
3 d: f8 n8 q; M7 T}
, Y/ }: K0 f# ]' V- U
; m' a! I5 z' j3 i8 i9 c</code></pre>9 q6 ~; Z4 W  ~
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
& A9 A# {+ e% |; E/ P我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
, d, @7 F( E5 @1 f* i<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
/ K6 m6 i# ^: L0 e* M# E<ol>
! N5 B! _5 |: C( f& U<li>t.join 需要持有t的所有权</li>
- E, X) u! v" k. m* p4 q- [<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>6 \6 ^" o4 t3 t& t. O
</ol>
; J' D3 d+ l; O0 A& L) z<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
) ?, n% I9 i" \; b3 y4 T换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>" ?  U/ }5 Y* o( D3 L( E0 m, }* v7 Y4 F
<pre><code>struct Worker where) T* Q5 ^  P- u/ R0 l
{
1 l& \) z  j/ u7 f/ t; g    _id: usize,
+ Q9 w; R6 @/ c% V& C8 Y+ C- p    t: Option&lt;JoinHandle&lt;()&gt;&gt;,- t, \2 i- H* m/ q% o/ a$ m) X
}: T4 Z' |" h$ z" ~  }( M
</code></pre>: ]7 c( @* I. j
<h1 id="要点总结">要点总结</h1>) ?5 Z' P  u- Y) \# C; M
<ul>; C" k0 b& {# n% I
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
$ G% r$ h  }6 B<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
! @3 W* M1 f9 x" F</ul>
. E1 p$ }% f# ^% P6 ~( K# y0 U<h1 id="完整代码">完整代码</h1>
9 n4 @& k2 l9 c% y* P0 ]* ?<pre><code>use std::thread::{self, JoinHandle};
8 W9 D$ l. b4 ^! Y  o# I6 {/ E/ n) Ause std::sync::{Arc, mpsc, Mutex};
* O0 S) b. N8 b6 K' C0 E0 L7 C5 C: p- S3 B& Q8 c

& v$ G5 J/ `  j" `type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
: @4 [- s: B8 e' ienum Message {
1 S1 U3 U' e! ?8 y# D& I    ByeBye,
+ q0 \* M" G% x( c    NewJob(Job),
( y; N+ B2 i2 [' }}
& b+ M! D6 ]6 B1 M# E$ Z
5 l( S% G' v9 j) T7 x6 pstruct Worker where9 J4 z# Z9 a2 E2 [
{1 G4 A" A. z; Q" N9 f9 B/ Q! |
    _id: usize,
$ x! G  }; l* u* h. S- R3 U6 N    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
, d. a1 ]: t+ C/ R" F}* L" t' z6 c0 q) z
& X. l6 x& c. s# |; ~9 e7 w
impl Worker
/ W4 ]0 k) r, u" ?( t9 L( ]' ?{
5 w" A4 `- {2 A4 E3 h( b    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {: ?; S& s2 x  S: h, I+ L( {
        let t = thread::spawn( move || {
  u) b# b; }' o6 m* n            loop {1 q# h6 q" t$ o
                let message = receiver.lock().unwrap().recv().unwrap();( F& T9 k  C6 G& Y2 e; `4 B8 \1 r) a
                match message {
. X0 h) j: G* q6 c' p3 C+ A* O1 u  C                    Message::NewJob(job) =&gt; {
" E- G! Q0 K$ E" ]- u/ D/ {+ m/ {) P                        println!("do job from worker[{}]", id);, J2 Z- p3 P' v( d3 {: a& R
                        job();( M. U) v! S1 A- s
                    },& w, |" V* t; N( F- s8 A
                    Message::ByeBye =&gt; {
3 n  w- n$ c1 F* e& n/ |7 |7 p3 I                        println!("ByeBye from worker[{}]", id);
7 g2 I$ m9 P5 ]7 F1 X- {* n' K                        break
* O% ?: f$ p# L/ M' \                    },
% m/ b) W; `# ^( D5 x7 Q5 N                }  
1 N5 A% P) ]' d, [! k            }
; O9 q: C. K$ ~$ c9 _! _( i+ S        });9 _- V7 L- B5 Y1 N) [! |# n

: Q" ?! y8 O. X! H* [$ G% Q7 [        Worker {
0 p9 }% n! {6 ?/ e3 k5 s. @            _id: id,( q0 o' A' j8 k
            t: Some(t),
( G. q6 p. u, d        }% z; g6 Q9 K- r
    }
  |; y2 ^1 v. W# v: O}9 ^1 [% z  V% q  ^# i1 e/ _8 t

8 J7 C" _' v9 G* }  |; s" apub struct Pool {
2 V4 B2 I  O0 Y. a) V% x    workers: Vec&lt;Worker&gt;,9 n- e( H" P( z% s; b
    max_workers: usize,5 W: S# j' {, C5 v
    sender: mpsc::Sender&lt;Message&gt;
/ }8 k( H) z; H. U7 V% [* c6 v  K}
0 x" x3 {0 p0 L$ n  ^$ m! D8 J/ O9 A3 b7 L- W0 [
impl Pool where {
2 S" ]& [2 M+ m) \  L    pub fn new(max_workers: usize) -&gt; Pool {& L; g2 r7 l' B' C4 J* q5 @
        if max_workers == 0 {
: U4 ?/ s6 \8 F, Q' C  k4 x            panic!("max_workers must be greater than zero!")
) Z% c( ]2 l: I! A' V        }7 @/ {: v; Z# a& N1 Q
        let (tx, rx) = mpsc::channel();) I+ Q6 @4 w/ K
9 ?) b, J5 e) o/ v0 h
        let mut workers = Vec::with_capacity(max_workers);
4 J: |6 e- U3 m" e& @        let receiver = Arc::new(Mutex::new(rx));
/ ]. ~! l: i  O& X( g$ L0 v        for i in 0..max_workers {6 J- {+ F4 f9 E; C  K! b
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
( A. e' ?7 d0 f" c        }% }- T& C9 B0 H2 ?5 c8 s$ D

5 M$ \7 f" Q3 o# z4 r$ G$ h9 \        Pool { workers: workers, max_workers: max_workers, sender: tx }. n: C- ]7 u8 @9 R  F
    }
. g& i8 K7 m* D+ |  }" G3 i    + U4 q0 H, N8 E8 S. o* C+ X6 _
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
7 A, h) ]; y: B# N0 v, ^1 m    {- Q6 x4 z& G1 f' x
4 G2 W* L* |1 U& c! X" I" K
        let job = Message::NewJob(Box::new(f));
. N* J4 C, N. S4 P        self.sender.send(job).unwrap();
) U* R0 z! S2 ?- r8 Z$ A, B7 N. q    }
' R6 p! i+ J. Q}
* R$ m- A1 ?- J+ j8 m9 b' G" e. H) R& m
impl Drop for Pool {
, c9 C- s$ V# l2 i/ w7 W# V    fn drop(&amp;mut self) {8 B3 E) `; O7 N! v- P$ f: |; F# d
        for _ in 0..self.max_workers {
" `8 m1 d! [2 i+ W7 _8 [            self.sender.send(Message::ByeBye).unwrap();  |# w6 v. d) @, \# o
        }
$ }" b8 N2 @* B. _* _        for w in self.workers {
! s, v7 A7 E4 @3 _            if let Some(t) = w.t.take() {, _7 v. t2 e" v0 `
                t.join().unwrap();
2 f! u! V) ?6 ]' x: ]3 ^            }7 t+ b6 ~$ y: c' z" u
        }* I! y0 u$ B. t0 I4 V7 y
    }
3 k/ G. u2 `  }& B8 s}
0 e8 L' o; R% y5 P6 K
6 i1 E+ B% [7 S4 t9 q' M* p. T; [  e% L7 P2 F/ S
#[cfg(test)]5 Y" J' g9 z  k( K, g+ r
mod tests {1 ^6 d% o% H2 g* g
    use super::*;$ ?. ~( R$ k2 @1 |
    #[test]  K/ s( p9 @8 J5 B, K  b3 d
    fn it_works() {
$ ]7 _+ t; F5 z; R& U        let p = Pool::new(4);* W- X3 x- R: @+ f
        p.execute(|| println!("do new job1"));, y# F/ e: g& R
        p.execute(|| println!("do new job2"));
$ p& ?7 K, r  D3 G4 l, K6 V% r        p.execute(|| println!("do new job3"));
% K$ l$ C5 h  }* Y5 t' e        p.execute(|| println!("do new job4"));
2 I$ S! l7 `$ d' D6 c    }
& f7 Q: [) t& x. Z$ x, E7 W}
: W% E% Q( H! n0 D( ~</code></pre>
, Z2 g1 ~) ]5 g2 \3 z/ Z# t$ ^( D2 H/ k, Y: p: Q2 Z" D
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-8-23 15:48 , Processed in 0.062374 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表