|
|
' U3 J4 }; e. \$ m. @
<h4 id="flink系列文章">Flink系列文章</h4>9 x6 E2 b! H4 B/ u% A& I
<ol>. X& t/ p- g9 ?8 M: L/ w
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>& h! q' P8 Q6 y! G- V f- q' P
<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
1 V/ [8 f) {4 W# ]* D: P<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li> K0 c& R$ k5 k* s) V- `& P
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
6 ^: ^* X8 _3 `$ a<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>) Y q) k3 i. Z9 r" a) t2 O
<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>0 `: o0 f8 w# u8 h
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>; C8 g8 e9 h6 L$ o" [4 p# t$ C
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>4 I. } x5 ~5 G0 x( u/ M3 P& Y
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>. X+ w5 i+ O0 @' v* d# j
</ol>
% ?0 ]. L- j( Q s0 M" ^5 M<blockquote> V* q! O5 |( h) C# v
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
0 V4 j8 k. I9 K7 j. m</blockquote>
9 L/ l* L# Q, b. C% J<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
6 @2 w* s l, n& }3 F5 h<h3 id="分流场景">分流场景</h3>
5 r- b, s2 w; `% i; R<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
, L j6 B- g7 ^1 V8 R<h3 id="分流的方法">分流的方法</h3>3 t2 u- }6 ?% ?- M" D* |2 ]
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>1 i1 s/ m, M( y k2 D2 k1 _
<h4 id="filter-分流">Filter 分流</h4>
8 e) d. I# {8 V( @1 W<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
& q F! v5 @- E: p, t- m" {% E: Q4 l% ~<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>( E+ ]8 |3 e- D1 P. |8 g
<p>来看下面的例子:</p>
. Y7 J# r5 l/ K<p>复制代码</p>+ Y2 A. W; z+ K9 H5 P
<pre><code class="language-java">public static void main(String[] args) throws Exception {
& f, G# u/ D2 ~6 Q StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();0 k9 m2 c! K$ l1 I. E8 J( `
//获取数据源
g# C2 B9 ^6 W" R List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
0 D# z0 h6 x- p0 R data.add(new Tuple3<>(0,1,0));% q* M# a+ X2 b% D+ s+ H# \! T
data.add(new Tuple3<>(0,1,1));" B, B* D ^0 j; l m% s
data.add(new Tuple3<>(0,2,2));
3 I' t# B. u! M1 @) S1 _- } data.add(new Tuple3<>(0,1,3));0 h2 ?" W' A6 ^& L6 t' m
data.add(new Tuple3<>(1,2,5));! z, R2 u; L5 u) d' I# {+ y
data.add(new Tuple3<>(1,2,9));
, Y% r7 ]6 K$ n T2 J data.add(new Tuple3<>(1,2,11));
) q6 `# r5 o& y6 A* T, S5 X data.add(new Tuple3<>(1,2,13));, |1 u5 o6 h- n& j2 k! O
( F z0 F4 g2 a8 Y( F
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
+ W- _( ^% s8 q" ~
( j- l+ N* j/ D' m5 _- M9 s: a+ w; E+ a e' A. y6 R
! i1 g2 ]" F, m, K1 J2 b& I SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);6 K& C8 e9 A- ^$ l( [' n# c
! ~+ r) b6 e+ S6 Z6 Y, u2 _
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);
" z( ]' s6 b8 \( `% x% I
n- `1 _0 K$ e! S* y, e% A; S% n8 Z
8 W! L3 n9 e; F
zeroStream.print();
( i; d# U7 Q/ T
2 b6 ?- }5 w9 d+ B3 r5 K6 A- Q oneStream.printToErr();
: r n; M' H# H5 o$ B" @# \/ o
8 ], D* y3 a3 K' H: N
b5 }3 n/ O3 R: ]6 a
# }3 P; v$ w* F9 G' Y" m3 Z* n
9 ~, N, ^0 L& M. l0 }4 g3 K
0 S# o# E% t+ T7 {+ h' R //打印结果* l1 i/ T# v6 L% O4 n
6 q$ Q# z d7 l$ E; d, T n( ^ String jobName = "user defined streaming source";
# |& l$ k t7 l; i
4 v; y* \% m' A7 M& y) X env.execute(jobName);
0 Y( g% r8 h& C! T# } {( K" I; [ v: k, M" c7 M3 o# p8 Q: R
}6 z. M- M# V- M" d2 h
</code></pre>& \; R( h4 K6 L0 R$ } c
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>9 m! _% T; X( ?* \$ M
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
( |& | Q9 E+ V" S! w<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
8 e% r5 f4 p3 P/ ]<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>8 E# s2 r, R2 ^, u* Q
<h4 id="split-分流">Split 分流</h4>5 [- `* w/ K2 j t& Q1 g3 E( Q
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>: [' a$ Q1 G4 e4 P _+ H. O
<p>我们来看下面的例子:</p>1 w" V& a+ h9 U: X
<p>复制代码</p>7 ?8 ]5 \/ m$ D: T9 Y! E" t
<pre><code class="language-java">public static void main(String[] args) throws Exception {
+ T; j* v7 N6 t! D& S0 R9 S6 c) Y$ P I6 t
7 p" {* u1 p/ G! k' R: s
; N; c9 ?" v4 q2 \9 Y
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();& `% q" F! t) F* L7 m1 X
) q7 }4 g U: o) A
//获取数据源
$ |8 o5 h; a+ m" M! I3 R
' g0 X5 S7 |; B- N" V( j List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
0 j; F' K L" K& A: Q; o5 J1 x8 y$ `4 J9 b# h& g0 H
data.add(new Tuple3<>(0,1,0));
2 D; ^7 L5 X! A1 Z [3 S
2 e2 J) i/ U! m6 Z1 { data.add(new Tuple3<>(0,1,1));9 D9 c7 i* f: Z' e2 c: t$ D
5 v, i0 S: U2 ]% g. E! x A data.add(new Tuple3<>(0,2,2));% o1 a8 r3 I R7 o# |3 G E
2 i2 m! [; [4 Y; h& j0 c ] s( ?3 E
data.add(new Tuple3<>(0,1,3));
6 @9 O. o1 u5 D) d q K) E$ l; s8 y2 I5 f$ W, k. l# ?
data.add(new Tuple3<>(1,2,5));) p5 k2 w. v7 l- |0 W
4 v2 L9 }5 n3 S! G6 d data.add(new Tuple3<>(1,2,9));& z; {' Z6 ^) _( @3 g; L9 h
* C" A! k( T4 I) \8 g
data.add(new Tuple3<>(1,2,11));
1 S: E5 j2 D; R7 E4 {
: e9 {9 R* k* F) ^2 T data.add(new Tuple3<>(1,2,13));6 Y0 \, p6 d4 F% n2 H; y3 ]
5 d% K0 S. u) P1 I$ F8 Q6 i% q! h
/ J" u+ x- i5 K S8 N9 j# _9 p( }* ~) {3 p
2 R6 I" h- x: Q- B7 ?: c% O
* u1 e0 j: j- M# N# r9 u
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
5 c6 `$ y4 ?( L+ g5 t2 u$ A4 `8 B8 f/ B
' ~3 i L' F* g$ N4 r3 o
, u2 x( Z. U$ m/ h/ }& c
! C( b T% O+ ]! y8 z' |
8 e7 K1 J6 }' d9 L SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
( r9 q5 |5 ~4 Z' k& Z; f5 G, i, n, \% i; [
@Override7 V) M1 S" u) z$ F- {( E
) R7 K) `! W: H! L% Z, C3 _% h9 f public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {: f: Q, L9 Y6 @5 f, f
: f$ E& Q% t9 m2 U6 m
List<String> tags = new ArrayList<>();
( U" A9 F0 r- |2 m
' d8 t0 D# j" [4 S" ? if (value.f0 == 0) {9 ~# o( u2 l* S( U1 P
7 Q8 f+ ~0 u! F d tags.add("zeroStream");9 m+ ^ a* n, [. k) e8 n7 q6 _/ k
9 T1 M3 ?5 u. C9 H- r } else if (value.f0 == 1) {
1 A8 M8 `8 v k' U7 r! v3 ~' J; J: H# F- _0 t
tags.add("oneStream");1 n U I6 I8 S( Z3 n' @0 D9 K5 |
% J+ w+ i! W1 r% w% c }6 M2 G% G4 d% |# J) y
5 P5 x' c0 @$ e) {( F: u e# c return tags;
" |( O c' A* B- K$ \* Z0 R3 [+ U4 I/ d% x( w1 G6 J! a
}
* u: l" ]$ X h! z' e4 n8 I8 M' \4 M* K( a+ e
});( K% ` v* M* e1 x" p9 q- L9 k }
c3 W, k' A6 D! t+ m- h1 w( F; x; t
- j) Y9 m& P5 m R3 G, r# D* @/ S6 }" w- U, g& a
splitStream.select("zeroStream").print();1 a: z* K3 _8 ^, \1 t3 Y- c
0 Z& _: C4 W, S- Y splitStream.select("oneStream").printToErr();
* j" S1 Y8 g3 Q! n) S. R, Q
" g% v: D; J: ?9 d; Q2 P1 f: p( K2 `- G: p: O1 H3 v
- _5 k) g* o& J4 Y' p //打印结果, }5 r9 r/ @, |, r2 ~6 ~& k
) G# Q! i0 y. j: P9 \- s) R String jobName = "user defined streaming source";
6 m- ]) | |( d4 I y |
5 j0 K3 i, v# V1 M7 L env.execute(jobName);/ f" y: a- R; u$ R! }# q
7 q/ j0 L, ?, x. h6 z1 M}
1 j7 t- h8 d) R+ W+ _</code></pre>
- \7 V& U- `5 S" z6 m/ {<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>0 K5 [- S/ ~: E0 |4 Y! A
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
2 \& X: z6 A9 g1 Q! u1 z<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
& ]" `$ x; J" T/ W9 P9 r<p>复制代码</p>
% Q$ }, Z" p& K) V/ W<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
6 ?( X/ A6 J; a/ G+ v</code></pre>
( D0 q0 B5 J0 U) k2 j& j, m- f2 ?<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
) c& g4 H6 V+ m2 ?* v7 W<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
9 m# q8 X8 p' f& v5 Y2 G* A<h4 id="sideoutput-分流">SideOutPut 分流</h4>
?+ _( r3 a' P# y<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>5 Z$ R7 N P3 I) Q7 D
<ul>+ ~& e2 A \ E9 x% f. V! ^' |' y
<li>定义 OutputTag</li>
9 l9 ~) {( D- h; N<li>调用特定函数进行数据拆分
) [- C7 {# l& O3 p<ul>
' K0 r& N! p& }2 _<li>ProcessFunction</li>
/ F" L2 B* T2 k<li>KeyedProcessFunction</li>
. D, X7 {) V2 `$ I5 e<li>CoProcessFunction</li>* D" Z4 ?" C$ i1 g+ y
<li>KeyedCoProcessFunction</li>
) j* X% c& m! r# {/ a<li>ProcessWindowFunction</li>% K, ^$ g- n4 p0 H) }, A* R" L6 t- Z
<li>ProcessAllWindowFunction</li>( o- @) i3 }9 ~: l- ?+ }' R' L
</ul>% d3 c' X+ D4 h8 @ e1 c
</li>
4 G8 @' _- s2 U: G* M: ~+ o* i</ul>; I% W! X Q7 o4 m, W6 H
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>$ [/ P$ s8 n4 Y% C* i
<p>复制代码</p>$ a: h9 K9 X; |+ C% Q
<pre><code class="language-java">public static void main(String[] args) throws Exception {
+ a/ H5 E# \$ v8 X' A4 H3 Q
# h. `3 x% G2 e& X
; Z. a. o; A9 }1 x! i8 p) O8 Z0 ~8 l4 j3 v# ]2 s
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
) c% b1 k0 ~0 V! c# w1 ~. y# X1 k$ h: E/ b2 d
//获取数据源
9 c, z1 B! @ s6 w- K' a' k) u) L4 t0 U7 t0 d
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();6 t' ?" L/ \+ Q S3 ?
& m3 G) W* | k+ x data.add(new Tuple3<>(0,1,0));
5 [" j# S3 V7 o$ Y/ Y+ p* I5 \ J: l; w3 {, i$ `
data.add(new Tuple3<>(0,1,1));
% @6 l( v$ B& U! S) `7 t
5 B- S) Q( k' { data.add(new Tuple3<>(0,2,2));. N. O( m/ \8 M1 X7 O
" @' O5 l4 a+ f/ W* m
data.add(new Tuple3<>(0,1,3));
8 z, A* x8 L9 `, r) c, G1 u& D0 ~# A/ f" ?4 y7 v
data.add(new Tuple3<>(1,2,5));
: G1 y$ I n- o* e* B' j- R, a
data.add(new Tuple3<>(1,2,9));! T2 d; [: @3 s, p% a6 Q
9 u2 E+ O. }2 U0 o% n data.add(new Tuple3<>(1,2,11));+ k1 C% x; }: V4 h; ?/ W& t1 _
8 g" x% B: a& {' ~# t) e data.add(new Tuple3<>(1,2,13));) n% l) d! f+ @1 p
; c0 X; O7 j1 f$ j; k" H
( ` S$ @1 N$ k6 w- H8 z: ]+ z+ C% R) c% @8 `. Q( ]1 u
5 I# \) ~7 O/ {9 U+ V6 G
% Z5 B6 l8 n$ T% G U
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
; t6 N4 ~6 [% p" I! p# w$ ?" ]$ G
" O+ X' I" |3 F' I$ K
* l& O. {/ v( S OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
% z9 ]2 Q' Z- `( B5 o' v/ }0 f' ?% X7 ^8 C4 t j
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};2 X" c! b: W( f
9 L% C/ H& [0 ~
; V5 ]6 }6 b! E8 d4 ^/ {* p
( u: N+ T l1 D A
# w2 c- k6 L1 N- D5 \9 a7 C" i5 d
. P: K7 m. w' ]3 e7 o* B6 q SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {% \& }0 n8 i3 v$ q- W
M2 m, s& O/ T! C" _0 N" l3 M @Override$ n/ C% h- B( n
0 H, W0 a/ y; h3 `+ f. H& [ public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
) ?5 k; u$ y. x& \+ h0 |; m1 R! w( s1 j
& \0 e7 n9 [& s7 J; z' b" `
( k: e& p. r+ l- V
if (value.f0 == 0) {
6 O1 A" C, Q8 J
1 @( g* _2 Y) G* {' W ctx.output(zeroStream, value);, e& L1 \- P" }, z& C
/ P1 k$ c( a9 I; q( V } else if (value.f0 == 1) {5 E; }9 r, P8 F( P3 z
( k3 D1 ]8 X0 F6 j* Y( F ctx.output(oneStream, value);
7 ^1 R6 Q9 Q: Z& P
" `$ u( D/ e% t0 f$ S) H, W }
+ y/ L8 n( P: i5 H
5 b# s" G4 n2 u% \$ B3 E" r }
* T- y( w7 [4 }2 |8 e& Y; y( M# ~* S
7 p4 {& O8 |, I0 m });
6 z* C3 u) N( @- d% N
2 w1 M( N. T$ a- x) k/ F
7 u" K# X! }& e' ?
9 k+ l; l6 c0 X; ]1 K3 \. e# c DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);7 G9 h* n! u0 R( b6 m
! R# F" ~/ y% S9 Q
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);! J6 d- [$ ~9 e5 W8 M
) R) h, }4 T" r- a$ j) z) P! i& j( k! ]- `+ u1 U F4 T
! b0 u& y' S5 c4 Z, g4 n4 S0 C zeroSideOutput.print();9 c. H t- @$ B* ]" G0 e; L
7 o$ [7 h- Q5 j4 V* m( a+ q7 _
oneSideOutput.printToErr();
9 o6 I$ h* z" [; V; x$ _0 V( |2 O1 Q. v+ K4 }$ T" R* Z
, ~& s0 i2 A C+ v) ?: r+ R' d& y; Z' S8 z( s2 \
/ |* o- i }8 p. B! |4 x; h) H7 ^
9 {5 G6 Y4 a! R; [; _ //打印结果
/ V# t7 O! t* ~5 ~, Q; w4 m$ I4 L
String jobName = "user defined streaming source";
; |9 y7 G* v2 E3 B& q! |4 y
- k* R9 F& e9 c" j5 Q env.execute(jobName);
) O& h5 R; ?$ h' [3 n8 ^
* `# y2 n s/ t9 ?9 O/ \}4 P; \( d0 Q- h- J- h! k9 B
</code></pre>5 O9 f0 p/ n" N( r
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>( L# g9 t& e: A- _8 E& d
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>% x) e& X; T3 r
<h3 id="总结">总结</h3>
9 g$ m( {/ K3 H' j3 M* e<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
0 k2 }8 z- q8 ]" \" J0 A5 k) |; f9 o<blockquote>
8 s% d2 b/ ], e. D& x<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>- ?# P6 q2 q' y8 b& p' {
</blockquote>
& A* Y/ y' e2 h/ G' g
C6 P& q" l6 F$ N; _6 o; q; f |
|