|
|
: Y# E& x' J3 C5 _- P
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
+ F- d$ U1 N5 D9 F! a) n<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>! [$ e6 ]+ w! Y" V
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>- \2 \; P0 J h; \
<p>线程池Pool</p>. Y! ?! Q+ y2 P3 n) Z
<pre><code>pub struct Pool {7 r2 t$ A& m2 O& x$ b
max_workers: usize, // 定义最大线程数4 t* E0 j9 B6 Y- u0 {/ B1 ~
}
{7 Y( B6 B+ X) N0 D7 m2 A
. m+ F9 @0 C" H* Ximpl Pool {2 j& L" T. W- W, [
fn new(max_workers: usize) -> Pool {}
. ?+ u, z! D; e9 x fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
a6 f' ?2 W9 O1 m}8 d7 A7 g8 ]7 ~; i
* l; j5 k" n, c8 Q: z5 f/ ?$ V
</code></pre>
" I! M7 R9 w. P, u7 d& |4 l<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
1 m8 x3 c4 `# {' Y- b* i<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>5 U$ B3 r/ `/ I$ W
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>( N8 `6 j6 ^& i
<pre><code>struct Worker where
% G0 }$ ^( ~" F; L4 c; G$ I{
$ T, R& F0 h# K* o _id: usize, // worker 编号 A2 q! l" D3 ^8 }" A
}
+ r& W! e! d1 `</code></pre>0 ^; M) a" m' L8 |7 k0 Z
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>( v8 N- q( X7 I/ i& [% _0 z6 ]/ u
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>/ V9 u& k/ L/ ^" A0 o
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
' i$ Q0 \ `1 R3 p<p>Pool的完整定义</p>
1 e% q( T) u- Y/ ]1 X( B) d<pre><code>pub struct Pool {
, X$ J9 x4 D& H; ? workers: Vec<Worker>,% S) a9 c6 P8 P f' p
max_workers: usize,
, r3 i" f% J, {& x sender: mpsc::Sender<Message>
$ h/ R# V* Z* T, R6 X7 I}! E* e2 i+ E& K% O) Z3 x/ ~
</code></pre>
$ b3 J& N( V7 P1 `0 }% D# O3 E<p>该是时候定义我们要发给Worker的消息Message了<br>2 A& L! m1 T" G$ @% A
定义如下的枚举值</p>( x: O' K8 O' K! B/ W* j a
<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;1 p# s$ b# P/ z
enum Message {: a7 r# T7 H/ C, O5 z- A4 a5 \
ByeBye,
7 Q1 K" W! j6 {& l NewJob(Job),
% G+ H0 G0 |4 l Y}4 M1 n i, u4 r4 v. w$ c
</code></pre>/ U1 C( d, t# w/ n4 R/ I
<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>
4 m3 h, g/ K c% n<p>只剩下实现Worker和Pool的具体逻辑了。</p>
. @( w e i! d7 A. q' i<p>Worker的实现</p>
! k4 w' G" [% v' I1 g8 `- ~, K/ c<pre><code>impl Worker
- A6 p8 M7 O4 a4 Q' V7 }; L{
* W2 |; U3 w3 G$ ] H& r+ O fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
: }, q: R! J: o9 } _8 p let t = thread::spawn( move || {- ?) T5 P6 |/ [1 A% T9 @
loop {
9 I+ W- R7 P% D8 h" \ let receiver = receiver.lock().unwrap();
$ b" O& [% X! [/ [6 Y4 n* [' P let message= receiver.recv().unwrap();
5 P7 {' a, z& K, \: w0 n match message {' s; o5 l2 H) |9 Z; c5 Z
Message::NewJob(job) => {
: V5 _. L3 ~" x$ J% | println!("do job from worker[{}]", id);0 [4 p- I6 I/ O7 h- V
job();
1 t4 d3 f0 i7 R I% l6 n },
/ @ h5 e) c* ^4 z* N! t5 ] Message::ByeBye => {
# t0 [6 M* S, A; W& z println!("ByeBye from worker[{}]", id);
/ c! W1 F( S5 |1 g! h) o6 l7 w break
! o# |8 R9 l( h) [/ a, ?4 c3 Z },
3 x, \5 X+ h3 N7 `9 }5 i/ k. f } & @: |1 \) r# F2 _
}
! g- h0 C+ W5 M });$ p; X1 t2 t, _0 _) D, u
. {' S1 X5 K! H Worker {
/ H0 O+ \0 z" D+ o/ A _id: id,
; D3 x) e0 g2 z4 z: ~6 i t: Some(t),
V( K+ ~2 @5 L% h% i }* F v1 O, Y7 } J4 C3 J7 m( i& G
}3 w" v) F; c. k. H8 X0 w* w/ f7 ?
}; i% i0 n' _6 m" O
</code></pre>
) U1 q9 ^$ j( v" t, \0 d! i<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
* H3 n* X/ A# p# H9 v0 W5 ` p( Z但如果写成</p>
6 S$ Y3 E% |6 B, v( l0 @$ w% J: u<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
0 v) Y* v( f. {$ d+ u% Q; r6 u T};
" p% O* v7 u, L5 T# b! z; f</code></pre>
6 U6 p3 t1 t% Q+ [8 i, v0 f<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>) Y! E9 ]+ {. \: ?8 l
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>& b5 c. Y z5 i$ b
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>: o) j4 O. }. \
<pre><code>impl Drop for Pool {7 g# J. W8 X5 C1 E
fn drop(&mut self) {& s0 E' `5 v# y) A. u$ h4 ?
for _ in 0..self.max_workers {/ a) V+ t4 I/ s& E+ w# i
self.sender.send(Message::ByeBye).unwrap();
: E* A7 n5 t: f9 O }& ^/ f* |3 g2 n+ n
for w in self.workers.iter_mut() {! ]+ |8 d$ ` B2 [$ j( n
if let Some(t) = w.t.take() {
, o* d* M) f& o! D4 g/ h t.join().unwrap();4 ?( h' g, {" s F- V
}
, S. J9 b6 `8 ^: j }
8 m4 q1 Y/ m4 q/ K }
4 z7 a% q9 G0 Z, a3 ?0 v* M}
# J+ w5 j$ ~( x7 m8 C5 V* I; c D7 m( P i# ~3 Q8 [& ?
</code></pre>! m0 L3 _3 M* c1 q1 }) I/ w T
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
, d0 V f: c# g3 v<pre><code>for w in self.workers.iter_mut() {
) b) a* g5 Y1 C' d4 L if let Some(t) = w.t.take() {, d* E4 s! l8 g6 q3 }0 A% r; c
self.sender.send(Message::ByeBye).unwrap();
; a$ S( U! W& t. F" P t.join().unwrap();
; T: i5 s: i: L+ L& E, [ } ~8 Y5 Q" [- W* f3 A4 Q$ X
}
5 O0 a. F, L) S
3 l: _; e7 [0 R. H4 p: o</code></pre>
. j1 ]& _2 W# b Y2 i/ ^/ @<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>1 [5 E, j3 K6 J% B0 o
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>4 d2 B9 }/ I. {$ _. k
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
' f# f2 E: O5 J( [<ol>
. I0 V# ^9 w1 [% X: I- j/ }<li>t.join 需要持有t的所有权</li>
8 ?8 c' ]5 z( Z<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
8 p# X8 x: \1 ]; _3 n1 d</ol>
% Y9 x1 M4 G, q7 E* {& A<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>3 h/ ~% N. |2 ^. ^7 X. f
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>. V) }; H2 C! Z# Q3 j1 i" Z
<pre><code>struct Worker where
" x& w' n3 n2 H k. u- E{5 q; m! B7 ?1 S# u# S N* Q
_id: usize,7 G. n/ |3 _/ c( t4 n
t: Option<JoinHandle<()>>,+ N( A$ w; ^, d9 x
}7 u: h9 X# L' I0 q
</code></pre>
* L4 `8 a2 ^8 f! t. k<h1 id="要点总结">要点总结</h1>
) y; Q2 ~: C( I: D8 p% }/ n/ h! U<ul>
# T+ F4 T, q0 l/ P* ?6 m, H6 f& N<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
, r7 P" y# A: V- q0 u- D6 t2 ~<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
% [, P7 B6 L) f/ x" }- u</ul>
# Y9 G3 d. e1 v/ \) q2 A<h1 id="完整代码">完整代码</h1>$ k. F1 v p8 W
<pre><code>use std::thread::{self, JoinHandle};
3 j: x: t) |! O8 E5 J- kuse std::sync::{Arc, mpsc, Mutex};
- C8 e: i: P# y: [3 u. g e% n% t4 ?- n, y
. G# m# ?9 a& k
type Job = Box<dyn FnOnce() + 'static + Send>;
0 P* R* B! _! s2 venum Message {
# L! ?+ \5 b( l ByeBye,
& }1 k2 t, D5 L' J/ v NewJob(Job),# J0 P, l" l9 {" n
}
! K! x( P( f, ]' y! H# z
+ ~. e. k% r6 C5 \struct Worker where
# R& D* p2 h/ [2 D. H; d; f& h{3 i& L& S/ j$ }1 b
_id: usize,5 }1 ?' i# e9 J
t: Option<JoinHandle<()>>," }1 _* G$ d7 W1 T( f' m9 w9 e
}
; w z& y( D; o3 D2 b
2 }2 N7 l9 K/ [4 X6 m7 O% |impl Worker
" N O0 b- s" b6 ?{
& j0 \. _& ]3 ?' D: { fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {9 F# M" [/ Z* u( s: W- @) l
let t = thread::spawn( move || {7 t5 P/ }+ N, Z- h
loop {. X- u' ?4 }' z" k& p
let message = receiver.lock().unwrap().recv().unwrap();
" ?1 D# j2 N: t8 l6 ]/ w' a match message {8 Q4 s4 v$ g2 e2 M
Message::NewJob(job) => {& H% ~( a0 C A/ i7 Q
println!("do job from worker[{}]", id);
8 P- t, |% ^+ O' c: | ^$ x job();
3 O' }3 P( ^' R+ @. ]2 @% G },
6 Y' D/ r- d" y# m& [ c Message::ByeBye => {
2 }( g" k3 q/ a& _, e println!("ByeBye from worker[{}]", id);4 f6 x: w) t( V1 v( p
break! o! q, q6 F; ^4 F, g! f
},
" @4 w! u% u! ~4 |. y; @ }
7 {% O4 Y `2 P+ ?, Q f k9 `# p }
1 B, |- e! s* ^; }* h! L }); ?. V6 K6 p B! [6 V) ~ {
' s# \5 G3 Y8 f+ E/ ?( ^' c p Worker {/ f; W; s3 `; G5 w
_id: id,' A' T/ \3 q3 z1 f1 e& v4 p
t: Some(t),. M8 d+ L @4 T' E
}
5 M& j$ f7 O. Z" @. y4 } }
& k* |+ k x* [}
3 ]4 g9 F+ q* ?& U6 y2 f* O
7 P& l3 A: T. i0 Vpub struct Pool {$ q: A" E" k& B) G( W1 V% k' n
workers: Vec<Worker>,
* l1 ~2 y, g1 Y" G1 E$ b8 ` max_workers: usize,
* `7 c% U9 G9 V1 Y$ ] sender: mpsc::Sender<Message>" }" @6 k* m9 D+ f5 l2 H
}
0 c8 U" n$ \ g3 s
8 [# g) V S- R0 P( O5 yimpl Pool where {
. p2 Q+ J7 A2 S( ]! I) b pub fn new(max_workers: usize) -> Pool {- p& h4 L' y' o
if max_workers == 0 {
( h1 \9 c! u0 n3 d5 l panic!("max_workers must be greater than zero!")7 c6 a+ p0 s, E
}
/ c( a) s2 s; N let (tx, rx) = mpsc::channel();
8 W" z+ z) F: U, _
! r6 n8 }/ T; K+ A$ ?# s let mut workers = Vec::with_capacity(max_workers);% @5 G- E' O. ^4 ?& H! u9 h; \
let receiver = Arc::new(Mutex::new(rx));
/ a4 U/ Q& e5 K; K5 S( B+ [ for i in 0..max_workers {
8 {/ T% N. ^, J- |& q) \3 [3 J workers.push(Worker::new(i, Arc::clone(&receiver)));
0 ^+ a; `3 s9 \4 P* M6 G/ @2 M }, o1 w/ q$ ]3 |/ r; F1 M
2 }- X' \8 p) S: V Pool { workers: workers, max_workers: max_workers, sender: tx }
9 M- ~9 {% H; n$ m }
) U# f8 K, g, X! d, x8 U6 \
& c1 E8 M: ^+ ?% ` pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send* G8 ~6 k3 t( E. d1 c( z% a3 M) {
{
0 a7 |' O; A5 ]& n
4 Y1 e n; ]3 e% a" Z let job = Message::NewJob(Box::new(f));
" @" g4 ]7 i- q: d self.sender.send(job).unwrap();
% Z. v$ m7 _; ?- q. g }% y# I3 E6 w$ {9 ~ S b
}- d0 f5 G7 O1 D. S
9 q" n! X/ I$ ~" |( \' r6 L/ D( ~impl Drop for Pool {
) i1 |# W" W u" r8 u fn drop(&mut self) {
/ J1 B. i. J) S' G# b2 p- L' F# r for _ in 0..self.max_workers {
: s7 R# G, @1 Z5 r self.sender.send(Message::ByeBye).unwrap();5 N# U/ r# ^+ C
}
( W; j' x- ]6 {$ f6 c: F4 x4 n- u for w in self.workers {
X5 X4 d1 D% m# l if let Some(t) = w.t.take() {
* m0 o( B0 t. C: Q% m t.join().unwrap();
4 G$ i# g" H* ~! i2 q }" m" O+ q& p& \) O# T: b1 [/ A
}
6 X% H* V o0 [& V4 _# `1 x1 K: c }
& [! m$ P! c8 K% o. n}% U0 J5 S0 a; f' B
5 `# l; m; S0 m9 B9 g
+ K1 q4 ~' b4 h# ~( P9 T#[cfg(test)]
& l. Q1 }, C) w" b+ P% Qmod tests {
3 h. N6 f% d1 H. B8 O ~3 W use super::*;4 C* }0 q5 v+ W! c
#[test]
5 E4 B3 Y7 u% f5 w+ u: L; Y2 F4 v) C fn it_works() {
9 e: |' E# L3 w( C+ H" L let p = Pool::new(4);% V" v1 X1 h. M' W
p.execute(|| println!("do new job1"));
5 V8 E9 H) n/ Q/ l& E+ n3 Y p.execute(|| println!("do new job2"));
8 |4 O9 g: S/ T7 u1 L" r( Z p.execute(|| println!("do new job3"));
( n* l; z# t' P, u3 t, ~4 q p.execute(|| println!("do new job4"));
+ I3 ^* N- E* i4 i. Q( q: f; _ }
}. `. W0 y9 I! w}
: X" v9 k* j9 ]/ t( ?</code></pre>& h& m! u( r6 z+ q% j8 A
6 C) E7 g' d8 p9 I2 G7 k# N# l' @
|
|