飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 9810|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

6806

主题

6894

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
22748
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式

% I0 v, r1 ^% L  c<h1 id="如何实现一个线程池">如何实现一个线程池</h1>! @" e* E+ c& R* N* ~
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>% [. n: U% R8 T, D( b
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
# \) f+ f1 b4 f5 p* l0 f<p>线程池Pool</p>; o9 G5 K% `, v! i8 L4 ^- |  z
<pre><code>pub struct Pool {
2 {4 E8 B) E1 J8 n  M6 |  max_workers: usize, // 定义最大线程数
4 u7 j  n4 R" ]; N}
* N, B) ~' C- Y" m5 n1 g! d
) ~; o3 s& {1 n3 T, R3 \impl Pool {
5 d! X* M& ~. W% Z% h' ^  t& H  fn new(max_workers: usize) -&gt; Pool {}
0 k& m) U* w/ {  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
; s  K! K" V; B/ G6 j" d}
  \7 x9 D& o4 g; A( J; {$ }3 b
, G, }5 ]' T; I. a3 J</code></pre>
+ v; z2 y4 N" m5 ^% j( A8 P<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
! n' ]* i' |- ]- n<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
# u0 A+ ]$ Y% y' j$ A; L6 x5 |可以看作在一个线程里不断执行获取任务并执行的Worker。</p>  n5 L' x. N/ |! d$ k# W! R9 J
<pre><code>struct Worker where
1 Z- r7 }2 v  `& ^{
6 ]9 m9 i" Q# P/ R1 |    _id: usize, // worker 编号1 ~9 E  U6 ]8 @& C6 a- _9 n5 w
}
* A) i$ k$ {& I' x8 y+ P</code></pre>
; s3 j% e! u" Q. I<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
3 e( g* k% [! _把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
. a9 X1 v6 P2 F<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
* c$ {8 G, E# O- `2 m8 j, [<p>Pool的完整定义</p>
8 u! W# P4 j: v) a* `; i<pre><code>pub struct Pool {# |3 V8 n$ Q; n! D
    workers: Vec&lt;Worker&gt;,  U0 k' {$ E" U5 C
    max_workers: usize,
- }7 o* R$ D) s9 H- p6 w/ H    sender: mpsc::Sender&lt;Message&gt;1 u- ~5 O: l1 L' R' Q) E. B9 i
}% |- J1 s5 r0 ]: D% ^7 o
</code></pre>
# S; t* S4 _" W/ S<p>该是时候定义我们要发给Worker的消息Message了<br>
8 a7 m+ P3 R1 s" }定义如下的枚举值</p>
8 O9 p, Y. o/ L1 T* x<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;* T) o' R& C' W; J
enum Message {
8 \4 n+ f7 r: r! O3 ^8 O    ByeBye,
& S9 x' @1 H* a: ]  s- v    NewJob(Job),. \+ |* z# c5 B! M2 J9 t+ B
}$ B7 K4 }) {! y) J
</code></pre>
( R; }: D/ V; w3 k- ]$ x<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>7 e( R' J+ ~9 n  a+ W+ @) P
<p>只剩下实现Worker和Pool的具体逻辑了。</p>3 e3 Y( j% [7 s9 D0 f
<p>Worker的实现</p>
! k  X2 ?1 `3 V) G1 k<pre><code>impl Worker5 I/ q, ?+ a9 l9 N+ n
{# p4 S+ ]' {+ [, T3 h
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
9 n6 w9 q  U: ^! W% [4 Q        let t = thread::spawn( move || {! @6 D9 E$ l! _* k  F
            loop {
) A  K% j2 |8 Q; S                let receiver = receiver.lock().unwrap();
- u" }: h  D! [8 s" r                let message=  receiver.recv().unwrap();
0 u6 Q$ |+ d. F3 C  {                match message {6 @1 [4 r" P9 s  c4 H
                    Message::NewJob(job) =&gt; {
# n  `1 q* C7 L9 B5 a                        println!("do job from worker[{}]", id);
0 I  d5 A0 P6 @                        job();
' g4 M9 t0 b+ h2 g- n; e. z9 A4 @                    },9 Z/ X# L# ~" w+ u; {  _; q
                    Message::ByeBye =&gt; {: ~1 V2 t4 f, N
                        println!("ByeBye from worker[{}]", id);0 R* ?& e! Q0 _8 u8 C
                        break
7 Q' k) |# ~( o0 A2 S% U% R/ p                    },
" ^, |! e7 C. |- E( c4 G                }  
/ [& v- O6 w! k8 _& t            }
( a# M! W  B0 w4 M! c        });9 l& _1 v. T# Y0 F5 f
0 p# W2 b0 P" H
        Worker {
, d) F8 V: j4 r            _id: id,& S2 S. Z4 c5 P# |/ N8 T( T. X
            t: Some(t),
0 F# _" S# X/ n; Y        }$ J( t5 |5 V. n/ K
    }
( m$ s) P) {1 Q) m" R- p) U}
9 }* D) }7 c0 P9 [  w6 @' X</code></pre># e$ s- U4 J+ j: ]  z  {
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>( a' Q  Q2 I! T4 D8 A. t5 D
但如果写成</p>3 ~% l. }3 x% o  V& [! W2 Z* N
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
9 n+ y. o; R0 i1 z};* P# |" [  r" U* @/ _
</code></pre>) C4 G1 E$ Q. Y* u
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>, @" {$ G1 W2 q/ M! N# {% h
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
) \( x2 \: z4 r2 d<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
$ X; B( J# n8 F<pre><code>impl Drop for Pool {0 O4 W+ e( s3 L+ A" J4 G4 M
    fn drop(&amp;mut self) {
1 C/ T) \9 \1 L3 R* C; s( P        for _ in 0..self.max_workers {. B5 q' F: h0 ?3 B7 }
            self.sender.send(Message::ByeBye).unwrap();
( }3 s% l) p0 S" Y2 l2 ]        }# F- e, A# S3 r
        for w in self.workers.iter_mut() {  ~; ?8 F, y, o% x( n
            if let Some(t) = w.t.take() {
- Z  }' L, L9 b9 u# I                t.join().unwrap();
. B" H- B! \! W; N5 D            }
/ n& l+ N! L$ V0 y% ^$ S4 `$ O        }6 N& s0 }7 ]0 O4 K: _) ~+ i# L
    }+ g+ v0 b5 f8 G- j1 `( p7 t6 N' M2 S
}
5 b( }# ~: R% `
5 z! r0 R) `* \5 ?4 x: E</code></pre>
6 P& ~3 D$ u* ^7 W7 B7 W7 J  x<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p># H/ L0 \9 X9 b. `
<pre><code>for w in self.workers.iter_mut() {
3 _8 s. T  R4 N( E    if let Some(t) = w.t.take() {
/ \- |4 Q3 s! z$ f5 v        self.sender.send(Message::ByeBye).unwrap();
  h8 V1 m- r0 ]0 i5 U( ?+ [        t.join().unwrap();
" S- a9 R1 u; X) E5 I8 f) b6 g    }
" I" i3 Z* [) n3 \- D}
2 |/ l9 S: I6 J8 Y" o1 Z
$ u" {# l" {7 d8 v3 `2 {+ ?$ a" N# a</code></pre>- g# g; H  Z% T" q4 A
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>+ T6 [/ d+ s. q# p9 ~7 j
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>8 i. \- G/ l7 w6 `5 X
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
- `1 q3 V$ g# v4 Q7 G/ E<ol>
) v& R, H9 w9 b2 a/ K$ L: J<li>t.join 需要持有t的所有权</li>
# e2 ^9 c  `+ V<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>7 x6 Y, X' b5 [4 L* T
</ol>
7 v" F3 G$ ]7 g3 n3 |  s! y. _- M<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
, }, d0 m! Q( z5 t换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
5 w4 e! E; G, u2 \- R<pre><code>struct Worker where
) |% I2 f9 r! n: _: f+ ~{3 {$ ]7 J: C' h4 O7 d
    _id: usize,  `  r8 E. R8 `$ _# T8 [4 a
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
+ i* O% Q, F* @; v; a" g) U! R}/ E$ J5 H+ C( F9 X# l- O
</code></pre>0 b- T+ {" G2 v6 E3 o7 M6 c
<h1 id="要点总结">要点总结</h1>( e* H$ X! x, p/ [2 z$ X2 d
<ul># Q7 V! G% p9 Q: c% j
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
* b( ^% O. O! S" D- X, R<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
% A9 U9 t& h2 K* b" Q+ y</ul>
( h& g# _/ R( {- B<h1 id="完整代码">完整代码</h1>
- w6 Z4 ]& e5 G; Z+ w) V<pre><code>use std::thread::{self, JoinHandle};
! |4 s+ y: Y( w& c! R" a9 iuse std::sync::{Arc, mpsc, Mutex};
2 p/ F1 s$ y5 E6 S  P# q6 J
) t# i) N9 C8 }1 B" G, j& M4 W$ R  E* B, Q
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
( f: i2 c  X# h9 Oenum Message {
. j, r  ]) {0 a! ~  n# U2 p! T    ByeBye,
7 i3 v' }2 Y0 Q3 }' s+ k, T    NewJob(Job),* Y+ q/ S5 |0 M5 \' @/ ~0 L
}$ q/ s3 a8 f3 |3 \2 J6 c

; ^9 Z5 ?6 o& Mstruct Worker where* R5 t7 r$ o" C& H
{
  f: u) a. s$ C! d* l# Z7 P! w    _id: usize,0 C, _! m5 H! V7 h
    t: Option&lt;JoinHandle&lt;()&gt;&gt;,; [7 i5 B0 }; K' y; V
}
- A4 w8 c0 M1 d( s" Y
; G1 u% _! k+ v7 N$ c6 H6 \impl Worker- [$ H1 q" |2 `' U' Y* u
{
% u' S) n3 z9 Z* q) [& M$ g1 r" m! p    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
# m; o  i, [8 ~' q# K& I        let t = thread::spawn( move || {
" c: E6 l" F. `0 o  l6 ?! t! R' x            loop {
2 B- }( Q+ {0 p# x; W: a! b2 [                let message = receiver.lock().unwrap().recv().unwrap();
7 N' U& x+ |) a                match message {/ s5 C, N. B1 Y" I0 k9 e! i. ^1 c
                    Message::NewJob(job) =&gt; {
( |' L* w3 B5 b% p( ]4 d                        println!("do job from worker[{}]", id);! y7 {) U  l* g
                        job();
) e8 M! ^. G( H4 M, O                    },
3 Y, `0 F* O( m: F8 k                    Message::ByeBye =&gt; {) [: x2 y1 f% R2 y
                        println!("ByeBye from worker[{}]", id);) _& B$ Y/ v$ W) E2 m
                        break( C5 T" [# b# L1 K6 m6 o7 [
                    },4 U( m5 H9 Z2 J% K' H
                }  & S9 {" A9 O9 D/ a. C6 I& K+ S/ n
            }
8 c# D- U' c: ?5 b- P+ ^. ~& z        });
, }, X5 [5 ~' A- d& t$ ?
( {% ~& q8 `1 D8 B1 Y        Worker {
' L! Y% ^' r- o. `$ c1 n" G& w5 N1 x            _id: id,- V" D5 \  l' H: H# U
            t: Some(t),2 ~$ `- i: H3 \; j6 f
        }
* g: T7 r' o8 M. H, ^    }
; v' m* m$ t8 Y! C7 A) G; w* j}
& I/ d0 Q5 b' \
9 n/ a( s. f5 p1 O7 J/ gpub struct Pool {
4 W& b! d- C' n6 h  z' J5 S  U    workers: Vec&lt;Worker&gt;,
% K' N9 o3 I0 Y* `; q/ U    max_workers: usize,% `: R. L6 [0 _0 I# ~
    sender: mpsc::Sender&lt;Message&gt;3 M* b- p7 }7 ^/ I5 j4 Q2 T( @) D( d
}
% T' y1 u0 {' C8 R  ~
/ f0 C0 x! C1 j/ Fimpl Pool where {' s( V1 K0 G/ h/ t( ~
    pub fn new(max_workers: usize) -&gt; Pool {6 S. w! Y) \/ j' h& z. q
        if max_workers == 0 {
" g8 L( [, b; A& S            panic!("max_workers must be greater than zero!")# w$ M( k4 G: w: E" Z
        }: v( z8 }8 M. s* [* ^
        let (tx, rx) = mpsc::channel();
  @) z  ?9 ^! a) r, [9 r( H& }# Q' g4 A. a0 ]' I4 p& C
        let mut workers = Vec::with_capacity(max_workers);7 l$ z9 u+ Z1 X8 U
        let receiver = Arc::new(Mutex::new(rx));0 u) b6 @& M/ @) f4 w
        for i in 0..max_workers {3 [1 j; |, h1 D' G, ]$ V* }4 n% R
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
# `1 J0 y( S* E5 x        }: L$ f' T- j4 {, j
5 f$ N5 r; m3 c  Z2 f
        Pool { workers: workers, max_workers: max_workers, sender: tx }! h% ?0 m8 j3 |' z2 B
    }8 \$ ]4 p2 ^& x8 ]
    0 u) t) v) \0 J* O0 j, @
    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
' n& s3 z( y3 {9 Y8 Y    {
  o( d/ K) W9 B! D5 P) a; ^% r* M. Q6 L& r+ k# C: Y8 ?
        let job = Message::NewJob(Box::new(f));
; v* N; u( ~% u        self.sender.send(job).unwrap();
  K  V" x7 u: n0 b, J! k    }' |5 D  g! v9 N/ p, y8 i! [
}" P' t/ Q/ Z) |: j

' A4 m* f6 i* K# }  cimpl Drop for Pool {
- G/ C! E9 d' I( t' q    fn drop(&amp;mut self) {4 Y0 Z2 `0 D& A3 x) C
        for _ in 0..self.max_workers {
& R1 D( k3 y" w& Z( a            self.sender.send(Message::ByeBye).unwrap();' p# y7 K) r2 W" F3 ]
        }8 O) {2 g! W8 b
        for w in self.workers {$ O7 C& X' @6 G* @  l5 N
            if let Some(t) = w.t.take() {( y7 D7 r5 \% @6 U4 ^0 Y7 h
                t.join().unwrap();' {# O6 k0 u" e  m$ s) c. W
            }0 K% c8 {, S2 K4 v/ Z
        }
% q3 n+ o' [/ n9 \    }# f+ ~- C8 G2 F. x  n# @1 g
}  F5 W3 n$ |. y' Q. d7 [
0 `" [* ^4 ]$ P9 p% C' Z5 x
% u! q6 F0 M, M, S& D2 v
#[cfg(test)]
$ Q( i- E3 r' G& p0 I+ Z" rmod tests {' s% c0 R. ~1 u& J
    use super::*;
' C/ N1 X! X& {    #[test]& V. m) G; g. g! m3 d2 G
    fn it_works() {- m% `7 J& @9 d
        let p = Pool::new(4);
7 y+ ?" c6 Q$ F& d- E        p.execute(|| println!("do new job1"));2 b1 R% B3 [! O, w4 g7 Z0 P
        p.execute(|| println!("do new job2"));" q% G! w1 [" n. E$ W
        p.execute(|| println!("do new job3"));
0 U; J; F4 K7 K, n% j        p.execute(|| println!("do new job4"));' g1 I% ?+ F6 q. V! d. r' p
    }
( `: q& R. ]" d% j& q8 Y}$ }. U. C9 E9 l/ l2 B, |: f
</code></pre>4 W5 ?% K5 G5 h% `* M

* f9 z$ A$ r( v' e) A( Q- V' @& Q
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-6-29 18:31 , Processed in 0.068246 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表