飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 17489|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8822

主题

8910

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
28796
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

( j1 s+ ^' n1 @0 t6 V! Q<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
% J1 S4 W! i3 `1 o4 x7 k9 c3 x<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
3 d* A9 t, _" {; `7 t<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>/ u7 E6 r' m7 u; A  G# {9 L. q5 g
<p>线程池Pool</p>/ E6 \8 Y* l% b; h1 N* N
<pre><code>pub struct Pool {! u6 h7 v- @. f+ Z
  max_workers: usize, // 定义最大线程数
; \% H2 ?9 X$ @& l  I; I}
' J# m# u! }: |! V/ e
9 ^, s& A3 R' `1 J/ n1 |1 ?impl Pool {' T, b* \: X# [. l5 O4 M
  fn new(max_workers: usize) -&gt; Pool {}
5 t4 N4 Y1 }6 @) g, {/ E  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}8 v$ C. o) d9 {" F" U
}# X0 \+ f$ b- ^/ W
  i! b1 T" a# J5 S9 k; C6 U0 c" B& G
</code></pre>
" R9 I" |0 Z) M4 l<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>. c7 v% R: w/ y, i/ ~& }
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br># @% A9 N* [! I. D# H: U0 O
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>* E0 e/ h' m+ d8 N. z* |7 n4 ]. P7 P
<pre><code>struct Worker where6 L5 F. I. Q; ?) o8 m* v
{1 A3 _* _9 A& F- ~# ?$ D: U
    _id: usize, // worker 编号& }2 Q' v* T+ j# i
}
! i) c) b. C- Y9 g</code></pre>
" C& B3 e( Z7 e2 {1 Z* Z3 H/ Q<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
8 y! R9 ]  t2 [0 D7 D把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
9 ^2 C; X3 A9 {5 k, H<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>* o* m( W- x" w  E* I0 D9 H
<p>Pool的完整定义</p>3 A# j1 t) o6 G) s
<pre><code>pub struct Pool {
' I" U1 z& c7 a- w% r3 J  \    workers: Vec&lt;Worker&gt;,
# P7 r5 j# k/ o0 y    max_workers: usize,! |; n4 f4 X9 |2 a. G2 Z" o9 c
    sender: mpsc::Sender&lt;Message&gt;: z* S' u# J, j! u  e( L  K7 U- c
}* m: W$ }+ M! [' H
</code></pre>
7 t1 S. }& O4 F3 Y<p>该是时候定义我们要发给Worker的消息Message了<br>
& `4 m0 D# i. F$ \8 s定义如下的枚举值</p>
1 ?: G; M; b' M; n+ v/ H<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
4 B. b+ `) F' |% H# N( j' @+ Fenum Message {
: @4 K, L$ B. k* n, B: [4 m) h, o    ByeBye,8 Y0 @# D/ w6 D' N. _1 d- q
    NewJob(Job),8 y( k! H8 `; w
}# h" D! v+ v. ]* l% Q
</code></pre>
6 [8 Z9 M3 v* ?4 p3 [+ Y/ k<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>" n) a: D5 i. G9 j+ h
<p>只剩下实现Worker和Pool的具体逻辑了。</p>2 H4 o$ i1 Q  C! |* Y
<p>Worker的实现</p>& ^/ `6 e8 z4 t) l9 h
<pre><code>impl Worker4 @- Q: i% p" S: p4 D7 a! q, J; K$ U! B1 R
{2 A, ?& t2 T, |) Z; L' |
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {8 n( u- ]5 m+ a3 b9 n/ v" F
        let t = thread::spawn( move || {
$ V, {0 k; [" x            loop {
7 k. j# `% h. B3 u* v- R                let receiver = receiver.lock().unwrap();
, R2 D/ D! H2 O# I3 m1 C                let message=  receiver.recv().unwrap();
# Q( h7 d2 c7 F1 q                match message {
- T! l9 B5 G' m' H/ r& {                    Message::NewJob(job) =&gt; {
9 h4 P+ V. t! y' K4 w                        println!("do job from worker[{}]", id);. A7 B) V, k8 m9 y' f6 x
                        job();0 \. I( b4 f5 Q) Y+ S7 @
                    },
, P  p8 S! k% s* R' o* s" }* u                    Message::ByeBye =&gt; {
# K4 U  g* a% f- O+ }                        println!("ByeBye from worker[{}]", id);0 \' S# M' U5 v0 T4 y1 H# h
                        break
4 q8 u9 g! P' J8 o                    },/ h' T' H6 H  j1 i
                }  
; V) c! a/ b4 z8 R! X5 V            }4 W% Y4 a! w1 X' y& v
        });- k8 |& R# G$ h) i, k* T- z7 a/ ?

$ W! y: F- G7 s        Worker {
4 S% j8 W+ p) B: Z  d1 `0 g$ g            _id: id,& U' i8 e# ~: c# h7 Y
            t: Some(t),: c. H( |+ L4 o- [! U4 _9 A
        }
4 p+ a2 M7 X. h* t& T) [9 s7 y    }
. ?$ y% C2 t) O1 B0 E}$ o; g. }) Q, n1 _2 P4 H
</code></pre>
; u* A- W# j% W% @- @( j* p<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>' y% A1 ]0 J3 ~5 g
但如果写成</p>
! ?# \  v- M7 }' q6 A( n4 L<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {& k" p5 O. Y3 _3 V8 |+ A1 C! v
};# A* Y+ f  k; T: v- Q$ ~
</code></pre>/ T4 `9 D0 k& m
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
* @) Y) M" F' L8 {rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
# `) V+ w( h% K5 c9 ]# d6 ]<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
7 z+ M! u' t$ N+ W9 M; T<pre><code>impl Drop for Pool {! ^* {# I3 x3 ~! p6 f  Y/ ^- R
    fn drop(&amp;mut self) {* K  R4 W: J: Z& H) V6 {
        for _ in 0..self.max_workers {
* e" O3 L9 s- U            self.sender.send(Message::ByeBye).unwrap();, D) m' l. Z/ _9 h
        }" h% j' _, A9 ?
        for w in self.workers.iter_mut() {
5 i( w9 E" K7 ]; t# H            if let Some(t) = w.t.take() {/ k! `1 b. h$ u; H" M- W# |% e
                t.join().unwrap();, ]$ q) i$ g6 |8 M0 |: t; W
            }$ s, g! e; g- c$ m) v( c
        }
! m" {" J* [5 e2 G" W. m    }
5 Q3 ]; X& K* X4 M' z. K}8 h  D5 H* w7 c: b* m
6 M6 T4 t/ \0 K! x2 p) m. {
</code></pre>' T3 S: |# R! I2 ^2 Q0 i8 o5 A
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
( @* f3 v, K0 W) |: m<pre><code>for w in self.workers.iter_mut() {. ~5 Y8 e2 N! U. \$ H' V/ M
    if let Some(t) = w.t.take() {
8 Z( K9 w2 Y# y0 {: c        self.sender.send(Message::ByeBye).unwrap();
5 O. T0 n: e6 V7 ]: X" g/ [9 Q/ y        t.join().unwrap();
) ]$ d' v# ]( q4 L. j6 o( d- J    }
- o, P# A9 T4 T+ {! G}8 B. ]6 g; ^- i) j
% C% `3 p0 @$ i
</code></pre>; M' d7 X1 N6 s% x+ d) I
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>% u( ?7 c( B7 C
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>- Z7 W6 }& O: d# L4 K( `, o; Z0 O
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>5 p0 P! j- F) h2 w' {: K# l* ~4 G1 P; d
<ol>. s4 |- O, V* \$ h/ ^1 c3 M
<li>t.join 需要持有t的所有权</li>; y6 S" f$ `4 Q4 p9 S  Y
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>1 b6 v0 r4 B2 O1 T: r
</ol># w5 v1 m& q2 E% ^4 }% H- ~2 C8 v4 v
<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>( ]8 B3 l* U# y$ t/ F
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>5 D5 N8 S8 p( D: r8 z0 s& d
<pre><code>struct Worker where& v) e0 U1 ?$ Q! y) x
{) R: z# c. n- ^/ i
    _id: usize,! u: v# @3 p+ J3 x
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
! q3 W: E7 [; D+ |- s8 }}
* s. B9 g9 x  {+ \8 T</code></pre>
3 E+ `8 w( w* J0 F4 R* Z<h1 id="要点总结">要点总结</h1>
8 J, a2 l; N. |8 A) {7 v<ul>* x' [  ]$ g: H% p' N0 L
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>, M$ [2 K7 z- }% W
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
* n. u  h, |0 ~</ul>
/ `  g9 A$ Y4 K4 [1 i% B6 `% M<h1 id="完整代码">完整代码</h1>" Z8 F8 j. Y  B$ ]! F& Y$ y. G
<pre><code>use std::thread::{self, JoinHandle};
+ w: i% t& j4 d1 H( Ouse std::sync::{Arc, mpsc, Mutex};
4 S( ?5 a$ |. N3 x9 {5 p  U2 m% D6 E& R7 P8 ~" j+ w% S6 w
& k; l, R, \7 m) Q
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
& z) v. U/ \8 w  z7 B' Benum Message {3 f( f0 v) v2 `/ y
    ByeBye,
/ g; k7 k. ?* _4 m    NewJob(Job),
" e1 x1 i  s! ]! |& R}, ]! X: S. q" X" i( A' q/ d- f7 ?
3 r0 K& g% w( |" f
struct Worker where
* Z) P; m0 J/ v- m{
' O) z% {* W3 h    _id: usize,% t# Y6 e. O) R5 P5 f4 G
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
6 i5 t, Q" m6 L6 q}
7 r  ]0 ^# p  G* L7 q& }9 p* z5 c1 J5 H- U. E
impl Worker2 h7 b; K2 d" M2 l! V8 ~
{0 ^6 U; L& S" u$ x
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
, i% X# e4 P7 E5 r        let t = thread::spawn( move || {! @* s8 @, K; K# {6 {* X$ h7 r
            loop {
* G- f: X- h' G7 E4 T6 ?  K                let message = receiver.lock().unwrap().recv().unwrap();4 x( z5 o" [* x2 i3 V$ S
                match message {
/ Y; |5 ^/ ]) {- S6 D                    Message::NewJob(job) =&gt; {
1 |& ?+ O, @. e& `0 y+ h* S                        println!("do job from worker[{}]", id);
4 T6 A) q; p& i8 f: d6 W                        job();
8 P# \0 i! y7 A* l0 z$ K                    },% K# S* K3 C& C. S) x
                    Message::ByeBye =&gt; {4 p0 p, g4 I+ k7 X
                        println!("ByeBye from worker[{}]", id);( h! [( s' {3 D2 }+ j  c& `- w: I
                        break( n, z7 W! M# [
                    },. |- @% n! p# h7 H8 f
                }  
& _' c# _7 k- Q+ t            }+ I4 X6 k: n0 L9 g4 @9 i- U9 ]9 y
        });6 V/ U- j7 y9 E  w& Y- z
2 Y/ M. u* U' }& m+ ^! t  d
        Worker {* o0 M2 N( `' D- Y, c5 u: ?
            _id: id,
' j! D, e9 [/ O+ \3 b1 Z+ d/ u! U            t: Some(t),0 A4 o8 t8 e8 y' j
        }' s' r, s6 i2 \% y
    }1 [; k& Y' b7 d
}, v& R% g8 L6 d5 ^
, L0 M' j/ G" G4 p( ]/ }6 S& z
pub struct Pool {
6 {$ z. M$ ~' E) X- \9 A) K4 D    workers: Vec&lt;Worker&gt;,
8 U; @2 y: U; S" I2 l# c    max_workers: usize,
1 x# B+ Q% _1 X$ K- C% q" }2 E! D    sender: mpsc::Sender&lt;Message&gt;: }  N9 u" d: t& W
}: I2 U5 K$ z+ c' j- T0 i/ G. H  {
- S& q: m, o1 @5 b3 v
impl Pool where {
6 _! h2 u6 \; z* c. M: u5 A    pub fn new(max_workers: usize) -&gt; Pool {
0 a& O$ h7 z& n7 G; }8 K, Z        if max_workers == 0 {
3 O3 T* j- a7 f9 s) M! f4 i  l            panic!("max_workers must be greater than zero!")/ o, I; ]! g( V- X6 g+ R) y" f. L
        }% v+ |. ?$ p' k7 s) b
        let (tx, rx) = mpsc::channel();7 E0 {; ?- X0 p! ?  d4 u

+ x. P( Q1 b5 K% ^+ ?2 Y        let mut workers = Vec::with_capacity(max_workers);1 Z% {+ E) k. ]
        let receiver = Arc::new(Mutex::new(rx));
6 i5 Z7 B8 _2 f# s( D        for i in 0..max_workers {) I  P6 n( a! K+ P) [+ {
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));6 h/ E2 Y% Y' k4 c
        }/ E" m8 B4 K3 h3 A; c  S) B9 k2 \5 ]
0 K7 j, \7 b. |7 F
        Pool { workers: workers, max_workers: max_workers, sender: tx }% C" A9 T$ y* j: }4 g
    }
+ u' y0 J* b: F    $ {1 C: S' m8 t+ C8 }2 {3 }1 V
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send& r3 v: A) i4 s
    {% J; m1 d$ l8 U& C7 X

8 P+ q+ W) b. F4 o% H        let job = Message::NewJob(Box::new(f));& E/ R6 m7 k8 B4 s5 i. Q
        self.sender.send(job).unwrap();
* h4 K3 h1 U( ?* I& D    }
9 \5 M( \) c7 g' `}
* g8 n' Q, m) A6 Y" e
1 Z. ?% v" X2 s2 Kimpl Drop for Pool {' H4 K4 w& u. d7 s2 R/ q' i
    fn drop(&amp;mut self) {
6 w9 B% @0 F  ~$ x* r7 v4 C        for _ in 0..self.max_workers {
. _4 a8 P$ S2 }/ I( n            self.sender.send(Message::ByeBye).unwrap();
- Y4 L4 l) t7 k, K        }1 L5 J) d' r4 f5 c. ?
        for w in self.workers {" D; l. B9 S  d' \: F
            if let Some(t) = w.t.take() {* _9 }" z+ j5 s0 D. V" ]8 M# l8 Q; m
                t.join().unwrap();
% G4 Z# F9 N$ @' z! X# U            }- Y8 M& i, {8 G" E& S% S
        }
  q$ F" k/ R1 M( i' }: i5 ^    }% o) e% q* i# e$ ^2 B
}% T- O. W2 m2 b+ S) U- E- }

( f/ ]8 S2 x* \# b, Y8 b8 E( O3 X# q4 f6 v, W1 E$ q, u8 H) C
#[cfg(test)]. C% w3 L) G* l1 M/ w9 b* u  W2 I
mod tests {
- K0 I7 C; M1 Y3 g    use super::*;
6 O% m' J7 b7 h8 a: t/ P    #[test]; V$ Y/ i0 H% K2 @* Y+ B
    fn it_works() {
3 M7 ^: x* A& k  r* C7 M/ |2 g        let p = Pool::new(4);
4 c6 p$ O- Z: `2 |! Y) U        p.execute(|| println!("do new job1"));
! Q3 _4 U* B" W8 @        p.execute(|| println!("do new job2"));) x/ l9 k$ J) q
        p.execute(|| println!("do new job3"));8 y; @: M$ i- ]- r
        p.execute(|| println!("do new job4"));
4 b$ O. J& l, Z. b: J/ t  A    }
- f' P- u' M, I, B  F  z}
8 a1 `: ]2 ?& Z, I2 u</code></pre>2 s; J( I6 R. G" G

" ]0 G: L/ |  Y4 K& r3 s
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2026-5-28 03:55 , Processed in 0.344342 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表