|
9 e, `1 C6 \+ g
<h4 id="flink系列文章">Flink系列文章</h4>) Y1 Y+ Z2 f. h9 H; K
<ol># {' n/ y4 f8 }. ~: ?
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
% |9 A: l3 [/ p3 W0 T+ l9 B<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
& c7 k$ e* ], d$ N<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>$ Z/ W1 [5 @- L# W f, ]' E
<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
: G1 z- h& r+ m7 O1 q# f" |, L<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
) }: b0 W. ^0 Y3 J, |<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>7 A; U0 ~! N9 V" g
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
9 m% b, w+ W, C" v5 r3 L<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>5 c- S5 N. B1 Z2 u7 _$ E4 G
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>, R1 t2 F" |0 K0 j9 E( I- u, A
</ol>
2 m% r" Y/ h5 I6 B* m<blockquote># X: D3 M, z- m" `) X K" d H( C
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
/ i* z5 Z( M* h" d" T3 M</blockquote>
$ G8 B! C/ K2 q, u0 g+ w/ q7 K<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>$ [& E% h: R1 i# \' u$ e ` c
<h3 id="分流场景">分流场景</h3>; |# T0 _: g3 k3 n" L% j" J
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
3 z/ Z0 p w- }. [<h3 id="分流的方法">分流的方法</h3>
/ z( n# }2 B5 ^& C<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>6 H! R h: Z* x) o& Y$ w6 C
<h4 id="filter-分流">Filter 分流</h4> g9 s+ ~2 {; Z2 s: K N3 z
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>9 ^( @1 c# w0 _+ Z; E9 d* r
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
+ \; U- _8 P: k5 R6 F<p>来看下面的例子:</p>3 p. A* z) ]1 C, T- A* f
<p>复制代码</p>
% V& M7 }! f: N<pre><code class="language-java">public static void main(String[] args) throws Exception {8 A$ N, B* t5 X: t
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();2 u; C' ?% L" o1 {0 b4 U
//获取数据源$ m4 r, i- A- u6 A4 [7 T& V, Q8 h
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
3 b+ [0 }- \* c2 Y! Z; a7 V data.add(new Tuple3<>(0,1,0));4 S2 E ~! q: G4 a% [
data.add(new Tuple3<>(0,1,1));
1 U% p1 q; Q- i$ `2 V data.add(new Tuple3<>(0,2,2));
" V, S. u4 Y+ k data.add(new Tuple3<>(0,1,3));
: K: ?+ l) K, D8 C data.add(new Tuple3<>(1,2,5));
6 i4 n/ |; l& V- F6 \2 i data.add(new Tuple3<>(1,2,9));) m, m9 u! D6 K# N
data.add(new Tuple3<>(1,2,11));4 L( x: K- b5 P8 m7 t3 `
data.add(new Tuple3<>(1,2,13));
# _+ z2 n' q2 F6 V0 R4 g @% H& r6 v( |/ I* Y; J
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);$ l0 T* i& Z, ?0 p
8 ` A* \/ f3 e$ S* }: ^2 @' t
O5 c8 n$ u4 c0 O
9 e$ I6 ~( o1 c, P7 c$ Z
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
J8 q$ F# C* o1 ?9 T( ]) c" @0 S2 m8 b# T/ _1 f
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);# P; K. k" L5 K8 R3 v7 D
2 Y/ p6 U7 W1 L
9 ~( v4 S. v- l3 E6 c6 k2 v8 w7 P% C3 a
zeroStream.print();
- G0 V4 B( K4 ]& f4 u
$ K8 h+ V1 |6 v, k5 T* F oneStream.printToErr();
/ S: K0 y' O% l9 f6 k# B$ h4 _- S! W
9 o( S7 e$ Z& w5 }- g
2 Y8 R4 Z" A$ D' k* S% a
3 W5 R/ ]( G. g0 F: x) X1 L$ x9 X3 n y2 u
//打印结果& ~3 G4 f0 ^1 J( e W/ o( l
, i+ Q) M: r( [$ C3 M String jobName = "user defined streaming source";
- s+ |( w4 b+ V2 L8 G: v$ h5 D7 N: q, U* P3 `
env.execute(jobName);
8 U8 W2 d7 \2 p* f
7 U ]5 ?. ^6 A! B& d6 E; l}: J/ Y# ]) V1 C5 L: Y
</code></pre>' p2 A6 Z; ` o; {1 K
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>4 W0 L, T) K" z- V8 g# w4 w
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>- H' r/ J C3 C7 I
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>3 {( v# Q" p v
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
* E3 p- c) W" r% S9 f5 p& M<h4 id="split-分流">Split 分流</h4>6 I& F/ k8 i+ h- f
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
8 F+ M+ C$ J% u7 d+ Z1 u$ n<p>我们来看下面的例子:</p>
/ x: V/ T# o6 A, A; W<p>复制代码</p>
6 W* K1 g2 G4 k D4 I6 b! ?3 [<pre><code class="language-java">public static void main(String[] args) throws Exception {6 o( h5 ? z3 v3 e/ k2 n0 @6 s
1 ]- Y0 {9 Q0 @& v. T
5 g9 q b' E. _' }
( @+ ^* V) `6 V7 h* }7 a9 ` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
, ?* e* f$ A( Y$ c# O
: q. t" O7 L: ^2 E //获取数据源
1 u9 ^4 }0 x. q5 d) z( ]: [/ w/ C0 D# D' D+ A* |& [
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();& J# h1 O' x! b. s/ d: c2 \/ D
! i+ V2 c; I% Y \: j data.add(new Tuple3<>(0,1,0));
* y3 J2 @" J! Q; `; ~/ e
# K* W% {; r) @0 P5 e- c data.add(new Tuple3<>(0,1,1));- l; ]# Z) X$ o8 \3 k
3 G. I5 c9 `' g8 U7 e5 D9 j
data.add(new Tuple3<>(0,2,2));( e/ L3 R4 e1 C7 D* D
8 s/ [ l/ B' T. R; D data.add(new Tuple3<>(0,1,3));; M- J* _. I; }
/ b9 V4 [# m" b; C {' `: X8 f' O data.add(new Tuple3<>(1,2,5));$ a& t: } L; {6 t
% z! A- c" }- ]( C
data.add(new Tuple3<>(1,2,9));# B V7 A) T, R) z7 s- H' f
7 O" _- W6 V5 U$ G j7 D J0 x% z3 U# F
data.add(new Tuple3<>(1,2,11));
: n( g. S! z' g+ y7 H
7 p# E0 G3 N: C N8 w data.add(new Tuple3<>(1,2,13));
3 i2 o4 Q7 z M7 o5 p8 \6 D& x1 K" @4 h; P6 T: m
5 ], O. f5 t: X4 t
3 {" A, Z, s5 E9 |1 t
c6 U# h- V( R9 s
8 B. l# `. T, j% Q: C DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
, u" w. n9 S* N3 r( Q# ]) F3 D4 W- S' T% K6 H
7 W2 X$ }+ M! h% ]8 I
" L3 s/ _3 H* [( e0 i: z3 J+ Q' k! z- K6 B
' K! I! h# Q2 T* r; M, G n! T0 o% u3 ?
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {& J7 V: o3 a: b0 p* P- Z7 C. W
% y' x: q/ m. O2 R
@Override |8 V0 ^! Q& b0 }
, A! V2 ]7 _! N6 i( w# i
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {/ L# Q; c- L' I* T; |- \5 P
8 R5 W# `; U# a List<String> tags = new ArrayList<>();9 N9 e( ~1 F5 F' O
- z- R' l: Y" F( P
if (value.f0 == 0) {
3 M* U& i9 L; Z% u; P$ C! _ u4 m% F1 v b# N$ o4 n& x3 {
tags.add("zeroStream");
: @: E) J3 R: _. s! W/ p4 w7 J& k, v: n1 v' O
} else if (value.f0 == 1) {
' f# H7 Z0 ?8 h# x( ~
/ d n, R6 Q4 h& v tags.add("oneStream");$ T. X8 p& A' Q" V, c
C4 @$ w) ?0 S8 @2 \7 B' D' V6 R/ a }
0 c( J" M( X' f: x* F% ^* P) [* \% S
return tags;* g8 A' r6 L' _) X( {3 Y
3 `7 \3 U& F3 I. ]
}, p: Q) a6 M% Y+ A2 [
/ z& M7 b- @/ g
});/ T ]; I# u. N/ l, {" Y' A
* j% C( t' f+ y1 C; c N" }$ X
1 `4 p" z) G) q4 P d) n- h9 F& j6 Q6 P' F" I) o2 S2 I
splitStream.select("zeroStream").print();/ b4 J9 a% N2 Q6 f, ~0 e- A' i$ N
; ^" c$ l7 j1 D$ u
splitStream.select("oneStream").printToErr();) V0 D5 @" L! z
. `9 N7 [- x* Y& k. p; H0 A
, }8 h( g: V9 ^9 D- T& s/ e0 x, S5 v9 F" R0 r4 ~! X1 t9 }
//打印结果. B4 K# J4 |. y# V& w ]6 A: r
5 ~1 N1 D+ f: R% I
String jobName = "user defined streaming source";
3 G2 h; }" E0 Z
5 A! A& I% M) R9 l3 E% g8 m5 Y( m env.execute(jobName);
1 {" P! G( w# x ~& r8 U+ {; o9 @* K8 I8 P1 h; h
}
& U; T6 M2 ^0 h6 d4 S$ V5 R4 Q9 d c: d</code></pre>7 [1 T9 `4 ~1 {" N2 |
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>9 @3 v; `- |% u& v& |
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
# V: m+ J1 _6 a<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>2 V7 i5 K' c7 F" M) I. U
<p>复制代码</p>
$ h: U( y& a% {3 w1 L: `<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.( w- Q4 z- W3 e
</code></pre>
3 O9 {5 H) [9 I( r& h8 k/ l<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
* N+ m. i ~9 A: W8 N7 h5 k! M0 S4 I<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
' |# E5 b1 T" v<h4 id="sideoutput-分流">SideOutPut 分流</h4>
. a, r! \' O D) Y<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
1 i. T: D b4 @+ t0 n5 g<ul>+ _- K9 \6 E/ X- U! Z! b
<li>定义 OutputTag</li>
; m9 h/ S& z/ R( f& b I8 Z1 [/ T<li>调用特定函数进行数据拆分1 E# d+ t9 A5 E! T
<ul>0 ?" o1 W5 \8 o$ I1 C+ I
<li>ProcessFunction</li>
1 s1 F' {- A/ K8 l5 V<li>KeyedProcessFunction</li>- l5 c& m$ R) ]& ^) ^ y
<li>CoProcessFunction</li>0 ?2 i, Q4 `6 | E/ Q: i
<li>KeyedCoProcessFunction</li>* Y4 \+ h7 i! w
<li>ProcessWindowFunction</li>; u! O' G2 N7 ^# s' E `
<li>ProcessAllWindowFunction</li>) K- E4 _/ N, k1 k, q6 M* h
</ul>
5 V4 N8 v v9 z: I' p</li>
: n3 g0 p, n+ {* j& ^2 O</ul>5 L' }& v5 Q6 U7 g2 O' v: N% P( K
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>- j% K7 @+ _- \' ]$ G1 g' a" ^5 y
<p>复制代码</p>
: Z* O* z3 G% b" t( g' y9 j<pre><code class="language-java">public static void main(String[] args) throws Exception {$ e! ]: ` j, I' G+ [, H
( [0 N2 F/ s l+ K1 C' v$ G4 P/ i/ d- m. Q4 S$ X
' G! `0 D1 }0 @- A% f+ n StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
# E: ^5 i$ K6 n, Q* V- J# D0 g1 C2 N1 r$ C! Y- o3 W" y' h
//获取数据源/ `+ g' i9 h8 @0 U! M! }
% [2 d) E5 e- H \ List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();8 P D. s7 N2 F* \ t! r
2 |: h& |. ~( V! t. L0 I data.add(new Tuple3<>(0,1,0));, A7 n0 P% _6 C$ y0 p/ y
2 g/ y7 o; c2 f- I data.add(new Tuple3<>(0,1,1));
/ g" b: z) O! L, A
6 g- o* X2 j7 ?" W: d! t data.add(new Tuple3<>(0,2,2));# Z: T* u" S6 n' ~3 D
$ t9 a" |9 l7 u, [$ O+ _; h9 n- }+ n+ W& n data.add(new Tuple3<>(0,1,3));
9 D( d t& U# o' k# y0 }1 I/ l, h( ?( l$ Y
data.add(new Tuple3<>(1,2,5)); o2 O& V" d6 X- |: [
. U+ h% d" d4 I+ |' d9 e data.add(new Tuple3<>(1,2,9));& R( |- F* k1 L7 Q8 @. d( A
- ~, S$ n. u _' i
data.add(new Tuple3<>(1,2,11));+ K* C' |- d; J0 h, ~5 G
! x2 p; t8 q; r: _0 l$ U data.add(new Tuple3<>(1,2,13));5 e1 l. p9 m/ }4 _+ c: P
$ e, N4 v0 p" `% ]4 \: e9 o$ [' h$ @+ s/ j8 V' M5 E
8 f# D( W+ E1 z6 u6 ~5 ^& i$ C' v& u# \+ E9 X/ x
5 o; l8 w9 ~+ q/ B. \: | DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
9 H$ W/ m. T! Y/ ~8 q) ?6 P" @/ [3 d2 `" V' d
+ D( B- ~, N G: y' D
3 L6 ^# f- _( W6 ~5 P1 j OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
+ m1 E' \, V' x" Y s; e8 ^# W6 C. o& x. q% ~" Y; N2 l
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
3 l1 H4 ?4 ^- a( R) ?: Q: U R
6 n( G1 y2 I c) w5 B4 h: E" `0 m: H& o x; T, Y! o; k
/ t* v3 c( Y9 N- R
2 ~6 J$ |7 f: u3 L
! K) P, f" L# {
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
2 F8 D. h* Z5 K% X Q+ i# G: I8 z1 \( |/ q4 ^: e
@Override
" W2 A* z: _. t
. g5 @, w2 D2 _2 | public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {" \7 f* }- Z2 L d
4 R1 H1 Q4 f8 p4 `5 B
0 o6 k% c+ ?/ k1 ~: S
* Z" Z, P9 `2 S( ~( J if (value.f0 == 0) {8 w! C) b4 O) ~. F
1 r7 a* ]0 A. g1 g
ctx.output(zeroStream, value);
8 i, L6 B$ @+ ]5 q" v% l0 k% s' H, F4 }& r+ G
} else if (value.f0 == 1) {
6 u' f( _2 M3 J; _: I# |8 f! w; F9 y8 Y6 o
ctx.output(oneStream, value);1 U$ O6 M/ ^$ b* ?3 A
* Q5 P) s& z# ~, a/ ?9 R }. S! [8 f& }4 Z, b
( o" {2 _, n) O }6 h6 o6 p2 N4 x& h! l9 H! g
! E: X: P/ ]1 i7 A! T' E4 K });
2 o2 e4 ]/ T3 S% d$ t
" {% T$ a7 _% E6 Z7 h& b7 J& \0 W; x* Y/ h4 F0 V
) b+ t- k1 D% p; c8 W7 q+ {( Z
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
8 {% m& \7 c4 I$ J8 w" K* f0 L; _% s6 Q A1 E1 V" K- |& o: k' B* N
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
4 J. W5 b- I; b; D1 q- U0 [& F% d* w# A5 H
' I9 [* s0 `) H$ @. B* d& R$ e
5 f# T9 b. w& N3 t5 k# D; Y zeroSideOutput.print();. E4 s/ k w" y% \4 t% b
0 D r; ]/ s4 x8 ~7 G: Q+ I' W
oneSideOutput.printToErr();+ d$ u9 f, Q n- p5 L4 F
* v/ `6 ]2 v5 t+ F
; r i5 P5 Q8 _# ]3 l3 K, \/ i2 q* q+ Q* K, a0 z4 B' ~
' W; J( {# {3 g+ \4 c; a. q
1 p' Q9 }; a$ Z( ~ //打印结果
+ W3 N4 z0 |5 d; p; ^6 D) W' F* Y5 Z# G6 u3 f
String jobName = "user defined streaming source";
" n0 p+ b$ ]4 C0 Z
) Y: q* s/ d: i) C% x env.execute(jobName);
# K0 `5 b$ v0 U2 q, N. n! }% M: d% f9 o
}
& J6 ?; C$ L+ K$ p8 J</code></pre>0 ^9 \# k4 i/ u$ P! z2 J- i- b- a
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
3 H- J. P, I. t% z% o( f<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>& s- G) V4 _4 \
<h3 id="总结">总结</h3>
9 @7 t4 Q; y6 A+ g5 Z! H. F<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>) _! Z' D) W' u3 ~" w0 E* l
<blockquote>+ Z: Y9 h0 Z/ i) U: e3 f. ?
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
# b9 G- c' O. a7 g" B5 ]</blockquote>* u9 W5 W1 O1 w; B5 O
) U, i& ]& H) ]+ U+ Z/ x7 [ |
|