飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 15441|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8242

主题

8330

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
27056
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

5 }4 E; a7 L" v) x6 _* I<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
8 P' w6 {& \3 @8 Y! i<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
+ u( |' w& m& d/ Q2 Q( S' P<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
9 v  @  `( c4 O- w+ @) _<p>线程池Pool</p>3 ^0 k1 R# s+ e0 k: I
<pre><code>pub struct Pool {
+ t4 |* F5 x, ~- G! Z4 Z  max_workers: usize, // 定义最大线程数
& c" I* V' [4 Z+ C( W' g}
- ?! b0 ?2 L. I) D' E( f
$ C, ]& X- X# {; P2 R/ simpl Pool {
( H! \" w0 o  B/ s7 x1 R) S/ Z$ P  fn new(max_workers: usize) -&gt; Pool {}
8 t2 m* z4 M2 ?  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
8 G- t* H1 q3 r3 N}6 h8 h6 |: x$ h5 f
* [2 r! `$ J' Y% J6 k* e  l" {9 j
</code></pre>' t) T* U! h$ ]& C- P4 ^, m; o
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
5 P) I8 I( y& I( O8 K<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
! w  i3 t6 d. W8 `" L可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
. [. T) A* ], C  ~: Y<pre><code>struct Worker where* d5 [2 b0 I0 K7 N2 Y8 F! p+ {
{
# s1 R, f+ Z1 H' T" T+ M; {" h* t% t    _id: usize, // worker 编号
' O4 q  g' ~( T) l}
: q" {  R" T1 C+ T9 \/ W, q3 Z</code></pre>
, g4 i0 Q* A: T8 {5 H  I8 i' u<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
% ^( g  \2 T/ N- f* M把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
4 H+ ^" `) J. q6 X: L$ N) Y( \<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
2 M5 r3 z+ }' `- _/ [& M  H! H0 V<p>Pool的完整定义</p>7 }: R4 j( h; R4 c
<pre><code>pub struct Pool {8 q, e; k9 Z7 l' u, u$ ]
    workers: Vec&lt;Worker&gt;,
( Y. c/ [; j6 J0 s# R# }% u, n& A    max_workers: usize,
6 U5 M0 _4 |3 ?  }    sender: mpsc::Sender&lt;Message&gt;
# @+ X$ [1 A8 ]9 B; m% E4 b% ~( k}
( L7 w) |- m5 ?</code></pre>6 E! A1 m% ?5 D1 `3 P
<p>该是时候定义我们要发给Worker的消息Message了<br>9 X' Z$ H6 S) m& [
定义如下的枚举值</p># S5 o$ X9 Z  O; z0 {3 W. _0 I
<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;( l; l2 e9 G7 w. d0 G' Z& D
enum Message {0 p; o- n: h9 |. B/ }
    ByeBye,
$ u+ D2 E" Z; q, O# j    NewJob(Job),9 {7 H7 T0 s$ }5 N# l. B0 M7 p' Q
}  ^& N0 ^' _& k/ N5 W
</code></pre>; p. T, r* b% Y2 n/ \  a
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
# ?) s& h5 @* O<p>只剩下实现Worker和Pool的具体逻辑了。</p>' g. B; {0 U6 e8 ?
<p>Worker的实现</p>5 I; n3 Z9 w* x0 }8 |
<pre><code>impl Worker
; C! }6 _+ `- I, |. W% J- B{/ N5 z: d5 j# s! Y7 e
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
+ F* H7 D8 `$ ]2 w& C$ X4 q        let t = thread::spawn( move || {
% o* b+ T8 ^5 @! A+ }            loop {! m8 }/ |+ R' c  j+ O
                let receiver = receiver.lock().unwrap();
) ?2 z; Y% i% V1 r. F0 j) Z                let message=  receiver.recv().unwrap();  {* v5 V- z, v0 {* s
                match message {" i+ ^& S, G8 N( M9 t
                    Message::NewJob(job) =&gt; {+ S7 \! i1 K# |" `* ?# b
                        println!("do job from worker[{}]", id);
- `( t) l6 Z$ J$ V% X7 |' n1 i, }                        job();
6 L7 \/ k- b1 C8 T1 U( t                    },' y! E: K7 n( a3 g' [& W( t
                    Message::ByeBye =&gt; {
" q, P0 b- b* x# i1 }4 s                        println!("ByeBye from worker[{}]", id);! A5 `. Y1 h% j* h; w6 Y
                        break' I+ \" s) n0 p! s7 v1 s3 h0 l. s
                    },. d8 s' Q4 n" _/ f6 }4 O
                }  
* \( i/ c! F# B9 Z& [0 v- m            }# L& a3 p/ P- s+ V, C$ o
        });
8 I5 N( t7 Y+ e6 S$ M9 k( g, q6 v
6 }( j4 g4 f- @4 A+ W/ J        Worker {
; }5 A# C6 E' ?, w            _id: id,, G+ V& \. v" v$ j6 U$ H
            t: Some(t),
" f$ q. ~! c+ f/ z: U        }
; _+ A' U0 J4 c; n: Q    }" A1 n/ X! U2 `9 j6 l8 ~2 x
}
/ i4 S& f( H" C9 e" B9 l' w</code></pre>4 K1 D# {( A0 U4 u7 Y$ @
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
- K0 y9 \/ p3 }: O但如果写成</p>
" }! u( U* Q" Y' b9 d' @<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {& y' E1 p* B8 ]1 s% p4 x
};5 w* V( @3 C1 K* [# a2 H+ j
</code></pre>: x4 `! d% c1 j/ ?4 w" W4 v* U
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
8 z" l( n5 {$ k; g2 _rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
7 `; F4 x; J6 N<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
* h: D& @  @8 {( C<pre><code>impl Drop for Pool {
* @( u# X" L( m  Y2 x/ L9 E. s    fn drop(&amp;mut self) {
2 }3 ]4 q4 I% x        for _ in 0..self.max_workers {
, N9 X' e* L- e  i            self.sender.send(Message::ByeBye).unwrap();) T2 _+ X% G- O( I: N& b# ?
        }' _) U% W) M& Q
        for w in self.workers.iter_mut() {
; S- g1 u# y+ `9 ~* \            if let Some(t) = w.t.take() {
8 J: [/ _& `" ?, ?/ P: Y+ Z# }& f) ^                t.join().unwrap();6 F3 x$ ~8 N' `8 v9 S
            }
2 {1 _8 X6 A: b3 B        }
% K3 V5 c: L. [  ^& a    }" M, M7 ?* `& h( m
}
3 p4 I1 O7 s1 ~* l
7 W1 V( L$ E; @5 Z- {</code></pre>
" s1 ^7 w& w2 k* v& {<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>( T) R' t7 j% I" ~
<pre><code>for w in self.workers.iter_mut() {
! L7 v+ K- T! Y) ]/ e& c' N    if let Some(t) = w.t.take() {
! S- \) X8 Z# ^4 A$ I8 y        self.sender.send(Message::ByeBye).unwrap();
, w4 R& n4 C* B8 C  W$ s! B$ |) i) |        t.join().unwrap();5 e# j5 t( c1 v$ K5 ?9 O7 x. p+ s
    }  s+ D# ~2 M' R8 n
}$ _- k7 }& ~% d8 y
) n  Z, q; g/ D* {! R+ a
</code></pre>/ [& }1 o# o, f2 V3 m9 L2 S
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>+ f( C; V; x, m& b$ w1 ~
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>" z# G* F2 }, M$ m
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
4 j; p  N4 o- H8 j0 d<ol>/ W  Z; k' P+ B
<li>t.join 需要持有t的所有权</li>5 q: J# K4 e" w  l" L
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
& g3 e8 V1 x3 j/ g3 y/ r1 |! G</ol>
7 a+ S' A, I. i. m& ]<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
& d: b; r4 V3 Q( e( M3 _换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>& C6 B- H. |2 |( V; Z* W+ I
<pre><code>struct Worker where
4 U+ P; c2 _) {* F, \+ y9 P3 x{
- C$ a; [8 ]" {    _id: usize,
% E9 h1 m2 i0 m0 {& ~    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
1 N8 n' Q4 m! I+ U, [+ c$ _}$ }; m1 O& p8 ~2 D. w
</code></pre>4 [/ `0 |# y) T: L0 ~
<h1 id="要点总结">要点总结</h1>
8 y- u2 c- L8 Z$ n" b<ul>0 z9 {# e& v7 H1 N; I$ h! F
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>- N( |2 `3 f( x1 C& K1 r* X
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
8 a2 d+ A1 J9 L5 t2 p</ul>* S# |. `/ f5 w1 n
<h1 id="完整代码">完整代码</h1>& C5 X" m. B. ^  |1 g
<pre><code>use std::thread::{self, JoinHandle};
2 O' p) R& R2 Z( F" z5 Uuse std::sync::{Arc, mpsc, Mutex};
8 J1 l, p  f8 ~: |% E7 F; q' X7 ?3 P# Y  i
3 s9 y3 w7 {. m6 |$ y
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
/ Y: C, f2 p# N# Z; c$ O7 ^1 Penum Message {& K" A! f% V, u7 L9 @% |4 X9 f- O
    ByeBye,
" W1 [: S( `$ V% v  @& r    NewJob(Job),. p7 c8 s4 _* X5 W! ^; z
}1 X# x# D; b2 M6 H0 u( d
3 X- l" `* N& f6 i
struct Worker where# g. e( `3 W5 Q  T
{
2 o& x" \; t* p    _id: usize,
( I% R' w: j" R* A) C  W8 e& A    t: Option&lt;JoinHandle&lt;()&gt;&gt;,7 x2 C: i4 V! Z  C
}7 ^- t9 y4 K0 N  X7 m7 m
: F9 {' c- R' E: k9 I
impl Worker
+ X2 s/ n- }! [2 ?0 b4 r{* S- _' I- D) b( c+ H4 Y5 L
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
& l) |1 i5 F+ s# s7 W( [7 \        let t = thread::spawn( move || {
1 F1 m  k: Y9 R, w" P' W4 U7 _            loop {
2 x# `% @: [/ c$ s. \/ O9 b* ~( @                let message = receiver.lock().unwrap().recv().unwrap();- L! S/ p% n8 C$ w0 D& x' O2 g. v8 m6 A
                match message {
( F3 O3 G6 C! ~3 a' i                    Message::NewJob(job) =&gt; {
( F, X8 m  \# ?( A' Y$ u                        println!("do job from worker[{}]", id);
, |% w6 L1 c) |& @" S$ R                        job();" v) _2 `- S) u: T$ I& g
                    },7 x1 b$ b: h& o* K$ s/ B  o) T& D! F
                    Message::ByeBye =&gt; {7 A: v( e% a; A3 j3 h0 }/ c7 w
                        println!("ByeBye from worker[{}]", id);3 w. K* P, N: l4 v9 N2 R
                        break
6 U2 K! T$ B' j                    },
# M1 @5 z3 m$ X6 |1 ?! n7 V                }  4 K2 r8 d' L; S& r' p4 E2 j5 ^; ?
            }8 N) ^7 A4 Y5 g; {# E
        });* O, W2 m+ H4 G. P  `% t- l

2 n1 [5 m. t: K  ~- Q        Worker {
% @$ s* H3 f) C2 u            _id: id,
2 p7 u& }) d3 q# I" q+ m+ n            t: Some(t),! o2 Q" \* |2 y( ^
        }, K$ ~  @' N- x* p9 E2 {1 c5 @5 W
    }
4 u4 _% \0 y4 i2 W$ r5 X; v6 q- ^}
8 }/ Z0 Y' b* E0 p6 D9 q, U. ]' b) P; E
pub struct Pool {& v& ]  {' \9 B! v% ^8 ?/ ^
    workers: Vec&lt;Worker&gt;,
3 m5 G* |, r* e- ~0 `2 F    max_workers: usize,
3 C' g7 t! L5 f: z$ ^6 g1 U    sender: mpsc::Sender&lt;Message&gt;
. q# ^/ b- X! K/ z; u. b}
* K& v7 }* ~, B/ Q0 N
4 |# a( {# o8 j2 b- J1 `impl Pool where {0 V3 f& @% l2 q8 _+ f+ b
    pub fn new(max_workers: usize) -&gt; Pool {
% U4 e+ I' M( r0 C        if max_workers == 0 {
9 _; v" n$ h6 t* p) q            panic!("max_workers must be greater than zero!")
) ~, d5 o' ~, K6 f        }8 O8 f/ `# t- M$ ^7 H  _* w* j
        let (tx, rx) = mpsc::channel();
" Q" N0 I. R& w) H7 X/ W: q( D4 q) z. Q$ Z+ l9 V0 }1 e6 U
        let mut workers = Vec::with_capacity(max_workers);
+ u$ c: p$ @6 O# R8 M1 o, f        let receiver = Arc::new(Mutex::new(rx));8 h6 z  p' V: `7 }7 m" B; ?7 P
        for i in 0..max_workers {7 }$ a; X1 G9 C
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));! i1 v, [* }# p) H  U- X
        }. X; ~- }) h' Q- |' B
4 K, V: u& x" r: N( P1 w* \% A3 s
        Pool { workers: workers, max_workers: max_workers, sender: tx }: Y- v9 _9 o) Y3 i1 S
    }8 F" s9 ?0 n6 z3 A9 {
   
1 N0 w2 t# [' {: A! C7 R    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
& b( E* U. d) }- ^- v- \% s9 t8 V3 m2 q    {
% c. U5 Y, a8 I; j6 U- ]' d4 P: G( h! K; [
        let job = Message::NewJob(Box::new(f));
2 |' d: I+ }) U9 }2 A  @        self.sender.send(job).unwrap();' c, D0 L) a, U' v% {
    }
1 Q' Z$ z. t/ S2 a& x}6 g7 a$ K3 R+ f6 q! i1 ^

0 N- W, Z5 J1 }5 iimpl Drop for Pool {
' V! _8 c% i% j6 }    fn drop(&amp;mut self) {# l6 z$ a) y% Z
        for _ in 0..self.max_workers {9 M( b* n! f. E8 E
            self.sender.send(Message::ByeBye).unwrap();+ l- f, C, h. T) L
        }8 l3 `; o2 p) C: ?+ ?) j6 R
        for w in self.workers {
5 T4 u" q4 Z  B% b            if let Some(t) = w.t.take() {, a, w- }" @! P. B5 [1 t
                t.join().unwrap();
. v% ~+ y1 [$ K& z% q* k) l2 Z4 f            }' [8 ?- J* Y3 z* ^! I4 \, {
        }0 s  z( d4 G/ d% x3 r
    }8 R. {9 }3 a( u, f3 q
}6 V  s9 E9 C+ n% _, x
6 x8 W( t! h/ e
1 d# P( @7 u+ T4 L
#[cfg(test)]* J& }8 D0 C, s  N0 B0 q
mod tests {; T9 }) T4 _1 u3 P* t
    use super::*;+ f+ t; g0 Q; L& N$ G
    #[test]
' E8 w$ O7 p3 b( L0 q" R5 V  A    fn it_works() {5 ]' T+ k$ F5 H4 I
        let p = Pool::new(4);$ y5 a& ^: j" ]
        p.execute(|| println!("do new job1"));" _* c% @# p8 J
        p.execute(|| println!("do new job2"));, z+ G/ P7 U$ m2 m
        p.execute(|| println!("do new job3"));
+ b3 B, w# I/ R; R        p.execute(|| println!("do new job4"));
6 x$ E% H" Z3 _3 \& q! b    }6 f7 e7 t" Z, E& \
}
) r8 g4 P( G/ D</code></pre>
# Y0 q6 }1 \& g: m0 ?5 m6 j6 N- T, X8 J$ ~3 m, P
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2026-2-26 23:45 , Processed in 0.279681 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表