|
% \- K! v: M: l# S6 q0 e! `<h4 id="flink系列文章">Flink系列文章</h4>0 r2 r: T* B4 l
<ol>
8 [. [3 w6 c1 ]" i d& z<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>: G0 ]: Z7 v5 k' S0 B, [' r
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
# g+ S5 K7 a @$ Q<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
/ r( ]3 r! L! B" @<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>2 o' n- S) B n5 N
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>4 C- G2 C: u& B6 a5 m8 @) C* C
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>; N% Z7 J, t+ a4 z) L& @
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>) W! B% D b/ x' t7 y: R4 ]
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>9 V, U% K- s% o; ~' ~+ Q3 e% L8 g
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>- L5 d p* I6 g5 U8 L; j' c6 J
</ol>
* M8 Y6 N' P4 F7 Z<blockquote>
% n2 G5 s4 S' X' M0 b<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
; T* @+ f8 P) H</blockquote>
+ |# @- \: @* `# f5 U<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>7 f* s+ O7 M f
<h3 id="分流场景">分流场景</h3>6 I. P' D: V5 F; X
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
! n" O' X* Q) z- e) V<h3 id="分流的方法">分流的方法</h3>/ Y& |9 g& W" a0 S! V4 Y% t e' Y
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>" F+ ]) j0 W# S
<h4 id="filter-分流">Filter 分流</h4>
& X% `* |7 e6 o6 u<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>$ t" X8 F% A6 ^$ y
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
- d& g( U4 b0 x' K; G! t<p>来看下面的例子:</p>
" h* n4 [* L5 ~2 p6 S4 a% P<p>复制代码</p>
% O% A5 s) z! `: s3 Q& H; y<pre><code class="language-java">public static void main(String[] args) throws Exception {
6 R2 m( J* K$ Q$ c# v$ ~; p' ~ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
' E4 p4 D4 a* O //获取数据源# }8 @+ S- Q, y5 p. O
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();5 Z6 w* l* }5 ^; M$ k$ p% h: f* d
data.add(new Tuple3<>(0,1,0));, Z; Q* D" W9 u: G3 X6 s
data.add(new Tuple3<>(0,1,1));$ b; L* T+ G2 h/ z6 }/ G2 {& w
data.add(new Tuple3<>(0,2,2));/ @: K4 w: k/ I; k$ l( P
data.add(new Tuple3<>(0,1,3));' {# p3 z8 J7 n) q
data.add(new Tuple3<>(1,2,5));
4 Q. y: o; i' K8 a5 Y3 a+ d$ R; C( U7 a data.add(new Tuple3<>(1,2,9));
& X5 c/ H$ O- Z8 \3 v data.add(new Tuple3<>(1,2,11));
% e o: ?: u& Y+ z4 E2 [ data.add(new Tuple3<>(1,2,13));
9 f# _3 O1 o6 O' E
2 D% ^! }' x' x- ?3 Z3 e DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
9 N1 g+ _: u5 g, v( l4 i9 O" Q& g
' R6 Z3 R: p3 {) m1 }" g7 `# J l% Q) K/ T
% q; O5 @' s- K. H8 c+ \1 k SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);& B3 Q- f9 _8 T5 M$ y1 e' {
# S1 E- @) z8 D, o) N% @6 E1 Z! q SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);% y% z8 l; a1 |) M( u
I' V) C! ^. @+ a2 U. d# d: u, V
* ~$ @3 g; N) y, L a* s
) ^$ x' Q: B( @; f% G# w& W
zeroStream.print();
& w2 l, H' @; r5 r9 b1 T- x" Q# G' c4 b' o
oneStream.printToErr();+ t6 M1 g" `% ?
L: }2 P! T- _, B, N
7 y6 f- O: K. Y9 {4 b8 c$ R
6 M7 @' ~8 ^ v3 H6 i' i: j" n" @* ~3 x& n9 L
- ^7 c/ ?1 `, P# C. X //打印结果
* \. R# M, L% o. Q* M4 g- P' W3 @" M! `* h0 ?/ V. ~: J
String jobName = "user defined streaming source";8 l1 R0 K* q6 `9 _: p/ T4 o
% |# Y- ]' l6 b5 |9 d" g env.execute(jobName);
5 R3 {, c& o [2 s* U8 \1 S \- X) z
}( U6 a, D' X; u8 r) r' s
</code></pre>
( l9 f& V# g) ?/ B* `, E<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>" U1 j& j+ W1 V. {- l
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>+ k4 v! {) t+ p
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>9 g: f; N9 ~! Z9 _2 `" o. S$ v
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>9 M2 @ n5 u% z1 P/ ^ [
<h4 id="split-分流">Split 分流</h4>- `8 l+ W! u6 @, y* l! N
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>2 M# Z/ g6 ]* E! d* W$ F
<p>我们来看下面的例子:</p>' c7 @$ O" s5 p' N
<p>复制代码</p>
" R. B* f& X+ Q$ o- n<pre><code class="language-java">public static void main(String[] args) throws Exception {0 K0 r. i3 t" R
. V9 A8 ^4 j* n& I. ^
0 I" {' N- l* t( s
; x9 B5 E+ @9 x3 P5 u& _+ i$ ]& B StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();& X# ]5 Y0 R* T
/ Z: Z8 D, R6 b. `* A! F //获取数据源
/ R1 R. Q! q/ O9 ]" k3 B
" a3 t- R$ ^, t List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
; J; k& r' N+ F, r& h6 C# S x8 R0 i; F. r% T2 K9 s
data.add(new Tuple3<>(0,1,0));1 i, G, h/ H: ?7 N4 z
8 [3 o# C: `6 f' s( \- n. D5 l
data.add(new Tuple3<>(0,1,1));
# G G4 l" g/ K0 d0 r; c& k( C# x: V$ t8 ~# {% p4 C3 H7 Q
data.add(new Tuple3<>(0,2,2));: E- U7 {. s- u/ }
' `: b2 _% d- d- `1 Q( K5 Z data.add(new Tuple3<>(0,1,3));# t& Y$ ]% n, P; S
. H5 n* U( ]2 S3 ?
data.add(new Tuple3<>(1,2,5));
( d: V5 d k3 Q9 u- T. U+ A* x( G) |+ v4 O/ z( g
data.add(new Tuple3<>(1,2,9));
7 u% B0 B$ k3 S' s
5 V# I" @* O! [1 y4 K9 W data.add(new Tuple3<>(1,2,11));# I( N" }( C- ^7 \/ w
0 z- j* C; q% ?' L! P0 z2 @, b
data.add(new Tuple3<>(1,2,13));
! ^& G u4 c. c
/ r1 O. F0 o f$ A1 i! n$ g( m! s1 d w+ Q0 u
" E4 K) i. _* a7 e' z/ k8 g, v2 ]7 A3 ~9 R6 @( t! L
# D/ E' |7 G; W ~2 @
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);1 l) A1 V2 k! X% H, @7 j1 C( T" O: c
+ j0 `1 G- r7 ~5 Q- a1 D
+ f2 d/ ~9 p+ t- u) | m) n, X
6 j) N+ e/ j8 ^; h5 B5 b$ N8 P3 T- v
9 e( I: M! Q# P7 i# r
$ r9 R3 N8 t. w5 O SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
1 q! p, \1 Y. ]2 d$ v/ ?
2 k" \! m0 x! x6 o5 m% D- @ @Override
+ _1 l1 _: \9 t. [
. v* ], w% g# w" t: h1 f public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {7 N- ?/ d% s' g6 H" D: u1 {
! `" }' c: X- ]9 T) V7 C. J List<String> tags = new ArrayList<>();4 h( d& u1 J9 y
a6 a- y8 C9 p# @( I$ A" V
if (value.f0 == 0) {
& @' G S/ A' v$ I( s, L* k3 |3 ] l$ W( D
tags.add("zeroStream");& K% M/ ]* c. y; w6 s
. L' ~1 z- i6 ` } else if (value.f0 == 1) {
Z* R7 I3 K% O+ M) Z0 _" e; ^! t; ?) o' x" S+ z
tags.add("oneStream");' ^4 z0 u }1 R( D4 i+ G' Y
% |. }' Y1 n& j9 a
}
; l+ v7 b5 i& x6 q! z! D: O a7 U1 n; j# e! C! H j$ o' k
return tags;% M' P3 U6 ^! J
' x' |+ r8 l/ C- {9 z4 c
}1 V, i; ~( y: `3 E9 U
1 M( g' N- M3 q2 K9 ^. D
});
8 F `' ?; W& I
. M& L. R: Z3 [7 z$ W% O; L. D; B
2 n- d: v8 a9 K5 w! N O4 @; x1 s3 J% z
splitStream.select("zeroStream").print();0 v1 n4 j( u/ O' ^5 r0 v# _* M/ O
$ y- M6 \5 H6 J$ I# w. M0 u' B( N& w( }9 w splitStream.select("oneStream").printToErr();+ ]9 S! s4 R& f) N, _
8 R' H8 n! F$ F. t) }' e0 P% B
9 f# S* w; C+ n! J8 x2 ~+ T6 b. d6 ] s7 r
5 [, m0 A" x& {8 |* o& I7 U
//打印结果
( f* g* E8 y! `8 m& v& H; u2 ?) y' h9 N5 b G$ d
String jobName = "user defined streaming source";( u# ?& e7 Z2 @# f6 ?! |2 ]
. M/ Q) a6 | ^/ V
env.execute(jobName);5 Z0 T" C2 y! G' q7 B7 X/ O7 `& T5 b
' w0 e5 [5 S( q r
}: H, W2 M- w; U( J) f$ A8 i1 J+ n
</code></pre>$ o" U8 X% R9 `$ Z6 D! n- V
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
3 Q( H: M$ c! L, F9 h<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>( |; q# [2 w5 S5 f' Y
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>1 T, q6 v; g, q
<p>复制代码</p>
) M% z4 I! t! v* T& e<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
6 C9 Q& y3 b) k& u& r</code></pre>
, X% ?) j* j: Z<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
" X0 x7 s! w+ t6 }; F) J5 i<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
* s; X5 P* Q' C, b1 l<h4 id="sideoutput-分流">SideOutPut 分流</h4>
# J6 @" w$ H4 t" [<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
: L) _) \8 y, o7 {<ul>
" N4 g& C$ h/ d' G& @<li>定义 OutputTag</li>
7 m- t5 D: U; m% n<li>调用特定函数进行数据拆分
$ N1 m" }1 {) l7 H<ul>/ M% W+ X5 s8 g9 R/ N. V# E: S
<li>ProcessFunction</li>
1 X% ]4 q$ a' T( J) _1 |' H<li>KeyedProcessFunction</li>
( ]' j) K" a1 ^7 d$ |<li>CoProcessFunction</li>2 t1 K5 _5 d' N4 p, L+ l4 O8 H
<li>KeyedCoProcessFunction</li>
9 f8 p3 c! O* z4 J1 U L% Q<li>ProcessWindowFunction</li>
7 |5 u9 Z! W9 u/ r/ `* k' R3 R4 d8 c<li>ProcessAllWindowFunction</li>
: |2 _% J# A8 \% C+ V</ul>/ I# a! E+ s( u: U+ s
</li>; u' f/ w9 J; z; R* I9 I v
</ul>
: E# k7 G1 ]6 ~ v( i* s! H<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
2 P; d6 T0 o. k1 `/ L<p>复制代码</p>
; O1 J' K9 x' |) h9 R<pre><code class="language-java">public static void main(String[] args) throws Exception {( F0 i! u8 @* R! ~/ U7 i( x& R
: `3 ^8 R/ u) O) z* d! K
" c* u" j% \1 v9 ?% o; m3 v1 {; z" u( l& O) _- I4 V
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();) V P3 ^/ u, E) C
4 W2 ~+ x1 F" k5 I/ A9 C //获取数据源
3 b! m8 Y# a8 N5 b1 p
0 V$ D3 M" L& r List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();; `! o3 l9 L3 k6 o1 q
, d8 O* `" L) k# s* ]8 q# q. ~- z data.add(new Tuple3<>(0,1,0));
, N; X) R6 \5 p) _
; P* F6 r5 v" y/ I- D3 R0 C data.add(new Tuple3<>(0,1,1));
, d# g! @9 l" T5 }8 d L
) l+ |5 f+ r. R+ A1 ] data.add(new Tuple3<>(0,2,2));
3 j- L& V5 W4 Q- C
6 K I- n' I9 n3 t, j data.add(new Tuple3<>(0,1,3));
* \$ j$ J( @) Z: C. T0 ?
( n0 C2 h2 S, h5 r3 k, h% @9 t" g data.add(new Tuple3<>(1,2,5));3 r# h- ^+ i# i/ ~% ^2 r
L7 _9 y( F2 g2 a1 N! @
data.add(new Tuple3<>(1,2,9));1 ]9 N- t# ^6 y7 B- i6 u
+ w0 S3 { t* v: [; a, d) p- w
data.add(new Tuple3<>(1,2,11));( d t' Z- i2 _( j) C7 t
; r! Z& B3 }8 ^4 F: v data.add(new Tuple3<>(1,2,13));
$ y R3 X7 v5 M% `! Z& p9 u% n! r v2 j: t, c2 f. K
: d" a, b0 `9 R( m
/ N- N* N/ Y, m; ~, ]# _" i# P
* a" m/ J6 b) w5 y( v5 q! a- N, b9 C
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);- |+ x7 Y# S+ Z. n* a* j
9 c$ K4 ~1 T! U2 v5 b/ m; D/ m( ~0 x
4 ]# Q o9 v0 c8 y$ Q) a/ j) V7 X$ {+ \5 u h4 {
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {}; _, E0 i4 o9 i% ?
- d/ s7 q4 f# ~; [ OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};) Q2 b# g% x$ P: c* J
( d7 F" ^2 c1 K
: |' d$ N/ V/ N9 F% @0 s0 {8 x
) B" y0 H7 u z6 h/ D- |$ \. b0 v; }* X: X# N
( `- [. r# [- z8 N" y
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
" x/ i o6 p' A3 e$ j2 G5 y+ |4 @2 c9 l
@Override" `# P- S+ c* X" o/ K: n, I: |
* N s6 M$ O8 r+ J- k1 ~% H \ public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {/ u) R% }' x1 F* i5 g
3 B; `( V( l& P- U
5 A) W7 T! @1 r$ r
- K3 _0 \1 f# H if (value.f0 == 0) {
6 H& V3 v& f$ E3 ^0 ?' ^* z3 | q
, _( \# o2 \; _7 |; { ctx.output(zeroStream, value);0 N6 x9 d! e) h0 F' X) e0 u- \
* O7 S/ ^/ p5 J9 V. _& L; v
} else if (value.f0 == 1) {
# z* v; L3 O% q$ w# F2 O1 R- Z; w4 O2 A% g: T! T5 @6 ^" [
ctx.output(oneStream, value);$ {; A# T i* u- {% h( [0 W- B7 i
5 \+ _" C5 X1 `4 x
}
! m* W3 D4 O1 G+ r* C. D9 c1 q+ D
}3 E# G8 z. H4 Y* E3 ^
, v6 @+ M# U- L1 Q6 P
});
2 \# e" e' I `/ B s8 i. f1 X5 E1 V& e8 p$ B& P
& T& P0 I0 x* s0 m4 o+ A8 a# b0 b1 i0 n1 h! a) Q G% m# }
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
) j9 ^, K2 D% i1 a/ d# Z9 B/ p1 r' t3 a4 ~) b
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);: c/ \6 T7 ~1 t2 R
! z) P! @" {- D6 d2 F
( o# w7 }' h% A3 L3 ^# a1 s2 W7 S$ P3 i- x! I* X7 Z( ~6 W* x
zeroSideOutput.print();6 g/ W5 z2 `- ]9 u( h1 g
1 a& S4 d0 [. v H9 A# U e oneSideOutput.printToErr();* N1 x& v+ Q' {! Z
, ^2 g: l# i+ p- j" R
3 m& W' V6 m; S
9 J; A: F% F/ ?! }! H9 _ X
( k F- v; u" P! o. z% V; g
# p0 K* c& B; Z9 J- l7 q //打印结果 S" Q0 L2 @! B' o8 \
& K1 _# O' ]9 a6 |" ~* X9 X. P String jobName = "user defined streaming source";
" X% s* h1 s* T' P& k. ]& v4 B
+ H( \. ~1 }# M& M3 r K- P' ?" Q env.execute(jobName);
& x2 a3 J \/ m2 a: @3 k4 V: B) S4 _ c& H
}
6 P- H1 c$ ?$ I4 Y& ]' f</code></pre>+ p/ R7 Q" O T, s: s
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
$ B# O% v) p U/ B6 [& o<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
' k6 ?/ E3 J4 Z l9 r/ o<h3 id="总结">总结</h3>
' t* b, m% K0 D0 Z d<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
3 I3 b4 m9 V' C3 N, d# e<blockquote>& D5 J6 k' H/ M! |1 z( T( \
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p># o6 Z8 n) J- l& u5 Y
</blockquote>
9 Z/ G4 B4 W2 @- X5 `. U) a9 Z, b: ]' `
|
|