|
|
' c2 V6 ?* p5 p; e
<h1 id="如何实现一个线程池">如何实现一个线程池</h1>1 y4 M G& {! p0 c2 e
<p>线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。</p>
4 W8 c) u3 _, M& z- W6 U. E<p>如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。</p>
( I3 g# K8 \* l# t4 }0 R% ^: K6 l5 r<p>线程池Pool</p>
% U1 E" [" s2 E; A& z4 ~<pre><code>pub struct Pool {& ]+ N) j& A$ K+ }: w) h
max_workers: usize, // 定义最大线程数
* ], L4 L+ t6 K# h4 P0 {" l}
# \7 u4 E8 @9 ]$ X& h% @- W, O4 T- v; J
impl Pool {
4 @6 q$ D! E/ j: E( ^ fn new(max_workers: usize) -> Pool {}
# C+ \- x1 ]& M' W, I. U+ d fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send {}3 s! k7 |. u' I
}; N5 C3 Z( ^" B
8 Z; G0 o7 y) K$ E' F6 ]- k {7 g</code></pre>
& a! ^! u" C. f& M, h<p>用<code>execute</code>来执行任务,<code>F: FnOnce() + 'static + Send</code> 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。</p> J2 j9 _. f3 |# C! l' p% m+ P
<p>另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如<code>Vec<Thread></code> balabala。这里的线程是活的,是一个个不断接受任务然后执行的实体。<br>6 g/ k- {! Q7 f# v( E& L
可以看作在一个线程里不断执行获取任务并执行的Worker。</p>6 N0 ]+ E8 X/ k A# A
<pre><code>struct Worker where3 B5 n- Q+ a+ j; Q! x. K) I m* t
{
# U) I2 D) d1 Q1 S, x _id: usize, // worker 编号
6 |5 i: Y+ {$ L$ {5 e- S0 O& R4 N}
- m# Y6 d$ P% F' Z _! }</code></pre>
+ B, j: s' }$ U: |! O# [<p>要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,<code>let (tx, rx) = mpsc::channel()</code> 可以获取到一对发送端和接收端。<br>0 U7 J d+ W- \9 a: J L0 r0 f
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。</p>
" D( a0 J+ C7 a& L( G2 c4 K1 \+ l<p><strong>这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享</strong>,因此需要用<code>Arc<Mutex::<T>></code>来包裹起来,也就是用锁来解决并发冲突。</p>/ j0 {$ }4 y7 P
<p>Pool的完整定义</p>
! x% _! N9 t9 I% A I, c, I<pre><code>pub struct Pool {
3 d& K( p9 S- J% u: W; e workers: Vec<Worker>,1 r6 S/ M% o( \7 |
max_workers: usize,0 N. C' J" v9 U. Y$ a8 j) m
sender: mpsc::Sender<Message>
* D& p* ^( s& q- _/ P& `}' x, J$ Z. X2 {: s2 c, F
</code></pre> C0 w( x0 C. F J. L. w Y
<p>该是时候定义我们要发给Worker的消息Message了<br>" N. A6 X n) M9 t' V6 U& [
定义如下的枚举值</p>
5 v. O/ {6 S; c" H O7 r7 Z<pre><code>type Job = Box<dyn FnOnce() + 'static + Send>;) k/ K' P* Y: `* Z1 N
enum Message {
* {5 ~. A2 J/ k' ]) q; Y9 y" [ ByeBye,
, Y9 z* e3 [2 x9 B0 h NewJob(Job),2 S7 R' |* @ k, Z0 U. b4 o
}
1 |$ K7 w& a! n% Z, M( C</code></pre>
/ k' z: k5 u, @9 Q7 c<p>Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。</p>3 H4 \4 J5 y& f% j1 f4 u$ B3 _
<p>只剩下实现Worker和Pool的具体逻辑了。</p>' K* T% H2 ?; J4 B
<p>Worker的实现</p>5 e5 m3 k6 U1 j1 i1 _
<pre><code>impl Worker
6 p- H+ z5 [+ N* R{, U3 W* u" O/ j6 q" M+ |5 s# z
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
4 u/ O" k$ _5 h" O. N, B let t = thread::spawn( move || {) B5 J/ h7 q! I4 @; ]0 ?2 ]+ `
loop {" Y+ f! p+ @! r" m7 E
let receiver = receiver.lock().unwrap(); _$ q9 _6 {* g. ^+ a2 M
let message= receiver.recv().unwrap();
% l8 C# c0 o$ f9 ? match message {
; g6 t7 R$ |2 `1 y/ @ Message::NewJob(job) => {9 } }, ^2 e1 Q1 ]
println!("do job from worker[{}]", id);
$ p+ I2 P: E3 I s- P job();
" D0 ^. |/ j, t8 ` },
# j Z" }( o, A6 { Message::ByeBye => {0 r3 V! \* B: [* \
println!("ByeBye from worker[{}]", id);+ i- E% O+ h2 m
break" c3 I) |, v1 u/ ?4 O
},
8 P2 c# i8 q3 B6 O) W4 i } . {" {0 Y+ B% O& k6 {
}
$ t& }( V4 R1 G3 ^2 y5 e8 t });/ x8 _1 l9 \' H _7 X. v0 s9 B6 i
& p5 Y/ ]# G: t+ i/ V6 _
Worker {1 z1 @6 l; G; h$ ^
_id: id,/ ]& O* T9 w% ]3 d* p% E
t: Some(t),
; x8 J9 N: s" Q3 ?. U }5 I+ }1 v& i* a1 u: q3 \! Q
}6 }5 x0 \! P1 O% _& T$ G6 p; l
}8 E6 O! k" k; Q9 J" s
</code></pre>$ o" b6 b1 a# D) |, C6 j
<p><strong>let message = receiver.lock().unwrap().recv().unwrap();</strong> 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。<br>
- ?6 s6 {/ U% n* A) a- r7 L但如果写成</p>0 h9 ~) M5 k2 j" ~$ E$ z
<pre><code>while let message = receiver.lock().unwrap().recv().unwrap() {
. F" @; U4 T% h1 ?};3 v& a4 r0 E# C* A- I3 u
</code></pre>
' l2 L* J' A6 f" ~8 }<p>while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。<br>* J5 }2 h# g8 \1 `' ?
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。</p>/ ?) N6 D4 ~% [7 V u) _9 K
<p>我们给Pool实现<code>Drop</code> trait, 让Pool被销毁时,自动暂停掉worker线程的执行。</p>
3 X8 C7 F) O8 s6 q' n1 T<pre><code>impl Drop for Pool {8 S! R. A/ p: [ L$ u E
fn drop(&mut self) {
2 W! T2 p7 J1 N. {9 Y, y7 D0 u for _ in 0..self.max_workers {
3 ]3 f+ A% {1 Y" D6 R& L: V self.sender.send(Message::ByeBye).unwrap();& U \1 u! ?4 s) C1 X' ~
}) Q5 Q& A; h- t0 I2 Q2 N
for w in self.workers.iter_mut() {; T) g) M9 L" K3 g7 x, f
if let Some(t) = w.t.take() {
% K; I- U/ R, h: S t.join().unwrap();* ^- c4 n- i8 B" V4 @9 r
}
& p* `/ r+ K7 N0 M! [9 S }
[9 r. x, v; W8 q2 S Y2 H }; I m- f) _9 A' i- r Y" [
}8 I2 T7 k9 u* C7 |, Y
; G5 J- K4 K8 D# @1 ]
</code></pre>$ Z: T2 [2 S& m. d4 V6 { g
<p><strong>drop方法里面用了两个循环</strong>,而不是在一个循环里做完两件事?</p>3 Z$ R1 y1 m8 X' l
<pre><code>for w in self.workers.iter_mut() {: b$ H6 ?$ R3 Z
if let Some(t) = w.t.take() {$ M( d- u$ r: K* l" I
self.sender.send(Message::ByeBye).unwrap();
- h! b) Z) R/ L& f$ s4 L- f) m t.join().unwrap();
1 Q, W, C$ l0 t4 |8 M }8 Z' e) D9 e0 b0 x" A
}+ t4 c& U+ I3 O9 j1 q; e
8 O& k8 _+ N1 D. R
</code></pre>
6 k+ E# } |7 f! u<p>这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,<br>
7 |4 e3 [- Q$ D2 K" M$ Z我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。</p>
/ f2 S' W# p/ a3 p) _2 M5 J0 q<p><strong>注意到没有,Worker是被包装在Option内的</strong>,这里有两个点需要注意</p>8 |2 s& ^" M+ ~- Z8 t) a
<ol>
0 S, M' }6 p: `9 Z<li>t.join 需要持有t的所有权</li>0 w/ [" N2 k+ G* t6 M! V5 j
<li>在我们这种情况下,self.workers只能作为引用被for循环迭代。</li>
8 }+ S/ @7 J5 b4 l0 x</ol>
, s4 K0 f" g! z" @& E<p>这里考虑让Worker持有<code>Option<JoinHandle<()>></code>,后续可以通过在Option上调用take方法将Some变体的值移出来,并在原来的位置留下None变体。<br>
[% @# u: X" ~5 @. G8 w换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程</p>
8 J: _. s7 |! M. h<pre><code>struct Worker where
) B7 q- V' G0 f, b$ I9 V{
" l# Y) }. v, C( u E( I9 Z _id: usize,
, E( k f9 U( ? x" O/ z t: Option<JoinHandle<()>>,
( Y* G, s; w1 I4 N}: d8 h$ x2 w y- j" C( s5 q
</code></pre>
* d+ o; o7 ?6 K6 T6 d E<h1 id="要点总结">要点总结</h1>; J3 g, o7 E. P2 [
<ul>! T' b, d6 g* ?) l
<li>Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁</li>" ~% E- v+ _2 w" f- `! t
<li><code>Vec<Option<T>></code> 可以解决某些情况下需要T所有权的场景</li>, P) C, u( |' T4 h
</ul>2 l) N7 Z9 j4 p8 Y7 Z0 @
<h1 id="完整代码">完整代码</h1>" ^& [% f! @# e4 E" a3 J
<pre><code>use std::thread::{self, JoinHandle};) N6 R" f4 K) ?
use std::sync::{Arc, mpsc, Mutex};
/ I& K3 W8 w7 _% Z, R- C3 z) v, v6 G. p6 {/ P& Q# @5 T
. o) T2 b% v/ G. Z& ]& |/ S# F2 q# Z
type Job = Box<dyn FnOnce() + 'static + Send>;
9 M4 T* @4 G' p- _: venum Message {& z6 {4 T% I% V4 C/ t' h
ByeBye,/ n2 W7 a. l, a) j3 m* Y' V) [
NewJob(Job),5 G% x' l2 S1 K8 o0 H
}, S* `) q; o! q2 e* l8 a) V0 n
* e) {, m1 E' j9 d
struct Worker where0 A0 _$ Z& h- d
{
* ]5 i1 H c! f Z) h; { _id: usize,
) c) }/ k# D& H1 {+ x5 S. W/ ? t: Option<JoinHandle<()>>,
: s# p+ U3 ^6 c/ n}9 O5 g. X& Z1 ?
) O: A6 u; n6 }+ K8 m- K
impl Worker" T7 |) W3 T4 o6 ?
{
2 k# D/ s: v+ |/ u5 l! G; q& M, S1 k fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {: p! Z, I" i9 y# ^3 L7 p
let t = thread::spawn( move || {' d* B9 v+ D# j! n
loop {
1 P& F3 y' u$ h4 X2 Z( Q let message = receiver.lock().unwrap().recv().unwrap();
1 f8 O8 o# Q- z; Z4 u, `+ P6 @ match message {. b+ u; K6 O0 W- l4 l
Message::NewJob(job) => {
) q5 T$ e) O% }0 s D8 w) ?( ~ println!("do job from worker[{}]", id);
0 o3 H9 o9 k' g7 | job();2 |/ s3 ]" g9 s @
},0 v1 N: q8 O2 |+ E
Message::ByeBye => {
8 v' ?* {. y5 d% b4 t println!("ByeBye from worker[{}]", id);
" \5 v! X' y+ |3 ~7 s, ? break! l8 {; W% S y! |5 }
},
1 j$ ^2 I$ M+ o$ q% y n2 o9 {7 O3 { }
5 m/ ?, [# [+ N }/ i$ b0 l! {2 D# X
});" X0 _5 J3 `1 _2 E3 w, m) n
' n" @) n( O3 h
Worker {) d5 P( U ~6 w
_id: id,' a7 _, v1 b2 {8 y
t: Some(t),
7 u6 \3 W+ q& K! g% l( \ }5 |2 |: j0 f7 |* S4 o
} h/ {6 L! j. [) S3 Y) |
}
N2 l, o7 ^2 B+ K2 f& w
$ u3 s5 D* {- L E5 s2 gpub struct Pool {
6 W% u5 v$ E) v6 p workers: Vec<Worker>,( R* T; Q d. p7 N, J; O! B
max_workers: usize,
% s$ x$ J% C2 q; N: p: X$ T sender: mpsc::Sender<Message>) B. b& v/ p7 e6 a2 o
}$ h9 W% b8 y5 N$ z4 u% x s F
7 M# t/ S4 P+ k, @, L1 j
impl Pool where {/ w8 T4 h- t% q* l6 a$ u4 F3 n& Y _
pub fn new(max_workers: usize) -> Pool {6 g, H7 i8 Y8 e5 i
if max_workers == 0 {
/ B/ T- a- H5 a/ G% w8 } panic!("max_workers must be greater than zero!")# c# \/ P1 q, J G0 z
}
" m2 d/ k: y5 d3 o7 X: a' | let (tx, rx) = mpsc::channel();( B8 A4 [( X/ l# }2 z
/ w ~- l$ P# Q2 M- D5 \
let mut workers = Vec::with_capacity(max_workers);# g$ m' O( b' b+ ~0 U0 g
let receiver = Arc::new(Mutex::new(rx));( [6 S9 ^# b- C+ z5 `! G
for i in 0..max_workers {! l7 b0 G/ G+ ~0 s# ?
workers.push(Worker::new(i, Arc::clone(&receiver)));. R2 X+ N# \! |
}. e% g8 V* a& D0 }. j: A1 Y
: j3 V* y1 w" J! v! H: e
Pool { workers: workers, max_workers: max_workers, sender: tx }2 o1 |4 S# m3 o; G! t. b
}# f1 ^ D I7 n4 }8 U/ d# ?8 f
3 D. f& x! R: X, ?2 X; S0 I7 g pub fn execute<F>(&self, f:F) where F: FnOnce() + 'static + Send$ ]- v T& t/ o$ M
{+ x& [6 }# c8 E2 F( m# B
. c$ _# E& Y, e) n let job = Message::NewJob(Box::new(f));( [( C' y! Z) O& \8 [& H
self.sender.send(job).unwrap();3 K2 q& o( }+ V& q7 M$ w9 L2 @( E
}
{$ x1 O' m0 A$ T3 b' @}
0 J% a5 ]* \3 s) j1 j2 _
`# q$ s* X# i4 f* Jimpl Drop for Pool {
{2 q2 y6 F% z5 s" }, k fn drop(&mut self) {
, K5 V, O/ y, ?' u3 C8 B for _ in 0..self.max_workers {
0 p9 y0 @4 v- u6 m+ q% O5 Q! Y self.sender.send(Message::ByeBye).unwrap();2 V5 x0 S( B) [! {, Z( W
}
) e. b) F3 E8 W9 D% u8 H for w in self.workers {( e" h9 t5 I. F, {$ j: c
if let Some(t) = w.t.take() {0 Y D) K/ Y* b5 t' c
t.join().unwrap();8 K- ]% q0 _2 o5 S8 z
}, k# n4 o' U& n% m
}4 h. f% c# g3 x; v% q. @$ c+ U0 [1 _
}
. R) H" C- E% `+ x% M; c}
6 i% H6 x( z! b! }+ p8 l# T! ]0 C, L; e4 n& S; I
' a* Q% C4 A8 f1 S0 B: i/ Q
#[cfg(test)]
" N N J) U& {! kmod tests {1 L7 O, h- I; `0 H
use super::*;
2 Z, @1 F/ ^: Q4 s #[test]
z9 e+ x# R6 O2 X! z0 j: ? fn it_works() {
; y% R9 Z& c6 ?2 v4 d% e0 s let p = Pool::new(4);# y( P. Y! x% x! U8 z& C
p.execute(|| println!("do new job1"));
7 ?# p$ Y7 B' K p.execute(|| println!("do new job2"));" a0 E7 u/ Y; l7 `
p.execute(|| println!("do new job3"));) L4 j: q" b9 H) X6 b
p.execute(|| println!("do new job4"));0 E8 ?' B2 ^$ q2 M2 H7 ?3 A
}
) o7 u. I% E4 h9 p$ k5 y5 z+ |}, T5 r2 P$ g0 K4 B3 Z3 _- e8 e
</code></pre>2 J% \: {6 w/ k: X
; ?3 R/ Z% k% U4 B, S" q. W$ r% q |
|