|
9 h8 r9 g+ r6 i) B<h4 id="flink系列文章">Flink系列文章</h4>
1 ^! Y2 E/ T6 o<ol>* c4 v8 Y1 A" d1 a: |
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
. I- `6 m# ]' e, ~$ t) |: j$ Z<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>: R# L# n' \$ q( i" E) V
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>% Y8 E, F" Y1 j* _/ I
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
; |5 u; R: u( W5 x: q. i# v<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
/ o2 _) t; c1 a/ i<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
% X1 u4 s( S2 X, p<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>5 p3 r* d4 r0 u# J: L+ l
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
) e; B# W% ?3 a2 B* q7 |9 e<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>4 o- V1 H" t0 S- I7 z9 X+ s
</ol>: {% R. [. o/ P5 ~ |
<blockquote>
% z- j0 ~, {( o9 T; X; [0 e- Y<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>+ q, \' W# k" a7 D& w7 D
</blockquote>
7 I2 v0 f5 d3 |/ c' \<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p> a/ q$ N& d+ z* J- ^# p
<h3 id="分流场景">分流场景</h3>* w: n; ] p- O2 [; l) k2 i( e
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
* L- c& u5 S9 V9 b. D. t<h3 id="分流的方法">分流的方法</h3>
# f3 J" [1 M, h<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
) p* H( D' a- v1 x; ~<h4 id="filter-分流">Filter 分流</h4>/ k) Z( Q$ T3 ~% M! X4 t/ ~" H
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>* Y$ ~/ Q" U5 R. B1 F) T$ Q# |
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
. x4 D. X1 W; f, X- K: o, B" {6 H* s9 L<p>来看下面的例子:</p>
5 Z% t, _/ z" S- l3 M<p>复制代码</p>
7 k# {7 N! n; h<pre><code class="language-java">public static void main(String[] args) throws Exception {& \& c1 b+ B- m0 j, v
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
0 S. [; `8 `/ V //获取数据源
3 l9 |6 F) [6 t* @! h List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
' ?% d4 V( J; _# v9 L; M data.add(new Tuple3<>(0,1,0));
% A- m/ s" F$ e7 t+ w5 U8 `/ t+ ~0 C data.add(new Tuple3<>(0,1,1));
' @" v, J, m2 I; X; h( i6 Z) f data.add(new Tuple3<>(0,2,2));0 w: K# l+ d% p& t) E* O+ |% x
data.add(new Tuple3<>(0,1,3));
~0 ^7 H8 H1 E- v data.add(new Tuple3<>(1,2,5));
- I0 Q# _' V: U! \: n; C( [( z data.add(new Tuple3<>(1,2,9));# B7 X) n$ g Y4 U/ x/ J) r& V q
data.add(new Tuple3<>(1,2,11)); |/ g r; Z( o3 f
data.add(new Tuple3<>(1,2,13));- J8 M$ ?# J {6 a: Z
. V! N& X9 {8 @- F6 F M
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);% O a4 `' u+ W; A4 ]) X% _
, T# P: s; N! N, `' U. p$ A! U; c% K& F, ~. _+ @. s
: T% l' B8 T3 A- S* ~2 T SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);" O8 t: O5 r# I$ g8 k2 n
8 |6 r5 M" y' b! w" O ^3 t$ e
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);/ j0 H; Q* L% R, ^
% ]; q2 b& H; L' F4 U4 e% _& s# k: X6 Q. a! Z ^
0 ^2 ]; x( }4 \7 L2 m zeroStream.print();
& @4 @% Z1 W4 s2 w/ c
4 s o: }/ H% z; m2 Y- C- R oneStream.printToErr();5 b& d* u' s& Y- h; m
, X7 _% g7 |( U! ^# y
7 n$ i) W2 F: J3 G S! E* x
. V* a# S" `; W# h" G1 s4 @8 f/ |! P) G N/ \* U
- T2 a) j* l) U //打印结果
6 i7 h* F( s4 z; y! @/ f+ I: A2 {* L
String jobName = "user defined streaming source";3 \" A6 R, k5 }/ |7 @. n d
. x' m1 M) x7 C: O: K5 \# t* Y env.execute(jobName);5 E8 x5 V7 x! u! e; b3 Y& O1 c9 i. G
6 N# ?0 ^- Y6 t- q" N& _2 M
}
. K5 J* O' d; f6 ]. }# b</code></pre>: [6 k/ h. l/ m5 W! W2 \$ ]
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
/ R4 }1 h. z# l9 T<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
5 v4 V. I! `# q' U& c<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
& x* T7 D/ Q! e<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
" p5 c; Z( c8 D4 t5 D0 F8 w' C, C<h4 id="split-分流">Split 分流</h4>5 u" P/ ~' v- f) V9 m( X/ I& {' J9 Y
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
& ~% U7 {' J A# M<p>我们来看下面的例子:</p>
& H5 F/ a1 a1 e+ l<p>复制代码</p>
' n. g6 Y. S& _& {9 w<pre><code class="language-java">public static void main(String[] args) throws Exception {
3 ^) k( R5 B3 U& K: T& I/ a' s7 R8 ?# S6 S6 Z; j: ]
" m9 r# f" @3 ]1 L; t( ]
; I; G& j$ e7 m1 n V6 D1 r2 f StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
# m) Z" D9 `; T3 |2 ]5 b. r6 Y, I1 k# Z" L* b5 P
//获取数据源
' ^9 S$ W2 L/ u/ X; A9 p X+ B7 N9 `, d
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();0 H _/ b* l# o8 ?; I+ ]6 q
1 k5 ~2 ~7 E6 R" W$ y
data.add(new Tuple3<>(0,1,0));
: q4 `1 Q( c4 J N* o6 r) K1 e( [! H/ h1 C' A' i& B
data.add(new Tuple3<>(0,1,1));! T/ _& a+ @5 n
8 x) @9 u' ?0 K! g1 x5 B
data.add(new Tuple3<>(0,2,2));
, b O& Y7 C' W/ y2 j' u1 z9 a' C' W8 o% u
data.add(new Tuple3<>(0,1,3));
7 |2 E/ L9 t) d* p7 u. W) s
+ g% V) p, f! l# `+ u data.add(new Tuple3<>(1,2,5));% R* a( o; n2 f0 _. s$ M. k
- h, B: E' j& V
data.add(new Tuple3<>(1,2,9)); q6 h* r% ^! r' z/ T) v
% s) |% @9 n) i- U! v0 D data.add(new Tuple3<>(1,2,11));
$ T4 ~8 Y+ {' T9 f, h
# }4 J. U) s {# t0 A data.add(new Tuple3<>(1,2,13));/ Z2 \0 T& }1 h! k" j) e% I- }% v
% y% Y# j% m3 `4 G2 N; r% i# |: v% i- K I7 i& l; v
. h/ l: ^- Z, a d9 x) d0 P
" g \! D' F1 r4 a+ c6 E, m4 n
* [: M! M9 e* `# S& l1 Q DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);% L3 |5 d i5 M2 _' K1 @; i: p f% d
: E( l0 z; W. V& i. u @, u
" c: k3 P/ q f+ c& L; V
% |7 R$ u! B9 c9 H/ X* n
, o3 y" X" ~* x% v) |# P) i% T9 Y: @6 U! A7 x2 R5 K
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {" Y8 P: T$ v# J6 i
* s! y' v, l: i3 e2 z
@Override
3 |. T! }) x6 \. i# Y: I% L' i6 M; T% ^1 ], d" B: f+ d
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {9 B' ]0 J$ O, y# f
" O( `/ Z7 T5 T+ _+ V
List<String> tags = new ArrayList<>();
! X* m' y+ U) c! X$ o' v J& v: P+ z1 A6 ]6 ~% A* A: j+ q" [; X
if (value.f0 == 0) {4 L, w' X) Q! y8 A+ n1 b
C2 R" O9 `& N1 y7 Z6 H: ]
tags.add("zeroStream");# K1 b# K; O. H' G
' d* [$ _. p7 Y2 b5 R, l
} else if (value.f0 == 1) {
( U" `0 N" `; y, d* W& F2 N
4 N9 k4 ^& s+ |* t6 R" r tags.add("oneStream");
0 F- \5 J+ V/ A! {3 ]4 F8 V" R J; m: h: f l
}9 h9 Q( ~8 t& e F& K
" A E' ]+ | u( n! }5 `! j3 D. ? return tags;6 V" m2 F( E W9 G
8 I2 p- ]0 n! I" k- W$ b }
" Z2 Z; @4 r1 J+ |: X& P
# f' c1 D5 `, W8 w });1 s/ k& h# s' W. l0 [# A& T/ i5 Q2 F
2 y# b+ |, e% T& ?' F' I' B2 a
1 |: o2 `5 R, Y: @1 H/ t; O# u. b
( A( v6 L- e5 y: x2 r! ~& M3 } splitStream.select("zeroStream").print();( c' ~5 \ ^" }/ x
2 c/ D4 s8 m/ J* H! k) I% x
splitStream.select("oneStream").printToErr();- J3 w' A A* B: r# \3 I( y0 c
4 N4 w4 ], u4 d3 ^9 s2 t; I
$ K& Q6 S5 r0 [+ ?- v
, H3 w) i! v4 v) u1 J //打印结果' Q4 d( I: @: S6 w
' u3 w4 ^+ R7 Y1 T! F& |0 X, Y
String jobName = "user defined streaming source";. L" \) s9 g! R; f: i, u* P2 t
3 [# `& G, j2 r) v. N( n2 h, a/ Z& l
env.execute(jobName);) g4 J. \+ K2 ]3 M* H; E" g
X$ F Y0 l" O6 {* Y! v5 M% d6 C}
- v" F: |+ C, n f</code></pre>
0 B* o3 D4 h% k" {3 G5 l<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
( `( D+ g [) b% o6 R4 O4 m<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>) y& ?1 W @; L8 b: y% A) E: m' @
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
$ {% m2 F0 B+ C; B- N. F<p>复制代码</p>, P* W& m. W5 D7 W& U" R& V
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.8 N% Q v) T+ M* h
</code></pre>+ z% G- H+ _ t5 Z& @
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
5 M# T: p1 N4 |3 v0 ]5 z" W<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
2 ]$ c& I7 [% }9 n G, e5 V' e) n8 h, M<h4 id="sideoutput-分流">SideOutPut 分流</h4>
; k4 `9 C) l& t3 z<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
# D5 l. ^0 z# i& g: B<ul> ^6 J7 \4 o; v/ j2 ?6 ]* A2 q
<li>定义 OutputTag</li>1 P- |7 `- {3 ?( h2 H3 \/ B
<li>调用特定函数进行数据拆分( m' D/ q; d: L: P4 k
<ul>; G# o. ^, Z$ c" R$ i
<li>ProcessFunction</li>
" z: b2 Y- z7 r6 z1 ~- A<li>KeyedProcessFunction</li>1 `" m# _2 ?. A% h7 Y) h6 O; R
<li>CoProcessFunction</li># d6 ?. n" v) H8 E2 U h$ e4 Z2 w' v* Z
<li>KeyedCoProcessFunction</li>! L2 {5 `8 Q+ V9 v( d
<li>ProcessWindowFunction</li>5 v5 y* [, V1 V& U. d( |
<li>ProcessAllWindowFunction</li>) A3 S1 G- _) P
</ul>
* f' C- }( z* i c0 v B: M" Z</li>
7 I9 R2 |" Q$ d5 h* f</ul>- k/ n: u8 L5 F3 X' w" J& P& T
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>6 @: E* |% U% {3 H ~
<p>复制代码</p>* L+ C8 x3 r* O8 m
<pre><code class="language-java">public static void main(String[] args) throws Exception {
9 }7 W6 S6 B- J, r! H
/ X, z8 @( K( A9 r
& D l3 ~6 I3 J) a3 \5 n) ]# Y" h! r+ [4 o
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();5 O2 @; Y) N+ u. h* |( P
7 G+ r$ G/ y6 p* c8 x% Y
//获取数据源
: P: U' k) j1 b, Z5 ^: w3 C7 a* q1 l& e
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();8 I( t) {" N* }: \0 m
. _; F0 }3 {) l7 m3 i
data.add(new Tuple3<>(0,1,0));
( B+ o# @8 K) e7 [! H2 ^7 L
! G3 _3 Y0 L$ ~% E$ Q, G) e- y r data.add(new Tuple3<>(0,1,1));, Q* F4 K- B) k$ K
' d( A' Z6 @$ h4 M/ |* x data.add(new Tuple3<>(0,2,2));+ | W- v2 I: E0 Z h6 a
4 ^0 u- O( W9 Z2 G; y+ X data.add(new Tuple3<>(0,1,3));. k, N! Y) X3 a) a
! N( [7 Q! Q @& |( l4 D* V+ p
data.add(new Tuple3<>(1,2,5));, G$ S/ z& O2 N1 b
2 M* a6 Z& A) @4 Z- ]! B5 D data.add(new Tuple3<>(1,2,9));
8 H( N' v6 j! V! R0 a. b! x1 n- e! b5 y
data.add(new Tuple3<>(1,2,11));
. p/ p0 c) X2 O2 a/ x* D- c1 ?; d7 z+ w
data.add(new Tuple3<>(1,2,13));
4 x# ]4 H) g( ~ ~; {
0 g. `2 E) ^) C4 J
* c! h9 p7 S/ X2 S2 N5 z/ s4 P
# v$ O7 R+ T! V5 K* L, o) B% S2 x: ]" S4 b9 {
3 i5 @$ m* m$ T6 u1 H% i4 g
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);+ `* l, b+ A% M# z% _" f
( @5 c* I, W5 L% i& A% s
. F/ }% L. ]9 P. u0 m
) O, p% \, j* D# p1 s( M- w
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
i/ u3 g" @1 [6 _' \/ E
$ y# o/ D+ H: e/ m5 U OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
3 f+ p$ ^/ t" j% J2 I9 ?1 ]- x3 r# i) Q% }* s" I
( w: m: _* i# _1 j& E2 f' G+ S
C* S L S$ X* v7 t
) r9 g' l; f7 L; ` Q6 ~5 S/ P$ Q7 s- g7 \
: k4 i T, H' k* L9 g" Q SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
% z1 t, r2 N% X. g/ l2 S
: o5 Q3 r1 i2 X0 H @Override
, k3 b5 I( g4 \+ e, X$ I! s% N! [1 f: F; Y+ U# `+ A3 D" c
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
% P* @( C3 y' u$ K5 M" ^! H/ d' b# d. d+ `
3 n) P8 b& n! f, K2 G. F' f- c* q
( U3 m. E# S* w5 p" v& K
if (value.f0 == 0) {
0 t$ u* w% X9 K: C5 }* w* o: x
6 [! p( C9 a3 I ctx.output(zeroStream, value);( Q/ b7 e( ?) ~( T7 [! d
( ~7 a) T6 n7 T } else if (value.f0 == 1) {* |1 c3 M9 i: U% w2 |4 v Y$ z
E0 D! F/ h7 m! N" K i0 }6 N* c ctx.output(oneStream, value);0 m. v* C+ w3 F& ~8 @$ s% J
1 O# i t& C# Z( x0 ]3 E
}9 c7 X |) e, I; y& l
0 J3 s8 c% ^! O. F6 h
}
/ F6 x1 c) X+ N/ I
3 q& z; h) D) D, I });
' W& I" N+ j' {) n" w3 ?; X0 O" q0 E8 {
! a2 |7 I* M8 D2 [/ c! a. d+ v
2 e1 j' H+ g8 Z$ X; o0 ? DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);0 P2 D( O4 i8 Z$ D
' G( ^! v- ?3 R- H
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
9 r4 Q, G, W* \5 p/ g; A/ h' r' j- f; t& p5 S2 r. X8 q/ G
& N- g3 R8 l$ Q- ~
# }8 r. |/ D/ N3 k' U# L' l% F1 k zeroSideOutput.print();
5 @. j% A6 }; A2 Z
& z) K l. T5 D7 W oneSideOutput.printToErr();
& ?, U% i, N, ]& V7 w
, u8 Z0 \( e& m& Y6 ]4 O& H- S i, q
, J7 X' _, o& W
1 o: } Z: a3 j' ~$ r3 p/ J
9 }' R$ \2 U1 E; s" A //打印结果
% F2 q7 E) D9 Y6 K* b9 W$ `4 ?( V% S4 T3 [8 R: Z& b" J9 u$ |* q( F
String jobName = "user defined streaming source";7 D: H3 U5 v9 J5 Q
1 F: ?3 {. j# T
env.execute(jobName);: W6 t0 S4 g0 n* a4 Q7 @
: [0 ^& a. o( f8 x$ c' Y/ f}
6 \% Z- E4 O% K( V3 U/ r; h+ v2 u0 I</code></pre>% @/ b1 t: N) T
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
; b. Y- x/ [0 `; P2 c5 c<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>7 ]6 @/ B( O0 M
<h3 id="总结">总结</h3>8 J& R, r8 ^$ D) T% ^; o
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
& s: `' P6 w2 H7 r4 T<blockquote>
# }$ s# Q- g7 R<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>9 G: r- Z/ T1 K
</blockquote>, s) }6 T$ J& G' u. \$ z+ `
% B' Q; y# @7 `4 |5 n! p$ \
|
|