|
|
8 F( w, z' j q' B
<h4 id="flink系列文章">Flink系列文章</h4>1 b, w3 V: l8 `6 Q4 M
<ol>" [9 P4 H( I- l8 X
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>7 @9 _ g+ S5 X/ g7 V) m4 a" R' C1 p
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
8 N$ r+ m% J. a/ i$ |- t<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>) {4 U" L( s( S% w- y' Q
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
. Y: ^% B% _3 a& z, I<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
0 C1 F. @% O3 v3 Q<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
2 T+ D4 e! a8 ?% k) d$ l<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>- S W7 V0 D8 g( o. T( {' a9 r0 u
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
. H. P( m3 z; G4 W<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
; N3 ?. z ^/ q5 y; t F</ol>
$ a( O# e; ^5 \# E% H! G i; w<blockquote>
6 j* t7 d+ ~ h3 O<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
$ C# J0 i& o8 z; L</blockquote># K$ Y, f# _% F/ ]5 _) S0 w+ H
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
* G0 v6 w6 U& n; ]7 F- E<h3 id="分流场景">分流场景</h3>
! H& S3 x* G% p* U- Z<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
2 E0 C9 l: l1 j" L4 Z4 ^<h3 id="分流的方法">分流的方法</h3>
9 A) _" n6 Q% S! P4 K/ B<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>- u, y" K6 L& b3 z0 ]9 \
<h4 id="filter-分流">Filter 分流</h4>; R# H* J1 `3 @+ v! A3 d; a* N
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>' ?/ X! R& k, g3 }4 n( |2 }' K# T
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
6 t: C+ d4 T* |2 c4 |! Z<p>来看下面的例子:</p>4 K& a4 ?) O7 d$ m2 s
<p>复制代码</p>
% B! D' v: L- p4 X: _( i( N- j+ [<pre><code class="language-java">public static void main(String[] args) throws Exception {
& ~! e9 [. m% I% D5 o StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
' y; i' ^$ P9 u7 k //获取数据源% L3 W+ V e* D5 |/ _" q! z' Q' T, K
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
% C( {: y. q1 Q" d" c; p" @ data.add(new Tuple3<>(0,1,0));
0 P ?9 l9 g8 ~9 s3 j+ ~8 Y% Y data.add(new Tuple3<>(0,1,1));
( v5 k- u& Y0 f( M3 G" H P1 d data.add(new Tuple3<>(0,2,2));6 \$ ]5 E$ ` l# _( k; f1 e0 T
data.add(new Tuple3<>(0,1,3));
7 h( H. f3 O4 U0 D+ I; S; @& ?% ` data.add(new Tuple3<>(1,2,5));
% [9 x- Q( K9 `0 E data.add(new Tuple3<>(1,2,9));: I5 c" o4 x E/ q/ V
data.add(new Tuple3<>(1,2,11));
6 `# u- r0 }8 p; ] data.add(new Tuple3<>(1,2,13));
2 H( r) ]3 }/ q- g
; U% E4 G0 f+ X DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);: t9 B* ^" j4 B8 I
: k+ _+ [* {# N1 N7 \
) z6 x8 a% ]' j- W4 H2 Q ~
% ]3 B: X1 i0 p* s
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);( K7 `# N% b/ b- h: \
7 q. s t! M3 P+ N7 Z; K SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);' s5 z- {" B' H1 j) {
$ Z7 `0 Q# B8 i+ P
) k R/ V& t' D
' @: S: Y0 H* Q2 j/ T zeroStream.print();
# Q1 ?0 R5 r! N& \' K$ W5 P/ @" {- _: l9 O
oneStream.printToErr();
5 H8 X, ?7 E }- E0 i; I7 t$ o: z( Q" d% h8 q- c c
8 R8 }% x' v0 U
5 D/ ]+ i, a% e
* S9 M# O& @) h" ` u! k0 l; I8 H- R( r' u' d
//打印结果
# r9 v2 w- A# W$ C* h9 O$ {' Y0 ^1 w' i, }/ B
String jobName = "user defined streaming source";
- G8 g5 T% K5 G: e. u5 q
0 f1 N3 e+ s+ X5 y3 v' f" a7 ? env.execute(jobName);- _; s6 G$ R- b9 b. p2 l" ` V
: c* \7 Y, y* n/ }/ ^$ z& J
}
" y7 `( F1 {' w+ _</code></pre>- W' i+ d' o/ u& W% j+ y: k: v
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
0 L# n7 L5 ?3 z# l. V<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
# H4 j$ h9 _" S9 m- W+ q; K; Q* i<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>, y1 e+ x$ ^! J5 k5 l
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>! ?. V q( A2 T( X B$ H' v) I
<h4 id="split-分流">Split 分流</h4>9 u0 j8 P: h3 Z/ t$ j! x' m
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>; U7 U7 j# L6 ] l+ a% u0 U# ?
<p>我们来看下面的例子:</p>
: T" n+ ^- w% G# I<p>复制代码</p>0 w7 y) T3 x& `/ _1 K: k f3 q
<pre><code class="language-java">public static void main(String[] args) throws Exception {
+ u% a$ \! V% w
6 o/ Y5 Z0 l* y8 y, P) w$ o# v0 A+ Z- _1 D
+ i% z2 Y! x' v/ { x
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();$ T$ R3 u' {' x
# a* `$ I. m- e% O0 c4 I
//获取数据源
3 w' `" ~1 J# R
0 J. _# J7 \: s, J8 V9 x List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
2 v9 A& c2 G1 D- H J5 ?" B+ q0 r2 ?% H
data.add(new Tuple3<>(0,1,0));
. m- ?" j2 T" j* O2 y3 l4 `, Y7 v# ]6 O& z
2 s) W) z: G/ i2 N) i1 d& y* R data.add(new Tuple3<>(0,1,1));& s. F: m& K3 j' @8 f
! a% ^3 t7 n$ f) y3 C, E3 d data.add(new Tuple3<>(0,2,2));
, {; \( p7 z$ z% x2 B/ S M1 v- w7 Q5 W
data.add(new Tuple3<>(0,1,3));
" z# L- g) w3 P' ~8 F! g( h# B! L6 _9 E2 K& z
data.add(new Tuple3<>(1,2,5));
. e+ @" V/ H1 N+ N" U3 S: U* Q# j3 c) E1 c# ^ [# k% a
data.add(new Tuple3<>(1,2,9)); h+ r( G. l9 @9 V
3 [7 X9 a; K2 Z8 m/ X data.add(new Tuple3<>(1,2,11));
/ g7 ^2 ?9 S1 Y( O: \+ P3 n1 ?5 F# p4 c5 D, _
data.add(new Tuple3<>(1,2,13));
! b( Q" O$ X# n% B# N, T- F4 `0 J! s1 W2 _8 m1 i+ E( A' v
7 L& f7 `5 N' n( u/ k% y4 U( c6 M7 Q, @8 m2 V0 `4 Q; U
) X/ \" h" D( [ y
# J( {& p, S" }) ~6 @$ L+ g DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
1 t8 L& t& u' S: e7 p' ~/ O p5 W4 Q
: Z( T8 p2 ~( @, w/ |( w, V
4 e% }$ h7 p+ u
& S; f8 ]! A- A0 R: K
Z0 Q+ r2 n2 F8 J6 a
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
$ G! x8 [" E! A" t6 \7 p* r# u- I% |* e! s( m/ H g
@Override% s; B6 ~9 A* s8 ]+ B2 v
( E# l9 C2 E' ]+ W
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
% Z! }0 R# H. [) \* i, [9 v: x6 `( K% ~8 r
List<String> tags = new ArrayList<>();, g" K; O8 b( u2 o" {: G/ W! }+ }
, _: W6 b8 C7 c7 f& g5 E6 e# U6 t
if (value.f0 == 0) {. x0 s! b7 F. l2 `3 B
! i- W' @7 m* t tags.add("zeroStream");3 ?9 p# y8 q5 |# g( ?% u) T( }
" B- I) n s. |1 X; C
} else if (value.f0 == 1) {4 h. y2 h* C! ]1 }, x
' ]+ b8 A$ z2 U" |# ^, a3 p
tags.add("oneStream");8 u/ N5 t" a6 K$ Y; y% s. k# f
! N' V r2 |9 j; P2 c. u
}$ w W' Y. N, C: q
; S0 ?: A }- ~) S return tags;
& b6 Y( U4 J6 \9 S8 L4 M7 v$ S, ^# |1 y) @9 I
}
/ ^7 k9 H6 _0 V( ]+ E1 Z
3 W: T# D C( p6 K( O });
0 V {' _) i$ n+ U; I1 O
- d* y9 g/ ?2 N! r H) x9 X. Y' ~1 B( e7 b
: c: j4 w, s, L2 c: ]$ ~ splitStream.select("zeroStream").print();: b$ f1 `$ P7 c& e3 D: {+ s5 E l* x
/ y6 x1 b( F0 ?8 [* N splitStream.select("oneStream").printToErr();
8 C" K, Z$ r5 h+ b
P0 K8 `5 ]: o4 G: `8 }
$ y' d9 {2 ]! d7 X- u4 G; ~* T" L
; J0 k; D/ R6 r' T+ J, j3 r //打印结果3 m" W( p2 ~# |, }9 v; p/ }
& V2 N. ?& g% }7 ~% y String jobName = "user defined streaming source";
# b" k9 S7 f* Y: N; j0 H% J% X
1 m% N' _# D. `6 {- l& e7 }* r5 o( i env.execute(jobName);" U" a1 E# K" {1 I; h
% p, w* A S0 a1 B1 u! e# A
}% g2 q- D" t& {; J5 N9 ]4 Q: N
</code></pre>
0 Q, Y7 u& d' \/ g9 d<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
1 Q2 S( M3 i9 `<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>" _ K Z3 N; R
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>1 _( D3 K) \$ d# K o. F' n
<p>复制代码</p>2 m. b! D- d8 e# W5 N2 {
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.- A5 X: v2 y- Y" X
</code></pre>
- m' y- x F! D8 m$ n' L; ]8 Z0 _7 l<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
( f- `4 ~& o% ^: V& L# \<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p> _0 i% o- Q* \9 A9 [& X8 U2 \5 B$ Y
<h4 id="sideoutput-分流">SideOutPut 分流</h4>
! a0 `' j9 x! a- P- l1 Z( \<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>, w4 i7 @* f; S8 |0 N
<ul>6 z" f! M/ Y9 x& v9 _; M
<li>定义 OutputTag</li>
( i; M. R. V9 x<li>调用特定函数进行数据拆分7 `# _6 d' r3 I) {5 C
<ul>
9 e$ \$ d. }2 L<li>ProcessFunction</li>
3 S. m: m8 K* v# O- l<li>KeyedProcessFunction</li>
3 X0 Y2 b- E1 c2 u* p: B4 O1 h<li>CoProcessFunction</li>* D' M0 N8 r. P8 G3 q9 r
<li>KeyedCoProcessFunction</li>
( ]( o N" Z$ @( h<li>ProcessWindowFunction</li>
# r/ Y2 }1 o/ h+ f, k<li>ProcessAllWindowFunction</li>8 g! w& y/ E$ ?8 S( q( g
</ul>" \* {) O7 { w. o
</li>3 Q5 Z' T/ H/ N) Q- \
</ul>
8 t. S( }8 p, I9 v7 X$ U<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
* z6 d6 O% F; X n1 ?<p>复制代码</p>5 j! Z+ Q1 c( u2 F. T) o' S
<pre><code class="language-java">public static void main(String[] args) throws Exception {
4 P$ z( A' z6 q, Z; }& j" U$ k# Y8 D
* I# g. V7 m8 [) c! B
& W( w# Y3 Q" C, V5 m StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
$ H6 I& o u2 c4 w$ a/ e, h* X# {) C3 v; E3 ]! F5 O3 C0 Y
//获取数据源
: D% X9 i8 C. Y) {" K- s+ V
( H3 K" f: J+ D+ b List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();) P! P2 K( e6 t$ Y: B% c
; l% s8 B) t. _' E0 h4 C D
data.add(new Tuple3<>(0,1,0));) \* M8 K7 c5 I0 j
. m9 g. T ^ w; C" a6 [6 N n data.add(new Tuple3<>(0,1,1));
6 {& e e* t+ b" T6 D
4 j4 S" [% m. N+ F data.add(new Tuple3<>(0,2,2));5 L( A0 @2 M" l( j/ |
& \, E8 I9 R. [1 M* N
data.add(new Tuple3<>(0,1,3));# @- F% F9 o6 S& V" B
+ U$ G4 t2 a: S5 o: ^ data.add(new Tuple3<>(1,2,5));
9 d1 r" n+ ]8 G0 g, S7 m# ~8 e1 m0 s9 I& ^
data.add(new Tuple3<>(1,2,9));! F8 G9 M5 \6 a8 q
2 P$ I3 m# F( e$ H! R3 H. { data.add(new Tuple3<>(1,2,11));9 \5 i! E% ^% w+ K
0 B3 V& [+ e& ~ data.add(new Tuple3<>(1,2,13));
' `/ |8 K U; X% F
- j' P' c6 {- e* u* X3 a1 N- V8 H3 T% H
7 y* A$ I$ Q. l7 v, y1 s
8 Z; t9 S( a. G5 b6 G$ Q: [6 Q; y3 {; }3 L; J9 W
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);: B3 z6 ~* _& P0 j
3 Q4 {# u9 `6 ~( A: k0 v: C
/ U9 l% A3 a" q& u& s* J* _5 i
3 S) C- \. ^6 A, @ OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};( [9 F% A' s! J# k7 x" T/ k
9 f7 T' j$ p6 ^9 b! _& L
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
( Y6 [! o# b4 U' h+ y8 {' y" w# ~& u7 n
: j9 O4 T/ r) k- B! w$ j, b# [( @; b j. V' P$ O
! E. T n2 K1 i+ L2 U- i# T( d, S" N) u, g
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
& A' ^2 O9 D- ` p/ C
2 i h7 ^; k+ L& S- E @Override
: Q+ W' G2 c3 y. ?# Z0 I* E5 Q( g: w: h( O$ V; @
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception { C/ Z0 I7 b+ ^7 ^# ]
& G( y4 Z K# H' J5 w1 |
9 L+ V. g4 P! @! e& G, u
9 G; p* a. b" i8 S0 H9 g7 o0 | if (value.f0 == 0) {! [3 Y: h! \9 i' y, E
5 p# ?" a# p, x- P& O( s+ D1 J, H ctx.output(zeroStream, value);. @# X: L2 f& g0 H. N
( d- f- }8 Z' `
} else if (value.f0 == 1) {/ n" X$ k0 s& r
6 F$ w5 ]. X! ?0 ~ ctx.output(oneStream, value);# F5 y7 C- q0 } N, b
% r# M; X; W. ?8 ~% i& a- u4 H }% u; A, k; ~8 U; o
7 ?0 W& }6 |" X; T4 n, B3 y8 a7 ~
}
! l# P/ j& |0 u! u/ @/ F0 f8 c6 w7 h! N7 B
});9 D8 s5 C* ~; j: q4 O s' N
" i% q" z& }# S; P2 h0 e: w; y
3 Q9 d* \ N9 i# }! r" ~- h6 }/ N. A
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);0 K% f m& F; M) U( u
; V$ t1 H" c2 k0 X) g# s3 ^( q
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);# h/ v7 n! V( d; h: ]. N a
. r# J, {. r. S2 c5 k* u, X7 `8 L& X% q* d) z% }
! U M: b x1 ^3 P
zeroSideOutput.print();
4 A- ^) w. e3 \8 R8 P3 ?/ M ]+ }
; ^4 h7 D0 G2 v& w+ G7 S' R" _! N oneSideOutput.printToErr();
- J: w" E( O( U2 Z) `6 f# @ e$ L# S' ^$ s; q
7 y( z( @! x1 a( R
/ \# ~# R8 U# l; E9 T
0 @/ f+ s5 h$ k% s
# a% H/ m" |- Q5 X. V; S* { //打印结果
( C0 e6 N% c6 D2 S- H" Y& x. `
# T8 i) v& h5 \9 j$ E) J String jobName = "user defined streaming source";
3 N- z. }' O* s
2 d7 `: K. r7 C2 s/ l) H7 O env.execute(jobName);
' P5 f) H' \) Z4 } B& S1 O! P3 I1 j# G/ i5 \* R9 g4 B# y: t
}
1 {: e: |, u& r3 r' r$ ]</code></pre>% v1 \+ x$ n5 u3 x% T0 F3 S! V
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
2 Q$ ?, } @1 V0 L* Q" }; U! W% Y<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
/ A/ r r2 S" t$ b+ h+ Z) o) g<h3 id="总结">总结</h3>
* f; J+ f6 M0 }9 A0 _) b# X<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>, W; G7 @. R8 t Y( _! n* A& b' `
<blockquote>, F6 p4 e* M: B& R- v, S% F
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
" v/ ]& X1 d. \5 R+ L</blockquote>
0 a$ N; s/ x* n9 z# b! q1 [' s* ~4 a3 g B
|
|