|
8 y7 N+ `$ D: a0 X( Y+ k# X
<h4 id="flink系列文章">Flink系列文章</h4>9 K3 r! |; V( V+ l9 R% g
<ol>9 U' V. A# J% b9 F
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
& Y6 b2 k- g# T3 y# c+ p<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
; {: g, O, s. S6 a) X2 B: e<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
* I; H* d8 E0 d" Y5 M$ |8 b7 P; V* I<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>. j2 T8 e4 ]# E, {; U
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
3 _3 G* J/ B. l) P0 O/ T. b3 j<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>* U8 p+ O$ ^7 ?
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>+ `7 C5 c3 l( x4 b @
<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>" _& X4 c, ?- w+ a2 M
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
7 l2 s4 ^! f' f/ @/ W</ol>
1 h: d2 a# y" S% ^$ c. j<blockquote>" y- ~9 I7 B5 E( W) N* ^( d3 h+ \/ B! Q) q
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>( ]0 G2 X, ?- _% f/ {
</blockquote>
1 c8 y6 o2 x, X' s6 y<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>( }: }% M' ~ d. O* j$ ?
<h3 id="分流场景">分流场景</h3>5 a+ D( ]; c8 g7 E( ~: |# E& o
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
# N$ X; r3 U% t0 G& L1 Q. E<h3 id="分流的方法">分流的方法</h3>
6 i' g4 ^0 Z$ |6 l/ S2 W8 V<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
2 j, u" X$ a$ j( r( ~1 @% o; P<h4 id="filter-分流">Filter 分流</h4>
, I; A' S* D$ Z; l$ W$ T<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
- O U) i# L4 Z. \4 Q$ g# B1 t& S<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>9 K0 i$ n* `& p# U. p; Y; j
<p>来看下面的例子:</p>
7 r- f. m( ~* T) K6 g' H' c<p>复制代码</p>
( P8 i: i) u" H2 t# p<pre><code class="language-java">public static void main(String[] args) throws Exception {
* ~! Y' D( [/ [7 e StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();1 n& c$ t i3 E' A- B9 N
//获取数据源
3 r* _, S J/ F* T6 M List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();1 u$ ]3 e; }' e! ~8 K3 d
data.add(new Tuple3<>(0,1,0));* I' q2 ^% L: d0 \. u2 k9 i
data.add(new Tuple3<>(0,1,1));9 B$ x8 f! Q% e1 t/ S6 [( E
data.add(new Tuple3<>(0,2,2));% ]- `+ J! W( c! H- T
data.add(new Tuple3<>(0,1,3));
( o# c; `0 f4 S4 b9 D6 S/ \ data.add(new Tuple3<>(1,2,5));
0 p5 x! a; `8 F8 p, k# H data.add(new Tuple3<>(1,2,9));7 @8 }3 T* I- U5 o/ y( S5 _
data.add(new Tuple3<>(1,2,11));# R8 P+ f0 g. \" y9 y3 m. R4 |
data.add(new Tuple3<>(1,2,13));
. r% t. v7 d/ g6 G4 p9 k2 o$ T% X
4 b4 p: j8 H: v- \ R5 R DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);9 I& t3 E2 W* q# ]6 O
6 {2 F1 ], N! F9 B [8 b1 {0 R! `+ Y0 z5 D) x2 k, d
% |6 ^" D9 C) v' E: R6 C% E2 X& z& I SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
, g* P* Y1 t: O R5 ~6 K# o
/ X+ i* N$ B+ y+ B3 t! H SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);4 S3 v$ C, L6 V! q7 _
2 |3 i5 p+ ^: d7 o
; Q- s2 H+ E9 O# R6 K. ~, [# q% V& @& N" v5 D
zeroStream.print();1 ^ c! T' s$ f1 W7 c* {# n! M- u
J2 c: W8 O8 k p+ U
oneStream.printToErr();
+ |* Y0 H& _. p1 I/ y0 T+ d4 [7 b4 L1 `# N1 U
/ p2 Y: ^0 b6 J( V
+ g6 w2 u. K7 H$ K% ]/ R1 |2 j+ G
8 \7 Q' n5 Z o3 o% Z3 | F2 L
0 {' p3 ~3 w, f, M+ {( [5 B' S //打印结果
^$ W6 T: t& r* e2 i) i6 Z/ w( k7 M* i( ?/ y+ M9 Z* `! b7 t& G
String jobName = "user defined streaming source";. y" p6 Y O! Q( v0 _* K$ I* q
- m/ \2 X, k6 [( P0 w# a2 L! R
env.execute(jobName);
2 R' T G, X# k" T5 D8 D* k1 a* s# \8 D
}4 k) C# _& e: d* m1 {* K
</code></pre>
4 y! L5 ?. c8 O) j- s9 o6 K6 y<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>6 {; v- X0 ?8 `/ ]8 `7 j& a
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>4 W, i5 {* J9 E
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>( W) I" M5 l4 e/ e( ]# E
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
9 p+ |1 z8 }* I<h4 id="split-分流">Split 分流</h4>9 `& Z) l" ~6 L0 X, ]
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>9 ^& {( _/ t5 j8 G* {8 R1 U3 p& H: N
<p>我们来看下面的例子:</p>
+ c4 W+ V/ e1 j( Q2 q<p>复制代码</p>6 V$ H% o' f+ E- d/ C8 X/ q
<pre><code class="language-java">public static void main(String[] args) throws Exception {4 z/ e5 w8 G/ m5 F: ?! Z& h
5 X+ h2 M/ V3 Z- V
0 s: }1 P3 W) [5 V$ c8 W1 C
, R7 C8 w; X1 Z5 l$ r6 R StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
, g& X7 |) y! [# o
1 i6 y3 u# a" E8 U" _8 W //获取数据源& l) o3 x7 K3 y- [& a' d
& a% u6 O# h3 u* G6 f7 T List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
: M, F0 H# V" u4 h4 Z1 Z) L2 |
4 U% n* V; N5 C data.add(new Tuple3<>(0,1,0));) |, f3 s9 q! B5 o$ z& f
P# E1 Q2 R6 Q& w data.add(new Tuple3<>(0,1,1));
" v( ^. g0 f0 E0 `- I( q% B
, E1 g7 `& _- m: q9 d data.add(new Tuple3<>(0,2,2));
9 G2 m+ e9 K( L" x. e' `! N# B4 J0 v
data.add(new Tuple3<>(0,1,3));$ X# N# S9 G E. q% z# i
- c0 \% \4 f& r' l
data.add(new Tuple3<>(1,2,5));+ E3 m2 M3 I! `6 e, I" b7 r
# b, ~: F+ Q X data.add(new Tuple3<>(1,2,9));
1 T- c; E- n8 j% g
% g- n% N3 _! u$ f3 j data.add(new Tuple3<>(1,2,11));3 B; e* D; R8 K! N, [8 N
; m+ m5 l: V( E1 S2 G. a data.add(new Tuple3<>(1,2,13));# d" W/ G9 M- [0 ]# s
' C2 Y/ M/ Z! h6 o/ ?
6 d1 \* Y! k% m
' C) b, }! r' x) A
0 o( K( c4 ^$ ^) N4 Z+ R, @ b1 Y; y& w7 {; C
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
3 c# y& h1 p! B4 M& I# p; x) _' a" L3 P1 o5 m# X" Q2 j& D
! c e2 F" u& f" v5 }7 U9 R0 m. A# I% @! N
6 L+ _6 ?6 u, a
: }) \6 W& j& M+ s+ \ SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
/ L7 D! o' T' a6 M% T" w
$ m d- C$ O% c @Override+ U: x+ {8 |6 R# Q+ a
- E6 R$ R& l- ^; S) c
public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {+ A7 \; v/ u' @6 u: f4 q f# w
" Q% P% }/ L+ W8 F/ O8 w List<String> tags = new ArrayList<>();
% \- t1 [" z" Q6 Z) S/ N: o# K$ G0 b0 b
if (value.f0 == 0) {
L3 J# o- V0 I; E A1 o& o
) }# S' G+ ~. {0 V tags.add("zeroStream");- S7 Z4 E7 d9 K& P r& {3 Y. d
# k E/ F! w. Z3 i( j } else if (value.f0 == 1) {
3 B& H4 C! b/ U N: S8 `4 O/ g* `4 G0 i0 c3 r3 L+ h
tags.add("oneStream");. c. x8 k' ^/ |0 [0 X/ G
( Y u; G1 ?+ g$ H+ p9 ^ }
2 `; l' `' q% j! ^$ Q/ Z4 C0 [4 F1 T9 R0 Y* ^0 ~! _
return tags;: C1 h$ |( k7 C* ^, M6 _
1 N d# R! a! J5 W4 W( B
}9 x$ W' O d1 T; k! B" j, O
, x5 W- |! m k3 O: j" H });( c! N3 t+ Z+ q* V! {2 F7 S
% K' `9 G- N9 ~+ P& V
% U8 C( O& k9 I+ D" [$ e1 z6 j: {, e0 w7 w. j4 h0 w
splitStream.select("zeroStream").print();: M7 o+ r0 h- w- S" C! s5 _
' k! f& K8 e5 W; X7 [ splitStream.select("oneStream").printToErr();- a9 j- |; @2 m0 P
; S1 w+ P. q3 t' w" s1 b4 g
7 }: l( I0 l; K# ]
4 g# U, B* U' w1 F
//打印结果) g. w4 ]. ~" n Y/ `1 I( x% E9 n
6 b6 w: ]5 n4 R( x1 P& { String jobName = "user defined streaming source";, ]8 `0 e( t% p% d) `4 S* L
2 \9 S; W4 t8 x- G, o" n env.execute(jobName);; K+ }/ l7 c9 J$ b0 R9 E- p+ h
7 ~* J" l7 v. F
}
$ K5 A+ [/ `3 F</code></pre>
" C% t* R/ a9 u<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>; |' m3 r- r1 K+ V: V) |6 R
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
' x! x8 U' l6 i: c<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
' I l4 T. @/ [4 {6 u<p>复制代码</p>& @: E6 m: o9 y% |; n
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
, V3 s, f1 `. ]$ J( F. `</code></pre>. r1 _ X3 L. L8 f5 D
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>9 r7 f! t. g4 L! u- A+ E9 W
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>1 e3 h* v4 n# H
<h4 id="sideoutput-分流">SideOutPut 分流</h4>
- `$ o) U! x( u: p9 h<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
! H6 }0 e+ t' u |) H, V<ul>
K) O* p% ]* t' i<li>定义 OutputTag</li>
& o. g$ b/ b" j) c. A<li>调用特定函数进行数据拆分( v4 P: m. @9 o# J; V( e% `
<ul>3 t" j/ T/ b" A5 G
<li>ProcessFunction</li>1 F& C" B. s) P9 e- k& C* N
<li>KeyedProcessFunction</li>+ E' ^" b/ R0 e0 n/ Q/ {7 O, ]( v
<li>CoProcessFunction</li>9 Q' J/ M$ N7 L- m
<li>KeyedCoProcessFunction</li>
* @& ?+ w. u) w! u0 J( M. Z<li>ProcessWindowFunction</li> Q7 w0 x1 H" {1 p, W0 q+ u7 l
<li>ProcessAllWindowFunction</li>
$ s, G. p6 B4 k+ Q0 _4 G8 v4 [</ul>" s+ _( E$ h7 a; R" U* n& z
</li>$ M) t, j* `4 h
</ul>
4 ~4 z6 h. _8 b/ X<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>5 q5 H- V4 Y+ L+ e+ i% x, N$ Z
<p>复制代码</p>
+ l- m( d! `. W2 V1 l5 g8 h<pre><code class="language-java">public static void main(String[] args) throws Exception {3 E. [9 k1 \) d5 S# c
6 t9 Q B" t4 e8 j: `
7 e7 ]: r/ c; }+ u4 f8 z, J7 ?- z' j% K+ ?& R6 p
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
# B. m3 B2 o% e; ~% F- k& P9 k7 W% }8 L$ z6 n4 [& D6 T% Z
//获取数据源
7 z/ J4 l7 R# g7 l2 c3 n8 i! l7 n# }9 l4 V2 X, Y6 E/ S4 R
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();7 U; H$ p5 `: [' q8 ?2 F
/ z& P9 z& j/ h% n data.add(new Tuple3<>(0,1,0));! r$ z7 O2 F# S& ^' j
, o& D/ s2 q5 C" E/ ^& k data.add(new Tuple3<>(0,1,1));' u6 C# t3 c! J+ t( ]% G
5 b2 ]2 }& `) S& ~& n5 { b% C4 N data.add(new Tuple3<>(0,2,2));. ]* v: a S" ?/ @: i
0 }/ x* b3 N1 Y! t m
data.add(new Tuple3<>(0,1,3));4 g4 n3 _+ l5 c! r# u
3 I) v& Z7 W' J/ j
data.add(new Tuple3<>(1,2,5));# L) T/ w- v! g2 [- a/ h) @! S% q0 v
9 _" c( f7 }6 }. a1 m2 Y data.add(new Tuple3<>(1,2,9));6 n+ A5 \7 P; Q( U& i
3 C+ U5 v, @- E4 a
data.add(new Tuple3<>(1,2,11));1 J+ U4 x! e, [/ }, z6 g" \5 T6 G
$ m2 r6 Z4 z4 w* N
data.add(new Tuple3<>(1,2,13));* B) \" {; Q+ @
6 [% T/ S+ v/ o3 ~5 l- I! |2 F; P; {" E" q
3 c/ N* ?+ k' R, q
/ \3 s2 q3 V/ J
* J7 E- V7 h$ d+ z4 E& n DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);4 p/ e) ^" K$ d/ p' z V2 Y+ k
" t! T+ ]+ T2 l5 f
0 o* o d8 C# e. b( M8 ~' H4 C
2 d3 ?: Z$ r; }3 D OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
2 W0 H0 p+ B( l5 v% s$ r; R4 u R% ~2 v" L" e7 R
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};2 t$ Y7 J- |6 S: W+ K' E8 C
6 K2 ^. X, w* r. v& b* x7 Q0 G, k% b$ S" [+ {; \
2 j; \+ f* u5 o) ?
( ~, v% J1 g# ]- I5 a
5 h* s" K: P2 h" F: o! P# u4 g SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {9 n+ U2 V& t4 c
0 Q, }+ N' U0 B @Override0 x0 t) @4 S& O: u2 u
7 L) ]8 a! v9 Y" M
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
' c9 N3 ~/ \* \! n1 E# y' I
# N, j" I d q2 ?* R$ u/ f8 ?7 S* `0 e% T& _
0 _% X4 }/ K; G2 |5 _# e
if (value.f0 == 0) {
1 g3 o r; E/ u0 s( Q: _8 ^% G% |$ K& ]0 `$ i. H
ctx.output(zeroStream, value);
C8 [* L) o" |6 b8 F( n2 H- n% d8 J6 K/ l! A1 a
} else if (value.f0 == 1) {
! o# U% q3 x$ G1 P8 U, w4 R, P! \& C; G( F
ctx.output(oneStream, value);3 l7 ?- d0 H( n2 ~+ n
- ^: N" O3 C3 s% [: n
}: m! X$ L P# _- u; f- L& h3 l
! i2 d2 v5 a. N2 E8 v7 F7 T }5 |( R# ~% M1 ^$ B+ a9 p
6 l, Y/ Y, D& c3 ]4 c8 q
});$ W' q3 e8 a5 h9 N# M
0 X- h4 v+ y7 H& J# Q6 q8 v% E6 U
4 y' |- q5 d/ W$ d7 A# E- m# b* w9 s" N" b
DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);7 \& U' H$ f- z/ r. P
! E2 d2 ~- I# x7 R& e1 d
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);, v; `1 T. N! }; i
5 h) n$ z, {$ L& {8 h6 E
4 |- l) w- w5 R$ j# p5 ?: u+ r9 h5 X7 }, Z
zeroSideOutput.print();& N" K* ~2 }7 G; p& M: |. J, K* [
! H2 p# H) }; Z2 Z2 ]8 C J p oneSideOutput.printToErr();
/ o: g0 X% k" o6 r; }& X* A) Y) D4 R- x5 F- H8 Y, |/ P4 C% d
5 C! p+ ~! R5 }, a6 u5 ^$ J! z7 l1 v' J* l& S# N# r8 _
# b; k: `3 w/ s. q+ G. C; K0 g% U$ W+ |7 K* l, n- n7 F3 U' w
//打印结果
$ N* K8 z7 s6 P# P, M3 U& @, \6 a: ^; O/ e2 V4 T! J" W! l$ \7 q
String jobName = "user defined streaming source";! w- f2 x6 _( Z
9 ^$ H$ I. J0 V
env.execute(jobName);- a& Z2 } Y$ E& H+ c
0 o* \) {4 E6 }
}5 a4 G% {/ `9 n. U$ ~
</code></pre> C+ e; m' ~1 e3 [6 d4 m' V
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>! H2 W4 P. P8 g) H( |
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>5 ]. h- }3 E6 L& Q& w8 U6 m
<h3 id="总结">总结</h3>
/ L9 A2 _$ t" ?9 z+ w* C<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
, g% O- f/ y X0 Z2 ~. L/ ^, h, J<blockquote>6 A9 ^3 v' H9 F: K0 V' F
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>$ n6 s) H! d0 h
</blockquote>' A" t0 a& e0 U- [: ]
# X* M$ s% F, R. S. x5 k |
|