飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 14871|回复: 0

rust 实战 - 实现一个线程工作池 ThreadPool

[复制链接]

8061

主题

8149

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
26513
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
7 ?$ M6 y( H* d, w' O- |& J
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>2 f  ]: R. u6 b; B9 K4 R" }+ v1 y
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
! b; y- m6 [  c5 }0 F0 T$ U# B; w<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>+ N# V1 m% _$ ]# Y2 S
<p>线程池Pool</p>
! W8 I, Y0 D* M* K$ Y# D. L<pre><code>pub struct Pool {5 W0 C' h. M# a& E* t
  max_workers: usize, // 定义最大线程数
1 L6 }3 ?/ f- u& O/ |# I4 h6 d0 c}
3 S- I& K! M6 m' C
, w) b" m5 R: R9 I% n7 D1 }impl Pool {
! x& \) f. M& N: A, S2 P  fn new(max_workers: usize) -&gt; Pool {}+ k, Y2 Q7 d& m& a
  fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send {}1 B' Y, K" V' a
}. a4 P' I8 _; O8 L6 P4 ~

4 ~# W- N  M5 P/ }2 v</code></pre>- A% E( ]# S. Q; v
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
+ p- z8 ?- ^# P$ X<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec&lt;Thread&gt;</code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
: |& y5 d! l8 i7 o+ P可以看作在一个线程里不断执行获取任务并执行的Worker。</p>  F( n- r) n4 b5 ]7 w0 A$ `
<pre><code>struct Worker where) y- y5 O7 I: K+ l. t' T% N- |4 m
{
" N& w5 h6 V  U, R2 s    _id: usize, // worker 编号
- s4 [4 @- v# ^}4 ~. ?* p  ]6 }9 `9 V6 `
</code></pre>
3 a( x4 F1 q1 ~+ N/ x1 f<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
) \4 S7 Z' v1 J9 f9 L; e$ x+ f+ c. T把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>( r; {% A: }& N, C2 d
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc&lt;Mutex::&lt;T&gt;&gt;</code>来包裹起来,也就是用锁来解决并发冲突。</p>8 \( N- h  L+ E$ p
<p>Pool的完整定义</p>
$ |: c) {' t# |1 Y7 u/ K/ G: \<pre><code>pub struct Pool {
6 l) A- Q1 Z0 a0 s    workers: Vec&lt;Worker&gt;,- S, ~* r. [0 x4 G( g
    max_workers: usize,
4 ?) N+ i! c: ]! r    sender: mpsc::Sender&lt;Message&gt;
+ q/ A5 y: a! u, @2 Y; m5 Y}+ X; o" L9 b- ~+ \  O
</code></pre>
2 E( o2 p/ T  B0 |7 f<p>该是时候定义我们要发给Worker的消息Message了<br>/ q) {) p: ?; K1 q
定义如下的枚举值</p>
3 G+ f' Z; n/ P; v8 [4 F2 u% |3 j<pre><code>type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;' k* ]5 u$ T5 G# `; E
enum Message {$ y$ m- N8 R( v7 \2 G- j. ?7 c
    ByeBye,
$ }7 |5 i9 P2 K    NewJob(Job),
; M5 f- @7 d8 ~, }}7 F% W9 Y8 X' `6 n
</code></pre>$ U" L& A( Z/ S) h! W& ^
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
  ^6 B8 Y! J8 i3 Z- @" ]+ @<p>只剩下实现Worker和Pool的具体逻辑了。</p>
/ f( B$ ^* R$ p) j$ \- a4 _5 W1 b! Q<p>Worker的实现</p>' m. ]$ Q" z7 c: H! J
<pre><code>impl Worker
7 l% A+ _5 P; a4 L3 Y6 @; z{6 `: e+ W4 L. @
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {, E! \3 u. P9 L  |8 L: d" i6 i
        let t = thread::spawn( move || {
+ G0 J+ {7 C# G/ `3 H% K            loop {
! c( Y; ?6 h: T. i                let receiver = receiver.lock().unwrap();
$ f8 f3 a- ^! S  ?                let message=  receiver.recv().unwrap();7 R: S( S2 [" n* w
                match message {9 d  R. V# p; {) E# W! }* D0 X
                    Message::NewJob(job) =&gt; {
3 z$ _  X- y$ T' ^                        println!("do job from worker[{}]", id);
# U' @0 ?# N& R/ x; T* \/ v7 A                        job();" A, g. c! ~3 T6 @! L: I
                    },
8 v. I+ V% N2 N& B/ c                    Message::ByeBye =&gt; {
- D, j1 e! }' o8 f9 Q                        println!("ByeBye from worker[{}]", id);
6 O1 T1 m1 f# w) q3 Y( a                        break9 d$ P1 k5 I  Y% E7 v* p) L
                    },
. s5 f+ N2 C4 f5 G. d6 G$ L                }  
- Z$ Y0 n/ D& p* a3 b& O9 x$ ~9 }- Z! x            }$ g' `; b) ]5 B! X: T  k  w1 h
        });
1 f3 A" j' e% c; `) A; G
0 e6 d+ \4 V7 U4 X3 u9 ~7 t$ y        Worker {, e9 _2 l& m; N; ]& Y
            _id: id,
+ F' k' q! Q- X. T0 t/ E+ k/ b            t: Some(t),& \# Y" S+ n- |" S$ p0 u, G" ~
        }  I- J& A% A5 t, {% J
    }/ b: P1 J1 Q. ]+ V4 G, J7 t' J
}
. o' k9 F- o4 J1 Y0 ]& ]4 W" y; k</code></pre>% p! e2 i3 n- r( r9 T" q* D& \7 q/ ?
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
$ G0 ]4 }  l% `# g' Y. E但如果写成</p>, ~$ p: d2 }; c6 `, R
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
/ K; \( v! Y, E0 @2 A};
8 v' h' h8 q/ h- u- f7 d</code></pre>
! o% g: a, L' b7 j8 [<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>: |5 C$ R" B6 l" T( f# n; _
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>$ u1 z( [4 C- P/ V
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
, K( p. M, u- k<pre><code>impl Drop for Pool {
; K2 T8 e3 N& ^0 w3 V( j4 y; V    fn drop(&amp;mut self) {
* U( @1 q$ x7 l2 z) d! i        for _ in 0..self.max_workers {
. u4 T3 a+ e8 z  k6 K: l            self.sender.send(Message::ByeBye).unwrap();& f) |6 v7 J9 `+ Y/ }9 l# c
        }
+ `5 l: I9 A$ ?$ w3 H& [5 o        for w in self.workers.iter_mut() {
$ ]5 `2 @, w- \5 P1 B! \1 c            if let Some(t) = w.t.take() {9 q7 Z0 ^. }$ q$ c  x2 z' Q
                t.join().unwrap();
) I  H, f" y4 A. D            }
' T) \$ o3 w" A# v, y        }
, \3 z9 v. s( ]1 n    }( t- j+ X1 ~& p$ E; C
}
3 L6 u% f7 d8 {% D- r0 B& u% {3 g5 B+ q, F3 N
</code></pre>8 G9 U0 k. P, ?* V1 a  ?. G# x4 ]( i; s
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>. E- L) r# y; N& E) |
<pre><code>for w in self.workers.iter_mut() {$ _- y  ~' Q- f- T
    if let Some(t) = w.t.take() {6 V3 L- |7 b/ B5 f4 s( @0 Z& \/ y& ^
        self.sender.send(Message::ByeBye).unwrap();
: N" Q* G' t# ]* v/ h        t.join().unwrap();
8 l/ s( O9 {% y8 b2 }    }
0 _  ?3 @( g! `- D8 B' Q: `}8 F, I2 E8 }. W& _2 w2 [8 p
1 ?+ |7 i, Y1 t% l7 g
</code></pre>
7 i( q  G! N% T# Q+ G<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>$ A0 q  ]" S: V; h( o' ]# a7 `! f
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>) v8 c$ |  f6 f4 J5 _, X: r
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>2 p! R* t& X" @$ `
<ol>
8 h  K8 A8 k. E<li>t.join 需要持有t的所有权</li>
: K5 p+ Q8 h5 N8 n6 `- x<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
& m: V# w% K) A* @</ol>
# _, u% Q4 N/ B6 ?. G<p>这里考虑让Worker持有<code>Option&lt;JoinHandle&lt;()&gt;&gt;</code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>0 w: v$ ?) ?" O* ~- }7 z/ G* ~/ b
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p># y% [  x  c2 p* M1 E3 z* C
<pre><code>struct Worker where
# G# D" o' \2 b* J. u; A! p( D{
  C: V1 Y/ R$ \$ C" c) C$ y    _id: usize,
. t. a+ m( z, w2 b    t: Option&lt;JoinHandle&lt;()&gt;&gt;,. V0 R3 z0 i6 K4 ?5 O
}0 r8 d3 R8 x# T8 T9 o# x. {( l
</code></pre>
7 D1 r" W+ p$ }<h1 id="要点总结">要点总结</h1>- k7 O/ f* `+ A. X
<ul>
3 e; m' o( U: ?<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>9 ~8 C0 |* G( }& R" X" a
<li><code>Vec&lt;Option&lt;T&gt;&gt;</code> 可以解决某些情况下需要T所有权的场景</li>- J: _  t1 ?% k0 t: Y: V( x
</ul>! W  K+ _* j5 D
<h1 id="完整代码">完整代码</h1>
' @/ j8 n: R. h6 F6 W; j( [<pre><code>use std::thread::{self, JoinHandle};. T( f  Z$ @$ a0 N( E+ ^0 T- v& d
use std::sync::{Arc, mpsc, Mutex};
5 V/ N3 J( [8 C* Z4 j( T
& {$ [$ J5 u. ^, K5 k) {4 \( k9 g3 \. v! R$ B+ r. c
type Job = Box&lt;dyn FnOnce() + 'static + Send&gt;;
; r; h* N: n+ O. {- S4 Denum Message {
3 d  r, ^* _3 R) D& `& J0 z' Y    ByeBye,
6 I/ J& n7 _6 {6 h) s2 @8 Z    NewJob(Job),/ {0 S2 x+ A* m5 Y8 c. Y
}4 s7 j+ ]6 ]) _5 W. L6 G
7 U; D' y2 y7 o
struct Worker where1 m# q" M4 s4 k! m4 d5 D: R
{
3 C3 l% z, U6 u) c) q    _id: usize,
" h# w% ]  X; l1 e& e# z5 }) ]    t: Option&lt;JoinHandle&lt;()&gt;&gt;,3 @, X0 L9 ]: O7 n( \; x: l9 J& W: r
}# [' e' N8 t, J# L

3 I2 c3 Z2 L/ P  w1 n- h# aimpl Worker
% v% b# y: ^3 P8 p! e/ o) `{7 u3 J6 M! h2 b3 r1 G5 j0 o& ^3 ^
    fn new(id: usize, receiver: Arc::&lt;Mutex&lt;mpsc::Receiver&lt;Message&gt;&gt;&gt;) -&gt; Worker {
: F/ e0 L! w- t3 k$ ~- O        let t = thread::spawn( move || {
5 `3 t* G( `0 |            loop {% D/ _9 H5 Q3 s' e" |
                let message = receiver.lock().unwrap().recv().unwrap();$ M' d7 b# N6 z, x) g* h6 u
                match message {
( d, Z  [& h/ m& v- X) B                    Message::NewJob(job) =&gt; {
6 ~3 C9 D* V1 \: S' C                        println!("do job from worker[{}]", id);+ f4 {$ f' M& ?6 m7 s$ \# Y& k3 B
                        job();5 W4 I7 X  L' y1 A7 P( m& ]5 f
                    },9 x0 I) m" Z- B# u8 C; M6 k8 q
                    Message::ByeBye =&gt; {
& h! ^* H; T: y9 l! _4 n6 o4 J                        println!("ByeBye from worker[{}]", id);
$ |4 D& P/ c  }. O                        break/ K7 ^6 O, |2 F( F  G- L& P
                    },1 P, \* [. k) R- R# h# G. A$ x1 o
                }  
& e8 v" }! c8 \5 I' V            }
: A: o0 ?% i2 |( a1 W        });
* x( w0 Q1 m+ Y! Q
3 e2 N& H% {9 N3 }1 {        Worker {
* s( Q" A  x$ j( j) H6 t% `            _id: id,! e# t/ z) X! ~3 r+ P" A& N4 }
            t: Some(t),
- |/ }8 v7 W  U2 X. z        }
6 y8 P. N+ B# H7 Z: Z: ]1 X    }) [4 A: l2 h3 X
}
( A3 |% _0 ~/ ~3 g' R( z
1 z+ e$ Z# @* H5 Npub struct Pool {
* V0 T4 x' g# D0 g4 z    workers: Vec&lt;Worker&gt;,7 v7 |$ I$ m# G
    max_workers: usize,1 ?- g5 X% [- y9 c2 ~2 {
    sender: mpsc::Sender&lt;Message&gt;& @) R3 M9 N5 d
}
& U; S/ U# v, o$ U" `9 f+ Q1 p
1 M! S/ T' y- B: Simpl Pool where {( S) J) j8 `2 G' H# Z/ z
    pub fn new(max_workers: usize) -&gt; Pool {; @  ?# b! G4 r* A
        if max_workers == 0 {
; s  Z& r" S( _. s( ]" O, \            panic!("max_workers must be greater than zero!")
! X2 A# V4 z6 W' n/ m  x* V6 r% g        }
3 P1 y* C0 |2 M1 M        let (tx, rx) = mpsc::channel();( d. r+ Z* Z" }7 Z  t- F" q
' F4 s/ Y" G2 ]. `3 i! L7 ?# S3 Z# [
        let mut workers = Vec::with_capacity(max_workers);
* ^- R* U8 u4 z2 Q9 b) n        let receiver = Arc::new(Mutex::new(rx));
- R) T" k8 ^7 w" L$ O        for i in 0..max_workers {
! K  b' J4 y- |; y  M            workers.push(Worker::new(i, Arc::clone(&amp;receiver)));: _5 Y) d2 u$ a% g: E
        }
& T5 h5 U; l( ^$ v
) S6 v' F9 ^% W# C6 n0 F9 ^& s" T7 L3 T        Pool { workers: workers, max_workers: max_workers, sender: tx }3 t% u0 c# x( s  t. D: R# }+ k* v
    }
% U8 I! Z; f- {9 I' N   
5 r& j& O8 \1 k; i    pub fn execute&lt;F&gt;(&amp;self, f:F) where F: FnOnce() + 'static + Send' v6 b/ c" g3 S- l, r; p% O
    {
* ]% l, `7 L9 w4 t9 n% O' F/ \+ [! n
( b: U4 e' q- o& k3 G        let job = Message::NewJob(Box::new(f));  }4 d1 m% Y. G  b
        self.sender.send(job).unwrap();8 k5 Y/ R& \3 G  L2 b' g
    }) }2 I$ J9 h' t  Z% O! y. z5 D
}
3 g, }5 I; u( ?/ Y9 e7 J2 z1 U* p; F5 z+ f" }" m' g! Z
impl Drop for Pool {$ G1 u& C6 u4 B% L( Q' H
    fn drop(&amp;mut self) {# C2 _$ \( @' ^& i7 p: X" B
        for _ in 0..self.max_workers {! K7 }6 g5 p  g' z
            self.sender.send(Message::ByeBye).unwrap();
/ ?6 E$ v7 O& x( l) e' e& W$ v        }8 p. `5 g( ~. x0 ^; B, i% _0 ?7 q
        for w in self.workers {- J9 N8 I, j0 `- e4 m2 j& y
            if let Some(t) = w.t.take() {
2 ^, b# I0 J5 i1 \) j4 }, l1 T1 K                t.join().unwrap();0 d  C! E) ^  V1 ?2 j, m
            }
  G, a1 f3 A3 a: m  T" b8 X        }
, u9 ?2 c2 C# b/ _( X- \    }& y* \5 w- M* N& B( ^
}' N1 |0 h# \/ k' S- d& U6 ]1 I

3 E1 F4 a9 f. B, t! _5 m8 W+ i& o) O
#[cfg(test)]
6 u) }/ U9 `4 m* b# D4 }) k8 Dmod tests {1 n& H+ x$ Y* M5 W9 U  A
    use super::*;
, |- k) Z* h$ h. ~* f, g$ K6 _. Q* T2 x    #[test]
' n3 a" a! y' s0 u7 C2 Z0 S    fn it_works() {
: S+ ~0 }5 @0 `6 C        let p = Pool::new(4);! @; `0 A9 y8 |1 Y5 {; |
        p.execute(|| println!("do new job1"));
4 t/ \( G9 E8 R/ z* L        p.execute(|| println!("do new job2"));: {- x% L4 U0 ?7 u! ^; W
        p.execute(|| println!("do new job3"));4 d' x7 q5 _! d# \
        p.execute(|| println!("do new job4"));% S: }  L4 s) l8 R( i9 F2 R% E. i
    }4 |- D5 K9 M4 M1 U/ N. R+ v1 c
}
% f% y. Y* K, J, c0 _( r1 Y</code></pre># I2 R' _" ^5 E
2 ]  Z. n3 C, J
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-12-15 13:02 , Processed in 0.290931 second(s), 22 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表