飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 13751|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

7726

主题

7814

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
25508
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

$ }$ k9 L& ]6 d: Y4 h2 A0 T/ R<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
( b5 {3 ?. X7 R: u1 V<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
9 q5 b) U6 t" Q. M0 o9 r<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
5 ]% e0 @# E/ G0 o2 C+ c  E3 e<p>线程池Pool</p>
" ]2 l+ u: G" G1 p8 |<pre><code>pub struct Pool {% j0 Y! T- n4 Z: c$ H6 G
  max_workers: usize, // 定义最大线程数
8 T, F' f# B% `" t* _}
; W5 n# n/ W4 ]' J' t9 ^3 w
6 X: R% r1 N/ T1 `impl Pool {8 T8 b% j  [( Q1 @/ |& E
  fn new(max_workers: usize) -&gt; Pool {}0 L' O. [6 a# c& }6 s: K# K1 `
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}' @; J* m1 `2 e
}
7 {& D: x6 q* ?: w/ `
& L8 I3 H6 O4 q7 o5 N( e7 [</code></pre>
0 |) X, Q# m9 G1 x4 H<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
3 e7 k* i& s- ]. p<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
, z) V1 J% H" L. I可以看作在一个线程里不断执行获取任务并执行的Worker。</p>" k- y" O( q5 o4 U
<pre><code>struct Worker where, Z) ?  G: T+ p+ |0 N0 r1 w
{7 u6 f; }8 |" W0 }
    _id: usize, // worker 编号% Z& ~, U/ T: s" n
}
# N( T3 _4 I" v" L! l) p" H</code></pre>, x6 Q8 U5 {( H% A& l
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>$ H* J6 F1 w/ x- l
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
9 L% L" P. p  H* X! W: i<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>2 ?+ N5 z& b" C: `- Z) K
<p>Pool的完整定义</p>: f" d& v) c1 @% L. f
<pre><code>pub struct Pool {
3 \0 K3 O# s# ]* D7 b& D4 S& e6 g    workers: Vec&lt;Worker&gt;,
$ O% C9 Z: D/ i: Q7 z# ~    max_workers: usize,( E! c2 ~6 ~+ j) A* J# U
    sender: mpsc::Sender&lt;Message&gt;
' G! v' k  |- i5 t. T' s# u}
  ]3 ]% }- a: N! \3 S</code></pre>
, d7 a  `8 S/ S4 u9 z# ]% }<p>该是时候定义我们要发给Worker的消息Message了<br>8 M" t, D7 r, ]  i3 v& \1 y
定义如下的枚举值</p>
5 k, r% q- u3 O1 M) A  w* G<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
  [  I* N$ e+ Venum Message {
( y1 B8 Z+ [; Q    ByeBye,# \! u/ P9 K3 W' w1 X2 ?4 x
    NewJob(Job),
( o9 @  z1 }  I) E! y8 Q# L}
5 q5 ~9 K3 x+ D- }0 h& D</code></pre>
% [6 s6 |: S( t) a1 F. W- a, F0 v2 u<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
3 D5 p9 F" o8 G0 _' j, j<p>只剩下实现Worker和Pool的具体逻辑了。</p>
2 @" {% T# {8 M: _1 P( c- H/ T3 k<p>Worker的实现</p>
# Y# r2 k. w! A% B0 i<pre><code>impl Worker
/ t, O( f5 e8 i( }0 h7 @+ {{1 I5 Z) v1 G' E9 Q5 x0 V
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
( m  b% N7 R- z5 z! ]4 t3 m        let t = thread::spawn( move || {
5 P2 I, |3 e$ q1 D            loop {
1 K8 {3 W' I  P; K3 @  V3 N                let receiver = receiver.lock().unwrap();0 d1 `0 f; j! i
                let message=  receiver.recv().unwrap();0 |' k8 V1 B( f3 d! u; ^, v1 S
                match message {
/ h3 n' z1 c$ b$ I" `                    Message::NewJob(job) =&gt; {
$ h3 ~# ^$ e( P( ]& a( L2 ^                        println!("do job from worker[{}]", id);3 E+ h# M4 {$ @) D
                        job();: E8 {8 P/ V  `5 M5 k
                    },
/ D" c* x. O) H4 }8 n                    Message::ByeBye =&gt; {# n& p4 m  _8 y. J! `$ w0 @) ^
                        println!("ByeBye from worker[{}]", id);
$ {$ o' W- L6 G- m" Q                        break
: R* c9 C% ?( K) {' x$ J                    },
/ P5 \7 d# v( r) `# }* [                }  " B& a# F0 s/ Y  @2 I6 l
            }
4 ]- i8 u5 |0 V- f  F' `        });  [" y6 S& Y) X' e9 X$ z+ ]. d

* g) g! c7 s* R' c% z        Worker {
! f. h! h# C7 [            _id: id,
/ g5 _$ c) A- p  C) r% x4 H            t: Some(t),* n) T7 i& j. k4 p! b6 P
        }
1 w/ ^2 i+ Y$ O6 d( x2 G  s    }5 H/ {! ?- p' j* |, ?. \  d- t
}$ ]4 Q' q$ j2 J, D# R+ g
</code></pre>
( {8 B) ]) M0 w' c( Q, }<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>/ R* G8 {3 f% }) h
但如果写成</p>
+ I1 t6 f* I. Z% r' o4 e0 o" j<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {  A6 p2 y/ p4 c6 E6 o5 w/ f
};
4 N3 @4 Q! L' B9 b% A0 _3 N</code></pre>7 ~  S! F# }( j: w8 R, Z
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
; m* q' C# j& S% n2 k0 I/ b/ frust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
% n) @* @2 [6 U8 ], L0 M& n<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
1 y4 S( v8 e+ ?- ]. P& e; V4 V7 x! L<pre><code>impl Drop for Pool {; N5 a, `2 C! P
    fn drop(&amp;mut self) {9 |7 Q( x7 {2 w( Z2 z
        for _ in 0..self.max_workers {4 e% ^3 U, O  B. N* z3 L7 C% }
            self.sender.send(Message::ByeBye).unwrap();, C0 W! M% m$ ~, l7 O
        }% ]! q) A4 E4 T$ }
        for w in self.workers.iter_mut() {
3 W6 [# K% V1 x" v8 e            if let Some(t) = w.t.take() {
9 z* m2 a; c; E4 q3 U# O% Y                t.join().unwrap();
' U# T* m7 m, g# \) N: i# H$ L            }4 c' C. l9 L! Q. K% g9 Z4 H# Q
        }
. f6 }+ i! s9 C) [  G0 Q    }
: X# u* L4 G# y/ F2 s}0 {9 }6 g) r1 M/ A$ ~

- s" U! u5 |. o. x  q</code></pre>2 F6 V( A1 e& J4 Q1 T6 T  m1 l
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
" X; z1 s. r; Q& T<pre><code>for w in self.workers.iter_mut() {
+ q, w0 B% \+ T& b    if let Some(t) = w.t.take() {
: {  V& {5 C3 T: o" S        self.sender.send(Message::ByeBye).unwrap();! Y, r! \$ _  f1 W
        t.join().unwrap();3 R( J; D4 _3 g/ f
    }
" T# e* i  V$ ?$ N) b0 D  {& X}
3 a6 A0 X' r4 R% o$ s* _0 v
2 h. B' D1 s6 ~3 _: }$ z</code></pre>; l& p6 p% X/ }7 d
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
0 L$ Y2 u2 S4 z: i我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
9 s: l: ]. E: p3 B( e<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>7 p) D& v6 {7 Q7 B5 T1 G' I5 g( z
<ol>. ~. t0 V) C. L8 P' Y( p
<li>t.join 需要持有t的所有权</li>
7 w6 g  ]  z' \' m. L<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>" x8 j3 ~3 K8 Q, q9 i- F! [
</ol>
  Z/ f  P# J: i1 Y: b' c' _+ _<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>( O9 y$ `6 x% X7 p* U& n& E
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
3 k% H" e$ g, ~' i  R' z<pre><code>struct Worker where* F4 X5 X+ Y0 y& ]5 {. c* N
{' h; @, v9 @, D
    _id: usize,
4 `+ W/ j8 ^) u' @* J    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
8 B5 f$ l. Z+ S+ Z% D% b}
$ C( e3 ?7 ]( B4 R$ j</code></pre>
3 X, ^' `. O% z/ c, ?<h1 id="要点总结">要点总结</h1>: K# y5 E1 |+ G7 H
<ul>
6 q+ @$ }0 j- U& e3 B<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>6 Z% N2 |* |) H$ P4 D
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>7 @7 }. Y& b- ~. I
</ul>
6 o# A) r+ e; z% V" w1 O) H5 I<h1 id="完整代码">完整代码</h1>* n/ r5 t! B' c/ E
<pre><code>use std::thread::{self, JoinHandle};3 K, \* X/ f  u: K  q: j
use std::sync::{Arc, mpsc, Mutex};& s2 n$ i4 f% y6 ]1 x1 ~$ _: W

" v+ _. O* y4 d0 ^- ]# b# m- O$ r4 c1 T, g, y+ ]
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
, Z) K/ r' z7 W; ]$ B: G6 Kenum Message {- H: d' r+ J8 t5 |* O
    ByeBye,
- f/ G3 G4 I, M0 y" D) k# A% p6 M    NewJob(Job),' K- P' ]# k% w6 v7 r* F* P
}
( F% ^% `- U" t) h- [
0 N5 x" r+ }& P9 }7 C* }& D4 O, zstruct Worker where0 W' i0 w4 r5 [7 v9 l& E
{: ]( S* b0 N5 C, Z9 D- c5 J: A
    _id: usize,9 X) |* E% K- B- ?; b" u+ C
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,/ u  I$ _! l) c! e3 q) f' r
}! Q8 P4 R! n# e0 V* }/ @' x

! X6 z, B. N/ w+ |6 x/ J5 oimpl Worker
& u3 |! W2 `5 s- W7 }/ j{3 r. C9 W; I5 x5 h9 s+ r9 O
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {5 Y: J! U- ?' _1 k
        let t = thread::spawn( move || {7 x- S, }9 [8 O
            loop {
. \4 D0 E8 b% o& v0 h6 c                let message = receiver.lock().unwrap().recv().unwrap();6 @5 Y: j, ?# _) ?& }! H
                match message {
" H% P$ K3 Y0 b# n2 t( p                    Message::NewJob(job) =&gt; {$ ^* d4 g1 i' Q) k' Y. \
                        println!("do job from worker[{}]", id);/ O6 {# t& y1 f3 k# d5 U* {- x
                        job();
' x* ?9 Z) \/ ?! s% [                    },( O* N' D7 q, {0 \
                    Message::ByeBye =&gt; {
1 l) {3 |, a# \                        println!("ByeBye from worker[{}]", id);
  }; N1 W/ n6 w( Q, i) A/ F5 Z) _                        break
9 _9 I0 j5 l* o9 \! m3 i; p                    },
' T+ }, v4 ?! S                }  1 m8 t: p( N' s2 n; P2 o2 G
            }
- h0 u8 Y, d" Z3 Q9 G0 d/ `8 [! G        });
; D5 u  I  [& }! h& _, d7 |% H
8 i% R: d, x6 q5 `        Worker {
; ~3 c9 s6 W9 P' v% @8 I            _id: id,9 j9 Y5 T9 l8 Q  Y- F8 J
            t: Some(t),- l& x# x" j- W4 g
        }1 ?8 ]4 U  X- f: V' C; ~1 O: ]! \1 B0 t
    }/ G8 d: r7 N5 v, u
}! q9 g2 [+ _% t* x1 G; T

  o3 e2 I! d/ }pub struct Pool {
+ o7 W. E. P' d, i5 a, m  ~    workers: Vec&lt;Worker&gt;,
  R. F6 z  p7 S    max_workers: usize,
' O" P3 @' t4 y7 o    sender: mpsc::Sender&lt;Message&gt;
$ q- x- B) A, D( [: c}4 f/ ~' N! F, S5 I0 T& e

2 T3 z! J8 O- ~0 ^; u- |) Jimpl Pool where {
5 N$ ^4 \7 I0 K# k# x    pub fn new(max_workers: usize) -&gt; Pool {
) u+ M5 L+ V- `, E        if max_workers == 0 {
8 a$ q2 q0 N3 y  i  m6 Z            panic!("max_workers must be greater than zero!")7 J1 r+ F6 ]/ B
        }
% @7 w0 t3 _# L& b        let (tx, rx) = mpsc::channel();
% ^+ l* r, s8 e. A) L' k- E/ @; ]0 X% K  }- f$ ~4 f* j( [
        let mut workers = Vec::with_capacity(max_workers);
6 Y- Q1 T2 l! V, ~        let receiver = Arc::new(Mutex::new(rx));0 r5 z: y3 G! _# d
        for i in 0..max_workers {: v: f3 R3 w  ^( g
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));+ ?! b# \! a2 K! B5 I8 T
        }# H& Y5 Y) H+ z8 C" F" i
* T3 `/ H- G" H
        Pool { workers: workers, max_workers: max_workers, sender: tx }
$ h* H; H0 W0 w* O+ [) z- c    }
" y( {$ u4 v/ [( Z, E   
# n/ \; `6 N6 h3 D5 M  u    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
, J9 y( v$ L' D+ M/ \    {
$ z1 T1 @) a: O* [
3 t* D. S0 f8 L- r" D        let job = Message::NewJob(Box::new(f));
/ B" }  w4 Q  ^' Z  p/ n        self.sender.send(job).unwrap();. P% J* k0 F% ^4 b
    }6 W- l1 A4 j. a
}
% `' A* b3 [. R4 c' N+ J( h3 \& P+ y- C$ e& k0 t  t4 C
impl Drop for Pool {. h7 X8 H8 j8 \& p. J0 A
    fn drop(&amp;mut self) {3 k: i3 {8 ?- W% c7 r% B" b
        for _ in 0..self.max_workers {
! a' O; b$ Z. k& @6 y* o: @! s            self.sender.send(Message::ByeBye).unwrap();
" |9 \3 V5 N1 L; g: {        }
/ {* Z6 y' j: x7 R; s6 S$ @. w0 j        for w in self.workers {
1 q% {8 f5 D8 Y% b            if let Some(t) = w.t.take() {6 W9 c( M. f4 x5 E! x
                t.join().unwrap();
* H7 y1 P3 o; I! R" _$ ]            }  X4 u' ~: y8 e6 i2 {0 V
        }* L( P+ d; Z* u5 z8 s4 o
    }! _: j! W2 u  R& R; b% C1 l# L. s
}
8 Y; c& x" D9 ?; X9 w+ {. l3 N* S8 l5 B- m; R: c
. Y/ w" I4 v7 f# {! s1 c: R; F
#[cfg(test)]- H0 i) Y. b: h# K' `0 }! J9 {
mod tests {7 ]5 G8 W, a0 W6 Y$ P1 M% `; ]
    use super::*;
$ I2 }) q' \1 c0 k2 ?    #[test]
3 x; P% X$ ~2 L4 I+ V% W- Q    fn it_works() {9 b1 U; E. _1 l) v; a
        let p = Pool::new(4);7 f' A) J7 s4 \. u
        p.execute(|| println!("do new job1"));" r. v6 F3 p: G2 ]! R% i" g2 o1 d
        p.execute(|| println!("do new job2"));
% z3 l6 P- J# l2 K5 u. ]2 r        p.execute(|| println!("do new job3"));
1 R: v/ o2 Z9 C( H; a& Y$ W. `/ a0 V        p.execute(|| println!("do new job4"));
+ ]. U, z$ a" X: w3 w8 Q    }5 w- X9 N0 y5 o5 a: U
}
, O8 G( a7 J. v: Q0 o9 L/ g! B  B" I</code></pre>! R6 H, ?* `; Y7 w9 W
5 h) F1 |" ^" g4 \( V
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-11-1 06:11 , Processed in 0.082325 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表