|  | 
 
| ( M% n" x! L/ [# x5 B<h4 id="flink系列文章">Flink系列文章</h4>& X) A7 @; U5 u% b( e3 l+ ?
 <ol>  l* c& T) a+ l- w' {
 <li><a  href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
 ; U4 [6 u# V( M' W: F) [! S<li><a  href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>2 L) u: o7 E2 b& d# L; W7 G. U
 <li><a  href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
 ) L- R' b, `# b% ]; s<li><a  href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
 $ d. I3 C; Q6 Y/ k: M: {9 Q. u<li><a  href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL & Table 编程和案例</a></li>( y7 B  A8 w, N9 a; W0 E$ R: H5 y+ c) f
 <li><a  href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>+ f; ]2 F1 A2 M# Z) |7 h8 o
 <li><a  href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>4 ~* _/ }$ w$ M$ h  ]7 r( i4 R' A! h
 <li><a  href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
 : x$ d: ^: R) h- y# S6 i3 n2 T7 @<li><a  href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>% }4 x, z8 K$ U- ~" {3 R, H
 </ol>* f, C9 K% W+ e7 s$ `1 j5 B
 <blockquote>
 5 E/ r) y* f$ o* y3 A( S5 W) H<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>, a) [5 N4 i/ G' n
 </blockquote>
 * T+ R& j/ z# X9 i<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
 ( V6 Q( ^# X6 W8 K1 M; V0 y; k! x<h3 id="分流场景">分流场景</h3>: V$ [; }! x1 y* |; n0 b' x  |8 P
 <p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>- ], H$ O8 {( _: m7 P7 U, G% Y
 <h3 id="分流的方法">分流的方法</h3>
 ) }4 w$ S) U& T/ b* n- B7 g<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
 - Z) k! D3 Y- j<h4 id="filter-分流">Filter 分流</h4>4 P) k! t5 I2 [8 v5 n* T* w
 <p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>4 _2 A. C# a$ G
 <p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
 ; T- J) ?; O: Y! c3 ^8 E3 w<p>来看下面的例子:</p>
 0 h* b6 D: C. o& l/ _$ q2 b; @: z$ U<p>复制代码</p>6 I. Q: P- K- A# M( m. t
 <pre><code class="language-java">public static void main(String[] args) throws Exception {
 3 F* ~) j* c9 i( }5 \    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 I# D" l, u+ t, j; J( u  {    //获取数据源: f; V5 S; b; U3 k
 List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();4 v% [' S* L4 o# R9 H" K
 data.add(new Tuple3<>(0,1,0));3 L. R: t0 z& h8 i9 `
 data.add(new Tuple3<>(0,1,1));
 $ a) c. R" a$ L4 v! |    data.add(new Tuple3<>(0,2,2));$ ^( s2 q$ ?, Y% H+ {
 data.add(new Tuple3<>(0,1,3));
 ) x: k% Q9 o+ Y" F9 `    data.add(new Tuple3<>(1,2,5));% S5 X% L: D# K
 data.add(new Tuple3<>(1,2,9));9 A0 k5 N( I# M1 w
 data.add(new Tuple3<>(1,2,11));
 6 M! I- l" Q: W0 _    data.add(new Tuple3<>(1,2,13));
 4 t" Q0 h9 H" C% Z( Y& _, l$ o1 f( ]* m- F' z
 DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
 3 I/ ?; f% ^3 [% b: I* @* A  K+ U3 r( r$ \0 ^! L8 w; b, _9 h
 . e% I5 c' O( v4 a% T
 / u& P8 w0 R& J$ A2 M2 W
 SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);; M, j+ h  f5 _9 ?+ {
 3 e- a' P, _2 j5 e
 SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);; L, Z3 [- _1 m6 \$ e1 r% Z& C
 
 - c4 T& Z6 x& O5 l' L! x4 n+ k0 z! }: H
 + |2 n7 t9 O/ w8 e* l) ]
 zeroStream.print();: `. Q* s5 [3 _+ O$ s
 
 1 P0 U0 p6 e) j; @) z* ^/ e    oneStream.printToErr();
 4 d# i+ F/ G: _3 D! G+ R8 J; ?& y& I+ j
 
 2 V0 I; e9 Z& L+ t0 U& ^
 - n) ~; t* _& U) m5 q& }8 O$ ~" [+ s
 . C# |" S4 ]5 G" ~4 H9 U8 r8 u
 //打印结果
 & W" \- o& Y7 e, [3 F; k; E& n4 }1 c3 @
 String jobName = "user defined streaming source";
 5 z0 k: E1 g: q" o* I, t+ u' N0 V' |: Z/ N/ V7 ^. C
 env.execute(jobName);
 # e$ z. b7 z6 c& o/ B
 * j! |9 }4 w9 ^5 B}
 0 l5 [1 }, N  w% S2 |</code></pre>
 1 o- x) H3 A# E# l; x<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
 ' k% }6 b7 O" g/ |8 a2 s& C% n<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>$ d9 W# ?6 t3 ]/ C. F5 W0 i5 Z8 j
 <p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
 - k" o2 W2 X( M) @. d<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
 ( u- a$ u5 t( k- O) d. _9 S<h4 id="split-分流">Split 分流</h4>
 6 u5 f0 t" g: {: g" L<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>$ v2 S6 @: H! Y, W* V' G/ y- F: d2 ^
 <p>我们来看下面的例子:</p>9 h* R  }8 b4 m- A! D/ ]
 <p>复制代码</p>9 w; c. z% ^7 p9 {% t& I; ?
 <pre><code class="language-java">public static void main(String[] args) throws Exception {- [4 B' \3 ^. r$ v7 w8 i
 
 ( R: _% T' U3 [2 r5 h+ t" }/ C$ D% H1 Z3 c/ F/ T, Y5 t! D- Q$ P- o
 
 2 o4 k) c0 Q7 Z- a$ N9 X6 u  p    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 2 O- G' J4 `$ ^; z# B6 ~
 ' w3 D" ^4 O9 N% z6 i    //获取数据源
 % }( G9 ?  _9 N$ K) s3 I! s
 # b! s; U2 V6 o    List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();) W$ d  R- ]4 r; z% ]2 Y
 3 {/ Y/ w2 {* }! K/ q/ p. i- L! f
 data.add(new Tuple3<>(0,1,0));
 2 e: N' u1 {( o$ u; o! l# W7 d4 d1 h6 k! q
 data.add(new Tuple3<>(0,1,1));5 }& m$ Z2 [8 o8 E0 \8 Y
 
 + v, h, F5 n1 a6 P4 `, H8 V! J    data.add(new Tuple3<>(0,2,2));2 `, e- p* G; F1 z( e
 9 X2 G1 W7 m$ A) a1 z
 data.add(new Tuple3<>(0,1,3));
 - d, r6 ?5 k1 U& a' w; Z( H# V$ c! b5 ~6 u! H! L) x
 data.add(new Tuple3<>(1,2,5));
 3 ^- Q3 C! {$ r5 x/ L1 l$ [  c$ Y' A9 _" t1 s
 data.add(new Tuple3<>(1,2,9));! q. n, Y  z  E5 w
 + ^9 f% L8 l- b# J; m
 data.add(new Tuple3<>(1,2,11));$ O# U" R9 t2 b( [; m3 w) T
 # p* t2 W* b% r! C3 a! D
 data.add(new Tuple3<>(1,2,13));$ E  G5 ^# L, N6 z0 q5 N9 A% G+ O1 D% q
 
 - m# H* J7 T9 q4 N! |; e8 x$ f1 E' _7 F  t$ n$ l
 
 $ A" S+ Q+ [; ~, M- E+ f5 H# n  K
 - x# A" `/ e; d+ b, g9 J$ J4 ^& D
 7 Y( q2 z& L) s    DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
 . p8 d7 J; g( X+ [. V# p) ?
 - n# m$ j  ^) Q. i1 w/ K$ S) B0 G7 L5 W* h
 ( P, E6 s; ?3 _+ j* g% R* X
 : K; V" A2 i- @9 @' K
 
 - Q6 X# Q: l7 k' m. _8 O4 c    SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
 6 R; T! d; H. Z, c. J7 r  k1 B# ~0 s7 U/ E4 u
 @Override
 / [' v( u4 `2 }2 M6 m5 h
 ' L7 b$ U) [6 _9 S8 O. r1 r2 |        public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
 2 d5 \7 S: g; @8 R4 |8 Y1 P1 P6 u4 a2 ~4 O8 X& @
 List<String> tags = new ArrayList<>();% d& q4 f! q- Z. k6 X, [( Y
 
 & j3 Z9 g7 G0 H' _; n            if (value.f0 == 0) {$ K  g1 L" }1 s& ^' o( E
 ( |& H8 \" R" @, {, |5 p) U6 z
 tags.add("zeroStream");8 E% C2 {# [- @# Z1 ?* h' t( n
 8 p: i+ D3 A6 \+ t2 Y+ A5 r
 } else if (value.f0 == 1) {
 " Y9 r0 V6 C; s) W$ K
 % {9 X  x. F. ?6 p5 M2 C* A                tags.add("oneStream");
 ' z" J" S2 K/ u# _, U/ [" F4 N  l1 a
 3 D% Y5 V8 c, B; A# H            }! Z7 @+ @* w' J8 u7 [) z( w8 X' _. B
 
 1 n1 A. Z/ K* a% R: ]# A0 G            return tags;
 & T# U7 ]1 a! N8 U  J, m, n, H
 ! l3 d0 g* u1 j7 K        }$ ]* m) J4 Q$ B7 @
 
 2 w" m& G' i+ ?7 D0 i! s4 m. D    });+ f* d0 z4 |3 c, O( ]9 }0 ^4 F/ ?; U
 
 - E5 \) H5 p" l& S) p6 n# T# X1 u1 X* T9 d" ~& \. V, T
 
 4 l* @" |1 ^+ Q    splitStream.select("zeroStream").print();9 J7 m# c( z- ^& X; b" c
 3 ^" \/ ~  }% v  L6 c; n1 ~" [
 splitStream.select("oneStream").printToErr();
 7 ~' @' G5 V: X! ~: ^: l6 I, Q5 z! N  `% j
 
 + J- g1 j& C" n; U% H$ b1 M# c4 z8 }
 //打印结果
 2 G. X4 ~0 L$ Z$ R3 j; D  s  t& j: N
 String jobName = "user defined streaming source";
 9 r+ \- L0 e) y* r7 P. }
 8 n2 M/ ^0 q& O/ \3 o5 @6 B% n    env.execute(jobName);# `3 S/ A2 ?8 I0 f
 
 % s' S1 Q1 n: p( j8 ^}; T0 L$ N0 S  U( m$ Z
 </code></pre>
 ' v8 w3 t, O0 z/ W5 k7 B<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
 ( k% r# q( c& Q6 s! W) [) N7 ]( d<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
 # g8 R5 o4 B% ~7 `* ~<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>  ~( f! K5 {+ v" s
 <p>复制代码</p>
 ' E- T% l$ D1 _<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.& ?, N7 j) h4 k
 </code></pre>
 , [7 ]0 A; w9 \6 f: s* t/ d<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
 # j! w' ?1 {) A<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p># q4 b; o9 _" }! k4 w* r% f0 h9 n
 <h4 id="sideoutput-分流">SideOutPut 分流</h4>
 & I; T3 [( Z4 r, k<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
 ' X% a; V0 m/ m, y<ul>
 3 U" x1 K! Q5 K% Q: w& k2 E<li>定义 OutputTag</li>4 T! B/ F3 u" D# v( n5 r
 <li>调用特定函数进行数据拆分
 . @1 M- N8 R5 J/ F3 M8 m" S+ G2 `# c, d<ul># |2 D8 J9 n+ ?9 U# [( v
 <li>ProcessFunction</li>+ g8 G9 P2 y# @! k$ O9 @
 <li>KeyedProcessFunction</li>2 F4 L" a+ I8 ?0 c0 ]
 <li>CoProcessFunction</li>2 _- {9 ?  @0 G! j
 <li>KeyedCoProcessFunction</li>+ R2 T, U6 ~! m( |* L' V
 <li>ProcessWindowFunction</li>
 / ?# J$ Q) c6 {7 o) K<li>ProcessAllWindowFunction</li>6 e$ ^2 _2 J- U: U6 L2 j
 </ul>3 m- O0 T* I9 W& d, _+ |' B. a8 M' Q0 x
 </li>
 7 i0 J' F7 M) s" X/ {</ul>
 ; X% ?$ r7 Q5 E% {<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>+ v9 Y( I% w$ U, q3 u- l. f
 <p>复制代码</p>" Z# O8 Q1 G# k0 o% W$ Y
 <pre><code class="language-java">public static void main(String[] args) throws Exception {) q3 |1 t6 a4 J6 F" [
 0 |$ t3 o6 @' P. Z
 
 ! ]# y. @  B* U4 k  ]9 M; p7 h8 Y; W& O0 p
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();4 c( R' ?6 h5 z
 % a! G0 ~, r3 X! }+ `
 //获取数据源4 |9 B) P0 a* e- O4 t  g4 n$ ^- `4 O
 
 3 q8 s: a! H+ U& X9 a3 s  J% G    List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();; x: Y/ f8 o3 V. G3 g
 : K+ m# u/ {$ {! |) m
 data.add(new Tuple3<>(0,1,0));3 {" K2 J2 @6 Y5 ?+ A% ~% a( D  e# T2 A
 , d% H2 e! M- I6 ]3 Q8 W7 c
 data.add(new Tuple3<>(0,1,1));/ u% {7 B# U5 W: `6 l
 
 ( U" {8 [% u1 ~3 E3 m9 A7 J    data.add(new Tuple3<>(0,2,2));2 M( J; t! |  n, O7 E
 
 8 M% O' ]) Z3 _' a% Q    data.add(new Tuple3<>(0,1,3));
 / j# C- `. d/ b; I; Q
 n: g7 d  B4 W' O    data.add(new Tuple3<>(1,2,5));
 + x  y% t& V. W4 _2 u9 ^) {& g+ ?6 h, S1 Q( g& M3 _" \
 data.add(new Tuple3<>(1,2,9));' A. j  h# @: ^! K3 C( w( I
 
 5 v( \! f* a4 ?6 u" ]    data.add(new Tuple3<>(1,2,11));+ k6 s. U: Y9 V4 t4 L
 9 s) \+ x  n$ J! C( `. X
 data.add(new Tuple3<>(1,2,13));- [- S" _6 n/ J7 p! i* i
 . w4 v- S/ @) E& I/ A
 
 8 O3 f8 |8 N: r+ r3 `! `# j7 A+ [# P+ u6 c
 
 ( j$ ]' ^" n: S- U% V# J
 0 q. s' N6 T; @# g5 L2 n    DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
 : |% I' U/ Q- W
 * n# \( u, X3 Y9 }% P
 1 b& \2 d: j- X( y$ T: m! a1 I) A) Y( u( p) ~
 OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
 # C4 d% l" Z( M4 c+ i+ C& e& }: l" J- x1 S/ v; G
 OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
 , j: H. k& s' u! e4 J* I! E& X+ k/ d# r& W5 Z4 ]  m
 
 3 \# ^. V. ]  a! F" H8 V/ |
 & v+ o+ }  S! ~/ ~) ^- x& f0 n! E- N& r
 + v6 d2 e' q5 d. Y
 SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
 " f" }' L  s! X0 E+ x! f. u$ ]  e5 h, s: A
 @Override# d( e+ Q+ S+ J2 C5 ~8 H5 e
 
 7 B8 Y+ A( t4 s4 }        public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
 # B3 m8 v* [/ w% e! S. ?
 & n; V* t2 [8 D' x$ g5 T  _* J. G( G% h
 
 ( W. e  I+ u( }            if (value.f0 == 0) {4 f, o. F4 L+ P0 W) S  e3 c" P
 6 p* U3 p7 Z& O; r6 |5 I4 J$ V+ ~8 e
 ctx.output(zeroStream, value);
 7 [- B+ H  ^2 S3 B  `5 r
 1 b, z) Y" H: f  G            } else if (value.f0 == 1) {
 . T# a! B: O8 K  s5 F7 S0 E+ B2 j; n- ^
 ctx.output(oneStream, value);1 M4 a! j7 w4 p2 V. i* k
 " l$ Y+ G" }: b7 h5 ^9 s5 _2 G
 }, ?& m( z5 k  d; _* y5 Q
 
 , s3 ]; ]  o, I( \) q8 H0 F& w        }
 2 P+ \# u1 b8 _; m; A  X# h" T8 W
 3 q; c' j& I5 k3 c- n$ O0 L3 |    });: [# e6 x7 D- \2 y
 9 O5 j' K7 C9 b/ a  g
 , w6 g: f& |" Q
 
 * J8 i1 t1 q+ f9 \    DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);1 p( e. A! u' n- G% O" x0 {( J% x7 \
 , p" L- D# v7 G, E9 Z1 Q
 DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);& B  y, b8 z. q; i* M
 ) V! V7 C& W: J5 U9 I) h; `% v: ^
 8 h+ i& d0 `' Z: I2 s- {
 
 ' J% K  N3 s% _: p$ f    zeroSideOutput.print();
 & K1 Y1 Z+ n( C8 O$ D, Z! m# B( v
 6 C0 x) |1 w5 f; ~    oneSideOutput.printToErr();4 _4 w- x" s. y, u
 
 ; G' N0 G6 k1 Q. a# b6 Z' K/ a7 b5 ~8 V0 Z, w! A
 
 & m5 @6 N' a  N, c4 ~; |4 m
 6 M: a! e- @* r3 q. @
 5 D5 G) N- l3 a+ K! V    //打印结果7 B8 _' J- `* c4 f% Z
 
 2 |! o5 O8 l5 `; W6 {$ J* O, x. g; a    String jobName = "user defined streaming source";8 C8 o4 F, |: m$ S
 
 ! Y: U, z$ H7 `2 f! k/ A    env.execute(jobName);
 2 @9 V& J  t' V+ O% }, J1 C
 ( m- ?6 k3 c1 B0 b* e. n}
 % i; o/ B5 A( t4 T+ k</code></pre>5 h1 b! y) L/ V4 Z3 W
 <p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
 3 f2 q/ {; E4 `9 e* B. T3 e! e<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
 9 `. i( _8 N6 @4 w7 H8 G3 j<h3 id="总结">总结</h3>& U4 B  X" W9 [
 <p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
 $ u! [3 `& S9 [  T+ S<blockquote>! ^( j7 i* b% m) E1 G( ^" c1 ]
 <p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>3 k' b  l5 F- v( ^. I
 </blockquote>2 r9 U" g' {/ f+ j- y2 O
 : o! N/ `% {* E
 
 | 
 |