|
|
; ]6 _! Y/ n0 ^
<h4 id="flink系列文章">Flink系列文章</h4>, ]2 h! i. @3 m& q- I: V* r
<ol>9 Q) x2 W# n5 L# |
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>5 F+ j" p+ ]3 [
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>" ^: `! z* z9 B( {! |
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
% b8 ]5 m' R3 w3 n: y$ d6 c+ @0 A* `<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
. ~$ n0 `" X. \+ F$ }<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
/ ~' Y+ n% w2 w- d' C) R, A<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
/ N: ^2 f: ?- k& g' a<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>* m; \/ d f4 F( q9 F% \" [% \, ^
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>7 T% I: ]4 Q; O
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
0 I6 W3 ^8 ?7 A' l. L$ }</ol>
- v. y# b9 B6 M( k; ?: C<blockquote>
R2 |9 M( a" T. F! Y/ k% w<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
! v; e9 Y& B9 Z" G& v9 \5 m0 g+ H</blockquote>
3 E7 b* J1 w; ]5 f0 J6 E8 I<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>* I* G* z( _8 w4 |0 B( R9 [9 i
<h3 id="分流场景">分流场景</h3>( X T; ?/ s. L# Q4 W. X4 y
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p># P" P& k- L% ~6 j6 _
<h3 id="分流的方法">分流的方法</h3>4 |" G1 t _$ m- @$ l
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>% ?* |2 H+ g1 |5 T
<h4 id="filter-分流">Filter 分流</h4>: Y( y& C# c* `" P O
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>, L* q6 Q! r w% L6 a2 T3 Q. y0 q
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
* V1 H: u$ _2 m+ h* S$ Y D+ q<p>来看下面的例子:</p>6 ]- H2 E) `* R$ U! w9 E( ^# n2 C
<p>复制代码</p>
+ q( ?8 E; O& E. m$ q m. H+ I<pre><code class="language-java">public static void main(String[] args) throws Exception {9 @ O8 L5 [3 r
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();- Q) S( w7 W7 v, j
//获取数据源
! X' N' m6 u9 } {( w% \$ j List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
' w3 z/ ]8 d Z5 p% w: C: F- { data.add(new Tuple3<>(0,1,0));/ ^+ V% ^6 U5 h P$ J
data.add(new Tuple3<>(0,1,1));3 G% F6 J; ]; h' T) }1 O5 \
data.add(new Tuple3<>(0,2,2));
5 N5 @# i8 X" f$ I) p7 Z; s data.add(new Tuple3<>(0,1,3));
+ z& ^( ]/ |( O7 Y& W data.add(new Tuple3<>(1,2,5));% Z2 K: U; U! c- D
data.add(new Tuple3<>(1,2,9));
! S, ^/ o* q3 M1 Q2 Y data.add(new Tuple3<>(1,2,11));/ V$ b* D& x3 V g" x3 F
data.add(new Tuple3<>(1,2,13));& T/ {; w) ^% m
, B! ^/ O, r6 Z- T, F* O3 Y DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);3 C$ `! m- \+ ~, R! q
# X+ F. q3 \5 A7 {- O
( A t! {/ V5 Y$ j( ^& o; {7 O$ c7 l
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);. Z4 |: v0 I1 z1 {: }
( u7 I1 Z1 n5 Z) o
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);6 {- E6 x5 p4 l6 ~+ J. A
& C( Y: G+ T+ h6 |4 C: Z9 p% a, D4 Z+ ~1 @' o
4 {8 ~: k7 I4 x zeroStream.print();5 q! c2 M" v. @, H3 n
# l; q1 E( E9 |3 g; T5 R& f* d oneStream.printToErr();9 F8 }; \" _5 r& g
5 [9 x: o9 U; S3 `* x' h" C
, o7 R" s9 m( S4 w, H+ }( u) G! F6 V
7 o) v! z2 S3 }5 I& ^% e$ T5 n' }+ r4 @( H4 K, }( D( V" j
1 j3 m: y; G; k! {3 I. X& i* ~( ?
//打印结果
+ o# ?- h2 u+ u2 E1 G ?2 S; i7 j$ b: V
String jobName = "user defined streaming source";: P' B$ Q, b* B; d
( M( _6 U: B/ i+ W# F env.execute(jobName);) N! U2 Y9 R9 C" m1 C
) `" T; h7 [" i R% i}- o+ e, }8 q- W& F6 I
</code></pre>
; G) o) C+ l& K4 j$ k) }<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>& r [ k3 Q6 Y9 e; X; N
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>$ U* Y6 J; @( L9 ]# w! r3 l
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
4 i5 M! N: Y5 I: k7 Z( r<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>0 P. n5 W: E+ k
<h4 id="split-分流">Split 分流</h4>9 B' g$ Y4 Q- Q4 I3 A- b
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
- y+ ^1 ~. @& j. g<p>我们来看下面的例子:</p>) V$ @1 k% Z, |
<p>复制代码</p>& Z1 v8 i5 ~0 G" }4 [5 K$ U3 j; |+ t
<pre><code class="language-java">public static void main(String[] args) throws Exception {
3 [. ]+ ~+ t) S6 t7 m6 F0 H! o, @+ s" E
, b, U6 e2 s4 Z, w4 u. U* n# R/ E8 G
+ t9 R" u. q0 w0 E StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
; ?( l% \* q, c/ H
" I3 E9 Z6 a n# y //获取数据源% G- @9 \& k! w4 e+ ]3 i
" g, j3 x, ?8 P, w6 \" U5 B List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();# X7 }5 d3 q9 v
* D; B1 f) d; g5 d& M8 N1 U- G& ?2 t
data.add(new Tuple3<>(0,1,0));3 Z6 z. E* R$ m2 b0 u7 L; ], M
7 }0 h9 |* G' L: v4 A data.add(new Tuple3<>(0,1,1));
" }9 B2 {" T2 a8 v- i0 }% g% _7 k* U# \- y% e
data.add(new Tuple3<>(0,2,2));
Y3 o9 ]" X# h4 E& u$ L0 z# |5 F- n1 s* z5 C4 o# Y( ^% T- i
data.add(new Tuple3<>(0,1,3));
- y7 O. b& s2 j8 R1 a: m+ S: U5 L4 O/ i/ `# C9 P2 g
data.add(new Tuple3<>(1,2,5));
7 W- B# a6 ]7 K' D% ?+ N5 [/ B/ _* w
data.add(new Tuple3<>(1,2,9));
+ X u; Z4 Y2 [+ ?1 d
! \+ u! @ D {& H7 @ data.add(new Tuple3<>(1,2,11));
% y7 K3 [6 {- Z0 A% w. {& C8 |5 W9 j# D$ w# O
data.add(new Tuple3<>(1,2,13));
' B$ e* f0 F" m
. U0 B2 w( [2 v& ?4 r: D
( v7 K% E' Y# k( G) c8 l% S Y; U. S0 q. {# ~
( }2 A& v2 L; z) V* w( o
0 D/ Q' Y2 a/ n( r. g7 u& p
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
0 o9 Z/ o: i+ \. I# ~1 y1 M2 d: h- I' ^5 k* U7 z8 M
* H) E5 J; ?3 `! h) U
2 _4 ~- w1 M, q6 \# Z$ S
' R! E' f( N7 Z, v, x# O, h9 x7 d8 M( g
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {" v: T, N* Q3 r+ _( N9 F
( d x/ o$ d0 q" ], f
@Override
6 r: Q* b& s' o2 x) ^3 w i; o/ S! X' ~& w M$ I0 C
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
7 ? t) d; B1 t( A! N
: W7 v. k. m U1 M& @- J List<String> tags = new ArrayList<>();
2 f7 o* G+ B; I3 N
* s) @# e ]# m2 W6 y' _ if (value.f0 == 0) {
2 l5 Y3 C$ c% Z* M! N7 u' Y ?( j J* [
tags.add("zeroStream");
7 B% s' M7 o8 z( G2 O. [9 n1 O
3 V3 {+ N& z* x" Y) z } else if (value.f0 == 1) {1 {0 z% o9 y% r9 |" _6 g
* a3 B# e4 T1 [, m tags.add("oneStream");
. T+ b. Q+ n. g& g: r
" H6 K, M1 S9 E7 o; C }
6 W/ q: U/ `! F; V/ y( @9 L
( I' T0 g$ @' U; |! a return tags;1 O8 i& }' S* L# b: z% P6 e
, A3 o+ f+ b3 A3 b
}
5 U: P1 |1 `9 z* Q! A# T( F* w/ L8 E4 s4 l
});
2 ?7 r3 `& i$ r9 q% s q4 e4 s. u; E$ T& h
2 D5 h; V) F' @% X. |6 s
: j; x8 h7 U" ]+ o# c3 o splitStream.select("zeroStream").print();
( {0 T: [$ U# A7 Z) q
! d5 ]! y. Q/ } splitStream.select("oneStream").printToErr();
6 W4 P& @& d! t* U1 ~" ]( d) ]$ C2 d5 i" S% g1 e- q+ }
; o$ H# c# `$ l$ `7 S" y- n7 ]" x, M# w5 z4 P5 ^8 R
//打印结果; w$ X& M$ ^' i+ [* R
6 |2 z- a. h2 v g; s
String jobName = "user defined streaming source";
% o+ I2 u3 z( t& H: j
4 ?7 @; b. H. l$ v: F6 S1 C env.execute(jobName);
3 }$ q- ]3 s0 k' n
& H# r/ @' t6 z$ v8 \5 O4 {}
7 g0 i) N- u, D; N</code></pre>
; }* ?$ ]; N# W. z, n" m9 |<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>, ~" l8 T7 O0 @" z
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>; E# y* Q3 h* h/ t6 l% T7 t
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>: {% `- N' _, f. K+ N8 u) a
<p>复制代码</p>
' j% Q) h {# s9 M<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
& A' ~0 ~ S4 N$ a6 d" I# H% `</code></pre>3 Q O# B1 F) H" m7 W+ d) O' g
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>+ k( _2 h0 P2 ~2 d
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>' c$ ~# I+ _1 F; V% ]4 M+ q
<h4 id="sideoutput-分流">SideOutPut 分流</h4>
( P; N5 A! o! `/ ^+ q4 E: N' M<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
6 V+ D! t# Z1 S$ n1 B7 X$ V<ul>8 ^3 s" O4 _$ G' v- J) E$ c% `6 Y
<li>定义 OutputTag</li>
" |1 ~4 x# e% \% M9 a9 t<li>调用特定函数进行数据拆分
9 t' W6 L) [; Q) a- [) m8 {/ [<ul>) C0 a4 T) q3 P% j( ~" e
<li>ProcessFunction</li>) k5 t6 `7 U7 g9 P! G- x" H% A$ m; k
<li>KeyedProcessFunction</li>: g3 {, z5 K& t$ `# I
<li>CoProcessFunction</li>- h7 W, X$ j- L7 Y8 R0 ]# y# j8 L
<li>KeyedCoProcessFunction</li>1 b2 o7 `9 B- h$ K5 C% u, [& N5 r
<li>ProcessWindowFunction</li>
2 ~) D0 v* ^5 b1 X<li>ProcessAllWindowFunction</li>* |( w# O4 M1 F- q/ |2 Z4 l! f: K
</ul>
# ?+ Z9 v1 L& }" j' ^! d</li>5 b5 x P- p# ]2 _7 {4 ]3 y% d
</ul>
5 q9 z+ |7 e: H% [+ }<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>. @# F) f1 J$ l7 ~& R
<p>复制代码</p>
# E: N ^0 l1 K8 q i# B8 N<pre><code class="language-java">public static void main(String[] args) throws Exception { I( S) X; v6 y o3 \& e* f& P4 [
* E. c; `$ M% P) X* ?5 } R
$ d( G' `( h) r }" y: ^' @6 Y o7 M0 p
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); }, s( f% _' X2 X$ B2 w
. g, O' d- s+ i" Z: E% f //获取数据源. z. h5 i) a& @$ k4 w
' a7 j" d& ~+ C9 B {' t List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
) G" L, q) [- u' q+ P- y, o( X! p S' `9 m/ @5 E
data.add(new Tuple3<>(0,1,0));2 e! A% @/ H1 S- t. Q* @" D
9 d% O3 y/ n+ `; z6 e4 ^: Z2 Q5 y data.add(new Tuple3<>(0,1,1));
, u4 }8 _% R7 A. U( e0 B4 e6 ]9 [" F8 H( W
data.add(new Tuple3<>(0,2,2)); o: s; \0 R: k" }/ g
' U- r( w+ ^* Q( Z data.add(new Tuple3<>(0,1,3));6 ~- Q4 S# }) L8 M
* f% h w' B, T" ]+ D data.add(new Tuple3<>(1,2,5));
7 A3 o0 x* D/ Z- p. \
L1 A; W& O- E; @; m data.add(new Tuple3<>(1,2,9));/ R3 ^6 s1 x* b# R S
) w$ b, F$ ]8 h
data.add(new Tuple3<>(1,2,11));, z2 w' v9 `8 x) `9 a
" {* E1 @. u$ @3 ]/ x0 ^ data.add(new Tuple3<>(1,2,13));
+ h, B9 X/ K9 K# e( N" U* g) y
1 ?$ ^+ i+ v' }
$ \) ]- v$ @" s0 k/ d/ ]1 T( [/ ~
! w0 b: Z0 C3 J( |8 N2 b$ o: G
+ L0 [/ i4 W# s6 t4 G& K; |1 J8 p8 j3 h: L. x0 Q
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
) i3 T" {& k) c+ A& G" A
6 ?4 i) Q$ Z& |1 [
' T+ I8 `' h+ t4 K% {) q9 A, w2 q1 O
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};0 S( t2 E7 n3 Y( d e7 @
' K" w4 e; f% y. l8 A2 K OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
; i( z5 q/ b, l- G, Z( F. G
- d3 ~' M V! r' f
x4 ?* N8 L' R o0 ~9 R( p" y* p0 L( b
* }7 ~) U+ B2 ]( [+ k) G6 Z% B1 E$ R E$ x' W& T
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {7 ]9 | ?1 t. h1 e; M6 {6 h4 q, [
3 t) W' z4 d( Q; X: U K+ V @Override
9 G' A7 K# t- ]: ` C
U3 j: H5 ~3 T2 l' \' S public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
0 `; X. K" T4 I# s# R' J. L- m6 V/ |- u0 [, r. e, E; ^
, C @& d% Y5 Q4 i# q/ l) b
! e% G) q! @5 N. x# @9 Y0 Y o if (value.f0 == 0) {3 d ]* U0 W- Y3 k# Q ]- {
' d6 c }3 o6 t
ctx.output(zeroStream, value);
" G0 S; d' r* [
1 h: W. p' r# G$ k6 z( `: N( ]* P } else if (value.f0 == 1) {
$ c: a6 z3 o" s; x9 D# c4 l& M/ `- n7 y; U8 K9 d, z/ X+ e
ctx.output(oneStream, value);
8 O, _5 J- T- k* E: {8 k$ T3 k
: M/ r5 E0 E- ]! l: i- ^ }$ b$ k- I0 c3 \
( q; M, F! w* w }$ D8 O2 [ z* q* L6 v! B. F7 e
4 u: H0 U6 \; Y9 Q9 s& p* F/ Z });
! N! K% v2 N! ]" ]1 q
. y3 B" K; x7 b) \6 d
) L: n! }# d7 a4 N$ ^6 y& U/ m; @4 M& T/ z- ?$ V7 a
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);* @/ ~- R6 o- ?" G$ \( U; o
) ~6 ?8 `7 e- z$ t$ }: I/ Z$ i DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
4 h9 k' p+ r6 D. d; i3 N
2 o/ g7 r3 |: r( r0 T3 A& G' M7 m0 t$ f0 }- E( \0 u% ~: A
' T8 l7 s. a4 C2 B
zeroSideOutput.print();* z! k2 u* W; Y# y- I' g- R
2 y/ F# c# S/ z- I/ z+ e- W
oneSideOutput.printToErr();7 ~! }% l$ I/ t
, u- `( U( |& W/ Q. A* u" J/ B. N% d- p, o
5 B% d5 P+ p- T
C8 O$ B: p, g+ b9 A$ l
8 v: `4 S- I/ @, x //打印结果
1 m H( M- x# [7 Q1 Y: D1 W1 E0 Y4 o6 ~( w
String jobName = "user defined streaming source";
9 N0 X. w% o6 i6 s; R/ \7 L5 L3 \: y M* g( p
env.execute(jobName);
% v4 O" \; c5 U3 c* q0 d% l" w% t2 A% I8 N$ v
} b* ?7 c; K, r* I# |1 ~4 u: H3 C
</code></pre>
& |. ~% F$ z( \. K9 s<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>7 s- a* X" ]5 u. W
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>) w0 E1 Y/ Z+ Y
<h3 id="总结">总结</h3>
6 o: b+ }/ g, w T( g/ P<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
; \9 S+ a/ h0 M% Z<blockquote> H* {. s: E- A% L( c2 |$ G w* B
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>, `5 l" O* q$ X" c. H* d& b
</blockquote>" Q, }# w6 R3 S$ P0 }% `- M& Z6 l: a
$ t6 ]1 e. p5 ]& @( Z: V3 \9 j. G |
|