|
|
" a2 J4 g' @/ a7 m8 k/ j<h4 id="flink系列文章">Flink系列文章</h4>% r2 ], q4 ^/ C D
<ol>- A0 o1 U7 \& r7 ?& e( l2 h
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
" g- |8 E# H+ A m9 D<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>, |4 q4 z( u0 I1 E u6 B. C$ S
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
% y, ~' F; o& ~; P. [<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
; A# L4 Y) \0 c0 Q$ u2 f<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
# d L3 h3 ^6 G: @, D<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
7 L$ ~7 v7 b+ \' S9 } ]) r<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>* i P' v' {6 n) ~3 a
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
4 G/ d4 B1 B6 y) Z<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>) x9 h3 I7 n4 Y2 U6 y; p9 h1 o0 X
</ol>
: l- ^. {! _/ D! j<blockquote>0 e& O% U" E7 Z) p# l5 S
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>2 J, G+ k% U/ j/ H
</blockquote>
3 {' u+ ]/ a6 q<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>' W2 f( _7 W1 g, o9 s
<h3 id="分流场景">分流场景</h3>
# o5 l9 w. y( @<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>7 [8 h8 Q0 R! \( A! i# h
<h3 id="分流的方法">分流的方法</h3>7 F( e) {! c4 E( c, c( c& H* K7 H* q
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
0 v3 b( Q0 Y' N3 R7 p8 {/ E3 @3 }<h4 id="filter-分流">Filter 分流</h4>$ Y: ]2 m. Z8 ^4 G
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
# Q+ u6 f0 X' Q7 ~; M2 x( m<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
' V$ `, C. R; l1 D<p>来看下面的例子:</p>4 x! i: ~2 ~. F1 K* Y
<p>复制代码</p>+ G( H8 w% g0 e7 L4 S- O& _* S4 h
<pre><code class="language-java">public static void main(String[] args) throws Exception {: U) S" e" R4 S5 _& }
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();( A, L- C7 D+ S8 @0 {- X' N0 D
//获取数据源5 z3 I5 f% H6 c: e) x) M
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();& k! T: H1 m I+ P! l. v0 d
data.add(new Tuple3<>(0,1,0)); Q2 o1 U8 O' u% \: x
data.add(new Tuple3<>(0,1,1));
O- s' o& X5 ?9 O' O6 P data.add(new Tuple3<>(0,2,2));" B0 q) G! k# j. q* F4 m3 q
data.add(new Tuple3<>(0,1,3));
" L" ^' P1 [5 n/ n* n data.add(new Tuple3<>(1,2,5));
: p/ q' N; T, c# }& f9 ] data.add(new Tuple3<>(1,2,9));
) D$ o* b; X! d, z9 S- @ data.add(new Tuple3<>(1,2,11));
, v: R) ?# F% l \2 v7 Z( ` data.add(new Tuple3<>(1,2,13));
0 l* `' p7 z0 v2 E
N( r7 H: P) l* ~/ i DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);6 |* w7 b; \) Q
4 g. s8 c* U+ L* }6 N* A
0 ~4 q$ Y6 I4 f0 T8 H9 ]
5 N9 | [% b) v SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);' s; @* R7 o' W. h# p% W9 G' h9 h
7 A1 t$ G2 Y- S+ H6 d, h
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
! U: ^ p. _ {) i, g; r$ ^' g# P+ Z( B `1 g1 d: d# Y/ I
; p: i8 |& `' m6 Z' G0 [5 l; q6 Y! X( o+ r% Q+ j
zeroStream.print();
$ O: Y, M ~( c4 X' [9 g8 ^9 P& U6 J3 J- f5 D) e' d6 }
oneStream.printToErr();: p2 o' @7 R6 @/ X
8 g5 x$ U6 J% G; r* o: a
, P: g& V: _9 D/ B4 v. _
/ T, z3 w7 P: _# i- c
6 x( X2 Y9 k9 l1 y0 w3 b1 T+ H! C9 o, z9 K' Y( d, Q
//打印结果* o; M4 E% {' [! i# j
6 z4 L: N0 w- o w8 J
String jobName = "user defined streaming source";
; Z% \8 X! `& Y( Q4 M5 f/ ?% O& l$ C0 W) ~, G% `
env.execute(jobName);
3 s7 ~2 g# y0 V/ E' Y, ^! {% S; S. n/ {) c$ h% b0 D# n) V1 s7 g
}; U, e0 H% z4 k" ^/ L
</code></pre>
( W5 J( {* r- s<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
+ o1 p3 M1 Q, W- c<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>" ^$ L; M+ ~1 ]' y4 h0 A5 r
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
* D x2 q4 v8 s; j. q+ u6 O<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
& q8 g8 X7 F0 j9 G# c<h4 id="split-分流">Split 分流</h4>
. f9 m' e# i; s% G7 J" @& b1 q3 l) ^<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>( D* V1 d& [3 C7 P( E* h1 A; s/ b
<p>我们来看下面的例子:</p>6 f; u7 o8 a0 x: e
<p>复制代码</p>9 W6 {1 a% d7 B) i- v
<pre><code class="language-java">public static void main(String[] args) throws Exception {
' e6 h9 }7 G: M2 t3 A6 V8 J% I2 n% I9 y: P1 R
% W- m) P( t' D/ T, L- ]/ K3 n9 }
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/ a E1 d1 X& z9 S4 |( s0 k! K: b: F/ s3 s- u5 E/ H$ `# p
//获取数据源% g0 F# @7 H& |) ]! k' U
/ D; w7 S8 B+ K6 G/ ` List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
3 m3 y% J9 j, t% {3 D$ K2 G& e+ e* B$ {4 Y6 V3 ]
data.add(new Tuple3<>(0,1,0));1 _+ [# H% ?; E! h% P8 X7 R3 @& X
/ L8 B! J( g* J! v O' ? data.add(new Tuple3<>(0,1,1));
" W1 G# _0 E5 B% G$ {/ _
5 o6 i; a$ q# j- s7 o data.add(new Tuple3<>(0,2,2)); B+ K; l- E" \! z4 I+ B+ B
4 `* f& e5 e T" n data.add(new Tuple3<>(0,1,3));7 a0 F) {! D; @; W9 P) A
5 p2 W/ s- a" u0 N/ ?9 g V data.add(new Tuple3<>(1,2,5));
& @& H: X& D1 p( {7 v* R* ` n4 Y; r
data.add(new Tuple3<>(1,2,9));
' A' I/ t$ u2 f- L2 s, @3 w) ~4 C3 C8 D& _3 B; X
data.add(new Tuple3<>(1,2,11));
" j2 T4 c8 s( B$ v0 j7 j/ q; d- w! ~7 |1 e' i* B
data.add(new Tuple3<>(1,2,13));
* |. L* p6 G0 E! v [ C n6 r% w( J9 H! E- ?! P5 l
- M7 y. V; K1 d$ [8 T$ h) n! @& I" t! j" x* S
- T: i- }7 a# Q6 D
! n& w) T6 p! l3 x. i; t. D DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);. W' }9 A1 m6 Y" V# b
# d! S! ^0 B8 I* M" H w: N* j5 Q8 f: Q
& o1 u8 L' t' C8 }. Z; \9 p5 V, E" q3 k9 ^! s* k
7 E8 n0 f0 ?, X- W SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
- x v! t' W2 w
1 q, a+ U1 n( S* ~; e* V5 A @Override0 Z" }8 H0 m/ V
" c& g+ o0 B8 b public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
2 H- n5 U& H2 |6 t+ _* X% I a) a0 @
0 D/ [2 m+ ~- Y3 h! V List<String> tags = new ArrayList<>();7 d2 _7 W: [1 i% P5 o, L
+ ]# e" {, ]5 h( ? if (value.f0 == 0) {+ C4 r& c. n( O& C( e) a
0 J" A. K0 z( E# ~2 v tags.add("zeroStream");$ ^& {7 F, `& q1 y- _: a
/ t+ t# e* s I/ T5 A, f2 K2 x3 J
} else if (value.f0 == 1) {
( l2 c W0 b/ f( u0 \/ k; R2 m
7 p! V. X5 Z: _6 I7 x tags.add("oneStream");# x9 ]3 T3 O( H) Y. ^
2 c3 n9 K$ y3 V5 A" B
}( ]: S9 m$ S ^
* X: l, Y- _" `/ J O return tags;% J) G: p- y" H5 E+ h
; T4 ]* J+ [( U; O! t }0 R* z, y4 J; A- t ~
" W" g( u! a5 h0 C" D* X9 ~ D. X });
& {6 j7 ?. b6 i8 |" i
. }9 j: h7 H* I$ a# H7 a5 r" x
: V( G+ {& z+ O$ |- a9 @1 w6 O. J
7 t5 N; V% F* \3 L/ D# s" l splitStream.select("zeroStream").print();7 _+ ~3 ?$ E, f/ g' _4 \
$ l6 s5 ]% j6 L3 U# E
splitStream.select("oneStream").printToErr();+ c! q" K) |+ v# @$ {
1 ?7 Q1 G" k: `" G$ n9 B, n
) A+ B5 R- I6 @- I5 Y7 n" y
' h1 k% N, Z" B4 D; b //打印结果
! F" Z5 O" h$ L4 R }6 ~3 Z& I0 t4 x9 \+ B8 |7 J3 J0 X# j8 n
String jobName = "user defined streaming source";
: H' T2 u( B4 ]7 s6 h
0 R" j W3 s, W: F env.execute(jobName);/ j# h. Q( x# V& ^8 |
g: u1 ]( l$ s. E8 N$ b& S1 |% q p
}# I$ j6 x @0 Q
</code></pre>: u) {( k+ k, I7 F
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
6 Y' U& g( P5 i( d# I- G<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>/ ]1 K l3 f$ ^; d
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>! e, q& w4 R0 W: W! U. F
<p>复制代码</p>7 j; a8 |8 E" w" b" _2 t' T
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.1 ^7 E- e/ W& @0 v+ q( Q
</code></pre>0 ?! I2 X8 B' J9 x* x2 \4 w& U' s+ l
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>- m+ O+ A Q8 _8 _$ K0 O; m1 h
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
: |( X" T; Q0 G<h4 id="sideoutput-分流">SideOutPut 分流</h4>
6 c' f& W4 w2 w8 ? F8 [3 [7 l<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
5 D% @4 h& x6 l1 Q: E! B; m0 v<ul>
) o+ O, B, B7 K5 X' z<li>定义 OutputTag</li>
. A( a- K Z! ~9 E<li>调用特定函数进行数据拆分
8 K. P" R2 b5 u8 W<ul>
3 w( M9 z; U- @0 [0 L' T<li>ProcessFunction</li>
% `2 _5 T" A: Z+ D2 t/ ?# ~<li>KeyedProcessFunction</li>! t/ o4 {5 s. g# @3 C+ P
<li>CoProcessFunction</li>
! m4 E9 I5 c: M) P1 Y \0 m<li>KeyedCoProcessFunction</li>
4 |1 g+ d% b* r9 o" v) {9 D<li>ProcessWindowFunction</li>
" B2 o. S; k% i/ w3 @' q3 V/ J<li>ProcessAllWindowFunction</li>
; E+ \# ^3 S4 u n; P</ul>
. e9 F2 F. a: c, m</li>
. O' \5 V3 [0 k' O( J4 Y</ul>
9 H+ T7 n8 Z' W<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>- A8 m" f% C3 h* X* h
<p>复制代码</p>$ k% T$ y) V$ @7 X9 L ~
<pre><code class="language-java">public static void main(String[] args) throws Exception {
% R- W1 z, c/ a4 G9 g! e1 ?8 s8 z) K6 x. L- _- Q% K- i
) i" l3 I1 v8 j9 c% n7 {1 b. Z0 m
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();5 Y' b! R# E0 Q
2 g6 a. F: b+ O8 ~: F6 f; u; }
//获取数据源
" s9 n+ D* v4 t- {1 q5 S7 u* H4 n
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
* L5 x7 B! z t' t; y" ` [* T1 T9 L" P2 ^8 G6 A/ _6 Q, T9 z: e
data.add(new Tuple3<>(0,1,0));/ b, A1 y6 Y. O2 U6 L- }! t, a8 U
( }/ Z6 y+ q' K+ {# y# a7 @1 T
data.add(new Tuple3<>(0,1,1));
; W( `0 m$ ]3 I/ \/ Z- {: O+ B, n; N( i
data.add(new Tuple3<>(0,2,2));
& W+ N# A0 a& R" D: ]; W. I& C' _* E5 K& h/ L4 a8 b; G8 r- Z
data.add(new Tuple3<>(0,1,3));& t, K) @* v1 o* V
5 K; M% X+ C% Y4 P6 r* j6 t* [
data.add(new Tuple3<>(1,2,5));
# Y3 i [$ H4 \* i4 t: Q7 N2 v( y2 i9 j3 O, @4 t6 j4 A* R
data.add(new Tuple3<>(1,2,9));
1 K+ D+ ^0 m) S6 s0 B) l. h6 c! |1 E+ B# f# @3 G
data.add(new Tuple3<>(1,2,11));! U% Y* I/ Z0 g3 D
# x2 A3 `, e1 u& v& p
data.add(new Tuple3<>(1,2,13));- v0 C8 V5 \ ]; _2 d |; E
5 _/ v: I% I- f( w* Y% @2 H
$ L" b9 ~: ~& h n6 p2 g
2 m: Y7 L; Z( `$ _& `
7 h/ M: w7 U& q+ j& t* H* ^ Y$ ^
" \5 @3 o" I _9 ~ DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
* F; F0 \- E8 i1 X
) |2 k5 T0 [. J3 @! b5 b5 F' r" `/ V/ s; o2 G/ q D1 [# R, X
( X2 b% [0 A( F2 O- L* f7 p, C; K/ R OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
, p) L. |2 t- m7 l6 W4 H+ D) P
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};. r. `6 e$ F7 v/ N2 `
! m: P! W, s! {; B+ v# @
' j% I( x, D% r8 |/ h) v2 f
5 P; Q$ C, H. u7 L4 {3 E3 k
/ t( A7 F/ o5 \2 P$ x3 @% O( M. M
2 R! f5 d6 m) H+ x
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
& y* }- Q3 f" l. _/ Z6 b
. G/ M( \* B# K. ~6 j+ N @Override6 d4 [7 }6 G" b
- R2 q1 d" O% f/ g5 u ?: t
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {! [ d: {4 Z$ Z
- K2 t9 q9 ~- R" I" f6 P0 o( p( T* ?& ]" B* j; D/ b1 k
) o+ C8 a! I2 F3 u; q& W) L( e if (value.f0 == 0) {
- L" d% }8 x- m' Y" k/ G, ?- i. Z2 z& b
ctx.output(zeroStream, value);' c) f+ [2 {/ s* U1 s' A; `
* Q- G; d |3 r) t
} else if (value.f0 == 1) {
! ^' r: r" s3 S
# M5 k% k+ {. {7 l ctx.output(oneStream, value);
0 y4 v U6 b9 L: l( t
' D# C2 M. I/ m m/ J/ j6 ? }0 W2 ]8 M6 S2 R3 f
; [1 I% v) @" X4 s+ y: L
}: P$ g/ d' u, I2 w6 _, J1 P% H
z/ Y) O. G: o' w# A* C; q });2 h% N* o' Q7 }. Y* o. S
$ _; w: E' f) m1 a1 B0 i& n% P3 w: ?( ~
U$ q( Y. }. ^, n& }( J9 T* b DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
5 _7 _; |5 i: E4 K/ F
% k& B! q) _5 D8 D0 f DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
$ X$ w8 r: n- f% W k" ]* i4 y$ `- T9 ~
/ A# O& f) B( C
6 l/ ~0 u( D9 |6 i% z+ @ zeroSideOutput.print();
; _6 G# j/ a% u. R" K% k
/ q. N& `: N( S) E6 p V1 p5 _- V oneSideOutput.printToErr();0 t- p+ C6 r+ W' P- M: r+ m
3 V, Y0 J* f; g6 l
5 K4 V1 t( p9 g6 v2 r4 B0 ~2 n% i' g+ c- a$ o1 Y
0 B9 W8 Y( H1 j" m7 Q& c0 H# `; L+ {" k1 X: i
//打印结果
0 n1 ^) O1 ?6 C* [# E% p
: W ?5 j6 F. O M+ B! _: v: L String jobName = "user defined streaming source";
+ H. G5 c3 L9 j' Z$ M- w3 W- C1 ~% Q: u( D2 M
env.execute(jobName);3 ~; E' p* |* Y. q: Z
, l- O- Z" ?6 c: t- u7 Z% o}
) H0 W, w1 z' i# ]</code></pre>
: z) l# x; d2 D0 j; o<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
' l$ g1 P8 Q- d" E* O<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>+ }# l3 Q# N: V$ F$ l" f
<h3 id="总结">总结</h3>, ~5 J5 j( l8 R0 i/ @8 ` r6 }
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
+ l. z' w6 v! w. `<blockquote>
: B; [' Z, ?9 }4 ~7 Z<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
, _/ ^8 g5 F9 b) F9 b$ U</blockquote>
3 B) o3 v H8 c% _& c+ ^' O8 W! I$ B H, ]1 J1 z5 E7 _2 q6 f
|
|