|
& O1 U/ z4 p, l0 E, b5 a' E9 S
<h4 id="flink系列文章">Flink系列文章</h4>5 P% ~1 T: Z$ Z: X7 `) I, d& |
<ol>
2 s7 l! d+ |8 r# v0 z<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>- B; B) M4 c. [- q7 h w
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>4 T+ o% a, _% t( Z$ u
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
' U( v2 Y% k) k P6 e( B<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>( d$ e7 i1 T4 K
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>. |" B0 |! E( I( e9 n/ R* W& F
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>' }% }9 i0 L2 o, u
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li> W3 k8 E- E5 J! j# T
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>* J, j5 j% M$ }! G$ G
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
+ X& R$ W9 t+ [- }8 ^; \5 o</ol>' c3 @. A4 B, S+ F
<blockquote>
7 i- m* V. W3 u5 j# A V" K+ ^<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
( ~* Q* D5 o4 j3 \2 t</blockquote>
: b; h ?( Z- q9 G<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
8 a2 s5 N9 s' v$ R3 L4 C- t3 I, i<h3 id="分流场景">分流场景</h3>
& P( t4 Z7 I/ k- R" `<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
9 }3 ]) C( i0 S7 D& g. C<h3 id="分流的方法">分流的方法</h3>+ l9 C6 f9 D2 H$ }
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>- l) U5 J3 c; A* _& t7 j+ S: O# M
<h4 id="filter-分流">Filter 分流</h4>
2 ^9 B: A. L$ X! v<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>" \; B7 d; f( T3 C7 s
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>, ^4 X9 O: L( v1 _# i
<p>来看下面的例子:</p>& A# E% Z" q# w; A' D% v! L
<p>复制代码</p>) w3 q: t5 Y" J- x' h; {3 i+ X( I4 z
<pre><code class="language-java">public static void main(String[] args) throws Exception {. V& E, C- j. M$ h4 B5 r! f
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();) ` D% A4 M3 Q9 W, R
//获取数据源0 V& q' N+ K, f1 _4 g& x2 S
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();1 A* L8 d1 V- Q: q! [
data.add(new Tuple3<>(0,1,0));
# |( z% D2 k- p* f( k6 A6 P data.add(new Tuple3<>(0,1,1));/ I9 |0 i4 V& v& F' p/ X
data.add(new Tuple3<>(0,2,2));7 g7 ~1 M; A6 `3 J8 U+ V6 T
data.add(new Tuple3<>(0,1,3));
( d2 f: M* Y! i+ f5 w data.add(new Tuple3<>(1,2,5));1 E6 n+ j P" {1 I8 V! h) O/ X/ T
data.add(new Tuple3<>(1,2,9));8 f; n7 [( i" o; x8 b% ]3 Q) K
data.add(new Tuple3<>(1,2,11));
) B6 H8 X. `5 |$ ~2 N- `# @' o6 v data.add(new Tuple3<>(1,2,13));
- x/ ?( a1 q R6 t& u, ^2 T2 E$ h- [. T1 d6 l) a& @2 J
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);$ v8 J: d& e3 K
( d7 K$ C* H& `# b, W5 O( c& G- t! H# J# N
! h% G( E; Z0 p5 t0 V SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);+ m. W, T$ x5 H- h1 W, e( ~, Z
3 u# ]$ u% y$ s2 H# Y SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);7 G* X/ R6 c+ V5 k
5 A* G. E8 O8 u9 X+ H
* A; y2 H$ m. y( g( q, f. b1 {# O) p3 B7 ^ {0 K3 C& }
zeroStream.print();! |- E4 w7 P/ L9 w3 f# q0 K6 n
, I. w. g3 Q. S! K oneStream.printToErr();
4 L% A) O2 d5 x! p( S& y( \; v4 s o/ _( w, n* K: ~# Z
& R7 w% g2 o0 P6 b) b5 b# p+ ~4 L9 _2 Y: O j z# x6 D8 w8 ]
7 _. |2 U! a" D! ?3 A! h, r
2 o2 ?' V( ?8 l6 e: f3 ?, ^ //打印结果
/ u3 A& @( ]; h' ~# x, h/ ^4 v ?4 ^4 ^8 g
String jobName = "user defined streaming source";) H& V8 Y9 R: S/ A# v' R: x" y% o
: J" U+ U4 v: P' l1 w/ d" V0 u
env.execute(jobName);
. ]/ s' ?* d; Y6 }; E M# C& |0 s9 B5 G" @4 F' _, z* U
}! Y3 a5 h/ n; w$ e. j! N
</code></pre>2 h1 Q7 s9 \+ m- I* R
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
+ e7 y: }6 ~- I<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>: y2 ? v" v! I X7 T' o! u
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>" g6 r* l8 W2 N+ | g6 l0 u
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
0 r5 L5 d5 r1 j; i9 q0 X<h4 id="split-分流">Split 分流</h4>
]$ l2 u- j; A+ ^1 o<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>! C4 b+ [: R4 j
<p>我们来看下面的例子:</p>' c+ a K+ @8 W+ O8 L. E5 l. |
<p>复制代码</p>
, l3 K' x5 j% [( Y) J# q s9 R<pre><code class="language-java">public static void main(String[] args) throws Exception {: `9 o$ |. Y. U6 }) o, r" \
g8 W' y( g. Q& t: k$ B, `
! s3 u; }; I- `( v; A2 {5 N
% u N, N! t5 ~' y" j9 o StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
% M9 S: M! Q$ Q4 z% d5 j7 y. u- `
//获取数据源, `. C0 y( K; Z6 D7 [* o/ k% h
' `7 z! f) t7 y2 j& A
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
( `0 J; h2 `; h# k
. }+ O4 L* P1 l, d2 \ data.add(new Tuple3<>(0,1,0));
/ H2 T7 |/ x- ]$ ]: e0 Z% z) K ?2 T$ J9 _+ y$ O6 I1 W
data.add(new Tuple3<>(0,1,1));0 l6 ~8 t5 H( E6 D: N4 L
2 @& x" h4 g( S# F; V: W8 {
data.add(new Tuple3<>(0,2,2));
* `9 j# f9 J% o% B5 A/ `! u. r! N- T0 }
data.add(new Tuple3<>(0,1,3));0 `& L0 X1 ^" k
+ i3 _) F7 j) \: | data.add(new Tuple3<>(1,2,5));
7 N4 D! D# E7 ]
& l3 E" u0 T a: C5 j4 Q% ] data.add(new Tuple3<>(1,2,9));; Z# c* V' N* Z& M/ f6 E
$ E7 q) p6 r* a3 Z1 d) | data.add(new Tuple3<>(1,2,11));: P# q: k2 L% |1 A9 m
- y) \& K; f/ d8 I2 e
data.add(new Tuple3<>(1,2,13));8 e: W0 K" V+ F
; h9 B' D! v- o% L2 u1 y" Z. x' h% y! x1 C
& H! @, R9 Q+ o) u6 ~! L
* v6 I$ G f# \7 O: @$ {: X+ D5 {5 z' S& O/ `" o2 W6 @" W
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);8 n+ K y. [: m8 W) C/ G! g- \
/ C- r7 a4 [- m9 K
8 _; y8 Q1 ~7 d! Y, t
" v9 J" D7 U# k7 x5 t2 p8 I3 J" n8 p0 }: D/ E6 H% w! d
! ~: {% u6 x6 K2 m& v& x SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
" V. {( H3 b1 Q% T |( w: C9 Z- [& n5 C4 _( K/ f
@Override
" Z1 X- ~% b# K) I$ p- W0 F* R1 j0 w0 u2 Y
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
2 P! _ \% H- j H' f6 I: @! C; A- p8 a
List<String> tags = new ArrayList<>();
( c- k1 f+ U' F% i7 g5 [: j* W$ k) b, ~* y- d) d% O, L& O3 c
if (value.f0 == 0) {
1 @. a) X! K+ y. ^/ f5 \/ |2 S# n& |/ V' O- l6 W
tags.add("zeroStream");2 Q# V c" ], q7 B7 j
' ?, b* U7 C1 m% k- ?/ J1 R } else if (value.f0 == 1) {4 ~: i a5 q; z8 J4 V4 Z
% i# [ E5 [' p+ N; ?
tags.add("oneStream");0 d1 W% Z8 t% A4 N1 V5 ~ Q q
) U( Z6 J* E! ^& }% E3 v! T }
6 E3 X' c$ H3 u7 u) R# e$ s! v' L: Y% }! B4 f
return tags;2 w/ \. k, e1 O; L" R5 W7 d
$ r0 m$ h5 [, Y0 {+ v- {7 a0 |1 Z" W }
9 d! h7 @/ X* z" ], \7 u" z+ u4 l! e% ?2 ]% F
});
2 j# N9 j( L" \. n( X( x
) T- `% p8 \0 \- t* \9 R
& ^! t$ {8 z% j3 z8 U6 a/ G& i$ Y9 d, o7 p
splitStream.select("zeroStream").print();! [4 b: ~" n6 d" W1 j, u! i, y3 E: v
! N! \4 J6 W$ {+ s1 I
splitStream.select("oneStream").printToErr(); ~! U0 R5 v1 p8 z' ?
$ f2 R( t z: J" R
& m E0 v \. X
8 Y. `! X8 T( C' ] //打印结果& J' K# t$ J9 q; z
6 P9 I% d" ?7 ]& U0 y# ]
String jobName = "user defined streaming source";
4 ^ T8 Z6 }1 q) w: S& n* n8 o+ d, L( R4 L* S; ~5 }3 D% M5 U
env.execute(jobName);
; \% n# [$ C1 }, B' e
( }. |! v* s$ D}& z0 ^: m9 B" R
</code></pre>
9 W! _: H% n( C( h<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
: m: L9 i9 g9 F; O5 m: U. p<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
2 q) J" S" K& Z& Z# T" Y<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
0 O0 A6 J; Y/ d" J; I4 y<p>复制代码</p>3 r. Q. T5 F6 Y& n; M6 n) d; K
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
# j! W& D1 C, {% Y1 [. G/ Y1 w</code></pre>
C7 H# h3 g4 t<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
- o- n. B/ s7 {6 k k% K4 J; z( w<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
^6 j4 j) \' T% h5 P<h4 id="sideoutput-分流">SideOutPut 分流</h4>
* G, \/ K" M7 g8 U$ u) U<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
) k8 x& x6 d( u3 c# u: `<ul>
b" L8 v3 W* B8 Q1 Y<li>定义 OutputTag</li>
# P0 H/ Y3 k5 f% w* M$ {4 V% R* }+ }<li>调用特定函数进行数据拆分
9 f9 U( F" s' h j<ul>& S5 r" @8 o; z, i: V% m3 G
<li>ProcessFunction</li>
$ o% q. I8 {% y<li>KeyedProcessFunction</li>; \( F" n7 v9 S2 a3 d
<li>CoProcessFunction</li>/ G: }: v' i2 L0 i
<li>KeyedCoProcessFunction</li>/ y: ` E* w! p' v
<li>ProcessWindowFunction</li>
9 g4 L- ?9 E: z4 i8 b" B<li>ProcessAllWindowFunction</li>
8 N- z2 \0 w& i' D</ul>
, z' C; f" K# q3 E</li>
) P2 H H; `5 W, M4 T% a3 Z</ul>1 ^: }9 M% \* Z! W+ }! j
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
3 n3 F2 t0 L8 w$ W<p>复制代码</p>9 [+ I: V9 `4 s; _* S
<pre><code class="language-java">public static void main(String[] args) throws Exception { a0 I- }) J" s% q! A
$ I! z+ r, [) h% y J. n
! {, ]7 R7 [, c7 M9 C7 Z0 s
; a3 E+ V8 w' \2 \* M1 U StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
" D+ A2 ^" q6 }9 Y% w2 V i1 a; g3 t0 }2 X0 I" |& \" a9 h' w
//获取数据源5 v" E0 ^1 b4 n$ L7 \6 V
5 y8 U- }4 q, |3 J List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
8 ]% I% k1 f! I1 _/ `, G) f; z! U2 o0 K D, { d( N& h. E
data.add(new Tuple3<>(0,1,0));- T' q3 U2 A/ d B
8 Q" m4 f; R' G& H3 p data.add(new Tuple3<>(0,1,1));7 X8 E5 N) f& ~5 G' n- d0 O8 H% M
4 L- V5 [+ |% b7 C4 A5 f, `2 } data.add(new Tuple3<>(0,2,2));
1 ], @+ A* W+ m" H
" r7 _7 M9 F$ w# n0 C data.add(new Tuple3<>(0,1,3));
/ h* S' i9 U/ X+ [& t
5 J4 B$ o9 C* ] data.add(new Tuple3<>(1,2,5));
2 R+ T* N. L$ q( I2 x% h) E
1 u! Z$ X3 F% R* ]# W0 m; i data.add(new Tuple3<>(1,2,9));
' t! E/ _- F& X$ X5 ?! N' O2 ~3 E* Y6 ~: T
data.add(new Tuple3<>(1,2,11));
$ f7 r4 u, ^2 y" w4 G2 d' ^
, T) R* L2 O* z- `- Q. D3 D7 g+ K data.add(new Tuple3<>(1,2,13));# G0 ~( f$ I: U6 q, i
3 B( d! Y4 ?9 x% U: y
6 I! {, B$ w7 p7 n* z! ?* a/ W6 S* ?7 q2 @5 R4 u/ o
6 h" L" h8 ^; X, J( g
: E# ]/ f E, l7 B, h- K DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
9 O3 P4 Q+ a' g/ [3 s, F4 y" T( I
/ {& e+ j: r# H5 I5 Y9 w/ D
/ e& [) y1 i) ^1 ^& [. _; T) U$ y' g6 b$ i/ U
OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
, Q9 P# ]2 T% H) T0 {
9 D9 ~7 L9 j Z& I' c OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};" C' }6 Z8 Q% J" a- u
# h. u" ^. G* P. o% T; J
- `' O; m/ O. `; O
" J( `- [, {$ T5 n4 M- \6 H# @/ w O' Z$ J/ g7 n
& K- u2 i6 g& {* k- H2 t SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {" V: @* W% o* o
2 a; M& Q$ L/ L0 L6 P7 G @Override
7 Z! E1 c& w0 [2 I7 C/ \! q O$ K* X( }
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {" q- l, N6 ?) J# l$ m( Z5 N( j. ~
) P+ `& _' U4 g3 E- s: G9 x$ l7 |/ S
2 I. z7 n) c$ K. p) Q
7 N1 F0 ?5 n& T7 w if (value.f0 == 0) {
6 d5 c) ^$ u& U! i! w" |5 c5 X6 i9 |- `& U
ctx.output(zeroStream, value);- |: P# B+ ^& h. }8 L6 y, T
) ^7 k% W# [ x! A
} else if (value.f0 == 1) {
& A2 @" Y6 {, E5 V& F* f* p) f* F% @# I# f4 f. a
ctx.output(oneStream, value);5 ^4 J: u3 U2 S. |
, a V% x: M& C! s/ e
}+ q2 ]2 G+ y; r+ _$ C4 k# p" m
# ^' `4 E E$ a. `% G( N
}
" Q! X! m4 o: X s p6 F5 y; F0 ]! l
});
% {1 _/ b3 I- l1 b6 I" U/ E2 S- w4 t- t% q8 k6 N1 g
/ A' K* [; Q& Z' I( l4 l2 z0 J8 q
3 P/ }/ ~. a' }, m8 X2 i& g
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);: f* V, q4 F) a2 R! t& n
# Z4 c0 G1 H$ a: H3 U' g4 b! i O7 ^ DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);/ ]8 q7 A& W7 Z5 c ]' @6 R, S5 C
1 s; y/ L) \; m# D# ?
) T* q- r' g" ^5 O T2 N/ V9 o' s. p7 G" D* I; x
zeroSideOutput.print();' ` \% r+ ^9 g* h. |! @' |
8 g& O5 I. d3 P7 [" o
oneSideOutput.printToErr();
, i! p5 E2 ]- k- X8 ~8 t: E0 }' B* {) s+ ?' B. k9 O4 Z' @
( U0 D" }; B5 P) A, }: q2 n0 _2 @
. u& T" @* Y* T% Y+ Y
: [" _# A6 T2 F7 a$ {
$ r0 X+ v9 n& h
//打印结果
8 A" r5 p. S0 `2 ]$ b. C
* t0 |( x" r y2 y9 Q String jobName = "user defined streaming source";. x* E/ J9 x: k
' P5 _4 \" b) x env.execute(jobName);. s% s% R# {! p4 s
3 h7 m, O( m1 }
}
0 y4 r$ J: ]: R2 }</code></pre>4 z- S2 Q! ^4 i
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
7 I) I5 z, |% X$ C5 Z<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>1 q" O. f: Y( M0 L" w
<h3 id="总结">总结</h3>
: d5 @) ^/ u$ h* K; Y W+ U<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
; v8 S% J' J. g9 `# `5 W# [<blockquote>4 l; g8 k) n; c- i+ l& d" `
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>" `6 V/ a2 L- R- S( o
</blockquote>& Q* u( @" g! b- W1 K
3 q5 {, R9 |4 Q3 F
|
|