|
j! l8 k9 `' b2 v: i2 ^
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>( Q, r6 s- Q4 w/ a: X
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>- R6 F& ~! |6 k2 I% m5 m, a1 g
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
' j5 R& [$ j3 v+ E9 u+ G<p>线程池Pool</p>+ `, o( I7 A7 u" L8 v$ O0 {
<pre><code>pub struct Pool {
; p$ {5 ?0 s2 { T" P+ f) x6 f3 N+ } max_workers: usize, // 定义最大线程数
# v1 j. `$ N" k; t+ s; y- ]$ ^7 r" _" o}; L' n; ^; |* g% s
% w! c/ B9 P: o0 M* x/ n- _# _: w
impl Pool {
+ b: p" T/ }' b: o3 ~6 \ fn new(max_workers: usize) -> Pool {}
! ]3 Y" n* t: {* H+ h. [1 P- i fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}! O4 \. q/ k0 h% A4 L
}8 M. V1 m+ d r. I
( l& p- i# A$ B
</code></pre>0 r3 u! |+ [. ?: f7 a
<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
# N6 o5 i! t* g8 y2 D E<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
+ A1 o/ W9 C7 Q# Z5 g可以看作在一个线程里不断执行获取任务并执行的Worker。</p># k* N4 N$ K h! Z. G$ N6 L
<pre><code>struct Worker where
4 W4 y" @0 R$ e# s{
' E1 \. F& |' O- {9 c _id: usize, // worker 编号
& ~' y5 p+ ?) a. W}
& ~9 `7 _& ]/ T4 p* \# C" H7 X</code></pre>
" q" o ~' N: ~<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
% u9 w2 T% B) n6 Y( m把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
# h5 }/ l9 y& G; t<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
: H1 i. l: m# |9 g# k<p>Pool的完整定义</p>
9 o) ]' x& s9 h5 N! n T* p5 y- I<pre><code>pub struct Pool {
' z. C/ @4 f& {- W: m workers: Vec<Worker>,
0 t! [& i) u( ~) c: F" u: G; I max_workers: usize,
1 l# [& n/ Z S2 y1 }* k sender: mpsc::Sender<Message>
- d& x9 w- V+ `5 |}7 M |6 P$ M" C) Y- G h% @$ p: a
</code></pre>6 y6 q5 H3 Y/ D& d: b
<p>该是时候定义我们要发给Worker的消息Message了<br>5 k8 p% B0 r: z9 C4 w% w. Z
定义如下的枚举值</p>
3 |( ]$ o! s2 b: W<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
1 R. K2 i7 L6 D; {, ^2 M. Uenum Message {& D2 D6 i8 M, i' _$ D" F9 W7 F# f
ByeBye,
4 V8 E9 z+ g6 @% r' ]! f& g9 z NewJob(Job),- L2 K# B1 f9 G* i7 T
}
K1 ]7 O$ U! U5 O' z</code></pre>
4 w; b( D! F9 }1 }5 B0 T8 S) s<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>, r; j r& c5 a, O; Z5 p3 k6 w W2 i
<p>只剩下实现Worker和Pool的具体逻辑了。</p>; ~7 J/ q( H, x i! ^
<p>Worker的实现</p>
" N& Q5 J5 x8 f+ @% R0 [<pre><code>impl Worker
8 D1 P9 |# u8 f{& s% s! K: n' {. U @% x/ i( _8 |
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {) _! P3 ?. `7 c+ d# n
let t = thread::spawn( move || {
" l) M- m5 K" ~/ I1 e# [6 B loop {, f8 P% J$ b4 b8 S
let receiver = receiver.lock().unwrap();
- d; T" z3 }, ~4 B6 b" b1 s let message= receiver.recv().unwrap();) ~3 E2 C1 P, t# M# T, p) {+ L: c
match message {7 F+ M" ?# \( f/ ?8 u8 v; i
Message::NewJob(job) => {( C$ l; ^* f! z( ?
println!("do job from worker[{}]", id);& }" C, p0 s; ?+ u9 b
job();
0 C" `. [! U! W },# f0 J/ g, Y7 z! G3 h" z
Message::ByeBye => {/ O/ X; w" t& X0 d
println!("ByeBye from worker[{}]", id);4 i+ g/ ^5 ~; b: S/ E
break
* f" n! P: J. u- Y },) e1 v. I% K3 Z5 x: B
} 6 y/ \7 V' X' H2 ?4 y
}! b- N b% K; w0 [. B# \' S
});
* J* I# O5 K* q9 T" A: ]( E3 s7 N% q8 ?% I& n6 o
Worker {
# w5 u& i; c' G, |5 H0 L& r _id: id,
9 w: |6 g( h8 f/ a0 R- y% m; G t: Some(t),
& \) K, ?" w4 J& r3 l* P }
1 s' n; d5 m0 K* s( l }1 h5 }% f" T2 ~ O% e: C0 j7 {
}
- d8 p" c5 x# E3 `% `: e</code></pre>
4 M" _! C7 u: e( W5 C<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>6 L9 I& q7 O% D) J" A
但如果写成</p>
, Q# t. m+ K. Y. W7 q. R% b, @<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
) E/ s' @: K( ?* \6 e/ \1 \};8 C- W; B* `$ V
</code></pre>
, o* s( D7 g" r( [0 ~) k<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>( ?6 L5 F. F" N+ ^
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>) Q1 \$ |" w5 }" _3 i6 _2 i
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
7 L/ D3 \2 O+ o; b: G<pre><code>impl Drop for Pool {
: ?1 m. E4 G: F+ a& D fn drop(&mut self) {( {. ]7 ^! t, u& P
for _ in 0..self.max_workers {
) [+ c, C; N, E4 B5 {& y self.sender.send(Message::ByeBye).unwrap();( r* N: ^! E+ c! }
}
4 t: \9 h/ C+ B2 Y# W! w) z9 }/ N for w in self.workers.iter_mut() {/ I) B6 h% J# Y8 E
if let Some(t) = w.t.take() {, z, n# \8 t( X5 F1 z
t.join().unwrap();! n" Q% v* o g' T) h( l
}0 y; \! ?# L9 W0 K% E8 s2 A2 ]
}
' B" g: p x; C, R' k% u( U& C }
' \# D- K, m2 Y1 g# \6 p3 t( `}
7 w- ~1 ]( n# `; Y q: N+ g% g; P0 ?0 }2 x2 Y
</code></pre> ~. z, i9 K4 J9 ?4 a9 j0 S
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>* r! O+ H: G, d/ z9 ^% H; G
<pre><code>for w in self.workers.iter_mut() {0 r! D$ E1 L/ s2 t8 d7 B7 [
if let Some(t) = w.t.take() {
& t2 m: Y2 U3 Q$ b3 x self.sender.send(Message::ByeBye).unwrap();6 h9 h; K& m% ^3 G4 w1 T
t.join().unwrap();# S" L! ~1 S1 a4 D, a5 R2 J( s
}
5 z/ o4 i6 \5 Z, H}
8 `7 `: |0 |/ l+ U6 C/ K m5 V) d
( F, B* s, @: y6 D! Q</code></pre>
" F2 I6 s0 N7 N: F! t; {; ~<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>: n' D- Y# {% e4 A( V# O. d
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
0 `0 b5 x; \; ]* [: H( e" C: C<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
5 `/ H& I) j. Q2 n" F, s# `( Y<ol>
3 q) g. a2 n, E ]4 \. u: v9 g<li>t.join 需要持有t的所有权</li>" s) ? r0 J9 j% @
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>: j( @( c) S4 v6 d) j% J
</ol>: n- d; n" i' f: s
<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
7 y2 R) u) ~1 `+ y) W# P7 ^换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>8 i, f: p1 d2 T9 `6 S
<pre><code>struct Worker where
$ V d( J( O U" _: J* b6 j{$ T. k9 v$ `8 P4 Y9 t( X
_id: usize,, \3 L5 L* }) ^/ x0 E1 q) ^
t: Option<JoinHandle<()>>,! R2 O1 A" [+ y9 J' f' O( t
}
! s |$ O+ K# p8 z</code></pre>
7 H1 ?: D& m0 N) |& z: R3 A: q+ T<h1 id="要点总结">要点总结</h1>
: b8 Q' e& H& L8 E9 k- D, R<ul>
- @ i( d8 q. N<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
) f) P0 b' y, t; G z1 ]<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>) R0 m' Y0 u! S& o# t. A% `5 I8 v
</ul>
3 m) @0 [; Q7 G' V, n. f<h1 id="完整代码">完整代码</h1>! b( U+ b* [% a2 g2 F
<pre><code>use std::thread::{self, JoinHandle};
* Y" j' o7 L' h& huse std::sync::{Arc, mpsc, Mutex};) U$ f5 v$ F9 R6 r
, |' I; w( { D6 d. E, A* R [% j# k. T! _4 e4 y
type Job = Box<dyn FnOnce() + 'static + Send>;! G4 Q1 o5 m* p1 q) s+ U2 V6 o
enum Message {" h" G$ O n8 }
ByeBye,
+ d, u- j4 t, T4 j6 T0 z3 n NewJob(Job),
0 w! U k7 G5 V2 v1 k k}
9 R' C- _& U9 V6 K7 G$ i3 p; q& k3 r
struct Worker where% c/ O. Q+ a3 I7 l m* F& F
{
: |! k$ m+ Y* P- J7 _) t _id: usize,1 I; e) Y- P# |$ ?+ B4 v% ^3 [
t: Option<JoinHandle<()>>,* I# Y! w+ Q' T2 J
}
- F- D0 t2 A# }1 a, K
: l. m6 `: s3 R" Q% f3 I1 N Gimpl Worker$ Q3 ~1 t1 e4 {; k8 S# m
{
; ~; V% d/ V3 `) o fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
: w) M& u: I) c* k" Y7 q1 m5 @ let t = thread::spawn( move || {
) ?: q3 M1 q& [3 \5 T; Y loop {
( |5 C. f# w; Y6 c: G; z6 y let message = receiver.lock().unwrap().recv().unwrap();
' Y0 |% h* D; e7 {0 A# h9 l match message {' T" V7 a/ J$ K* h/ g, x2 r
Message::NewJob(job) => {
1 s: b: o2 l+ \! [) Y$ C println!("do job from worker[{}]", id);
' |3 l0 a8 G( X job();
- v/ ]) v2 A$ b; w },
[( T1 r8 V2 W9 W) o n# w Message::ByeBye => {
( ]" Z* O6 E f! U& I( C println!("ByeBye from worker[{}]", id);4 G' }; X6 K/ C% N" o' P; I! I
break
# j {7 h# M2 @) G; ` },, E& O! h# I& I8 L! I7 Q. K
}
8 Q4 o9 K* {& E# O$ c9 R0 I- v }
. G+ Q; c. t0 V& y* x6 D* g+ A. V( K });
9 s9 v4 D+ b7 A
" b9 Q. A$ N+ H1 o- ^6 x: K Worker {
( T0 b) X6 j0 R5 }, r _id: id,
8 U7 d- f2 [) e& y0 @0 y t: Some(t),9 `& K2 N" T. ^ m7 y
}
8 q& F" n+ F$ x; } }
* w# C- j `( b7 w) m* I5 q f; @}9 B* L, Y. D; V6 P, q4 V C
7 B* T# }1 ?9 l- u* Cpub struct Pool {; o( c3 c# f) E$ s" S0 o
workers: Vec<Worker>,5 I# |2 _$ q; G5 w
max_workers: usize,% g2 v5 Z: m$ q0 a/ G( U9 n
sender: mpsc::Sender<Message>
9 W; e0 Q- J9 g# h1 T$ `6 |}* \4 ~/ k4 a' J" A
1 ^0 ?. [% M! Y& v+ Pimpl Pool where {
. }+ ]0 F) A9 c* Q. w2 n pub fn new(max_workers: usize) -> Pool {1 H( n! L* ]' o4 _
if max_workers == 0 {
+ G! ?2 f. M3 X- r9 J% ]( y% [ panic!("max_workers must be greater than zero!")( o( ~* T v" }- k3 ~+ ?0 G
}1 \" G9 ] f- H% f
let (tx, rx) = mpsc::channel();2 G! F. d W; }+ D6 I8 x* ^
; n$ E" ~: ^4 \1 l; M% x3 H C let mut workers = Vec::with_capacity(max_workers);8 `7 i* `8 U( x
let receiver = Arc::new(Mutex::new(rx));
( b/ S3 L8 C# |# |, d6 l* T! ^7 P ` for i in 0..max_workers {. g9 E# m+ k+ n$ {& L1 p% b, I3 U
workers.push(Worker::new(i, Arc::clone(&receiver)));
+ t, _& @9 m- l2 N$ G }, @% k) |8 S+ B0 I
6 K( R1 H _! r M
Pool { workers: workers, max_workers: max_workers, sender: tx }# S R& V" y$ W2 Q' N, A: Q
}4 H/ N+ y2 B6 W: G6 M4 C8 o1 j, S
1 s; g% T/ d- p' b; N7 Q0 S. X1 g( o
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send7 `+ ~. I/ c p$ B: O- S5 W7 E
{
9 I. {! V' A8 O; B) a, e
# L( P* e8 l! Z+ | O9 [# X( x; m let job = Message::NewJob(Box::new(f));0 A1 r) q# `4 u
self.sender.send(job).unwrap();. ]) h) u; U: k1 t/ B9 r# f/ _) t
}
0 \4 P: r3 k. b s7 G: o}
, q. \% h9 Z/ G: q* ?3 b/ _* r5 a5 @& y- r6 O
impl Drop for Pool {: U: ~7 H: v+ \9 r( Q
fn drop(&mut self) {
3 G5 B$ |! N; ^% x for _ in 0..self.max_workers {
`4 Y) C0 v3 O8 _ self.sender.send(Message::ByeBye).unwrap();( ~+ m( \& y& n" J. O- R4 A
}: N+ m* {' t- X+ N
for w in self.workers {
: D& X% ~+ j5 h* T; H if let Some(t) = w.t.take() {; \% o. M' B1 ?" ^) p
t.join().unwrap();. [+ x8 y5 @/ C5 b# c1 p
}" D5 q7 `+ x* e G, F
}
4 r6 S% ^2 R' B/ ~/ w: \2 U3 _ }' G p% k" \- `8 a# I. y6 H) ~) z
}
( h7 f6 f: Z* a- `1 i% @0 P# `
1 N# N% \6 `+ ?9 l, p, g6 B, v! R: m% u
#[cfg(test)]
* ~) t1 B- @4 C% |. |" J/ }mod tests {
}( E4 [1 e- w5 S use super::*;
+ l) t4 i1 A T) s( a- R9 \ #[test]# w2 ~2 u4 N, y7 b; l( X- u7 H
fn it_works() {
6 k) M$ z5 X7 I ~. l9 K) F let p = Pool::new(4);8 y D. x' E% W$ C# C
p.execute(|| println!("do new job1"));" w( S* Z# ?7 X! _6 `
p.execute(|| println!("do new job2"));# I% f S# S* |& e1 M
p.execute(|| println!("do new job3"));
8 ]( w3 @0 K4 ?5 ^/ U p.execute(|| println!("do new job4"));5 R# ?' H* _6 [) t3 e3 l- w) ~! k
}& a& Z* A3 B N: e Z* v6 r9 b' D
}7 y5 D* {# k' |! ~5 Y
</code></pre>
7 \8 m$ a. ~0 L# y2 v# n& o' f& R2 `6 ]& R# O% ?' N
|
|