飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14914|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8087

主题

8175

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26591
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

- A0 |) o; d# S) J3 @<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
  {/ E" M/ f( z2 W6 [' Z& {+ q<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
: Q5 g2 f; @5 k- D5 _<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
% I5 t% R7 E) A* e/ j* x0 {0 c<p>线程池Pool</p>
! }3 R" `& M1 m. V$ b<pre><code>pub struct Pool {0 k* ]% O' D+ U" h' y3 r
  max_workers: usize, // 定义最大线程数( F  w4 B8 _( C4 B5 p
}% ^& g% d% f" j% t: N8 z

  {# `1 X2 J" J, vimpl Pool {
4 F8 W2 ?! f6 F/ Y5 W  fn new(max_workers: usize) -&gt; Pool {}" T* g) u) Q/ G5 H' ~. G" I
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
( N* ~. H: m, D0 g, P}9 A* `( M; O1 e! Z4 E2 U
, w, y4 R/ u' O2 s) H1 m
</code></pre>
8 H+ a# V+ v" T6 ]' d+ {. o; G<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p># p. Z( D/ C0 C8 ~# H  @
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
& C4 B! {+ u$ ?' ^5 O# X, f可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
% I8 e& d1 W, j7 f<pre><code>struct Worker where
. n+ T, @8 o1 {. q% `4 i4 Y1 c4 X{6 h1 K# L! z1 E6 L
    _id: usize, // worker 编号
! f5 L1 _: a6 }7 h# X! e2 ^}, F  B2 T$ j6 u
</code></pre>- ~# [) }5 `: l* u9 x* B. F
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
, b- V% W* z, ]" B把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>; X  P; M/ t4 @  c
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>- o! w2 g$ x. y3 j
<p>Pool的完整定义</p>5 j6 u0 T' P% x7 H5 O
<pre><code>pub struct Pool {
5 I/ _! q7 f8 Y8 A1 b$ t+ X. K    workers: Vec&lt;Worker&gt;,9 p2 H8 Y: X$ b3 t& |& _6 Y
    max_workers: usize,2 N6 ~0 t( Z' ]/ ^8 `8 P
    sender: mpsc::Sender&lt;Message&gt;
+ f. G/ T0 q5 z; Z' T}
  v4 w- `: z. G1 l</code></pre>8 [9 ^! C% @1 q: h* k. W
<p>该是时候定义我们要发给Worker的消息Message了<br>; `6 N0 T% k* I0 M
定义如下的枚举值</p>
( X, X* x9 V- H2 B; N$ \/ w% K<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
. q) I+ b% J/ Z- ~! ^9 a, Q" uenum Message {
; J& o& f: r: Y& t, W! C; x    ByeBye,6 U& v+ f# X6 w; l9 ]* z( _2 K
    NewJob(Job),
0 Y1 ~0 b& o/ V# e- c, R3 v}7 _' M' R! W, Q) W5 {
</code></pre>
% a; C) g. b/ L<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>- ^$ R8 z* v8 u! G
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
; }9 J: F" \* t6 J! ?<p>Worker的实现</p>
. }0 J/ ^- F' R4 o' g! N4 P<pre><code>impl Worker
# ]' p) o" i' m0 @% a! I' I# C% K{
' x" q) i$ a$ U' P! S6 F    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {* s  T+ |- _2 O+ ~
        let t = thread::spawn( move || {$ {' c( t' P) I- O9 a2 b" U, I
            loop {2 h/ K+ S* v4 Q$ h$ s& Y+ n
                let receiver = receiver.lock().unwrap();/ [2 X' w, H6 d
                let message=  receiver.recv().unwrap();
! D* a' y9 z# F2 ?7 b* F6 e                match message {9 L) y( a% k: S; t
                    Message::NewJob(job) =&gt; {- [4 @1 J( `" L" {' |) H
                        println!("do job from worker[{}]", id);2 }5 `# O- ]# H
                        job();3 T" C* a4 H9 e+ P$ J- y( c
                    },
2 t) H& H  j4 i, R2 @$ s' \                    Message::ByeBye =&gt; {  [9 u! T* ]3 A  Z3 h' ?8 F
                        println!("ByeBye from worker[{}]", id);
  S: Z6 u  l1 H9 o2 A                        break
2 u( h' P( x( k  b% N) ~9 H                    },1 t. b/ N8 ?3 j  G4 p3 ^4 k9 J. c
                }  $ e  w% }2 k$ s
            }
7 h  K) w" E2 X) i8 d- k3 u        });
4 V$ @" \  m, B/ S
3 s, ^# T8 D( l# z, a% p        Worker {( f4 b$ E3 _4 n% I0 P8 l
            _id: id,. {3 m& x' S3 t+ |4 M* l4 @
            t: Some(t),$ F, d: B+ }5 ?
        }2 q' h0 W' k+ _9 \0 k
    }8 g, z4 w1 M/ {) }" c! m+ H
}, A/ U# e" b& A+ j! e6 T7 m
</code></pre>
6 k0 m8 c! m6 h% t<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>. e) `7 X5 Y( M0 D8 ]6 G3 z* _: i9 v
但如果写成</p>
, C6 Q: D* L6 t( [5 h# E0 v<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
7 V4 q+ j1 @- C9 R4 o8 ~$ k. e};
. \9 y) }% x- K6 y! O+ C- Y: ?; ]/ g</code></pre>" C4 R# o. E8 y# w- S9 O! \# N0 t5 E
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
( G5 b; I2 P- s! m' grust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>) z# Y1 v. a- D, r
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
2 j: }* n+ j% `<pre><code>impl Drop for Pool {
# b; a. \7 Y" u* D  T2 C- K, i- @    fn drop(&amp;mut self) {
% ~8 `  K! |" a8 \  M% Z, }- V5 X        for _ in 0..self.max_workers {4 D, ~2 X2 p. r
            self.sender.send(Message::ByeBye).unwrap();) A$ t4 }2 Q; I+ L% B) `0 S
        }
+ |) P. T% z0 _) g" X        for w in self.workers.iter_mut() {) h5 F5 d8 d4 w$ }
            if let Some(t) = w.t.take() {, s5 q1 y- ^4 m: ~% ?6 P$ H5 d
                t.join().unwrap();
" Q! T( O- F2 H5 [& E6 G. G            }) q, g# m+ I2 x- t, W$ l
        }1 F9 B5 C: W7 C& L/ _7 d
    }
( @5 |% m9 \, \& a+ T8 I- o}
2 \% c& E; P& E; u8 F6 i  j* K& H3 k; O! N$ d
</code></pre>' l! M. X7 k. j+ O# U, d6 n
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
, o0 O7 v' f; |  W<pre><code>for w in self.workers.iter_mut() {, U: a2 ]5 h4 P# S* D' x
    if let Some(t) = w.t.take() {5 J( `8 ?+ D$ n5 T5 d, ~
        self.sender.send(Message::ByeBye).unwrap();
% d8 w8 {  x& {' K: c7 h) r( D        t.join().unwrap();
# s( @2 U& i0 x* q. _    }
8 I1 Y5 s5 W7 s. ~}
! u' H( z. x; u9 ]. L3 l- }& M
7 i& B2 l* z+ V# l6 d</code></pre>
9 `, t7 W6 g; L2 n0 W1 n( J<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
. d& i+ V, S2 J! b; C" k7 w. a- z我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>: Q7 C. Z$ ]3 e1 [1 D
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
9 P3 D8 N' t7 _<ol>
; ?; l/ j  t* d& T<li>t.join 需要持有t的所有权</li>
8 [  T. t; e6 Q% ?0 r6 i<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
2 ~4 N6 e& s% g# \</ol>
8 U* p" {+ L2 x4 F8 F; B<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>/ J4 K5 v; b1 @" l, ]$ s$ E; V, W
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
* c- n0 F$ t9 U% k. J/ O6 K<pre><code>struct Worker where9 T: T8 E( B, I, W& v
{
5 {4 n" l' q# {* |0 ?/ g" h    _id: usize,
3 ]5 l! Y) k. w5 S  D7 o1 \' X    t: Option&lt;JoinHandle&lt;()&gt;&gt;,4 [" L! T  v, |! ~" W  G) G
}8 P* v3 q* w/ s- u  j+ z
</code></pre>
$ G% z3 P% m' g( h6 ?<h1 id="要点总结">要点总结</h1>7 y$ p3 d" r, {' G. t/ Q# f
<ul>" a2 N; R: [& Q+ s
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
" l9 _- S% ]( p# y8 z  s1 u; w; ?% Y, `<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
# e  [1 w: k: D$ c$ s$ y</ul>) z3 d* U6 ^: x" G
<h1 id="完整代码">完整代码</h1>
' e7 s& @- N, O! W6 _<pre><code>use std::thread::{self, JoinHandle};) [/ P5 _8 y. E: f+ R
use std::sync::{Arc, mpsc, Mutex};4 H% V4 g9 @5 _# x, q
5 r, b4 K5 i' [% Z3 ?
8 G: ^" E! n0 a  \( P
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;* I- e4 Q; {( Z. U& b
enum Message {
. H, w. K4 j3 p% v. t7 |    ByeBye,  j) h! v% \" z
    NewJob(Job),
( D% K8 {- b2 X# K9 F6 s}5 U+ b8 |6 f5 }( Y$ e* M# U0 S
0 B1 Q0 H! a* G! x7 c, F
struct Worker where
8 U2 ^; V" g: T7 B; a{' ?; ]$ s1 R' P5 t$ k7 P) ^
    _id: usize,* `" R9 E" h" w6 x- ]+ L3 s5 O) |
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,- N6 t5 s# c3 a
}! Z. v) N" J3 m1 H& g
* M8 u" B3 ~2 y" x, I, V
impl Worker) {+ ]' G0 x6 ^" X8 G$ w" j
{# Y2 x  x" w7 N% ]# U) f
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {% z" E8 b2 ^' E$ ^( [# v
        let t = thread::spawn( move || {
" X4 w# B* e- x" m1 e1 k            loop {# d! Z, W5 r. L- a" S4 x
                let message = receiver.lock().unwrap().recv().unwrap();
2 o5 i4 r- u5 L% K  [                match message {, u8 [5 \4 h3 t& F
                    Message::NewJob(job) =&gt; {5 ~, k2 N; T" f; @% i: c* P7 f' v' x
                        println!("do job from worker[{}]", id);! @5 Y' i( N3 {* G5 [  ?- u- m, i4 {
                        job();
& N6 y7 p& h2 s' C4 ]* k                    },
- e7 l- R3 G, a2 m- S$ m                    Message::ByeBye =&gt; {' X" j( P  s  L4 \
                        println!("ByeBye from worker[{}]", id);2 |6 ^( z, u+ r" e7 g" Z- F7 ?! m
                        break; ^0 C/ @% W1 }8 Z( d2 ]* C; F, E, [$ l
                    },
: n7 o+ x/ Z2 L5 Q4 c! d; X. L3 z                }  / D/ m: m( a! I8 [
            }
! g' ]8 X  L" I) D  y- I5 G        });1 p! G- F4 ^  z5 Y- v

8 g" X: _' h/ v. ^, o        Worker {
2 n& g7 e# q! I4 }8 V/ N, u            _id: id,3 `) m7 t9 U" X. B7 \' e) A
            t: Some(t),: t* W  Y0 L9 F& X( u
        }8 C$ {) n: o4 ^7 v- X3 [2 J& @
    }
# B5 D# b1 q* s}8 Q! @7 h1 y5 z

4 a& }! h5 c& Fpub struct Pool {- _4 k3 {: X5 t! ^7 U4 z
    workers: Vec&lt;Worker&gt;,( }  Q$ w4 A5 B: Y) Y
    max_workers: usize,
3 a, S) w4 [5 H& q    sender: mpsc::Sender&lt;Message&gt;
: u' G: `+ b  T4 O}* t' O4 N5 f& L# H, D/ u" ]
5 R( @" C9 i; \# W
impl Pool where {
* x2 d, ?$ L! V. P7 G    pub fn new(max_workers: usize) -&gt; Pool {
4 N6 z; x8 D& C+ a        if max_workers == 0 {. J1 \  L; I8 V! I4 a5 T
            panic!("max_workers must be greater than zero!")8 V5 \& m4 i/ ]8 {1 m# G' {% M
        }
/ I& t( n) J6 i9 Z! [9 \        let (tx, rx) = mpsc::channel();* |# H& y# O6 w: d9 T6 W8 u
4 l( F! N7 k1 @, }+ l# C
        let mut workers = Vec::with_capacity(max_workers);0 y1 D5 T) L2 _7 K; a% r! G
        let receiver = Arc::new(Mutex::new(rx));8 }9 G: U, K: a! t+ ]6 x# d/ Y# H
        for i in 0..max_workers {) Z: E5 [" q0 Z, n8 o
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
4 `! j; F0 u+ Q1 C' s" r7 G6 h        }4 ~6 ~8 w5 G& z4 d$ j' m2 u+ }& D$ _

) U* D; ~; H2 |$ q9 x( v1 K        Pool { workers: workers, max_workers: max_workers, sender: tx }
% H2 C+ s7 {" M4 ?/ O    }
2 T8 D+ v2 o. }    1 G# }3 \( Y  S+ h( Z/ p; `
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
1 S! B  w# H# Y, E. ^* }5 [/ i- q: ~    {# b, z. m4 [; V: C8 `2 d5 s' k
1 I+ {9 I  m/ P. `# {( x0 m: T" @
        let job = Message::NewJob(Box::new(f));9 l* M: q% R8 i2 q7 q
        self.sender.send(job).unwrap();
1 o. u/ X9 Z. H    }
3 N) q' X2 H4 e- ~, o( X. G8 X3 x}  i4 F# }- I! T2 z+ M2 a% i
3 S3 H* Z6 h+ S' d# j6 d
impl Drop for Pool {$ l) b. K$ I- d
    fn drop(&amp;mut self) {
3 E- V0 z5 G) U  p        for _ in 0..self.max_workers {/ Y) H# b0 W# I2 b" Y+ A4 ^$ T
            self.sender.send(Message::ByeBye).unwrap();( U' B" c; V7 j/ w! ]4 G: L
        }
/ K1 V2 N) S. N1 s% b' b        for w in self.workers {: }+ W. p! O8 o! M
            if let Some(t) = w.t.take() {
. V' S; a0 p4 O$ S1 f' q) W                t.join().unwrap();
6 M5 f. B0 O/ V% w% g/ t" R8 V+ L            }
+ C( c! ?' @% A6 j        }
  g! D/ a7 P( [1 H    }, l; u$ f2 }; I- g
}3 ]9 L& ^6 K7 o  w! L1 f, K7 ]/ Y6 o* Y
* }9 B: W7 d  ]2 Z' S. Y

7 V! o% e( ?0 S9 T" |#[cfg(test)]4 ~: z  d7 q; ]
mod tests {
* c& n! B6 N* o# o$ Q" {/ C; p7 W- e    use super::*;
- e) w) {/ @' W    #[test]" h) e, q* t+ ?$ s% S  R& }. |! m
    fn it_works() {/ t- X$ d1 G# M1 }) ]$ ?, V% M
        let p = Pool::new(4);1 N: Y9 x6 p2 T7 A* c
        p.execute(|| println!("do new job1"));
: T+ d* O- U1 t6 `) w$ y6 A        p.execute(|| println!("do new job2"));
1 [8 @; |; i# V( g2 e        p.execute(|| println!("do new job3"));
0 g; X, j7 G( b* `, o) w7 h        p.execute(|| println!("do new job4"));% [& o. K8 z9 [+ ]# @4 b
    }
; V$ Y9 @: D4 i& a2 R4 l}: Q% M) X0 q# E* E
</code></pre>$ I) h% H+ ~$ N$ C- T0 S
- `/ j- j. G& n2 ~9 b" `
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-17 16:43 , Processed in 0.100213 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表