飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 10693|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

7100

主题

7188

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
23630
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

# Y. s0 s- G$ ?% R# F. D* k( V<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
# b+ C9 A; }0 z3 E<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>8 Z5 b2 l% Y; [; I
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p># k0 p$ y5 c- g  b! v
<p>线程池Pool</p>: N5 A2 ^0 j% ?+ [, N, M
<pre><code>pub struct Pool {
' R' @, T6 C, O) Q/ g) Z1 T  max_workers: usize, // 定义最大线程数/ o, z" o' M! Y9 h; V, k3 B% A1 ~! ~
}- O- x% P3 _0 S4 d* {! B
! h" q7 \6 T2 d0 x# r7 l: F- m
impl Pool {
/ _4 \- p- s, s  fn new(max_workers: usize) -&gt; Pool {}
7 J/ G& h8 B" i2 z7 B1 U  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
6 I6 |' l+ T) ~* u% E}
6 U5 R" t) D, Y9 e5 x9 b! k- `/ _( s" J* j& Y# K5 A6 }" J+ ~
</code></pre>7 ~7 M. Z% c- f2 o0 D* ^3 u
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>! x$ T$ _! _& j# o% _$ m3 b
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>  N1 c; o( I: U2 _, M: @
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>+ \' n* X; z0 J: _( R
<pre><code>struct Worker where
; |! V$ M0 ]% [{) n/ {/ i5 R: f' ]4 A6 Q
    _id: usize, // worker 编号- e! u( _4 J' E; `2 L
}8 {$ B9 p0 K. s
</code></pre>1 f2 b2 s$ O& {6 O
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>4 W- |" F) f0 h9 _  a) i
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>+ b7 i/ g6 ]+ g& x6 G* A
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p># R% g& }) T' K: O* r) d7 @
<p>Pool的完整定义</p>
0 D  T+ P1 C& q! D3 z<pre><code>pub struct Pool {$ q% J) c1 w7 ~/ T& }" B" M
    workers: Vec&lt;Worker&gt;,- A7 M$ ~! }& P+ ~5 M( i$ e
    max_workers: usize,
9 w1 Z, N! F6 I( W# ?: a    sender: mpsc::Sender&lt;Message&gt;
9 w# \' K% N. F% y+ M; ]}; F% q9 [" S! ~8 u  i# ^% f
</code></pre>* f$ M* G  Y! M+ x$ t+ t
<p>该是时候定义我们要发给Worker的消息Message了<br>
5 m2 W# ?  |# J1 P$ }0 t1 U  O定义如下的枚举值</p>
1 Z8 {' ~1 j9 `) m. D/ B<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;3 j. w4 r+ y, U6 }3 t2 T
enum Message {+ }2 ]6 R4 ^3 G2 q+ ~
    ByeBye,
) X3 C3 ~; g  f$ X2 A    NewJob(Job),6 m, K3 m& T* c) [0 Z- o. f
}
3 `3 L2 B/ c/ S2 w3 u0 N3 T</code></pre>
3 \! e1 t: a8 h! F3 l<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>+ P* t& u* }  M; }" @
<p>只剩下实现Worker和Pool的具体逻辑了。</p>* F( u* K# z2 b  p0 J+ d8 l
<p>Worker的实现</p>
" ]+ b6 t8 N/ P# L4 H! j/ @0 j9 X% Y<pre><code>impl Worker
) w3 _2 J  L+ R: Y( P* E9 ~{6 \. Q* y" I2 T, c4 ^
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {; z* h! p1 X" M3 [, B! n
        let t = thread::spawn( move || {9 N* k5 H+ ~: V) F* M
            loop {, q% {) S% M! v: ~( J
                let receiver = receiver.lock().unwrap();
* s4 m, \* K1 H0 E# g4 V# y; V                let message=  receiver.recv().unwrap();' [9 r4 S8 i0 b: L' n
                match message {
2 ]; x! D6 s, a0 q                    Message::NewJob(job) =&gt; {! d3 d" x) X* n" `: p0 o
                        println!("do job from worker[{}]", id);; }/ v- Q4 e2 P, O4 F7 T: t
                        job();
6 |' Y# J. c3 f5 W% P& W                    },
2 r8 @6 s. T' _8 ]4 ?. Q! I: [& q) i) K                    Message::ByeBye =&gt; {4 G% c5 P/ D' @! e  w2 {- v
                        println!("ByeBye from worker[{}]", id);/ L8 H* \9 O" R
                        break5 B" ~# Z5 X- _
                    }," V7 L, Z1 A5 L
                }  
, L7 m! E( q' i& x' B2 c            }. ^: e& V& w* q* u9 D# h  W0 N
        });; G8 X! R& w* ]7 e! H

% r. u4 z; i5 f" E% W! {        Worker {
' z: w5 |( x) g* C3 y            _id: id,
/ P; {6 X- H, Z. g" u: k            t: Some(t)," c2 Q( C/ T. Y  p, N6 f  j0 C
        }  c+ U: |: K( \5 Q3 v
    }
/ g  S$ P  n' X- J, \- D. ~}
6 H7 Q( g2 e/ b6 d1 d, u</code></pre>
9 F1 ~  s- [3 \4 g% |) S<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
. {' i( ~1 @! {# N% T但如果写成</p>
& K# g# Q+ f/ ~) y3 d<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
; F' u0 V5 p. Y" ]1 r! c) z};) U) _1 |1 Z* m  f
</code></pre>
$ b! Q- q# t+ \. \) h  v<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>. ]- l: l5 ?5 s. T" W) n% n
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>5 {. l  m' B, f
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>4 D- g% o, W- p7 _. W
<pre><code>impl Drop for Pool {2 c# \9 Z9 r  e/ s9 }6 W2 M9 e
    fn drop(&amp;mut self) {9 q! ^" @4 t+ Z3 s0 D
        for _ in 0..self.max_workers {
  ^& N  Q& t" r+ z& F. U5 p            self.sender.send(Message::ByeBye).unwrap();
3 s3 H5 U$ L0 P9 w        }3 u1 W8 i& f. ?5 Y+ n9 T# w( x: b
        for w in self.workers.iter_mut() {
) b$ {2 L! S* `& |            if let Some(t) = w.t.take() {
3 F+ d) L% C3 c9 [                t.join().unwrap();
! S( i* ~/ j. Y6 X4 {# M            }2 T* i0 l- w" v2 d! H0 }5 D& Q2 Y
        }" c  d% u+ P8 s
    }. }5 ^/ X2 z. w8 o" |7 E$ G
}5 a) L$ h  c7 J/ W
2 X' ]( ?7 q+ ^; {9 z! F7 n
</code></pre>" Y9 h! _! s' A" @4 B( D
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>. Q! l9 h+ L2 ~
<pre><code>for w in self.workers.iter_mut() {( }9 J9 P, R, v- e) n0 K
    if let Some(t) = w.t.take() {
. Q* @* T. U' e4 S        self.sender.send(Message::ByeBye).unwrap();
6 Y4 j4 S: \" H        t.join().unwrap();% M% r2 q3 T7 ~- v
    }
8 ~4 _3 E# O( Z& e4 G}
1 L' `: v1 z! Y9 R" S" h  p4 H6 l  U% q0 j3 Q
</code></pre>
$ }: _1 L# ^) u" k" d' R( ]+ J<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>6 E" {! e; c# u: X* |/ N
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>5 c- Y, l$ R  p( ^2 z: B& Q
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>4 S; o  D6 R4 i3 Y* @9 w
<ol>" P, P) p' t& y7 C, @& K
<li>t.join 需要持有t的所有权</li>+ u/ a( @( m% V& T+ H
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>8 q  M! w, G3 I
</ol>
" O& v- d* I3 `5 n: z<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
% ^7 y' I/ n1 a1 t; X2 x4 Q换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
2 V1 o9 a: @* u* K9 g: j<pre><code>struct Worker where+ V7 ~+ `# j) b; V
{- [1 [) J* ~/ o  V3 Q3 j0 K- _
    _id: usize,
: c1 Q& H3 l8 _) b    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
! A! }) N9 J. s& \}" ~6 Z0 W) ~' h. U7 _( P! x& `
</code></pre>
: U. n, Y  l' w1 k6 w<h1 id="要点总结">要点总结</h1>
% S; A' ^+ J' V4 D3 b<ul>2 ^  W7 Z4 [- X+ e$ q
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li># z6 U; z2 s/ Q! k% ^* d  B
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
1 I6 b; ?, |% C, o  h1 j/ v/ E& F5 i</ul>
/ _7 V& o* A) M3 _8 R; i<h1 id="完整代码">完整代码</h1>
# ~2 g9 A1 y8 u" F, t<pre><code>use std::thread::{self, JoinHandle};
# L. \* c* ^; Suse std::sync::{Arc, mpsc, Mutex};! n% ~+ b# J; [& x0 T$ W" z9 H6 E

9 ?: G. C: t- d  L& V
- ~( R4 F% O, }type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
/ M2 l2 @$ L8 u: g* I1 H: Menum Message {) y' C* Y' q* V  Q% A; w
    ByeBye,
  ~- M1 [. k& s5 h5 h: L    NewJob(Job),
& h3 s; }8 F- y0 G- ^2 L5 n}9 d& ]! |" r% i0 r" ?. K% ]
: H7 G% R& e- H* M0 m
struct Worker where, m8 P$ b1 D/ l  ~! Z& O; q7 _
{' ~+ e1 O7 f1 G
    _id: usize,
9 \/ U6 g$ X# v. K; C    t: Option&lt;JoinHandle&lt;()&gt;&gt;,0 d8 x+ s2 q7 A* e
}
$ W. O, V" g  ?) H( K" T5 a0 {" K. c. c; G) ]
impl Worker) U1 W% v  X$ j% W
{, i" [1 i0 _, d" a- f' R* K1 @, @
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
3 U2 g4 |" N: R! F3 p        let t = thread::spawn( move || {4 f1 \: x$ [8 z; ?) Z) l
            loop {7 ]: w" v7 S! T! s' i) _
                let message = receiver.lock().unwrap().recv().unwrap();3 Q2 b9 s+ |, `; J4 U. t
                match message {
( S: M8 W: e9 e, Y4 l( r4 {7 I0 A1 x                    Message::NewJob(job) =&gt; {7 G6 j) @, \3 _6 @' t, [( k
                        println!("do job from worker[{}]", id);
, b' z# q. O, s1 a  y                        job();
5 H2 D8 [' h6 e# b( c! L                    },
( e# D) ?( }, Y' R                    Message::ByeBye =&gt; {+ H0 S# }+ J# S! b+ M
                        println!("ByeBye from worker[{}]", id);7 ~8 B. ]$ m1 P# |
                        break, p5 w% ]& G: W# P5 ]! I6 J
                    },
% Y: s& c! L" n& s# |& R                }  
& W* ^( s# q+ x: x            }
! F5 d. T0 R7 g        });7 u  g7 C" J' I0 n9 ~

% a* i3 Y2 Z: ]9 L/ Y6 k        Worker {. I+ _. l; q% `" s2 Z0 J
            _id: id,% h3 |1 R% a. D6 ]8 L# i
            t: Some(t),1 W3 f7 }6 ~4 C3 s! i, N" W
        }
3 n* J2 c6 k4 f    }
3 D8 |( w  B, L4 i7 r0 w}  N, h! a) c4 F
! I; U  _+ T8 p
pub struct Pool {
5 `; p  C- \% w/ z8 t    workers: Vec&lt;Worker&gt;,
# ]( N$ v0 G# S9 B( ?8 h    max_workers: usize,# [4 K5 Z) X. _, y; j& W8 t5 q7 Y
    sender: mpsc::Sender&lt;Message&gt;
5 E0 y( z" y/ n% X! \}: a7 c7 `! {! E: S$ r* n# ^

! f8 v4 V, O+ N9 T3 Gimpl Pool where {* V+ u" O7 y4 j, ?
    pub fn new(max_workers: usize) -&gt; Pool {
3 Y* w( x. S+ L: W- G        if max_workers == 0 {( v) ~3 {+ s- t) E' N; n* [
            panic!("max_workers must be greater than zero!")
9 T; v+ o  d. U+ i( B        }6 D/ J! y$ I- x2 J2 ^
        let (tx, rx) = mpsc::channel();; \, v% |& s1 Y- m' e( m9 r
6 x: @5 o2 D- d  l6 r
        let mut workers = Vec::with_capacity(max_workers);+ f+ U8 c2 t) `; c) K2 f
        let receiver = Arc::new(Mutex::new(rx));
, y6 O- a8 w9 m3 C        for i in 0..max_workers {5 [' b6 y/ T: `! n2 h' w
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
9 M; c" `( ]7 y5 G        }
/ y% ^3 d, o8 Q0 Z8 v4 j9 m1 o
. j9 D- F9 B# a        Pool { workers: workers, max_workers: max_workers, sender: tx }
( {' E* N$ {: Q# Q& W0 b    }
* G: ?+ \1 o! O; v    / y4 r* u5 d' t/ G/ e. ^
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send, D# `: A- i- e
    {% g" r' Y  x$ Q! A
: _6 G1 m: _8 w1 g% l
        let job = Message::NewJob(Box::new(f));1 a+ m6 [0 {" o
        self.sender.send(job).unwrap();% Z+ l- j' E9 h3 N
    }
2 Q) i3 v% \$ L& y8 e& Q+ R  R}
" D  O5 Q: x0 n; K7 n$ C. Y( o2 |. Z9 S' G: b
impl Drop for Pool {# B4 C5 ]& @3 s
    fn drop(&amp;mut self) {
0 {) u3 \& a% P& }5 u        for _ in 0..self.max_workers {% o" S. K" a3 Q3 \1 D; i5 [8 |9 L
            self.sender.send(Message::ByeBye).unwrap();$ }4 _1 g0 W4 T9 [, C
        }
3 }0 [0 r, {4 q3 X& U. x- Y        for w in self.workers {
! l( H8 @! N! i( q  C            if let Some(t) = w.t.take() {* J; }/ f8 I. D; H- }9 {' {
                t.join().unwrap();/ i( q. z- k, s) X( `
            }
$ X" @, o/ x3 M" E7 n: d$ Q        }( Q& s8 S- @( T* W# v
    }
4 d. b: ?+ n9 Y}
' q! K7 q$ }& j5 T- j! Q
( U; G4 }# ^( b" l. E/ ?5 g: y, T) t2 d. b% N
#[cfg(test)]( _: w! g6 ]3 C8 m3 ]& X
mod tests {
" C, k$ a7 w; m$ m- E    use super::*;$ \; ^9 b( ~. m
    #[test]
$ I; i: n6 F! S) X" k    fn it_works() {4 }0 `* Z. }% V- W) s
        let p = Pool::new(4);
4 _! O& H: h6 T7 _2 f        p.execute(|| println!("do new job1"));
* Z$ B; y5 D3 K$ x) U6 r* J        p.execute(|| println!("do new job2"));
8 f& W/ N& h+ Z6 q7 z* K        p.execute(|| println!("do new job3"));
! z: @' d# v* a3 Y) J  j  y        p.execute(|| println!("do new job4"));
( Z$ k7 x- j  E+ P    }" q" K3 [8 F. k( K, S6 t, b
}
8 t- G" c( x! l) Q  Z7 k& F</code></pre>
( Z# e5 I. ^& j7 n
" n1 R; E" m. b: F
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-7-30 18:33 , Processed in 0.078021 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表