飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 16400|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8560

主题

8648

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
28010
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
7 n+ g0 }* j! R$ y, u' {; d
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>) {* q0 L) Y3 m5 C
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
- n2 p) \5 Y  x<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
1 B# P* F. i$ {5 n3 T7 r* e+ _' W$ b3 [<p>线程池Pool</p>
+ a8 f7 R3 m2 J3 I" \) ~' B<pre><code>pub struct Pool {5 F. y8 E# V/ k6 Z, S) A
  max_workers: usize, // 定义最大线程数  ~$ k4 [0 ?& \- o+ p8 y# E3 b3 G6 s
}3 P4 Q  q# C( {( _

* S+ k: M( \( `! @  l+ Gimpl Pool {
) v( K7 B0 w. ~* p8 S  fn new(max_workers: usize) -&gt; Pool {}
" l/ N: Y+ j1 i* Z7 ?7 K% U  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}2 m# T7 a) }, h8 h
}
8 r2 M. j8 q1 G- O) ]/ p4 a  ]' A( ~. L7 ?, Y& s1 p, E
</code></pre>/ ?! r0 @) H8 b
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>" r8 g% Y1 n! E, Q; R$ C7 j% q
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
8 E* u. M- i7 H7 ]可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
9 A+ Y1 H/ @" ?& _2 R<pre><code>struct Worker where* Q9 H8 M4 ~% U
{
' W, y" [6 f& z7 S* a! h1 E) v# x: F    _id: usize, // worker 编号
5 [* R1 I7 J, A) m3 C1 g}
" c5 P7 I( e% T( x% S</code></pre>3 j; y8 Y: S$ x; @. f2 E
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
, v, T( A5 N0 Z- P, w! N把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
% f" F5 s# Y$ h9 [& y% P<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>7 w% K/ k: B2 ^3 s8 B7 A
<p>Pool的完整定义</p>6 A4 j( ]9 ]4 g4 z3 \3 }
<pre><code>pub struct Pool {
+ c0 u  I5 C& G/ q9 s/ o    workers: Vec&lt;Worker&gt;,; D( a; n% i, G3 A5 h
    max_workers: usize,9 x  M: L, e$ c: e- e
    sender: mpsc::Sender&lt;Message&gt;
# K9 f8 u* y4 D) Z0 ~# g: S}
0 y( d' n6 h8 U: y5 a</code></pre>3 f# f4 y' _) e, \$ U4 ^
<p>该是时候定义我们要发给Worker的消息Message了<br>. k1 Z3 D1 o) ?, d+ \% U2 S% @
定义如下的枚举值</p>
" X" p* W" n7 c& [; C<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
9 M/ I" ^/ p: H+ I1 tenum Message {8 E: Z3 p" p# T4 x6 i+ Y# c. G
    ByeBye,- e, m; E0 Z  O; `6 |9 g
    NewJob(Job),' M) ^' U" ?: K$ j8 K, [
}, e' N, \; a2 W% `
</code></pre>
* O/ q. U+ P" P( a) {* s<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p># F0 d- ]7 O7 g/ z' X9 ^; W
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
% f- S- U( M& O/ Y<p>Worker的实现</p>
3 D* q5 j  x( G; [" [3 Z<pre><code>impl Worker8 ]! o/ C$ [% h7 o) d
{
7 r1 {8 a  B: q! [. I. s    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
. b# M- S/ M: c        let t = thread::spawn( move || {
" P6 X8 j( I0 P& G5 X            loop {
8 H& S# e' C7 R                let receiver = receiver.lock().unwrap();* a/ C  b7 O" W+ M1 @5 R
                let message=  receiver.recv().unwrap();+ G! ]# J' g5 }1 l1 {) n; j
                match message {
& G$ Q* [  _$ w8 q                    Message::NewJob(job) =&gt; {5 y: J9 v& d% W! _4 w
                        println!("do job from worker[{}]", id);. T8 z) g0 Q$ x5 J2 c
                        job();
5 z3 ]0 ~6 s8 V6 Q* F$ n                    },% }8 I; A7 G" x; |# D
                    Message::ByeBye =&gt; {
/ l3 V9 K3 Z( t1 N                        println!("ByeBye from worker[{}]", id);7 n1 |" C' X$ m4 V# O* Q
                        break$ d& u0 W6 q. M
                    },
2 F2 g7 w- t9 p1 H' i- y6 l                }  
' j. q0 V! S- o& v" b1 d4 k            }
9 t: T! D2 T' x% \        });, ?2 E/ q! c; p+ `( W9 m
2 R2 s0 _( W6 E8 f: l
        Worker {
& |) M. N% o) I4 Q            _id: id," L0 |9 N: q' k5 T, Q$ [! n1 {
            t: Some(t),( g% ^9 h" A( n, m. |
        }
! R: M. C% P, v. A# c    }
# H/ x+ N+ T1 B+ ?/ |% U}7 M: s- g6 Y+ r2 _, C' g
</code></pre>3 F# f9 }+ j" y  m0 ]" F' N
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
8 M( C1 z8 J2 _& Y) E8 V5 r但如果写成</p>9 F& n; T: e+ \; W
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
9 Q4 ]  U, X" {: q0 C% ^};
- D, e) e* x6 K: c0 A2 u4 p</code></pre>
# n8 D5 Z; B# M! n<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
! W3 k  k7 v/ r9 \' b2 t2 Nrust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
8 X* `1 V" a4 x( g" e" W' `/ {9 }<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
; |( b# }& E% _( m) [8 u<pre><code>impl Drop for Pool {! R' r- G0 H0 [' J9 Z
    fn drop(&amp;mut self) {3 H! S2 J$ h  R7 X0 j6 x
        for _ in 0..self.max_workers {
1 M8 ?! Y8 ?. L7 G: l* @' J! ~            self.sender.send(Message::ByeBye).unwrap();
0 W$ H4 N$ _3 s$ j& B9 w        }
) X! C$ K* V3 ]/ B3 ?& x        for w in self.workers.iter_mut() {$ B% N; ?) e* x; `% @/ t. h
            if let Some(t) = w.t.take() {! ]' J3 G& b( v- B9 z7 C9 `% U( B
                t.join().unwrap();! o4 W% U, |! B, c
            }  d" B) t6 N0 {5 J! {) I
        }3 e9 [9 S& a! I4 Y' K# D
    }
1 o' f$ u  @. a9 y# j8 n}. m: d( e+ x* E8 ^/ |
  o- G7 ]6 T, w5 L$ y
</code></pre>
. z% h" D# ~' Q, }$ ^. w, I<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>3 _" ]$ _9 x# A1 S; e  ~. e
<pre><code>for w in self.workers.iter_mut() {
+ q5 n, q% H; \7 s    if let Some(t) = w.t.take() {. `6 n! E3 O# @. m) n% V- [
        self.sender.send(Message::ByeBye).unwrap();( z$ t4 v5 b+ e; b& P
        t.join().unwrap();
9 p* t/ \  p9 _, y- n& d: r7 G- p# }    }) u% T. }& F: P5 k$ Q0 m
}: r' x/ x  z( n: \8 l, L* w
! l2 v3 {- Z! G' n& J" _) s6 @
</code></pre>
6 ^. p* m( P+ K# ~! q<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
0 R5 j3 m1 R- X8 X9 H我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>7 t/ t4 |( D% @* o% m* D3 o
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
# k7 Y7 P2 m4 ]<ol>
7 T% Y3 L" M- ?: M0 L9 |2 v<li>t.join 需要持有t的所有权</li>4 ]6 C; \8 @5 N7 M+ k* W6 n" H9 t+ j& ~
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>& i, F5 y! K) q
</ol>
+ ]8 Y/ c& l( u1 L<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
, n- [" T& J' @+ W. q; O" j换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
7 J1 N0 b0 a5 i<pre><code>struct Worker where6 S* l* D6 Z3 r6 k; d
{: L0 k0 Y! T8 l2 P9 O# x
    _id: usize,
( C8 G$ X8 t( d5 C: h    t: Option&lt;JoinHandle&lt;()&gt;&gt;,4 |1 Y9 }+ z6 X8 M. |# Y  L
}; d- S" K- k5 R4 t
</code></pre># u6 c) T% a8 B0 w, v
<h1 id="要点总结">要点总结</h1>
% Y4 O) s* P8 b2 M<ul>
2 V, y% Z! ~* Z- \( u- G+ u! v<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>$ P; C; O' N; M0 @6 z1 B  u2 Y
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
: G3 T# S( e" S- J% b" D' L4 K</ul>
1 u: Y4 I7 l0 E& M<h1 id="完整代码">完整代码</h1>
$ z5 V5 x8 n+ v) b9 V7 \<pre><code>use std::thread::{self, JoinHandle};
" \( v+ g( Q& N$ S4 Fuse std::sync::{Arc, mpsc, Mutex};0 K2 r1 P! \/ c
0 m. S. I! [; l" ]

2 r6 j1 {# v# B# N6 Qtype Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;# @3 O; B0 V9 q
enum Message {  e- f8 ~3 L& W- X
    ByeBye,
3 X% r& k- x0 u2 |0 N/ D6 v    NewJob(Job),
" k; @8 W" h5 R/ v) e$ S}% f  A# F3 W' f1 n7 X! @' U
7 Q* x! T7 u5 i: A
struct Worker where% v  y$ O6 c3 O6 X
{6 t+ b" p- d. V$ E. g- [
    _id: usize,% V: S5 U# i7 x' \
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,: G0 C9 ]9 Y" O3 _
}
! a, p( j" c* X
" W5 e- {/ G, Rimpl Worker
) \0 k  e; \3 T7 W/ a  D% c/ [6 O1 y{
* o7 [8 R2 a, B( ~% u; t    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {2 s0 y4 j7 b7 }. @1 X: ?
        let t = thread::spawn( move || {
. `: P7 U5 d2 s2 G            loop {5 F+ }$ j' C6 L; m# _: Z
                let message = receiver.lock().unwrap().recv().unwrap();! t- \" L/ I8 t0 A  N9 R
                match message {
$ J9 x+ b9 L% }5 v' S! P0 V9 W                    Message::NewJob(job) =&gt; {
3 v7 n4 F2 g5 Y/ F' y                        println!("do job from worker[{}]", id);1 D! d! i( N8 e" a! R0 B
                        job();
2 x6 t8 t5 L0 b1 n0 Y% a/ _7 N                    },, V: W: F& p2 y$ j+ ^2 X' Q
                    Message::ByeBye =&gt; {
, i9 }( N2 o6 q8 Z                        println!("ByeBye from worker[{}]", id);
( T7 T* r* o$ D                        break7 J* x, B9 h) _
                    },7 d, @0 t2 Q( w/ K! e% E4 l
                }  & e. Y- G7 g) e  q& n
            }% e- k* x5 m" c9 l; b, y+ z
        });, I% V4 Y8 {6 k
# o  L1 l1 p% z( i! y8 c' Z' n, t
        Worker {
  S8 A2 [9 n3 Q            _id: id,6 r+ {" _) k3 F5 H3 c
            t: Some(t),  E- M4 h, W( k
        }
1 Z7 t1 u* c% H) Z% V1 [1 F7 a; g    }: p4 `! ~2 D* Z5 i/ I$ T, W7 w
}
% H7 C' e7 J0 l/ a) h# V3 S& F- t: b" [. g  V/ s
pub struct Pool {
( f+ G* h/ P3 K$ o3 F0 `# f    workers: Vec&lt;Worker&gt;,
) l$ p+ |% B* d# A/ d* z" ?8 `    max_workers: usize,
# Y0 J6 Q5 _! ^0 i! `9 }+ w    sender: mpsc::Sender&lt;Message&gt;. `/ S" ~4 E# o7 ~) `& M
}; ~; s' Q6 \  c4 r1 F+ j
5 ?+ l" j; k6 `4 c2 T! C
impl Pool where {3 V5 v4 j8 |* y5 D" O" }
    pub fn new(max_workers: usize) -&gt; Pool {) S# H; c6 e, I. k! b) ^% Z* v- V
        if max_workers == 0 {& ~/ a: E( o6 R& ]" J8 Q
            panic!("max_workers must be greater than zero!")
6 Z$ w6 J7 Z& S1 e        }
* D5 l% M* z: I" {; _# x        let (tx, rx) = mpsc::channel();; o7 A# E" J( I4 J- H% d" X
7 E& K5 I7 {6 y0 g
        let mut workers = Vec::with_capacity(max_workers);
+ ?) i* p6 Z, l- i        let receiver = Arc::new(Mutex::new(rx));
$ d  R& J. w: R6 F- ?! n        for i in 0..max_workers {
* N& Z5 E/ @7 ~. Z4 L+ {! W            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
8 y8 B* t6 X* ]9 J# {9 g: N        }0 m* u% z: h5 `

$ C# G0 J3 {5 z0 a6 Y, t  n        Pool { workers: workers, max_workers: max_workers, sender: tx }
7 x) b- ?- {9 F% t+ N! M0 |1 X    }
% k% @  Z2 a! G8 x" D   
- f0 ?5 \* f0 K. ~! g* j9 l5 o2 G    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send) S. {& Z/ ~2 u
    {- y# s5 D& u+ q/ v3 y: J3 t3 ?

8 O5 a0 m" }# @! h        let job = Message::NewJob(Box::new(f));8 \8 p7 w3 I  _' w
        self.sender.send(job).unwrap();
  q% s8 e" `2 m) P    }
2 F) Q3 v2 F! l8 W, L* B& ?# Q; Z}
, C" z; w1 d0 {7 x
  o  i- P- l, {/ p, Bimpl Drop for Pool {
) t. l' r# v2 y7 d+ x6 o    fn drop(&amp;mut self) {6 {4 D3 z. X; V; g4 q  b- F# ~
        for _ in 0..self.max_workers {
3 Y, g% M0 w, Y3 z            self.sender.send(Message::ByeBye).unwrap();
5 v/ V& o1 z5 v! Q6 |+ J# H        }
* S' O" w. I9 f* B! C4 ^        for w in self.workers {  O+ h! H6 x+ V
            if let Some(t) = w.t.take() {
4 ~( r8 X- V8 n; A. G7 N                t.join().unwrap();
* M, q. b# ?; W8 ?            }/ j+ T+ p1 X4 e* v* K$ J, B
        }! F% i1 W+ l0 I* s& H" P
    }
7 ^: l4 G1 G8 ~7 A2 f! }8 ^}8 q3 ]; B3 H# E: R6 B$ o; n6 C
/ U- G. a. j5 v8 O
- g0 D; e! d+ @0 R$ ~! {
#[cfg(test)]4 u, c  I4 o- ?- @1 u; |$ q/ E
mod tests {8 }/ k8 f. }7 {
    use super::*;" d6 U& `" b; V2 W( `; U
    #[test]$ [/ T& E( X8 L: S
    fn it_works() {
/ R; I/ W: v# N- v        let p = Pool::new(4);2 V8 Z; R0 t+ j5 L; E. H1 e2 [
        p.execute(|| println!("do new job1"));
& O2 ]3 o( {' Y: x5 _. O; R        p.execute(|| println!("do new job2"));
6 h- U  L. j2 S8 ?        p.execute(|| println!("do new job3"));
6 k" r. c/ G6 r        p.execute(|| println!("do new job4"));
+ d# G$ c- E, ]8 ?( H$ n: O5 F* L    }/ D6 V$ W2 ~4 ?: x/ i1 X* M, w
}9 k, f' V9 }' o# R8 f# J& p8 a
</code></pre>
+ w$ \! b4 s  \1 v5 E
, X* P. h; g: p# M
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2026-4-13 01:37 , Processed in 0.062902 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表