|
( d7 ^7 @1 ]& ]% H; W7 \<h4 id="flink系列文章">Flink系列文章</h4>
% J0 O* v$ D6 R4 L B- L+ L<ol>& ]1 V9 r3 _' }
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
/ q; o; O Y! f8 X4 {, }. p7 i# K1 g& R<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
0 b8 J% ?+ a/ l<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
7 Y$ R- j* A6 c<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>5 ~$ |# N# n5 J9 m
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
! {+ o& ?# B; J" d5 X) G<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
2 b( m2 b; M! t6 n<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
$ B6 J% {3 f4 z7 n @2 O5 Q<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
/ } i) l) K3 ~4 T<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>4 D5 q4 D7 Q, i7 X# C
</ol>
& P* j$ w% P- M, P0 K<blockquote>
7 A; i% {+ R" K9 K! z: s* U<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>5 q/ @& Y0 G6 d* `! B
</blockquote>( Y9 v! e, |# O7 t" N4 ^" B9 {- q
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
7 z: J0 w( w- l# N' h<h3 id="分流场景">分流场景</h3>
& B5 X( m: p% M5 W8 A) v% c5 o0 s<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p># ~( y6 q1 |6 D- I
<h3 id="分流的方法">分流的方法</h3>
7 Y, t5 r4 z8 |8 e6 s, `<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
3 K: P, e, ]+ ?9 ]<h4 id="filter-分流">Filter 分流</h4>
+ |- L' C3 |' {+ R- C<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>0 G$ t9 |8 k; y+ Z \8 R4 \, I& b1 i
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
1 @* i' M+ j# X P<p>来看下面的例子:</p>+ w1 @# T( Z: k/ i& _+ Z0 K3 a
<p>复制代码</p>
. h+ ^1 j5 f* i0 x+ W g<pre><code class="language-java">public static void main(String[] args) throws Exception {
! v- O( [4 J3 v; k6 e StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
7 x. c9 r% b O$ y/ X& M, b" u //获取数据源
7 G4 c9 r7 m2 c List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();; V$ t+ B0 [8 P6 h6 b
data.add(new Tuple3<>(0,1,0));. R+ q: w& |& w1 [4 M
data.add(new Tuple3<>(0,1,1));! i5 j l! Q* C/ h d
data.add(new Tuple3<>(0,2,2));
8 _1 j, |, ^3 @5 A1 }& Q, i7 L6 k data.add(new Tuple3<>(0,1,3));
$ L: k" P" V! F9 `' |% l2 s' e3 k data.add(new Tuple3<>(1,2,5));
4 R M9 [9 O$ K7 e1 I; @ data.add(new Tuple3<>(1,2,9));0 z* e' q9 q$ g# s
data.add(new Tuple3<>(1,2,11));
% o$ V; o6 u6 g' g data.add(new Tuple3<>(1,2,13)); A5 ^7 u: Q- T7 Z+ T
% {. b* F, ]" O b8 n0 ^+ j1 M$ [- y
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
& q* [/ q0 }) F8 i& \0 o, \4 Y5 n9 O% [1 y& C J: {3 M/ ~
5 E: U/ ]( Y9 r* @9 B- F9 t1 u
( G$ ]1 W2 U, N0 k( N0 R
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
+ M; E- K5 n* s/ c6 {* X3 h) u- _5 O" k2 N
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
# v3 v6 D# j7 Y, a5 P. J3 H- y4 r8 z) Z* o
. W: N: y" a+ U+ N0 ~$ Z" P
: p; o% P* ?2 _
zeroStream.print();
" y8 p1 t) d( R! j: k) F& F# E( i% i
oneStream.printToErr();
1 W! B% U5 E- Q3 z( F
! {' b, _7 b: g) U" i4 O6 D" e
6 a I, _# G8 M9 \+ w+ u8 Y' Y+ w' t9 x* H% @/ x4 N
[; u! D( f% e: S* S: I5 l. j
7 A1 l2 W9 w4 P5 N
//打印结果
" Z6 Z- O. F, s) ?& q- s( j# S* b0 | P, t/ H0 J" t4 O7 ?
String jobName = "user defined streaming source";* S- X) X9 }% b# G; J
9 x1 n. x) K$ a5 Y! C1 @3 m
env.execute(jobName);
* Y6 x1 b* r; y. b, t: E
b& T( h) {% v( |+ b}
0 g( B. ~2 m$ _5 C6 r; J- L/ O</code></pre>
4 O, n% p4 F$ d1 e; n5 W<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
4 l: H+ w" a& E* A4 O! z<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>5 ?/ P3 \: R) S
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>7 w* @! X2 x. C- y6 ]5 {
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
# y$ q2 K+ [/ a<h4 id="split-分流">Split 分流</h4>
5 S$ L+ S. ^) S: N<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>" d+ w1 a7 ~6 W2 T9 e. r# }; h
<p>我们来看下面的例子:</p># E5 x8 n" w! B4 G0 p6 F) {
<p>复制代码</p> D+ m* p, K1 B9 S
<pre><code class="language-java">public static void main(String[] args) throws Exception {& V4 a7 F6 p$ e( p
) |" I7 v5 v0 i4 {
7 g$ q8 T2 @+ G. g) @8 K
* C( \6 }6 k/ _$ J( T) B StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();0 ^: l* t$ R( a$ x
: L) m2 J! X# m- h& T4 Q( D
//获取数据源
. N) ~: Z8 ?4 t$ @0 f Y0 F. J4 s( w. P! D% x5 _- w
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();4 O( s# E7 V! ^
4 K- R; X/ V, c) h
data.add(new Tuple3<>(0,1,0));
# v& K# @% y8 [4 w
. s1 g# `+ d/ X data.add(new Tuple3<>(0,1,1));
0 T' y: |+ j( [+ r' Z5 r
7 k/ b4 f' d/ B2 K; r8 w( F. N data.add(new Tuple3<>(0,2,2));, t6 G& | K% M/ C( J' n: A- e; e+ \6 G
+ k) N- N- e4 z1 V9 e F$ o" T
data.add(new Tuple3<>(0,1,3));, a; |8 \& B) T# h3 M; x
; z* d9 V7 r% b# O2 W
data.add(new Tuple3<>(1,2,5));
; a9 y* f/ Y5 X- r9 ], k1 {5 `7 Q/ u3 Z$ k; t
data.add(new Tuple3<>(1,2,9));9 a: h, [7 b7 p7 G9 x
, N3 r) ?$ G; c! ^; R data.add(new Tuple3<>(1,2,11));
* e2 |# t" ~( @3 G6 K+ q5 A2 D* m+ d; \8 x' V) B, m8 V
data.add(new Tuple3<>(1,2,13));% V6 Y( F* @- g3 v
8 I0 b g' d1 _ K( }/ t7 n
/ \3 K) v$ x: |' X
" A8 L* t: Z! v& H4 n" @+ j
) G6 K' c" Y5 X' N" u
3 v; J8 G, s8 c4 L DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data); T6 D( \9 ?# X2 n, i8 |
: e2 ^ Z4 U6 X# ]4 C/ E3 ]) r
$ G3 y5 c$ |, j( k8 u r" b
" P, G" s3 ~( n5 W$ F1 M$ Y1 t3 D; U# y# A9 E/ }8 @
1 J. k( w3 ]5 |+ ?" q, {
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
3 D" K& \' }$ u8 v" Z
, u1 d* R6 f* F, w4 F3 g @Override9 {! T) G0 x/ {5 Z6 {; T8 b' [
1 Q; w* i3 D/ J& C/ m1 I public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
! h5 l9 v: A5 g4 W4 z2 ?
! i' j+ P3 }/ g0 D List<String> tags = new ArrayList<>();
5 v+ \2 _" c: T% v. [! m0 A5 u$ H
if (value.f0 == 0) {
! o }6 ]( L" t% g; r n: h% k) r
6 L x3 t3 ~* ]" b tags.add("zeroStream");1 T: [6 C- S4 I; O
* P2 ~7 T! B% \, P. ^, z% ]* U } else if (value.f0 == 1) {4 Z: ]5 r/ Q9 g
4 {/ Y1 v. w4 n3 f: U tags.add("oneStream");
: s7 T0 K, O4 j" z6 F- d4 | [! @5 O' _; R/ ]/ T4 m
}. d" J9 L7 C u) O, B
" |- V" v5 D. K' ?( t7 Z4 n return tags;
; G! L+ c9 x$ c6 @0 l8 b
0 l7 k/ _6 d( F. E/ ?* F5 e }
' W5 X9 h6 `7 e3 B- O7 M- Y5 o' m3 g) p0 {1 m
});
! ^- z$ h4 ?* }- C$ l" D) o( J9 a6 e' P. _; ^# U
. Z1 ]/ [ u6 B& j
% q8 J4 { p! e splitStream.select("zeroStream").print();
0 n- }2 ~5 H$ j) V' s2 A2 t
7 t. C/ z# M. c# d+ L% `6 Y splitStream.select("oneStream").printToErr();
6 p; W- {8 f m- ^
7 d, W7 t6 V: k% n& t0 p) d ~/ b- h- V. K8 n* J/ ^
/ n0 I" O- ?1 J- p
//打印结果; v' W" j: Z. Y2 U7 x5 N! }% X; S0 I
" Y5 |! D/ F* H: W: }: d String jobName = "user defined streaming source";3 v4 r+ O7 \+ f0 k) v' V
' B* }% E, i$ O0 c env.execute(jobName);
, j( m$ c, Y4 S; e
: I, ~% d( u0 [}3 B5 N3 P- z' ~( E
</code></pre>. v4 V' h5 |' u% x6 n
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
. p, Z1 L' C4 }+ n# k7 M- B<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>+ @* W3 U1 c0 l$ G% {$ }
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>2 c9 W$ G; G9 a$ Z( i1 D
<p>复制代码</p>
$ H7 H6 L- p" D' P6 J' x& i<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.5 L8 s2 T" t2 I: G$ b4 r
</code></pre>
( D0 s3 _8 D3 T$ O: U$ X$ q- G<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>& x! d% T! _; J, Q. D1 p
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>8 u% p; U& c3 `4 D: s; }6 V
<h4 id="sideoutput-分流">SideOutPut 分流</h4>8 s# R- H6 p; `8 S* G( h
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
" _& H! }3 g' n+ e* p8 w6 a( W( p<ul>
+ Q1 m C- s* M2 z0 d! k<li>定义 OutputTag</li>
! \+ A3 g5 i, ~! _7 H! o0 b& s<li>调用特定函数进行数据拆分) L0 f8 \. T6 @( `6 q& X0 o/ G
<ul>
! {; z$ \. B0 ~, W/ ]& f<li>ProcessFunction</li>; K$ ~' d6 Y5 e) d* [+ X3 P
<li>KeyedProcessFunction</li>: F% h% T9 @; F+ L" H, R& K
<li>CoProcessFunction</li>1 Y9 V* N& `+ f6 h
<li>KeyedCoProcessFunction</li>8 d% G4 J8 u5 Z6 W' `
<li>ProcessWindowFunction</li>
, D5 N7 }& Y( A; N<li>ProcessAllWindowFunction</li>3 o) o" g" a9 S4 M
</ul> P M- `1 S o: O6 N( a
</li>5 ^) b+ S2 z4 _0 B _2 _
</ul>7 Q' o1 |) N) A4 b, E# p6 O4 M
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
9 \+ a( k+ R4 _& e/ o<p>复制代码</p>6 E1 j( H1 E x. q' t* F) U5 G
<pre><code class="language-java">public static void main(String[] args) throws Exception {
1 y4 ~5 Z0 D7 e: B
) U# b# k) x8 {/ p+ [$ \% t2 `3 o, W! `8 z+ Z
: a2 p# L6 ]% h) A L7 S+ J
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
7 T, Y6 X$ _; }$ K: M* X2 w M7 Y a- }+ ]! h: v" t
//获取数据源2 @9 \3 d1 X, A O7 m
0 [: x; Z7 b x- I4 l' e8 X List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
$ I# Y& q+ v9 j% b' R3 v/ n
: X0 D; P6 N. A" r2 U data.add(new Tuple3<>(0,1,0));
9 @! i7 Q/ z; g+ g0 c. g/ ?+ w( b9 z! d; ~3 y }
data.add(new Tuple3<>(0,1,1));
8 O4 o1 g1 \. N& [
0 m- m4 G3 q7 S Z' |! N2 w7 w data.add(new Tuple3<>(0,2,2));6 k0 i- N/ S8 c/ f7 `- a: K# C! s
" {% @1 h3 S# R$ I
data.add(new Tuple3<>(0,1,3));2 a. Z4 g( j5 D( i r4 m
- d* S# a7 O! J
data.add(new Tuple3<>(1,2,5));
8 j( _: n; u7 i+ a! Z1 W- y
G4 Z/ b x1 g6 J' ~ data.add(new Tuple3<>(1,2,9));8 } N6 z# U& W. m6 M
# Q& b+ e4 B$ v( J) S data.add(new Tuple3<>(1,2,11));
9 m! o" D( H) [ g' V9 V3 w) g! n% H$ g4 H
data.add(new Tuple3<>(1,2,13));
3 w6 U9 h- \, `/ B& L" ]0 \- y' G+ d# U& U O
8 v- R& E3 f4 }7 f, X3 {& g
/ v9 w2 L3 M: k9 J- D( P3 B
; X3 I( l- k) | i9 D, W! n# m0 `: |* n& C% p1 E6 b
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);; Q8 N& G0 m/ i" n% v3 O- c
5 I9 a) {; d/ I
5 x) s# e( ^ c f4 ~& n
3 H- U- I/ Z: i7 }4 d, Q& h, Y OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
, y& l' \6 W4 e! Y6 E$ q
% x. E! \5 [" |/ e# H' U! c6 C) X2 R OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
1 _9 O1 C( `+ j6 r" s" z
+ L" v* n) C3 k1 e, w% D
+ y7 l: X! q1 V, v" T3 {* |+ j/ c1 B
- p# a6 m) V7 g. ]& o- q+ F6 K
( y% z% E2 d- c( ?' L SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {- o: g$ F5 v( l
9 v5 `0 j" {& O) J& ], v
@Override
, n& z' \8 U+ B( H
0 `5 D1 _. w9 }/ _4 V$ m public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
; O" f7 B) \' T" y; I; F
; u7 h! R% J/ k' b3 ^$ \( `7 a5 n0 s% C% E- R @; m
2 Y1 D7 Q2 e0 N5 l
if (value.f0 == 0) {5 \4 t) O _+ X. U7 H
0 ?! a3 w' S, _* i* o ctx.output(zeroStream, value);$ x8 B9 w7 t A( n6 B7 G. w
6 c T' ^, n; q% O } else if (value.f0 == 1) { E6 o4 g6 \8 }& Z/ @
/ O( `6 j' H/ I( |1 T- h' \5 e2 i
ctx.output(oneStream, value);4 E z) `) Q, c. ~# Y, W
7 T7 }' w4 x. f+ I o- ~/ u }& d/ U3 r& V5 O1 }. P# c
# V3 A Q8 a& i' p/ j5 L, f. ] }
6 ~; q, U: w! k/ ~7 W% D; o
; }! |9 Q# M% F" w% G });: d- C% S! o3 P: R
3 j# {7 M% ?$ e5 C) o( m* u ^0 _, H9 X2 g+ f
; E& g8 o8 F4 n t6 q( s& C
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
3 C, i7 c7 Z9 D6 [5 j; b5 R z
, S3 U0 b( n3 A- i$ W# r+ k# ` DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);. v: W3 d7 M1 m0 {. @6 [
1 [4 K. w% [0 S5 y- e& M( t: v% R& v* r' n' _( e! Z
' `4 p$ d9 g0 M9 Y. Y zeroSideOutput.print();7 e' _$ ?5 { w' w. i$ S& j& z
7 G' T7 k3 o m8 E5 t6 S3 { oneSideOutput.printToErr();
9 G9 c- F+ W! H$ G4 j/ ^
" I+ q' J$ n& C' @
4 _7 Y; s6 o: I9 w6 b; Y2 x
' H* f% Z1 p8 a1 g2 U, q I
3 |0 L# X! c8 U. @- d- v/ y" f* z
# G+ O$ L4 o' b/ h, W //打印结果+ L0 W [4 N4 N# ~2 V
5 d5 ~' p: j; A( W- c6 ~& v* B String jobName = "user defined streaming source";- I& s- |3 E0 D2 Z
( a$ A$ |7 d% d3 J1 M
env.execute(jobName);
, f0 p' m! p V. W
" V& L" N' D" K}- ~ s2 q9 }( L5 V5 `
</code></pre>& m k3 B5 g: D- k _6 E
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
, D4 l' C- k3 E, A0 F6 N1 T& |<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
* ]. s" R3 O- x& s<h3 id="总结">总结</h3>
: I; t* p, @ i4 X! K<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
0 d( s5 q8 N! _6 s: E7 _ ^3 r<blockquote>
9 _ ^' K, g: i<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p> x9 z& B- _1 ?* n) e
</blockquote>- ]8 B4 }, w0 h& b3 E8 N
g& t% O) F# G/ m
|
|