飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 8507|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

6478

主题

6566

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
21758
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
& q# p( v- x% c
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
  h! u3 p7 h0 U% _" M, C% T: A<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>  o6 G  ^: w! W% d8 Q
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
, u  V5 ^; E3 o0 ?! `<p>线程池Pool</p>6 B! T6 x: s  c% J
<pre><code>pub struct Pool {+ \- \% d3 T  F9 E
  max_workers: usize, // 定义最大线程数
1 m" K6 z8 a& V; K, s% G}2 x) [! b. F) I. r# f$ v

$ n7 r- W( ^. N% q5 aimpl Pool {
2 V: [8 z0 G9 f" i  fn new(max_workers: usize) -&gt; Pool {}
1 M, m7 w) Z  }9 u  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}
" g; Q! O! e+ }. V- _/ Z}
: e* D0 W$ U9 h4 N/ b2 @
( T# X( W9 \: [) ?# r</code></pre>
, m7 `5 n( y9 ?8 T3 C' T<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>: P9 K& [% V$ Q8 e# u7 Q8 E4 T2 |
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>, _0 v; ]; s( w0 M
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
7 W0 C4 {7 u9 T. [0 _1 x# I# m# c: k<pre><code>struct Worker where
% R% z' H5 Z( D3 C! V{* m- ?7 v" ^6 F7 l6 Q$ i/ M6 C
    _id: usize, // worker 编号: Y) u# ]! |* [0 |5 o
}
8 s8 x' d$ P! `3 Q1 ^</code></pre>8 t- `  U8 J8 B# N1 ]% J) h" y) ?
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>7 N- i5 g; E5 S0 `+ E; l
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>; Q( g7 q$ }! M1 p
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>
+ j% O: m" g7 X* A6 z* u, N<p>Pool的完整定义</p>3 @7 Y" C: X9 o2 u- O
<pre><code>pub struct Pool {5 ~4 X  l! c; Z1 g( n, q& d* C5 b
    workers: Vec&lt;Worker&gt;,
: q4 @/ C' ~9 @# W; b    max_workers: usize,
9 ^1 g$ |5 |3 s1 U3 W; }    sender: mpsc::Sender&lt;Message&gt;
) m5 S% T( e4 H; _& O( ]4 ^' k$ w. m9 Z9 [}
. ?5 l% U5 S2 T6 O& K5 A8 x</code></pre>" P0 n, l1 k0 y- i& e3 I- ]) V
<p>该是时候定义我们要发给Worker的消息Message了<br>0 q8 `  Y2 k/ L$ D3 U
定义如下的枚举值</p>
( E8 K' g6 ~  H9 {. H<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;4 @* m$ h6 w7 }7 t1 m( y3 h
enum Message {2 M( |9 O8 E! R- f
    ByeBye,
* A' ?. t6 l' O3 I) X2 F# i    NewJob(Job),
+ ~. ~' O8 d. r$ ~+ f}; r. |+ A0 u7 `. B2 E: S, P
</code></pre>
( U( u# {# _- D/ _4 h<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>4 [4 \  G' C0 q9 u% Z. I, B
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
6 ~4 r4 `# h! T/ {  S5 K1 n- e<p>Worker的实现</p>/ _6 f! ?# V7 W' K
<pre><code>impl Worker
  \! R$ ]; O+ p: a, {{
; G2 `) K: y2 r; A8 I6 `    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
8 K) i' @- G* t9 K' C) F- O        let t = thread::spawn( move || {. V8 E# G$ g8 j  _( r
            loop {
) n6 n3 I6 Y2 v) `' l  k                let receiver = receiver.lock().unwrap();
, d" F0 r. b: u4 x1 {3 j: U                let message=  receiver.recv().unwrap();
: Y, {4 k: `4 t! l1 w0 J                match message {
/ Z0 j- I8 `# t+ }$ q/ E# c                    Message::NewJob(job) =&gt; {) h1 i% s: g! y7 k  n9 A
                        println!("do job from worker[{}]", id);; ]1 m# I* r$ }  p! V4 z! C$ A
                        job();3 O. H9 ?; Y2 D/ J3 J
                    },
& Z2 p9 h5 r1 P4 m6 O                    Message::ByeBye =&gt; {
7 u; ~) T- O4 t                        println!("ByeBye from worker[{}]", id);) D8 c4 K( P' x' T, n7 t* M8 M
                        break
& U7 ^& g6 f# t! D/ V                    },1 @6 `9 e6 }3 [- V" r. n/ K
                }  2 y( D4 J" P/ P; B
            }
4 K+ x' J* Q$ u2 i& t        });
: M  y/ k: B( p* i( q' |
2 P- W. A/ e5 V7 V! X        Worker {+ r; d: j* {( a! u$ H& H0 ^
            _id: id,% K% G, G& f1 Q5 j, p/ c
            t: Some(t),  t& x+ s. ?2 X9 K2 I# W
        }
! ^" H) N! `  r    }
8 L* R# C: k" ^" S$ t}8 Y* e7 Z+ m7 y/ f+ V7 ^& q% R' H
</code></pre>$ B3 i) N: ]2 q; c9 d: ]
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
9 {3 |( o7 [5 X* C但如果写成</p>
4 N$ i9 F/ L; a/ Q& Q. a; W2 x" f8 q! M<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {0 n; P' P2 h1 n% ?
};
$ c0 J, c) t$ h! _# B% A</code></pre>
5 }, w" A( ~8 c( K- ^<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>+ a4 q" _7 z' z+ ?
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>' `5 g: M( `/ p+ r3 q
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>9 T1 w! R0 O8 E8 F$ S5 Z2 X1 M' d
<pre><code>impl Drop for Pool {
/ B1 x( Y0 Q$ D, }, s6 m    fn drop(&amp;mut self) {" e: C6 ?/ _. ?3 `; Z- C4 `
        for _ in 0..self.max_workers {
9 v8 z  i+ @) v8 C! f: Y! J            self.sender.send(Message::ByeBye).unwrap();
( D# T8 m# T8 V6 p/ b        }
9 a. x! Z/ Z. K. F7 B) S        for w in self.workers.iter_mut() {
2 f* h9 F2 p( B6 t' G            if let Some(t) = w.t.take() {
) l: m5 C6 R$ }                t.join().unwrap();
1 C5 ^+ J& r, V  F6 C1 o- P+ Y3 U            }
% G5 g+ G" V1 t5 N* T        }9 p: k$ d! B9 ~& G& L
    }( I4 Q$ _$ G8 X8 J/ n6 E
}
) x- c! X: L3 V! U- Q; w2 O5 U$ x" H& x" q( \; V
</code></pre>
. K  O/ R) j; x0 T, Y9 o<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>3 j. f! `" M  R8 W' m
<pre><code>for w in self.workers.iter_mut() {
# n# P2 ]3 j: V/ P; t7 U% h    if let Some(t) = w.t.take() {, @- \& X! Z) ^% Z3 v+ G" l
        self.sender.send(Message::ByeBye).unwrap();
2 ]3 s! x; z" ?4 W; |- [        t.join().unwrap();" U* T6 X. H1 U, O( `
    }3 x) |9 r; j# j% `
}
0 C" u2 L/ h& O
- ~/ `, E5 F5 Q" w2 l. t2 n</code></pre>
8 ?3 r5 D/ |! X9 r2 w" y<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>0 @5 D& b" Z, m; J% n2 |5 A  |  Z( m
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
' e$ `7 U9 e* F& [<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
  v( \! e% `% b4 W3 g; G- N" w! _<ol>7 Z- c$ }8 c4 }: P
<li>t.join 需要持有t的所有权</li>5 F: l- W6 c, |4 W& \
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>, ?& k4 d4 e& A$ z; T% O
</ol>
/ o2 e7 \& _% r6 h, ~) h: d6 U% g<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>; B" y  d! c# Y5 g
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
+ G; p) w8 Y5 \% ^3 D<pre><code>struct Worker where/ `3 l, J; U( T; A" C: k1 A- h: Z% X+ h
{2 x8 m" z. \' X( g0 b" Z) b
    _id: usize,
8 x2 E% ]$ C2 {1 {    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
& f/ W+ M$ _# {% o}& A$ l% k$ d, \1 J6 `, U
</code></pre>
, ^+ w/ s! m& G4 A+ M<h1 id="要点总结">要点总结</h1>
8 T1 W' r; w  [. T* \% S7 ?- `<ul>
; h7 W/ z. `; g) W. v<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
. t% {- i: A8 I6 m& R<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>
3 ]. z$ D$ ~: s3 j; V</ul>% e0 `$ ?  @! |1 p$ d
<h1 id="完整代码">完整代码</h1>" Q( r) [. y! p4 r3 L
<pre><code>use std::thread::{self, JoinHandle};
& L, u. P  [  Euse std::sync::{Arc, mpsc, Mutex};8 Y" g1 ^! B3 Y& O9 [
  C" {' V1 ?5 L! ~

, s9 g5 q" L. j7 ]1 ]. |type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;! Y0 W0 U' i+ |, h
enum Message {
7 ^' x+ U/ [$ `& P# R1 y4 d    ByeBye,) V: q% R, B0 k9 f
    NewJob(Job),' R+ g! x+ |4 k9 y9 P; S
}
! ?  g% F( C4 O8 i+ V( k) w$ M6 y$ u7 w5 }5 l$ V* r/ t9 _% N
struct Worker where
$ \' _! r6 G5 ?' ^1 K2 A{
! b2 ?& `0 A( u/ `9 T5 u    _id: usize,
4 W; W8 f, [) H& d    t: Option&lt;JoinHandle&lt;()&gt;&gt;,
6 B4 ]+ r" n( T5 D1 ~/ ~}# J  Y5 |1 ]$ q8 k7 O( I
! M8 p; Y) i2 n* ~
impl Worker
8 u4 t4 [0 _  Z1 e7 L{: O6 w" f5 a: ^+ E
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
7 |# B( x4 a( c7 K. w/ |        let t = thread::spawn( move || {
, L, N; j6 K& Z" D            loop {9 z6 K+ V! K  b: s
                let message = receiver.lock().unwrap().recv().unwrap();+ U! q: D; |( z" X. d, Q+ c, Y
                match message {
3 R1 f# b7 |1 K9 H$ t6 M                    Message::NewJob(job) =&gt; {
% M  P" x" Z8 C% w# ?: T                        println!("do job from worker[{}]", id);
5 v' M, Y( T/ b! ^5 e2 f( s                        job();
' V/ w$ G+ R. I4 l( a, j, M! Y                    },; |' C" H. ]' a5 \1 \. N0 l
                    Message::ByeBye =&gt; {
9 Z" Y" h' }- S, L8 c; O                        println!("ByeBye from worker[{}]", id);1 `- c( x3 K& F1 O1 M4 l
                        break
2 |0 y7 _% z' P                    },2 l( h0 M+ Y% d! B' l) Y" f
                }  
4 P3 Y% C6 f0 ?, [2 Y            }. f, M6 \* D) m) [5 q
        });' T9 v! x3 G; @' U% R8 G  z
5 U8 w, ^: Z3 G; A9 q2 \0 H, Y2 i
        Worker {
9 \1 ]: ]4 l; R            _id: id,
: t1 `9 q. `$ O% ~. s9 {! |: h+ _            t: Some(t),
/ W+ V$ o. `3 O, t1 q+ z        }
/ o) F" T7 Z- ]4 e    }$ A* _& C+ ?+ `4 d! d; T
}
/ _  c! @# e- T) d; \; }
; s' t0 q) G; R# m) O4 spub struct Pool {
4 o' d! x: W) |# ~5 p# Z/ i% O    workers: Vec&lt;Worker&gt;,9 ?# r% Q) m" N7 F: ^; C
    max_workers: usize,9 _! H( @  e2 @" x
    sender: mpsc::Sender&lt;Message&gt;
2 `8 }! f% |9 A* h! y& j}
+ B& |9 m* C9 l( z! N: [
9 E, H- X3 v2 C) V+ K( @* o+ |impl Pool where {
. h8 g3 ~# c1 R% A! p+ \- |9 Q- _+ P9 M" p    pub fn new(max_workers: usize) -&gt; Pool {
& ~. z$ ]- l, G, U  t        if max_workers == 0 {
# J% Y+ k" T4 J# ]  H            panic!("max_workers must be greater than zero!")
$ z, g$ E' g3 U  r4 N1 v% j0 ?        }
5 N8 ?% n5 H# v. H        let (tx, rx) = mpsc::channel();' e0 c- [2 t4 ~

( G/ S0 z3 P& g! b* q        let mut workers = Vec::with_capacity(max_workers);
5 K6 f" k9 b% h5 G4 F: q# g8 Q        let receiver = Arc::new(Mutex::new(rx));5 {% Y2 Q) S' h; _- y  }4 Z
        for i in 0..max_workers {6 a; q8 Z2 b# b& Z& u2 l4 H
            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));
5 |$ B0 v3 M  u( ?0 E        }: ]- g# v. @# _/ T" ^) {
; h+ V" k; v# F# N, k! |
        Pool { workers: workers, max_workers: max_workers, sender: tx }) v1 O! M1 w  B7 l
    }/ ?5 r, Y. u8 b% Y$ y- K& @" v
   
1 I8 z, ~" l( k+ M% F; L* a  w    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send
" z% g' P, q; S; \/ p; s    {6 S( w/ {7 ?3 T4 }
$ D2 [5 g% U. d: O' L
        let job = Message::NewJob(Box::new(f));
  C' i. ?) o, V1 H" @/ r% C        self.sender.send(job).unwrap();
) m; I4 e- W9 F- X    }$ S" X# Q+ s3 r9 T# ?
}
& E9 V# ~! ~  l! T( g( o. _* f- @- A# G  c" n
impl Drop for Pool {
: h' Q/ Q1 T+ z3 o: r7 F    fn drop(&amp;mut self) {
! A- @' O8 J4 w3 P; b        for _ in 0..self.max_workers {( o1 N/ l+ }" ^  \" z4 x! \
            self.sender.send(Message::ByeBye).unwrap();
( f  u' T4 a% \        }
6 b4 U. F$ O- D' w9 A5 T- J        for w in self.workers {
  p: E# U7 g7 v+ }            if let Some(t) = w.t.take() {: B2 w" a; H% Y% Y5 g5 P
                t.join().unwrap();2 M" e% y2 f- l6 q# D
            }
0 t0 }; e% O! x5 p        }* l' E7 u6 B, B3 [* {+ D) ~
    }
: H- t' G7 w% v) j- m}
. o; E! [4 G- F# \& y' U, s1 l  a% r0 q: n8 Y5 n/ I2 j6 j, C) r
/ W8 d0 C2 i7 E6 ~: v: ?3 l
#[cfg(test)]
- S4 _  y8 b! a% amod tests {$ r6 n. I. J7 P* H# N
    use super::*;
  d0 X5 [  \/ J% Z$ H$ x    #[test]. l% l( w- q+ L% O- n0 C
    fn it_works() {- k2 J$ Y4 ~$ F- x# y
        let p = Pool::new(4);
% t5 a% J" \+ K$ Q. |* X        p.execute(|| println!("do new job1"));* B0 {1 h% O4 |+ n" A' c
        p.execute(|| println!("do new job2"));3 X1 f5 _' u: ^% f
        p.execute(|| println!("do new job3"));
& j' q7 A' j+ O  C: u' x/ d- L% G9 p" b        p.execute(|| println!("do new job4"));
& u) v/ i% u2 T8 o+ j; |% x/ ^    }
( |1 a% Z& R$ |! V, S}( o3 ?* T8 B0 a& W2 e- `/ T$ x
</code></pre>
! D9 I( Q* Y8 h- s! w9 _6 B
" u. z0 r% x( M. X* A3 t- A8 Z5 l$ z1 j
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-5-1 07:24 , Processed in 0.084064 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表