|
|
8 C% [2 }' E4 J3 `# `" V9 U<h4 id="flink系列文章">Flink系列文章</h4>
3 d r/ S) N6 |<ol>/ I2 S) i, G, e2 W7 t' y, i1 ~
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>5 n" }! V+ i* N
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li># n5 [! Y# f ?( b M3 H0 F3 t
<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>9 d+ n1 I1 X5 `+ }; h, ?
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>, Y5 o9 `! c) k1 e
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
( \- ]% z1 L- l3 m8 L7 K3 A3 `<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>( Q# a& k u$ r# b( b* ^
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>! t* N1 K# ~; A; ~9 B" H
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
7 |# j" [/ V6 v( G: g, t<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
9 |6 F9 n! l* J! d' r( `+ Z</ol>
; x; T6 v: X. M* ]5 b<blockquote>
3 C& ^2 G6 \! J! {; `) a* v: Q# l<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>/ N( `2 T) ~) {: ]
</blockquote>
+ k+ q; \% d+ T- D( q<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
p4 ^ g6 _) S<h3 id="分流场景">分流场景</h3>
* g& f9 |8 Z- N6 |5 w8 }<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
2 f( U6 S& p! ~: N" }" w<h3 id="分流的方法">分流的方法</h3>. P2 j& K3 z/ p5 C2 z h; ~
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>: j4 ?) V% j$ Q; {. m) ^0 u6 p9 \
<h4 id="filter-分流">Filter 分流</h4>5 \7 U! _- k; h0 k* n1 E& I
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>" ]1 B* s$ C1 |. v6 c
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
& u9 w- D% A) O" m4 Z( H<p>来看下面的例子:</p>! C( P* s. i0 m# _* b9 s r5 ?$ e
<p>复制代码</p>
+ H4 K- g$ n9 }) h4 f: \<pre><code class="language-java">public static void main(String[] args) throws Exception {3 g% ?6 ]2 ]7 R6 Q& m
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
& m) ^- x0 F" T* j3 v1 _0 O //获取数据源
0 w* r H5 l3 J4 n; K List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
4 _) q' c) c; R( i data.add(new Tuple3<>(0,1,0)); k3 h" ]+ M# Y0 p, P
data.add(new Tuple3<>(0,1,1));
, B' c, O! s3 T Y( T data.add(new Tuple3<>(0,2,2));4 D$ M2 ?# |0 i" \: ]* B8 I: W+ Y
data.add(new Tuple3<>(0,1,3));, s& [# B$ Y H; R8 k
data.add(new Tuple3<>(1,2,5));
5 l. i1 j; o; L& f7 \9 C( n) H8 [* g data.add(new Tuple3<>(1,2,9));
1 ]) s( q; T) B data.add(new Tuple3<>(1,2,11));* A7 x7 l+ O4 g0 Q! V. A
data.add(new Tuple3<>(1,2,13));9 C; ^' {: H8 Y: ~' j" S
7 M) D4 C$ \" l8 n" Y& v
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
& v; v6 Z" s2 Q6 r: f' s& A( x2 B/ t* h: O! U
9 e, F2 {8 N& D" v+ w I2 P4 `$ s N/ h v6 u
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
% c$ m" ]- r! Q$ u0 b* `) M7 z i: W g3 b% D/ U" x- {; Z# M& W
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
# U8 Q( b5 p" Q5 K" {# b6 |. P. [0 g- `4 u
+ h6 x( m; ?' U0 K
7 l, S- p. T% u6 W, i! U) G zeroStream.print();# I: t, r: N$ q; E# a. D
, J) l6 Y8 V- Z oneStream.printToErr();
9 L+ M c' |3 w% V, i2 Q
* M4 i* @: d3 x( H# U
% P) Z7 ~- p( c) C+ g# A9 o1 V% s5 {6 a
% G9 n" `% k: z/ b
' a4 y e' G, O2 j" Z
//打印结果
0 B1 q7 X2 M* P% F/ q# I* V
5 z% H _- a2 A6 c String jobName = "user defined streaming source";
5 w5 t- O+ ]: `2 r6 `8 m6 G* ]5 t7 c) d8 k
env.execute(jobName);& [* b! M/ f' O K$ e9 Y' l
/ y8 [5 X, F. t' j2 ~}
" R- i: D+ c& A% P9 Z! m4 h</code></pre> _2 z7 a/ i+ I! R9 j+ z
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>, [& b+ }8 F5 u
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
5 `5 Z. G$ h( Q. s<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>7 S; b! K/ @0 |4 K8 T( H
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>$ ]1 D+ S- y2 r; n. Q
<h4 id="split-分流">Split 分流</h4>
1 I8 ]0 Z6 u) c1 s, J# {* B<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
' k, o& O y5 k, Q; ]! X3 y+ L<p>我们来看下面的例子:</p>1 U+ X: e, v! `
<p>复制代码</p>$ E9 q! U6 [ k* b) _
<pre><code class="language-java">public static void main(String[] args) throws Exception {
9 p0 \: _9 g3 N6 a- [. z$ {/ G. `. B, ~' U( E5 g! {3 f- p% ]
$ o9 r% L8 M) t% R4 O$ I) D
# [! E/ q) z. h) d$ r) }# T) p StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
4 V( R+ R$ _5 z+ f& N
2 \# j8 e9 s5 R2 ^# W //获取数据源
9 Q- N+ N& q) s0 @ {/ ~( Y7 ^' ~1 t0 e6 Y0 Z }; F
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
6 X8 ]( s0 P0 p/ V. I) G2 y2 C
/ ~6 @2 F+ Y6 k4 G, ~$ x( t data.add(new Tuple3<>(0,1,0));
% r Z, p! p. ?' s: @9 L5 n# j# V7 F. T
data.add(new Tuple3<>(0,1,1));% R" m# U' @9 p! M
, c5 {$ {% }9 e3 e0 ]( I! h data.add(new Tuple3<>(0,2,2));
; H6 }* {8 I! w' {
" a2 v8 T9 d$ {1 W: z data.add(new Tuple3<>(0,1,3));6 _/ h; C5 L$ U$ a& [4 ^
/ w) s' _! |2 A* w3 X m7 [ data.add(new Tuple3<>(1,2,5));# {8 T( f7 }7 w% P; K" u
) p9 z) ]# ~1 F8 H data.add(new Tuple3<>(1,2,9));
' N0 l/ L) w4 o3 u: W* a6 `$ F* T( D$ |3 G) m; M
data.add(new Tuple3<>(1,2,11));4 G' X5 U9 U- r; @
1 a% j! ]7 B9 X; p4 d data.add(new Tuple3<>(1,2,13));
& A; S2 a& ?, x; _& ^* l, L+ G$ f" v, ?3 |' l. _3 B( T+ Q
# I2 {0 R. _5 E1 q( ` U$ Z. k0 d
8 S8 S( v: f/ j. {# h
( F6 g7 M I; R8 {3 S! r
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
3 Q' @. r6 E! a% D& z
, H8 J. V2 b6 J0 P6 g. M2 p$ m/ Q% x9 S, N
N0 n- ~; \/ K: N
: C5 k6 P: ^! {& H' V" g& x& H! I* Y' T1 i& \& q
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
: ]4 h* \0 e: f( }, R
4 U* g8 }, c" l0 J* K% } @Override
, l+ i( x# R$ ?2 b9 C/ e) z) n6 N5 l
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
/ U) y- j4 w% `* B# |2 b" y( L* Y/ Y& j4 n4 G# e3 Z
List<String> tags = new ArrayList<>();
+ T5 y) q+ h& V; }
4 a1 [; T" ]% \6 @: G- e: D if (value.f0 == 0) {
2 w/ C- Q" x" C2 \. K+ Z( \
3 T7 Q% o$ U5 v* n6 w. h tags.add("zeroStream");- s1 @+ z0 I0 C
! n2 `4 U2 \+ V } else if (value.f0 == 1) {. |# K# ?# j( t R- [/ [
7 _, h. y7 J& v; ?5 F
tags.add("oneStream");
3 q: L* E: E& Y) V. P
4 C, G+ G% `: _, F }' G- M0 F7 a* ~1 Z9 q$ m, P' a
* k, }6 v* o, U4 f7 z7 A" x( e return tags;
6 J$ b$ [5 {* X
& P; {7 Q5 Y1 C$ F }
0 j! D6 L; e/ O+ K9 \$ x
7 }1 M0 c7 {% Y/ u: l0 \ });1 M" {) w0 s8 y3 O
( R* w9 @, v7 u1 m2 {
9 `% d0 z/ S4 v3 r/ E4 k" z9 X7 c) Y( K5 r# w4 p
splitStream.select("zeroStream").print();& z5 X# u6 z% z
% U3 y% o; Y+ H, W# N splitStream.select("oneStream").printToErr();; i( f3 s7 r2 d. t* r' u0 f
6 y+ u& F$ ^7 w$ D9 C: H: |
8 T. I* Q9 D3 W! z" n2 u4 p- q
5 a* l% g( ?4 E5 f- S. q' S0 j4 k //打印结果
' A% p" l) q4 _/ V3 z8 L# j" N! f3 E% y# p& S
String jobName = "user defined streaming source";1 C5 s" } k3 E; J/ O/ s* K& i6 k
, w+ H; D' ^* D2 J; D0 Z
env.execute(jobName);
& M* H" b( y" s1 D5 |( D& P2 r; c0 u4 L1 E# V
}
% J3 x4 J& R2 R- \7 V: c; _</code></pre>4 F' m% H' B0 q0 {/ @
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>- }% w' v9 p) R7 _* o6 l- \7 {
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>, ^1 r" b$ }2 F1 Z9 N# ~+ v: ~3 A
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
% R" N3 r/ @" C1 R$ E& A# T" t; f( g<p>复制代码</p>
; p+ j9 @+ [1 i<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.$ L( _% t& } K, r
</code></pre>
" q6 R% X+ w& k) Q! D S<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>' g, u" X& D; ^9 m
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>$ m1 l9 {- C, I7 i" S
<h4 id="sideoutput-分流">SideOutPut 分流</h4>
4 F: I& Z1 M4 M1 K I/ S: f<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
* ]* j4 W8 U$ G" h9 G7 a8 ]8 c<ul>8 G* E9 C+ n2 X J% Z. I; V
<li>定义 OutputTag</li>
7 Q* P$ l, V( A<li>调用特定函数进行数据拆分
/ Q' a! r5 T0 y, D( v6 l! _0 s: h8 M( Y<ul>
0 e- _3 B( e, b9 q. @<li>ProcessFunction</li>& S$ j& [, g8 |
<li>KeyedProcessFunction</li>6 w) x+ k* W' v. x) i- q( o" t f3 }
<li>CoProcessFunction</li>- b( ?8 w( U$ h h! V: _9 ] J
<li>KeyedCoProcessFunction</li>* p9 b Z+ q$ o- q' W9 M
<li>ProcessWindowFunction</li>
7 G: S4 y% X8 }8 a: s; a<li>ProcessAllWindowFunction</li>
8 w4 G) G$ l' f- b/ n2 F</ul>9 i4 `3 {' U0 t/ t% p
</li>
2 A' \% R6 d) `. a. W2 T* J</ul>) F& D- C; l9 m) ~ R1 A
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
, B- R7 P' g: f8 U5 y0 Y- a/ b<p>复制代码</p>
( b% {- a- u. Q7 Q: M Y<pre><code class="language-java">public static void main(String[] args) throws Exception {6 P1 f* s2 ^* [& M! t
. }% t3 t0 c4 e. l2 U5 B% Q* [& C% V; b7 \. P1 L. K( p6 B
* n# k5 A5 c9 o/ N
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
& l+ X& Y1 d& h" H0 A# K+ u3 i, u( n8 s! k& P2 ?3 ^( j9 S
//获取数据源5 T; d# k& S( i" U
% u* k- C9 n% }& w- `$ X/ W
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();/ [9 x4 c3 V9 y9 z/ c: J
% |) L& m* B) S4 ] data.add(new Tuple3<>(0,1,0)); F' C" q9 g6 q& h- l# P, a. U
3 h* G+ Y0 ^& k& c4 g5 @
data.add(new Tuple3<>(0,1,1));! c0 i# u) ^* @ S9 J1 S- a
- A9 }; t' Q3 t9 [7 J8 {- T; L data.add(new Tuple3<>(0,2,2));
1 r" `4 f2 V, N6 t
! N9 k2 ?; a9 Y9 ^9 @7 g6 B data.add(new Tuple3<>(0,1,3));
. z! u% g1 p" g; f
1 H$ ~5 r# g+ a data.add(new Tuple3<>(1,2,5));
! `! M' b, m* V- f8 K" s1 y- D0 v
. H6 O7 e* Q, v& p data.add(new Tuple3<>(1,2,9));
2 j9 f) b$ I$ H# n; ^: @% @8 X/ f: ? d+ G9 |8 B( o
data.add(new Tuple3<>(1,2,11));5 C& v+ Q2 i0 ~2 J$ m& h1 r
9 Y8 X9 k$ `/ c4 H6 u
data.add(new Tuple3<>(1,2,13));2 Q2 j; q& U5 |9 c
& z8 P D' e* m+ g0 S* n/ z2 m
/ O8 l. V; A& ~1 N8 E Q
* b2 x0 t1 u8 G0 p/ Q3 H3 V! e! N) o+ q1 [4 f
- K! J, X! [ k: L! h# _( ~ DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
9 Y; ], Q$ |/ P
5 M2 Z/ O0 x* a5 L( s: y" ^; s ^; i4 C+ z6 I
0 j |" T i; o( _( I6 y+ S u: w OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
9 f& y7 L) S# Q. Y5 I. Z" W, B
% W/ R6 ~2 X7 l' l OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
& f" j* P W7 j+ x: h; T
0 J. \5 T$ u( t0 X
+ f2 u. U3 u8 r" K; [7 l p6 m# A: ^* V) ~
) O6 ~+ w$ H) O
( u4 z k: K# i9 j9 Q0 J( B SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {: u4 k2 p! @& K: d0 c
- Q- T; x) ]* J, ]: |6 C" t
@Override
3 n& p) x/ w4 G2 \
) f0 X( `, U5 B y0 r: } public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {# Q4 G, }3 u/ B3 J6 Y$ q' H1 L
~% W2 x% y4 j6 a# Y' b
- @ {! f. [+ E
: ?' W+ K2 Q' n, Y3 w if (value.f0 == 0) {5 |" e7 l Y" y7 Z8 n2 c+ N
4 \6 n3 n! D5 Z4 \ ctx.output(zeroStream, value);& o! _0 T3 P. k: o+ f9 Q
( ~# S" x0 P9 x9 X8 S4 k } else if (value.f0 == 1) {4 p8 @! ]3 k. |
- |6 M4 D+ h i* ]5 n ctx.output(oneStream, value);' I+ @1 g" z0 D% C+ ]2 j
0 }; J+ }" ]$ R, J4 F! B
}7 t# @3 m/ Q6 S$ ?9 w0 ^
+ X' X4 s% k, ~# v# W |- x/ k }+ w/ F b( R; j) ^! q3 d) p
( |0 D1 Q1 x6 X( {/ b$ G
});
& e0 Y; T5 a. l2 o6 E: A4 e4 B9 x$ G/ g, R! T+ W+ }8 ?4 N
# ~: X: p7 X$ @. q J. e0 K+ @2 i. E4 O1 G3 B! P- ~* o
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
, M! J% d) ]( X
7 t" X7 a: b) S! M DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
0 ^1 R/ n! v6 y; m! i) E3 o3 t: c* K8 E! `
3 Q* V' ~! E( ~
5 y) Z* ~5 i. J" c zeroSideOutput.print();
" E8 o/ `& X3 Q$ Z7 Y; w
: D$ \- u1 y. r1 P. B. u$ H p oneSideOutput.printToErr();
# Q/ H0 W s8 u3 W8 m7 N0 N- q& A* M
& u# e5 i9 {! T3 k( k0 \
( N9 m2 Y; V6 v5 U: j
]& m# x n, I7 l6 k9 F2 q4 Y' K+ }) X N
//打印结果8 k6 E" I( V& Q4 a `+ ^
0 D8 m p3 E& w) D7 N% v
String jobName = "user defined streaming source";8 f; d, E1 S2 o. x
; l$ T+ J0 z: X$ `/ ^8 V, Q/ W
env.execute(jobName);! E w! N( S$ {( j" K
$ ^) ?) }/ U& e' U$ f}
6 z3 C# N5 M$ u; k& ^7 I</code></pre>
6 x' ^9 T" \/ ~3 J: t( a3 P<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
. ^& l/ @! `: Q<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p># ]; S; k/ ]' |" ?
<h3 id="总结">总结</h3>: Z- K8 K% m3 A6 a, r/ H& F
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
. c* b' D# J. g* h6 B<blockquote>
9 y3 \' ?( Z I4 w<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
. d! k6 x1 m) R+ v, h: w</blockquote>0 g5 k7 u [/ n& `2 Y2 c8 x) E
4 F: q$ O1 s3 m1 s* G
|
|