|
|
- A0 |) o; d# S) J3 @<h1 id="如何实现一个线程池">如何实现一个线程池</h1>
{/ E" M/ f( z2 W6 [' Z& {+ q<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
: Q5 g2 f; @5 k- D5 _<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
% I5 t% R7 E) A* e/ j* x0 {0 c<p>线程池Pool</p>
! }3 R" `& M1 m. V$ b<pre><code>pub struct Pool {0 k* ]% O' D+ U" h' y3 r
max_workers: usize, // 定义最大线程数( F w4 B8 _( C4 B5 p
}% ^& g% d% f" j% t: N8 z
{# `1 X2 J" J, vimpl Pool {
4 F8 W2 ?! f6 F/ Y5 W fn new(max_workers: usize) -> Pool {}" T* g) u) Q/ G5 H' ~. G" I
fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}
( N* ~. H: m, D0 g, P}9 A* `( M; O1 e! Z4 E2 U
, w, y4 R/ u' O2 s) H1 m
</code></pre>
8 H+ a# V+ v" T6 ]' d+ {. o; G<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p># p. Z( D/ C0 C8 ~# H @
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>
& C4 B! {+ u$ ?' ^5 O# X, f可以看作在一个线程里不断执行获取任务并执行的Worker。</p>
% I8 e& d1 W, j7 f<pre><code>struct Worker where
. n+ T, @8 o1 {. q% `4 i4 Y1 c4 X{6 h1 K# L! z1 E6 L
_id: usize, // worker 编号
! f5 L1 _: a6 }7 h# X! e2 ^}, F B2 T$ j6 u
</code></pre>- ~# [) }5 `: l* u9 x* B. F
<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>
, b- V% W* z, ]" B把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>; X P; M/ t4 @ c
<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>- o! w2 g$ x. y3 j
<p>Pool的完整定义</p>5 j6 u0 T' P% x7 H5 O
<pre><code>pub struct Pool {
5 I/ _! q7 f8 Y8 A1 b$ t+ X. K workers: Vec<Worker>,9 p2 H8 Y: X$ b3 t& |& _6 Y
max_workers: usize,2 N6 ~0 t( Z' ]/ ^8 `8 P
sender: mpsc::Sender<Message>
+ f. G/ T0 q5 z; Z' T}
v4 w- `: z. G1 l</code></pre>8 [9 ^! C% @1 q: h* k. W
<p>该是时候定义我们要发给Worker的消息Message了<br>; `6 N0 T% k* I0 M
定义如下的枚举值</p>
( X, X* x9 V- H2 B; N$ \/ w% K<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;
. q) I+ b% J/ Z- ~! ^9 a, Q" uenum Message {
; J& o& f: r: Y& t, W! C; x ByeBye,6 U& v+ f# X6 w; l9 ]* z( _2 K
NewJob(Job),
0 Y1 ~0 b& o/ V# e- c, R3 v}7 _' M' R! W, Q) W5 {
</code></pre>
% a; C) g. b/ L<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>- ^$ R8 z* v8 u! G
<p>只剩下实现Worker和Pool的具体逻辑了。</p>
; }9 J: F" \* t6 J! ?<p>Worker的实现</p>
. }0 J/ ^- F' R4 o' g! N4 P<pre><code>impl Worker
# ]' p) o" i' m0 @% a! I' I# C% K{
' x" q) i$ a$ U' P! S6 F fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {* s T+ |- _2 O+ ~
let t = thread::spawn( move || {$ {' c( t' P) I- O9 a2 b" U, I
loop {2 h/ K+ S* v4 Q$ h$ s& Y+ n
let receiver = receiver.lock().unwrap();/ [2 X' w, H6 d
let message= receiver.recv().unwrap();
! D* a' y9 z# F2 ?7 b* F6 e match message {9 L) y( a% k: S; t
Message::NewJob(job) => {- [4 @1 J( `" L" {' |) H
println!("do job from worker[{}]", id);2 }5 `# O- ]# H
job();3 T" C* a4 H9 e+ P$ J- y( c
},
2 t) H& H j4 i, R2 @$ s' \ Message::ByeBye => { [9 u! T* ]3 A Z3 h' ?8 F
println!("ByeBye from worker[{}]", id);
S: Z6 u l1 H9 o2 A break
2 u( h' P( x( k b% N) ~9 H },1 t. b/ N8 ?3 j G4 p3 ^4 k9 J. c
} $ e w% }2 k$ s
}
7 h K) w" E2 X) i8 d- k3 u });
4 V$ @" \ m, B/ S
3 s, ^# T8 D( l# z, a% p Worker {( f4 b$ E3 _4 n% I0 P8 l
_id: id,. {3 m& x' S3 t+ |4 M* l4 @
t: Some(t),$ F, d: B+ }5 ?
}2 q' h0 W' k+ _9 \0 k
}8 g, z4 w1 M/ {) }" c! m+ H
}, A/ U# e" b& A+ j! e6 T7 m
</code></pre>
6 k0 m8 c! m6 h% t<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>. e) `7 X5 Y( M0 D8 ]6 G3 z* _: i9 v
但如果写成</p>
, C6 Q: D* L6 t( [5 h# E0 v<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
7 V4 q+ j1 @- C9 R4 o8 ~$ k. e};
. \9 y) }% x- K6 y! O+ C- Y: ?; ]/ g</code></pre>" C4 R# o. E8 y# w- S9 O! \# N0 t5 E
<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>
( G5 b; I2 P- s! m' grust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>) z# Y1 v. a- D, r
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
2 j: }* n+ j% `<pre><code>impl Drop for Pool {
# b; a. \7 Y" u* D T2 C- K, i- @ fn drop(&mut self) {
% ~8 ` K! |" a8 \ M% Z, }- V5 X for _ in 0..self.max_workers {4 D, ~2 X2 p. r
self.sender.send(Message::ByeBye).unwrap();) A$ t4 }2 Q; I+ L% B) `0 S
}
+ |) P. T% z0 _) g" X for w in self.workers.iter_mut() {) h5 F5 d8 d4 w$ }
if let Some(t) = w.t.take() {, s5 q1 y- ^4 m: ~% ?6 P$ H5 d
t.join().unwrap();
" Q! T( O- F2 H5 [& E6 G. G }) q, g# m+ I2 x- t, W$ l
}1 F9 B5 C: W7 C& L/ _7 d
}
( @5 |% m9 \, \& a+ T8 I- o}
2 \% c& E; P& E; u8 F6 i j* K& H3 k; O! N$ d
</code></pre>' l! M. X7 k. j+ O# U, d6 n
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>
, o0 O7 v' f; | W<pre><code>for w in self.workers.iter_mut() {, U: a2 ]5 h4 P# S* D' x
if let Some(t) = w.t.take() {5 J( `8 ?+ D$ n5 T5 d, ~
self.sender.send(Message::ByeBye).unwrap();
% d8 w8 { x& {' K: c7 h) r( D t.join().unwrap();
# s( @2 U& i0 x* q. _ }
8 I1 Y5 s5 W7 s. ~}
! u' H( z. x; u9 ]. L3 l- }& M
7 i& B2 l* z+ V# l6 d</code></pre>
9 `, t7 W6 g; L2 n0 W1 n( J<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
. d& i+ V, S2 J! b; C" k7 w. a- z我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>: Q7 C. Z$ ]3 e1 [1 D
<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>
9 P3 D8 N' t7 _<ol>
; ?; l/ j t* d& T<li>t.join 需要持有t的所有权</li>
8 [ T. t; e6 Q% ?0 r6 i<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
2 ~4 N6 e& s% g# \</ol>
8 U* p" {+ L2 x4 F8 F; B<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>/ J4 K5 v; b1 @" l, ]$ s$ E; V, W
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
* c- n0 F$ t9 U% k. J/ O6 K<pre><code>struct Worker where9 T: T8 E( B, I, W& v
{
5 {4 n" l' q# {* |0 ?/ g" h _id: usize,
3 ]5 l! Y) k. w5 S D7 o1 \' X t: Option<JoinHandle<()>>,4 [" L! T v, |! ~" W G) G
}8 P* v3 q* w/ s- u j+ z
</code></pre>
$ G% z3 P% m' g( h6 ?<h1 id="要点总结">要点总结</h1>7 y$ p3 d" r, {' G. t/ Q# f
<ul>" a2 N; R: [& Q+ s
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>
" l9 _- S% ]( p# y8 z s1 u; w; ?% Y, `<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>
# e [1 w: k: D$ c$ s$ y</ul>) z3 d* U6 ^: x" G
<h1 id="完整代码">完整代码</h1>
' e7 s& @- N, O! W6 _<pre><code>use std::thread::{self, JoinHandle};) [/ P5 _8 y. E: f+ R
use std::sync::{Arc, mpsc, Mutex};4 H% V4 g9 @5 _# x, q
5 r, b4 K5 i' [% Z3 ?
8 G: ^" E! n0 a \( P
type Job = Box<dyn FnOnce() + 'static + Send>;* I- e4 Q; {( Z. U& b
enum Message {
. H, w. K4 j3 p% v. t7 | ByeBye, j) h! v% \" z
NewJob(Job),
( D% K8 {- b2 X# K9 F6 s}5 U+ b8 |6 f5 }( Y$ e* M# U0 S
0 B1 Q0 H! a* G! x7 c, F
struct Worker where
8 U2 ^; V" g: T7 B; a{' ?; ]$ s1 R' P5 t$ k7 P) ^
_id: usize,* `" R9 E" h" w6 x- ]+ L3 s5 O) |
t: Option<JoinHandle<()>>,- N6 t5 s# c3 a
}! Z. v) N" J3 m1 H& g
* M8 u" B3 ~2 y" x, I, V
impl Worker) {+ ]' G0 x6 ^" X8 G$ w" j
{# Y2 x x" w7 N% ]# U) f
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {% z" E8 b2 ^' E$ ^( [# v
let t = thread::spawn( move || {
" X4 w# B* e- x" m1 e1 k loop {# d! Z, W5 r. L- a" S4 x
let message = receiver.lock().unwrap().recv().unwrap();
2 o5 i4 r- u5 L% K [ match message {, u8 [5 \4 h3 t& F
Message::NewJob(job) => {5 ~, k2 N; T" f; @% i: c* P7 f' v' x
println!("do job from worker[{}]", id);! @5 Y' i( N3 {* G5 [ ?- u- m, i4 {
job();
& N6 y7 p& h2 s' C4 ]* k },
- e7 l- R3 G, a2 m- S$ m Message::ByeBye => {' X" j( P s L4 \
println!("ByeBye from worker[{}]", id);2 |6 ^( z, u+ r" e7 g" Z- F7 ?! m
break; ^0 C/ @% W1 }8 Z( d2 ]* C; F, E, [$ l
},
: n7 o+ x/ Z2 L5 Q4 c! d; X. L3 z } / D/ m: m( a! I8 [
}
! g' ]8 X L" I) D y- I5 G });1 p! G- F4 ^ z5 Y- v
8 g" X: _' h/ v. ^, o Worker {
2 n& g7 e# q! I4 }8 V/ N, u _id: id,3 `) m7 t9 U" X. B7 \' e) A
t: Some(t),: t* W Y0 L9 F& X( u
}8 C$ {) n: o4 ^7 v- X3 [2 J& @
}
# B5 D# b1 q* s}8 Q! @7 h1 y5 z
4 a& }! h5 c& Fpub struct Pool {- _4 k3 {: X5 t! ^7 U4 z
workers: Vec<Worker>,( } Q$ w4 A5 B: Y) Y
max_workers: usize,
3 a, S) w4 [5 H& q sender: mpsc::Sender<Message>
: u' G: `+ b T4 O}* t' O4 N5 f& L# H, D/ u" ]
5 R( @" C9 i; \# W
impl Pool where {
* x2 d, ?$ L! V. P7 G pub fn new(max_workers: usize) -> Pool {
4 N6 z; x8 D& C+ a if max_workers == 0 {. J1 \ L; I8 V! I4 a5 T
panic!("max_workers must be greater than zero!")8 V5 \& m4 i/ ]8 {1 m# G' {% M
}
/ I& t( n) J6 i9 Z! [9 \ let (tx, rx) = mpsc::channel();* |# H& y# O6 w: d9 T6 W8 u
4 l( F! N7 k1 @, }+ l# C
let mut workers = Vec::with_capacity(max_workers);0 y1 D5 T) L2 _7 K; a% r! G
let receiver = Arc::new(Mutex::new(rx));8 }9 G: U, K: a! t+ ]6 x# d/ Y# H
for i in 0..max_workers {) Z: E5 [" q0 Z, n8 o
workers.push(Worker::new(i, Arc::clone(&receiver)));
4 `! j; F0 u+ Q1 C' s" r7 G6 h }4 ~6 ~8 w5 G& z4 d$ j' m2 u+ }& D$ _
) U* D; ~; H2 |$ q9 x( v1 K Pool { workers: workers, max_workers: max_workers, sender: tx }
% H2 C+ s7 {" M4 ?/ O }
2 T8 D+ v2 o. } 1 G# }3 \( Y S+ h( Z/ p; `
pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send
1 S! B w# H# Y, E. ^* }5 [/ i- q: ~ {# b, z. m4 [; V: C8 `2 d5 s' k
1 I+ {9 I m/ P. `# {( x0 m: T" @
let job = Message::NewJob(Box::new(f));9 l* M: q% R8 i2 q7 q
self.sender.send(job).unwrap();
1 o. u/ X9 Z. H }
3 N) q' X2 H4 e- ~, o( X. G8 X3 x} i4 F# }- I! T2 z+ M2 a% i
3 S3 H* Z6 h+ S' d# j6 d
impl Drop for Pool {$ l) b. K$ I- d
fn drop(&mut self) {
3 E- V0 z5 G) U p for _ in 0..self.max_workers {/ Y) H# b0 W# I2 b" Y+ A4 ^$ T
self.sender.send(Message::ByeBye).unwrap();( U' B" c; V7 j/ w! ]4 G: L
}
/ K1 V2 N) S. N1 s% b' b for w in self.workers {: }+ W. p! O8 o! M
if let Some(t) = w.t.take() {
. V' S; a0 p4 O$ S1 f' q) W t.join().unwrap();
6 M5 f. B0 O/ V% w% g/ t" R8 V+ L }
+ C( c! ?' @% A6 j }
g! D/ a7 P( [1 H }, l; u$ f2 }; I- g
}3 ]9 L& ^6 K7 o w! L1 f, K7 ]/ Y6 o* Y
* }9 B: W7 d ]2 Z' S. Y
7 V! o% e( ?0 S9 T" |#[cfg(test)]4 ~: z d7 q; ]
mod tests {
* c& n! B6 N* o# o$ Q" {/ C; p7 W- e use super::*;
- e) w) {/ @' W #[test]" h) e, q* t+ ?$ s% S R& }. |! m
fn it_works() {/ t- X$ d1 G# M1 }) ]$ ?, V% M
let p = Pool::new(4);1 N: Y9 x6 p2 T7 A* c
p.execute(|| println!("do new job1"));
: T+ d* O- U1 t6 `) w$ y6 A p.execute(|| println!("do new job2"));
1 [8 @; |; i# V( g2 e p.execute(|| println!("do new job3"));
0 g; X, j7 G( b* `, o) w7 h p.execute(|| println!("do new job4"));% [& o. K8 z9 [+ ]# @4 b
}
; V$ Y9 @: D4 i& a2 R4 l}: Q% M) X0 q# E* E
</code></pre>$ I) h% H+ ~$ N$ C- T0 S
- `/ j- j. G& n2 ~9 b" `
|
|