|
% I0 v, r1 ^% L c<h1 id="如何实现一个线程池">如何实现一个线程池</h1>! @" e* E+ c& R* N* ~
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>% [. n: U% R8 T, D( b
<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
# \) f+ f1 b4 f5 p* l0 f<p>线程池Pool</p>; o9 G5 K% `, v! i8 L4 ^- | z
<pre><code>pub struct Pool {
2 {4 E8 B) E1 J8 n M6 | max_workers: usize, // 定义最大线程数
4 u7 j n4 R" ]; N}
* N, B) ~' C- Y" m5 n1 g! d
) ~; o3 s& {1 n3 T, R3 \impl Pool {
5 d! X* M& ~. W% Z% h' ^ t& H fn new(max_workers: usize) -> Pool {}
0 k& m) U* w/ { fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
; s K! K" V; B/ G6 j" d}
\7 x9 D& o4 g; A( J; {$ }3 b
, G, }5 ]' T; I. a3 J</code></pre>
+ v; z2 y4 N" m5 ^% j( A8 P<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p>
! n' ]* i' |- ]- n<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
# u0 A+ ]$ Y% y' j$ A; L6 x5 |可以看作在一个线程里不断执行获取任务并执行的Worker。</p> n5 L' x. N/ |! d$ k# W! R9 J
<pre><code>struct Worker where
1 Z- r7 }2 v `& ^{
6 ]9 m9 i" Q# P/ R1 | _id: usize, // worker 编号1 ~9 E U6 ]8 @& C6 a- _9 n5 w
}
* A) i$ k$ {& I' x8 y+ P</code></pre>
; s3 j% e! u" Q. I<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
3 e( g* k% [! _把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
. a9 X1 v6 P2 F<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>
* c$ {8 G, E# O- `2 m8 j, [<p>Pool的完整定义</p>
8 u! W# P4 j: v) a* `; i<pre><code>pub struct Pool {# |3 V8 n$ Q; n! D
workers: Vec<Worker>, U0 k' {$ E" U5 C
max_workers: usize,
- }7 o* R$ D) s9 H- p6 w/ H sender: mpsc::Sender<Message>1 u- ~5 O: l1 L' R' Q) E. B9 i
}% |- J1 s5 r0 ]: D% ^7 o
</code></pre>
# S; t* S4 _" W/ S<p>该是时候定义我们要发给Worker的消息Message了<br>
8 a7 m+ P3 R1 s" }定义如下的枚举值</p>
8 O9 p, Y. o/ L1 T* x<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;* T) o' R& C' W; J
enum Message {
8 \4 n+ f7 r: r! O3 ^8 O ByeBye,
& S9 x' @1 H* a: ] s- v NewJob(Job),. \+ |* z# c5 B! M2 J9 t+ B
}$ B7 K4 }) {! y) J
</code></pre>
( R; }: D/ V; w3 k- ]$ x<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>7 e( R' J+ ~9 n a+ W+ @) P
<p>只剩下实现Worker和Pool的具体逻辑了。</p>3 e3 Y( j% [7 s9 D0 f
<p>Worker的实现</p>
! k X2 ?1 `3 V) G1 k<pre><code>impl Worker5 I/ q, ?+ a9 l9 N+ n
{# p4 S+ ]' {+ [, T3 h
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
9 n6 w9 q U: ^! W% [4 Q let t = thread::spawn( move || {! @6 D9 E$ l! _* k F
loop {
) A K% j2 |8 Q; S let receiver = receiver.lock().unwrap();
- u" }: h D! [8 s" r let message= receiver.recv().unwrap();
0 u6 Q$ |+ d. F3 C { match message {6 @1 [4 r" P9 s c4 H
Message::NewJob(job) => {
# n `1 q* C7 L9 B5 a println!("do job from worker[{}]", id);
0 I d5 A0 P6 @ job();
' g4 M9 t0 b+ h2 g- n; e. z9 A4 @ },9 Z/ X# L# ~" w+ u; { _; q
Message::ByeBye => {: ~1 V2 t4 f, N
println!("ByeBye from worker[{}]", id);0 R* ?& e! Q0 _8 u8 C
break
7 Q' k) |# ~( o0 A2 S% U% R/ p },
" ^, |! e7 C. |- E( c4 G }
/ [& v- O6 w! k8 _& t }
( a# M! W B0 w4 M! c });9 l& _1 v. T# Y0 F5 f
0 p# W2 b0 P" H
Worker {
, d) F8 V: j4 r _id: id,& S2 S. Z4 c5 P# |/ N8 T( T. X
t: Some(t),
0 F# _" S# X/ n; Y }$ J( t5 |5 V. n/ K
}
( m$ s) P) {1 Q) m" R- p) U}
9 }* D) }7 c0 P9 [ w6 @' X</code></pre># e$ s- U4 J+ j: ] z {
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>( a' Q Q2 I! T4 D8 A. t5 D
但如果写成</p>3 ~% l. }3 x% o V& [! W2 Z* N
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
9 n+ y. o; R0 i1 z};* P# |" [ r" U* @/ _
</code></pre>) C4 G1 E$ Q. Y* u
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>, @" {$ G1 W2 q/ M! N# {% h
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>
) \( x2 \: z4 r2 d<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
$ X; B( J# n8 F<pre><code>impl Drop for Pool {0 O4 W+ e( s3 L+ A" J4 G4 M
fn drop(&mut self) {
1 C/ T) \9 \1 L3 R* C; s( P for _ in 0..self.max_workers {. B5 q' F: h0 ?3 B7 }
self.sender.send(Message::ByeBye).unwrap();
( }3 s% l) p0 S" Y2 l2 ] }# F- e, A# S3 r
for w in self.workers.iter_mut() { ~; ?8 F, y, o% x( n
if let Some(t) = w.t.take() {
- Z }' L, L9 b9 u# I t.join().unwrap();
. B" H- B! \! W; N5 D }
/ n& l+ N! L$ V0 y% ^$ S4 `$ O }6 N& s0 }7 ]0 O4 K: _) ~+ i# L
}+ g+ v0 b5 f8 G- j1 `( p7 t6 N' M2 S
}
5 b( }# ~: R% `
5 z! r0 R) `* \5 ?4 x: E</code></pre>
6 P& ~3 D$ u* ^7 W7 B7 W7 J x<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p># H/ L0 \9 X9 b. `
<pre><code>for w in self.workers.iter_mut() {
3 _8 s. T R4 N( E if let Some(t) = w.t.take() {
/ \- |4 Q3 s! z$ f5 v self.sender.send(Message::ByeBye).unwrap();
h8 V1 m- r0 ]0 i5 U( ?+ [ t.join().unwrap();
" S- a9 R1 u; X) E5 I8 f) b6 g }
" I" i3 Z* [) n3 \- D}
2 |/ l9 S: I6 J8 Y" o1 Z
$ u" {# l" {7 d8 v3 `2 {+ ?$ a" N# a</code></pre>- g# g; H Z% T" q4 A
<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>+ T6 [/ d+ s. q# p9 ~7 j
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>8 i. \- G/ l7 w6 `5 X
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
- `1 q3 V$ g# v4 Q7 G/ E<ol>
) v& R, H9 w9 b2 a/ K$ L: J<li>t.join 需要持有t的所有权</li>
# e2 ^9 c `+ V<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>7 x6 Y, X' b5 [4 L* T
</ol>
7 v" F3 G$ ]7 g3 n3 | s! y. _- M<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
, }, d0 m! Q( z5 t换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
5 w4 e! E; G, u2 \- R<pre><code>struct Worker where
) |% I2 f9 r! n: _: f+ ~{3 {$ ]7 J: C' h4 O7 d
_id: usize, ` r8 E. R8 `$ _# T8 [4 a
t: Option<JoinHandle<()>>,
+ i* O% Q, F* @; v; a" g) U! R}/ E$ J5 H+ C( F9 X# l- O
</code></pre>0 b- T+ {" G2 v6 E3 o7 M6 c
<h1 id="要点总结">要点总结</h1>( e* H$ X! x, p/ [2 z$ X2 d
<ul># Q7 V! G% p9 Q: c% j
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
* b( ^% O. O! S" D- X, R<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
% A9 U9 t& h2 K* b" Q+ y</ul>
( h& g# _/ R( {- B<h1 id="完整代码">完整代码</h1>
- w6 Z4 ]& e5 G; Z+ w) V<pre><code>use std::thread::{self, JoinHandle};
! |4 s+ y: Y( w& c! R" a9 iuse std::sync::{Arc, mpsc, Mutex};
2 p/ F1 s$ y5 E6 S P# q6 J
) t# i) N9 C8 }1 B" G, j& M4 W$ R E* B, Q
type Job = Box<dyn FnOnce() + 'static + Send>;
( f: i2 c X# h9 Oenum Message {
. j, r ]) {0 a! ~ n# U2 p! T ByeBye,
7 i3 v' }2 Y0 Q3 }' s+ k, T NewJob(Job),* Y+ q/ S5 |0 M5 \' @/ ~0 L
}$ q/ s3 a8 f3 |3 \2 J6 c
; ^9 Z5 ?6 o& Mstruct Worker where* R5 t7 r$ o" C& H
{
f: u) a. s$ C! d* l# Z7 P! w _id: usize,0 C, _! m5 H! V7 h
t: Option<JoinHandle<()>>,; [7 i5 B0 }; K' y; V
}
- A4 w8 c0 M1 d( s" Y
; G1 u% _! k+ v7 N$ c6 H6 \impl Worker- [$ H1 q" |2 `' U' Y* u
{
% u' S) n3 z9 Z* q) [& M$ g1 r" m! p fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
# m; o i, [8 ~' q# K& I let t = thread::spawn( move || {
" c: E6 l" F. `0 o l6 ?! t! R' x loop {
2 B- }( Q+ {0 p# x; W: a! b2 [ let message = receiver.lock().unwrap().recv().unwrap();
7 N' U& x+ |) a match message {/ s5 C, N. B1 Y" I0 k9 e! i. ^1 c
Message::NewJob(job) => {
( |' L* w3 B5 b% p( ]4 d println!("do job from worker[{}]", id);! y7 {) U l* g
job();
) e8 M! ^. G( H4 M, O },
3 Y, `0 F* O( m: F8 k Message::ByeBye => {) [: x2 y1 f% R2 y
println!("ByeBye from worker[{}]", id);) _& B$ Y/ v$ W) E2 m
break( C5 T" [# b# L1 K6 m6 o7 [
},4 U( m5 H9 Z2 J% K' H
} & S9 {" A9 O9 D/ a. C6 I& K+ S/ n
}
8 c# D- U' c: ?5 b- P+ ^. ~& z });
, }, X5 [5 ~' A- d& t$ ?
( {% ~& q8 `1 D8 B1 Y Worker {
' L! Y% ^' r- o. `$ c1 n" G& w5 N1 x _id: id,- V" D5 \ l' H: H# U
t: Some(t),2 ~$ `- i: H3 \; j6 f
}
* g: T7 r' o8 M. H, ^ }
; v' m* m$ t8 Y! C7 A) G; w* j}
& I/ d0 Q5 b' \
9 n/ a( s. f5 p1 O7 J/ gpub struct Pool {
4 W& b! d- C' n6 h z' J5 S U workers: Vec<Worker>,
% K' N9 o3 I0 Y* `; q/ U max_workers: usize,% `: R. L6 [0 _0 I# ~
sender: mpsc::Sender<Message>3 M* b- p7 }7 ^/ I5 j4 Q2 T( @) D( d
}
% T' y1 u0 {' C8 R ~
/ f0 C0 x! C1 j/ Fimpl Pool where {' s( V1 K0 G/ h/ t( ~
pub fn new(max_workers: usize) -> Pool {6 S. w! Y) \/ j' h& z. q
if max_workers == 0 {
" g8 L( [, b; A& S panic!("max_workers must be greater than zero!")# w$ M( k4 G: w: E" Z
}: v( z8 }8 M. s* [* ^
let (tx, rx) = mpsc::channel();
@) z ?9 ^! a) r, [9 r( H& }# Q' g4 A. a0 ]' I4 p& C
let mut workers = Vec::with_capacity(max_workers);7 l$ z9 u+ Z1 X8 U
let receiver = Arc::new(Mutex::new(rx));0 u) b6 @& M/ @) f4 w
for i in 0..max_workers {3 [1 j; |, h1 D' G, ]$ V* }4 n% R
workers.push(Worker::new(i, Arc::clone(&receiver)));
# `1 J0 y( S* E5 x }: L$ f' T- j4 {, j
5 f$ N5 r; m3 c Z2 f
Pool { workers: workers, max_workers: max_workers, sender: tx }! h% ?0 m8 j3 |' z2 B
}8 \$ ]4 p2 ^& x8 ]
0 u) t) v) \0 J* O0 j, @
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
' n& s3 z( y3 {9 Y8 Y {
o( d/ K) W9 B! D5 P) a; ^% r* M. Q6 L& r+ k# C: Y8 ?
let job = Message::NewJob(Box::new(f));
; v* N; u( ~% u self.sender.send(job).unwrap();
K V" x7 u: n0 b, J! k }' |5 D g! v9 N/ p, y8 i! [
}" P' t/ Q/ Z) |: j
' A4 m* f6 i* K# } cimpl Drop for Pool {
- G/ C! E9 d' I( t' q fn drop(&mut self) {4 Y0 Z2 `0 D& A3 x) C
for _ in 0..self.max_workers {
& R1 D( k3 y" w& Z( a self.sender.send(Message::ByeBye).unwrap();' p# y7 K) r2 W" F3 ]
}8 O) {2 g! W8 b
for w in self.workers {$ O7 C& X' @6 G* @ l5 N
if let Some(t) = w.t.take() {( y7 D7 r5 \% @6 U4 ^0 Y7 h
t.join().unwrap();' {# O6 k0 u" e m$ s) c. W
}0 K% c8 {, S2 K4 v/ Z
}
% q3 n+ o' [/ n9 \ }# f+ ~- C8 G2 F. x n# @1 g
} F5 W3 n$ |. y' Q. d7 [
0 `" [* ^4 ]$ P9 p% C' Z5 x
% u! q6 F0 M, M, S& D2 v
#[cfg(test)]
$ Q( i- E3 r' G& p0 I+ Z" rmod tests {' s% c0 R. ~1 u& J
use super::*;
' C/ N1 X! X& { #[test]& V. m) G; g. g! m3 d2 G
fn it_works() {- m% `7 J& @9 d
let p = Pool::new(4);
7 y+ ?" c6 Q$ F& d- E p.execute(|| println!("do new job1"));2 b1 R% B3 [! O, w4 g7 Z0 P
p.execute(|| println!("do new job2"));" q% G! w1 [" n. E$ W
p.execute(|| println!("do new job3"));
0 U; J; F4 K7 K, n% j p.execute(|| println!("do new job4"));' g1 I% ?+ F6 q. V! d. r' p
}
( `: q& R. ]" d% j& q8 Y}$ }. U. C9 E9 l/ l2 B, |: f
</code></pre>4 W5 ?% K5 G5 h% `* M
* f9 z$ A$ r( v' e) A( Q- V' @& Q |
|