|
|
0 r5 m# V# C0 w% U, {& w7 w<h4 id="flink系列文章">Flink系列文章</h4> W* o- D0 n* ~
<ol>/ s# u6 G4 ?. e
<li><a href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
4 v" P6 q3 s# N# T" V7 ?# Y<li><a href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
0 w) Q7 E7 e5 o3 h0 m7 P<li><a href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
, k3 W% a. n G2 \& u: x$ T<li><a href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>2 D" J/ g2 N' r1 K0 b' R
<li><a href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>
4 q w& s5 T7 d9 b% {& Q; F<li><a href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>* r* z! b* N7 R! W8 B
<li><a href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
# \- W5 D, n2 b<li><a href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>& O- P8 C9 J% n0 c8 ?/ O
<li><a href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
0 S; z0 X2 L5 F</ol>' D6 Y/ j& }% y- b; k3 W, s
<blockquote>( _: y! l6 M0 U1 h& t
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>5 A: L2 H1 f1 o5 B9 c
</blockquote>4 k% \3 p& M0 X$ H( `& H
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>$ H* n. z* M, S3 `1 M; D
<h3 id="分流场景">分流场景</h3>
- ~' s& M: f7 ?: O6 K<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>+ h* g) ^- D9 s: H0 A) S2 z- U% O) j0 y
<h3 id="分流的方法">分流的方法</h3>/ I- u9 T- n% T( G! X2 G5 S
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>7 M$ o5 ]- C& |3 H! o/ b4 @
<h4 id="filter-分流">Filter 分流</h4>+ |" x% D k& b, N8 O( v9 @+ r# @
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
+ j- e8 q) i% V/ h$ O( M9 e. d<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>4 ?. V. D! I7 e4 ?# ?9 S0 x
<p>来看下面的例子:</p>9 L8 y( D$ p7 {7 V4 X
<p>复制代码</p>2 e" Z- N2 h/ B" l6 k! @1 r
<pre><code class="language-java">public static void main(String[] args) throws Exception {
% j& p! g" v k, F% G C StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();9 N. S! p+ V! U- l! C+ z
//获取数据源9 }5 Q. r8 |. d4 S1 ~
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();- d" S9 W0 X+ a5 ?! w" p2 h
data.add(new Tuple3<>(0,1,0));
Y4 w2 p7 `* I/ j' t9 x6 a data.add(new Tuple3<>(0,1,1));8 T6 t6 J& j2 X. |0 G8 K1 K; Y2 Y
data.add(new Tuple3<>(0,2,2));
' ~+ s- Q/ Z1 m& {6 o7 v8 X data.add(new Tuple3<>(0,1,3));, H3 m0 E7 h8 d" U# r' g3 D
data.add(new Tuple3<>(1,2,5));' F! w1 ?, h/ [8 A
data.add(new Tuple3<>(1,2,9));
- ^# m* B0 ?3 C6 d5 P. D data.add(new Tuple3<>(1,2,11));- W6 P" H- h Q6 P
data.add(new Tuple3<>(1,2,13));! ]4 P1 q% a: E; Y3 e) G
/ A' X0 q+ U6 ~) k& Y( X
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
l( Y# U+ A: W) v
" Q! W! r% n" u& H; {: }
( k5 y. ?, C" `6 S+ }( H
( k- `7 t0 V5 X1 { [+ l! O% c SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
5 Y. T! ~4 B! `* \0 a2 M+ v2 |
& r7 p/ a/ y2 a2 b$ y SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);# p- [3 U. s0 G4 r1 |
4 @" o1 m+ V/ ?* h2 E
; A) ]4 n6 X8 l( A; P
1 y+ J+ m" E8 q& `" P6 J+ s zeroStream.print();
' ]4 T% T5 N- N
( B6 p' c. w, D: h! b oneStream.printToErr();
2 t# e3 x$ ?0 U
6 k' W/ a3 I. w4 T A% P6 a7 p- W. b% X1 O3 R0 J; e$ d5 O
6 @* G# w* L9 V1 ` e
6 n+ e q6 M7 a6 w" V9 |6 F, W2 _
& T9 c5 s7 s" A2 ]' x' M, S //打印结果/ I& i; e6 }) A. n
% R7 ^' n$ P* j
String jobName = "user defined streaming source";
6 [% A: V9 ^6 G( A( q. |' n* _( Y8 c* \1 N
env.execute(jobName);: U, v3 q9 |' E$ L) m. I0 Y* Y
6 z$ V( I$ @) _ f1 g8 a$ }" [" [! l
}
, _. i+ L( D, t8 ~1 q1 e</code></pre>- _# c r4 i+ o; ~
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
) o# Z. ^# w% B) u<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>5 n) R$ l+ c5 \- o8 K, k8 i1 B1 g
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
6 Z2 u+ d5 P4 W* w Y& c/ ~<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>5 N6 `2 [% u; V k' d$ r
<h4 id="split-分流">Split 分流</h4>
* k0 z2 S4 L% y1 \- j<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>, K' a! ` o( o) `# H# P
<p>我们来看下面的例子:</p>
% s L; U9 I8 r' n: g<p>复制代码</p>( ] R8 F- p" Q$ e+ m
<pre><code class="language-java">public static void main(String[] args) throws Exception {
9 ~) F6 c) m8 x, \' o$ {! E4 K4 U
7 t% i% K% C. Y ?( k1 Y" W' k3 l- G( _5 [1 m5 i
, U7 `' L {& R! G
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();. j) t. u3 ^7 P! t6 w5 w
$ Y3 |; g4 \8 B# y4 t+ W
//获取数据源
: E) t. T) Z, V! w5 v6 L; S8 f9 t. @: I+ U; U6 P
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
6 s+ w6 V5 j6 t7 y+ k5 X' D. P% g5 s1 U. M6 P
data.add(new Tuple3<>(0,1,0));: y' P9 W' ^" A% o" v
0 l8 E2 K- y; x* W! `+ h/ p: k data.add(new Tuple3<>(0,1,1));+ b0 J9 G2 s% q( u! |$ f" F2 b* A
1 |, F6 L3 n) f1 n9 T! S
data.add(new Tuple3<>(0,2,2));3 f0 B( F, A4 U- G7 W
* h) i0 Y9 |" `) f4 L% p data.add(new Tuple3<>(0,1,3));& T) E$ \8 Y: B2 g" `: Y9 ^% `
4 t# b* [: r6 C- I( R2 M+ A% b
data.add(new Tuple3<>(1,2,5));: S8 U8 [% ~: o' n$ B7 t) W
, F* D& l z3 C$ v! ~) Z' w data.add(new Tuple3<>(1,2,9));: Y4 J9 q2 D' J- {
+ r# ~) D5 }; v$ r9 x) h0 B
data.add(new Tuple3<>(1,2,11));3 h9 f" \3 t2 W) Y
; e/ g9 b8 j$ ` V. C0 K$ O data.add(new Tuple3<>(1,2,13));
% Q6 D: l) M! g0 i; p. l% m. V* ^$ V# x
1 |/ D3 k, n* E2 {) {- x
5 \: R7 t+ P$ W0 {7 K5 l2 v* n
/ l. q- T9 N( j# s( L& l- p. B0 {! J5 i- v" J. m7 n
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
) ^1 y& s$ g. x" b" U0 a% v: q; i+ G: u% ]0 t- t4 O
0 X. _) t6 l1 n7 E0 e& g7 ^+ S3 |+ l/ \
- ~% w4 p) x" K
) K. b# } L6 e2 r2 I V/ t
SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
6 E0 q. {1 a: O d
$ T" ` o8 c6 J% H% m3 u @Override
. g; F" Q; p2 K1 R6 d
, o) S- _" B( Y: M; t0 @ public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
# `7 q/ i9 C8 k" o
$ N- X. V& V' U, I% R& a List<String> tags = new ArrayList<>();
n/ J" o( _3 O( Z- X0 I
% S4 g! z6 s$ s if (value.f0 == 0) {
/ G1 U8 E2 x, u+ ]& @
0 l C& Y/ g! `5 v: w tags.add("zeroStream");+ l6 E$ F3 I7 M# N. }: y
: O" c# ^7 l: i: H% x( P6 b
} else if (value.f0 == 1) {8 |' n3 ]7 ^) s) z4 @/ c) i9 D
: f4 J4 t4 M; B( ]8 w tags.add("oneStream");
1 J# D. |/ T1 P* K# e) P# W, v% P8 Y* s b( f; U' Z
}' g2 Z, w5 y1 _; R
! N; e# O4 _+ t2 P7 u
return tags;8 O' l* ]# l( X3 F, k
1 S8 r. G1 O0 T* [
}
& d) m+ \& H/ P# O0 Q# V( h i/ I
; X# U& N O1 [/ L9 d });
/ J R) R3 a" ~7 Y( S) w2 B8 }
( B1 Z$ b6 l7 U Q# m# X9 J2 H2 _6 ~( Z7 R; U
) G8 I7 k& ^) l splitStream.select("zeroStream").print(); D. R1 @7 X: ~" u
$ w0 b2 Q8 E/ {5 k! ` splitStream.select("oneStream").printToErr();9 l/ L& D7 x3 W, Q
- \1 k% ~& Y2 m& }9 w/ L: n. a
( {- g; H9 m) s3 d* b g
1 ~/ `( D# Y' h1 @1 y1 D3 P //打印结果$ w, @# I( r+ H0 n
8 Y9 j' ^2 Y% I z9 B+ K. L
String jobName = "user defined streaming source";
" }2 w4 x) s c$ A8 u
1 b' O$ `9 g! t2 F env.execute(jobName);
" |% n) Q1 \0 {- `3 S4 S0 ?3 r
. s- m# t, |! M$ q1 f% x8 w}# O- C) l/ z* Y& [
</code></pre>9 F! W# n9 u# I# u% l- |5 i
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
3 s1 O; |2 d* i8 }& ~<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>- j" D3 Y ?" ~& l4 {' l
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>5 b2 S4 p+ W+ U. \* H
<p>复制代码</p>* k- b2 ^; ~. \
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.0 S. x. ^- Q; y3 [2 Q( m& \
</code></pre>
* p/ T2 G6 L( p<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>, v# F9 a* T" ]0 t9 z; ]# L, v
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
# e5 M% N; ^4 B1 [0 @% q<h4 id="sideoutput-分流">SideOutPut 分流</h4>
4 ]7 G* e" m g, u<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>/ J) h) j z6 ]
<ul>/ g) k& L9 v5 G* h6 I
<li>定义 OutputTag</li>7 z- R4 Z. Z5 C4 V! F6 v
<li>调用特定函数进行数据拆分: j# u$ I; D Y& G6 y8 C7 T
<ul>
4 V/ I& N/ I8 |9 h# G1 Z<li>ProcessFunction</li>
" I9 R% ^' X$ L7 G% `. x" e<li>KeyedProcessFunction</li>! q7 q4 Y/ V- V0 E2 }
<li>CoProcessFunction</li>
{7 S: D ?, }8 h/ T. w2 ?& ^: l- n- c<li>KeyedCoProcessFunction</li>
' Q' D: Z9 I7 d$ i; x- h<li>ProcessWindowFunction</li>9 c. A/ w7 O/ w' i; M
<li>ProcessAllWindowFunction</li>0 y- l% ?4 f" K2 p% a6 N) h! e: ~
</ul>
! u1 y& D; Q8 m4 M% O* P: z</li>
3 u2 k" I5 G: V" k</ul>
. X* y O( {& n: a& y' r<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
5 {2 b' ^% w) \0 E% b |; U8 n<p>复制代码</p># ?7 w7 s* K4 K' l3 |- A, r9 L
<pre><code class="language-java">public static void main(String[] args) throws Exception {$ A+ x; L. i6 a) X0 P# _
& q. q" i R3 \8 l3 F3 C; u, E% u7 ~1 v- E: D. G
% Y# \. \6 e9 O; y1 i H7 S0 c( Q) x+ i
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
! ^/ N+ L8 n# M5 y e
7 F) G% x0 v/ q4 M! Q* b //获取数据源: p7 S- Q. \6 `
/ o1 w H3 R, v* E& X, | List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();* r5 O* B1 I6 Y" k/ c* ?! w0 W
/ I6 i; d9 m/ q! ^9 y' R data.add(new Tuple3<>(0,1,0));
/ V) G# s: D3 S9 g2 {) E' R
8 i4 w% y |# Q! V data.add(new Tuple3<>(0,1,1));& e7 n6 I }% _# v
# B' i" P, @" a
data.add(new Tuple3<>(0,2,2));
+ _9 t2 l0 a- A Q. v
0 H8 O- E. H/ g- q. | data.add(new Tuple3<>(0,1,3));6 W# J% f' M, A1 I0 a" _) S& \) n
5 O/ ^2 P D: O0 E; S- w5 A6 p
data.add(new Tuple3<>(1,2,5));0 c, Q2 l1 `# w2 Y8 T, {
/ ^3 Q& k) \0 ?: K7 }, R
data.add(new Tuple3<>(1,2,9));8 m' \, ^9 L, l$ n
8 Q7 Q8 c& J1 s% I' W8 T
data.add(new Tuple3<>(1,2,11));$ i1 \# M) j# D* K9 C( F0 O
/ N, g" h( a. {1 E& X: R) ^ data.add(new Tuple3<>(1,2,13));
6 y( O* a- |8 c5 T
8 ~! t, d# B: l. x! h6 V$ N5 [6 X6 i/ R6 ~- c) Q
) N. V% E) ?" F" {; z! v$ H
0 l2 Q# @; k& M5 r, p) ?0 j
- Z$ ?) I" y( p# q7 A( b, r DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
2 N- p% K1 |; F
9 o9 ?6 v `3 m! L2 n8 J
# ^1 [( i* ~" C: ^4 a d
d7 d3 U+ x s' h3 [6 | OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
+ h' `% ~' O; h/ g" s# v6 C4 k3 G( a0 E/ l- o, `
OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
& ]2 f- {) x% u( P5 t* ^) ^; Q( ]1 { J. T# h( u
; `2 q8 f# G2 L+ k) c: ]$ P, C( g
, `$ S$ K6 o7 t3 a+ l- |
3 ^- Z7 q Q. Z2 |# Y& C I
# V+ Q2 P5 t2 z) ?8 F SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
, k: A6 _6 _4 W. V& T+ Q/ W# s1 |3 }# D' T; ?
@Override
, O1 x' ~3 q1 j) X' x' J- k( B& \) `# ?0 P" b( O9 ]
public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {; B8 t; \% X; `* s( x4 `9 ^3 n
, I7 n: N0 D& E6 d9 F! t
r: T3 w6 t4 ]0 m
6 c! x1 E8 _/ W6 i: z if (value.f0 == 0) {8 B; f8 D, {- p- E( |; n5 C
0 Q; |' J: l5 k. d: |
ctx.output(zeroStream, value);, c" Y% M) L' j Q" ^8 W u# P
) N7 d: N1 K; e
} else if (value.f0 == 1) {; Q1 I u# l8 V2 g* h" @9 O
2 a, v ^, i' q9 o$ j8 Q' R
ctx.output(oneStream, value);# t/ n% t' L* y" r% \6 i ?8 r9 q8 w
) Y* e1 S. |. s' V; K
}9 l( Z$ l' ?( D# W) F
9 X! u& F: Y9 f# }
}' z$ ]( p$ s* \( N" q0 u& u/ o
& P [' c0 ~9 e2 |8 \ });8 d" ^( [) J# x$ J
; w i+ E2 U* S
4 X8 G; o; q8 y
5 \5 B( s C. F DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream); U: I9 _# i ]3 v& \/ E+ v
9 g3 u* J1 H' {, y5 G/ N
DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);6 Z/ u1 X( T, y! i
- z1 X- h3 C; r' |
/ V) [' s) w/ @1 c( U% L/ V
f, Y& Q1 W7 K5 J1 K c zeroSideOutput.print();
; F& [7 {: M& r, L1 c7 u
+ g7 m3 `* x/ V4 {6 L0 Y oneSideOutput.printToErr();
5 D1 }& r" t8 V9 A. P7 z+ f% m& f" G- l& n
( l+ T$ f6 q) \: }7 g% a
: [4 q( ^$ d4 p- u+ q
# K" Y( y" S k. y: | Y3 a
& ^( e$ q K( ^2 W" S+ K //打印结果' Z2 E# r( e* X) e- r; n$ H6 P7 T
+ t j- t: k" |8 }2 ~6 g String jobName = "user defined streaming source";# s5 d- K+ ^) V' R3 K+ M
+ |% c0 _+ S) ^# i) G6 [& K* W' J$ T env.execute(jobName);
( d$ O4 H& q9 ?; r. i: a0 i' C/ k, F" i. j% Z
}
& _) W6 e% _2 t( D+ {6 |</code></pre>
, C: I. S) V7 }5 `* c<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
% h; H+ [$ Y/ k) k! G" @<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
0 Y ]1 K: f" ]) @+ o% T<h3 id="总结">总结</h3>2 ]* S, ?2 w8 r, c, b2 ^* I' e
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p> z y3 Q. x. `6 o0 D2 ^0 O
<blockquote>
+ O& v" u# \% E# m6 k$ Q6 ]: B<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>% w* C: x7 @, D- m6 o
</blockquote>4 I1 T5 M* G% V) a5 A, _3 d, X
, y8 ?0 ^$ P8 J% K* Q. L9 g |
|