飞雪团队

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 9291|回复: 0

第10讲:Flink Side OutPut 分流

[复制链接]

6738

主题

6826

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
22542
发表于 2022-2-12 14:35:42 | 显示全部楼层 |阅读模式
$ r4 f) d9 I$ F. f
<h4 id="flink系列文章">Flink系列文章</h4>
9 @5 l: O6 O* s<ol>
. d5 F1 j" q0 O- C% o<li><a  href="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
, `; u( j) R/ ^0 \0 k<li><a  href="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>- S6 O' N! R: m
<li><a  href="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
2 {' ^7 D) L  l0 s  ?<li><a  href="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>* t- s2 O' X6 G  p
<li><a  href="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL &amp; Table 编程和案例</a></li>" |1 `' ~  p& `1 q* I( M
<li><a  href="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>" b; }& I& k* X- T( o
<li><a  href="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
4 L- R2 h# U3 }! I. f<li><a  href="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
3 ]9 i' D! G) R8 S<li><a  href="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
: w1 C+ T: R! U, ^9 E! q% |" f</ol>
9 J9 o3 U* H8 W+ {4 Z; t<blockquote>
6 B0 r" {) f! V- K9 V7 ~- f<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>% Z0 s% t% k2 Y
</blockquote>
/ [/ _. B+ T( o<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
8 q0 A; q, r, ^; x* ?$ w0 M* G0 b<h3 id="分流场景">分流场景</h3>5 d9 ~; u  d; Z4 I5 p% b
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
8 ^' z1 j0 }6 K- n" m<h3 id="分流的方法">分流的方法</h3>1 z% |+ t/ @9 m$ W! ]; x4 Y
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
& I$ b$ G8 }! e1 j# o) w<h4 id="filter-分流">Filter 分流</h4>: ]- \) j% d( i+ k0 G2 W
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
* K, y& W& }% O  {8 v: `' s6 A<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>) W" T& [: p; Y, A
<p>来看下面的例子:</p>. v9 ?3 z9 \# u
<p>复制代码</p>
+ r( S+ H2 x. J* Y# J* g. J8 }6 ~<pre><code class="language-java">public static void main(String[] args) throws Exception {) e/ X) ~8 m) ]2 z6 e
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();! Z9 p' j4 `& [2 m
    //获取数据源
% p7 T% l1 s, [  i  C- I) I    List data = new ArrayList&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt;();7 [4 u; d+ a7 }; k1 L6 o, n8 F
    data.add(new Tuple3&lt;&gt;(0,1,0));6 v4 d' m" V2 ]) N! U
    data.add(new Tuple3&lt;&gt;(0,1,1));
* V2 H0 P, V- z# X" y8 e: G! Y( Z    data.add(new Tuple3&lt;&gt;(0,2,2));# P( J4 L$ `6 @
    data.add(new Tuple3&lt;&gt;(0,1,3));
3 f9 f: G  w5 s3 y, Y* k( x    data.add(new Tuple3&lt;&gt;(1,2,5));
. R! g/ O( l" P& v3 ?    data.add(new Tuple3&lt;&gt;(1,2,9));4 z/ Y, P( X' ~
    data.add(new Tuple3&lt;&gt;(1,2,11));
1 {6 j0 A; @5 e: N$ F0 Q    data.add(new Tuple3&lt;&gt;(1,2,13));
+ N/ s  y7 u( o' u" M$ m6 {3 _
$ F4 P. R! n" N$ q  n6 I& P    DataStreamSource&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt; items = env.fromCollection(data);
1 ?8 {8 F+ x. U; ~& m( Y7 W- b, n, {; P! K
  I5 k; O2 W% D5 y2 t. E/ @; w

- |- V6 Q1 m1 O* [& ]* w    SingleOutputStreamOperator&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt; zeroStream = items.filter((FilterFunction&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt;) value -&gt; value.f0 == 0);
0 {' w* {  v% h% W" O, F0 H( f; k* x8 f, }7 n% C
    SingleOutputStreamOperator&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt; oneStream = items.filter((FilterFunction&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt;) value -&gt; value.f0 == 1);
4 x% E; e6 \* c4 e. s7 A3 {, O6 |: u3 Z6 s

. j7 V, A. u/ i8 j7 ]5 Y1 J2 {# ?
8 b# X4 b2 N5 n& N! ]; o; I7 g4 {    zeroStream.print();# k9 c' l  i0 m+ J- K9 K

9 s! i, n' I8 P$ U9 |    oneStream.printToErr();
7 h5 s! E/ r! X; p, H( X: [
- J. h  @; h/ s+ `" ~$ D/ C  S
2 ^( h# Z* B, Y% D) v0 }& i9 X4 q1 K: n8 `* X) S
7 s- L& ^( ^& K) T, |6 E

4 I( [# x5 M7 l/ U0 D9 f  p    //打印结果  Y* S  |. p' n6 h
, @- S" g  ~+ {$ |- p+ v) M
    String jobName = "user defined streaming source";
0 t- T% m. W3 v# ^# z  j
2 L) i0 c0 K4 c    env.execute(jobName);" q0 _" i( S: {& k- l6 Z

; _- z' T8 M6 K# q* e}0 l" _# N, b/ y. k) g$ h& M6 ^5 z
</code></pre>. y2 S' p* g1 {  x# k/ J0 h+ b
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>6 q- e$ @' G% q1 Y# W# ^- E
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
) \' L* @# x' Q  i6 o% y<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
& n: c) s* J5 o5 d( G  w<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
6 f2 H) _8 f6 A  v& }<h4 id="split-分流">Split 分流</h4>4 @8 z" b/ o: n& z2 V
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>6 A! g, G% B5 `  p3 `/ V# L7 }5 S
<p>我们来看下面的例子:</p>& G1 e& J, ]( U
<p>复制代码</p>9 u* L: O  `# J8 `- C% m! i
<pre><code class="language-java">public static void main(String[] args) throws Exception {
0 x4 @1 `. Q, s( {( s$ {; r) s7 C& O% [, ~0 e

7 t7 Z2 o" O; y3 B/ G2 h$ Y& @( a* o* q' [: C6 p
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
! e  V3 [; d1 M0 Z* t7 Y
& Z* U" V3 E, P1 E7 V    //获取数据源
1 s; H  v; C) h3 j( v' v( }- y2 Y4 C) L. i
    List data = new ArrayList&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt;();
. b( z& w# r9 G( ~- h: Y  Y- T* W
9 n8 R. B6 t. k    data.add(new Tuple3&lt;&gt;(0,1,0));4 |8 ^1 A/ n+ q8 i" ]' p
0 k5 F- C; p, t' S* N; c
    data.add(new Tuple3&lt;&gt;(0,1,1));
* C. k! T' ]6 A  ?. x% G* h$ y% s" R0 d" E/ ~) c: B
    data.add(new Tuple3&lt;&gt;(0,2,2));
) }$ @& }- c/ M+ a
+ [0 G/ M9 |) A/ j0 u* W  Q    data.add(new Tuple3&lt;&gt;(0,1,3));
, a, H2 L5 |" l$ X1 ~0 ?5 b$ p/ U+ q. h2 V2 M, q1 h- Z# E
    data.add(new Tuple3&lt;&gt;(1,2,5));
* K( {* j- |% n7 f* _4 ~
( L* E. e0 ]( J" x' b' [9 I" N+ G    data.add(new Tuple3&lt;&gt;(1,2,9));) L. B4 V: ?( r  O2 G; O8 X

5 o6 L" ~# R7 y& q    data.add(new Tuple3&lt;&gt;(1,2,11));
5 s/ h5 ^6 `$ c- ]8 I3 M( Y+ o5 B  i. f7 J1 K% O
    data.add(new Tuple3&lt;&gt;(1,2,13));; t7 |9 {% o! Y% E
6 x$ X! O6 J- o! p, }5 c
$ o) g* r9 {; N  L5 N, w( i

- p) K. F3 a7 j6 [. i! A6 x) E# P# O
+ g4 g& M  l& N; r$ J2 j  u/ N: e8 k) w
    DataStreamSource&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt; items = env.fromCollection(data);
( P5 |  C( d& D6 S6 v. u# v  B0 n
6 z: Y2 b2 n2 A% q; Q& j" E
5 R8 a: p2 F5 W+ E& k+ X) i9 F
/ R% z5 n+ ~  j
5 Y, ~: `  f2 f( K; Z: l1 S: \- o, \/ F6 E$ W  c4 g( g
    SplitStream&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt; splitStream = items.split(new OutputSelector&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt;() {  T# i! P* c  T. d
# N7 i$ k; Z) c
        @Override
  w- B0 E% [8 F: ?3 ]& I. h
: d6 T: m+ X; `0 A. W% [0 C        public Iterable&lt;String&gt; select(Tuple3&lt;Integer, Integer, Integer&gt; value) {( d  d1 C- p5 o/ |

( i. ^' U! l# q0 p- F! @" w            List&lt;String&gt; tags = new ArrayList&lt;&gt;();3 g) z8 R( `* _4 ]: T
/ G% ?3 G. R4 o& h( m0 B, X5 q
            if (value.f0 == 0) {
" n7 Y; A: x- s4 g# J* \! o2 o
9 E" A3 P$ i/ n/ h4 g0 K  [+ l0 i+ U( N                tags.add("zeroStream");
7 v8 T$ c+ `1 M/ N8 U' T: H/ q5 M9 D
" l: r5 Y+ O( a* ^            } else if (value.f0 == 1) {
& p9 z. f; |7 V
; T9 a: J+ r. f( l; ^3 P                tags.add("oneStream");
# Y2 a+ Q) L4 A# ~
- N( T9 P* i) F$ q9 O7 h  T            }9 _& \( \& b& k6 n

& h1 D2 Z$ T  l/ j: }            return tags;
5 s' `& i+ f- Z% d, H; Z4 d( I3 Y! p. _+ g- e4 D
        }
9 Y5 r0 r- A. s) ^% N& ^1 C0 l6 Z% S/ F9 }
    });* k  |+ H- `4 G# ?& V8 p. W. h
0 T: E( u6 `9 d; ^6 R  O: s

3 K- n4 b7 F/ u! ~1 }, T; S9 k8 j) s2 j1 @! [! ]
    splitStream.select("zeroStream").print();. J6 Z6 P8 `9 G5 ]7 Q
8 `& {" F# Q3 G4 x7 `; }
    splitStream.select("oneStream").printToErr();$ T  b, k2 ~; P
9 R! D& \( w4 g: G2 m
0 e  W; e: t# q$ d* ?
( G1 ~4 P1 R3 `1 w
    //打印结果
! s8 ?! D8 N' C/ O/ V% w% b! @/ O& n; I  |0 J. G
    String jobName = "user defined streaming source";
% r0 W6 Z8 K4 X/ }/ i  U1 c4 D8 F) Q. ^8 H* j' H
    env.execute(jobName);' f9 k8 w8 q# q
% H( m6 ^; u* t5 {* z
}
2 L7 n! Q; o) b( L/ H</code></pre>
  p8 ?2 H# L4 q<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>4 G: y; R  a2 ^+ V6 X2 K
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>0 j5 K1 n# q5 ]
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>8 E. q' i% N1 y- ~
<p>复制代码</p># O+ O9 E4 j: t: \2 y
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.2 e( p2 i+ F7 s4 O4 M
</code></pre>( ?6 Q& V: D2 z& _# f4 c" u+ _. V
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
# n9 x) ?2 S3 _& B! n' r<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>" N' }9 j# j( l4 M4 h
<h4 id="sideoutput-分流">SideOutPut 分流</h4>) H, U9 o# L0 M; l* U; }
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
5 b( C0 S# }) k1 N0 T<ul>
/ R# O7 L* P. ~) f! v0 X/ B/ S<li>定义 OutputTag</li>2 S" r. I0 W8 T- T: g9 E* P
<li>调用特定函数进行数据拆分$ n4 q  J* S* w* P# V5 T
<ul>
+ b: H3 F+ q, Y% h* H0 G) i<li>ProcessFunction</li>! i3 p+ I* z. k) O) U" _) k2 G
<li>KeyedProcessFunction</li>* p' E6 j7 T5 w0 U( V* m; ]
<li>CoProcessFunction</li>
# j) i: Q" H6 b+ p& O( ~9 t<li>KeyedCoProcessFunction</li>
; S) C  ~4 l3 U' F- z! g) I/ u3 L- u<li>ProcessWindowFunction</li>5 x# S0 |3 q$ S/ f& V( K
<li>ProcessAllWindowFunction</li>
# a2 M/ ]" I- R, h+ |7 J</ul>
# e) e& S2 a: S' |# ^</li>
7 S8 M0 H5 v- @$ W! i) ^" A</ul>
/ {4 b# M" {+ U2 y: W5 u<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
3 i' S. T& r3 b  Q2 H' U5 a<p>复制代码</p>& j7 m2 B/ G) F  Y, c
<pre><code class="language-java">public static void main(String[] args) throws Exception {
& M- Y: H" ^9 q4 |/ M
: D# A/ W: O: P+ V1 h! N
0 H, H# v$ ^3 g
$ `; {. R! L1 R  X/ `    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();2 T2 G8 L3 |( V: e! c2 V  k4 D
; |' R0 g! h) c/ }# |' B( |+ c
    //获取数据源
9 }! f% v; B+ d* m" O; }0 X$ S# }% M& O8 N7 L4 d
    List data = new ArrayList&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt;();
" `5 V$ ?8 U0 W, H: j9 u9 A. C
# x' x8 Y  b* Z    data.add(new Tuple3&lt;&gt;(0,1,0));+ e7 m. h3 y4 D1 o& r+ G: c

- `" z; G1 Q2 f1 s& a    data.add(new Tuple3&lt;&gt;(0,1,1));
9 N" @" m2 x% d+ Q
( I3 c3 k- [5 T1 Y/ u9 ]. H    data.add(new Tuple3&lt;&gt;(0,2,2));
' ]& i5 H% w5 C7 t( K1 c1 b* {$ W) a6 Q" S4 m5 `% t
    data.add(new Tuple3&lt;&gt;(0,1,3));  K( C; `! w4 x6 M1 D4 F

/ _! x4 z3 P- U$ v. q    data.add(new Tuple3&lt;&gt;(1,2,5));
0 E9 i& M3 P% o0 G9 d8 x
& J2 g. o  Z! a  n: }$ h9 S& o# {9 A    data.add(new Tuple3&lt;&gt;(1,2,9));5 D: h: _2 V/ }! u2 U; ^( a

, K7 U8 g# ~& v    data.add(new Tuple3&lt;&gt;(1,2,11));7 m; z& v( X( Y  D

9 B5 G$ w( M+ T' W, Q    data.add(new Tuple3&lt;&gt;(1,2,13));# C: M( ]$ l# H3 d3 M4 X9 e
6 q# I% d! o' U# B

, ~/ L. p. f" a
& J& L1 d! |9 x( R) j/ x; I! T$ Y5 \

3 O& l, g6 y/ l  f/ e    DataStreamSource&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt; items = env.fromCollection(data);( {, d! Y1 X$ @$ L

2 @# |( ~- e& b$ F6 d9 ?( M) @0 \! r& ]# H. k

- C- U1 k& s  U' |    OutputTag&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt; zeroStream = new OutputTag&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt;("zeroStream") {};/ o8 j6 f) G; }, |  B( v
: b' K) n( N' D" L6 R! j0 Y3 y
    OutputTag&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt; oneStream = new OutputTag&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt;("oneStream") {};% m1 P- a% C) W' Z0 [) d, Y+ Z9 z, y
% l3 D! t. h$ n2 s, Y6 \6 g
* d) W6 W" {! L" T- t- V5 L; _* F" ~

! O3 {0 ~4 A- c* L; V( O: l& m" c# c! W* w$ F( C/ C
" j% y3 Q# B( r% h$ y- V
    SingleOutputStreamOperator&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt; processStream= items.process(new ProcessFunction&lt;Tuple3&lt;Integer, Integer, Integer&gt;, Tuple3&lt;Integer, Integer, Integer&gt;&gt;() {, F% P; t& ]) M  t: V+ u7 h
, l  G! P  `& e3 _, ]
        @Override
) |9 b5 ]' t, p3 E
# |+ W( S# [/ X5 q' [        public void processElement(Tuple3&lt;Integer, Integer, Integer&gt; value, Context ctx, Collector&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt; out) throws Exception {
/ e3 P( b: n; q5 _
- m4 S* K8 x# E3 }
4 o7 b. m& ?. \/ m0 {; k6 v2 P  C. j- r9 I8 s$ A9 i, u  N* C
            if (value.f0 == 0) {
6 r7 t: E2 [2 f4 V9 l& c! p$ d2 ^) q( H# l& Z7 `0 r. s8 f4 {
                ctx.output(zeroStream, value);
" Z2 ^" k; }* Q4 {4 ^! k% t! l) w1 P2 V
            } else if (value.f0 == 1) {: h! `) k. r3 Q3 V
; [: [# g% }+ W: }2 F: Z3 n  T
                ctx.output(oneStream, value);* ^% l4 [, i% O% O- p
% V. G" q3 ~. T$ E
            }
7 P% j4 u% v, ]" C1 G
$ p* u/ V7 m2 t0 {% O% [        }
( U6 v& P3 {: }6 V8 Z2 m
! f7 t, d" o0 R4 m+ k, W: Y1 G    });
; J$ d- k+ w' w" |
/ O6 H$ H2 g' b4 ~) y% d, I8 X
( f5 E& G9 i8 t' ]+ W, i2 T/ `9 s
- H  A. V' n2 }! V/ r0 @& d6 u, u8 M    DataStream&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt; zeroSideOutput = processStream.getSideOutput(zeroStream);% L0 g1 ?7 r7 r& V7 ?6 O" c* t
0 e/ D$ C, ?4 ?, @6 t2 Y9 V
    DataStream&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt; oneSideOutput = processStream.getSideOutput(oneStream);
$ \1 @/ }5 J+ g5 c5 |
* q# _7 l, P; T" E3 E3 }. L. X2 h" V
& Y9 J+ {) X( k9 Z* @/ C% l: I$ @: E/ r3 g/ t3 d
    zeroSideOutput.print();7 P1 |6 t1 U% ]1 K2 ]* n
6 \6 U. I! E' B, a) A( f9 _
    oneSideOutput.printToErr();/ ^# p  C5 d( ~( G0 f, f3 V* e

* \- y! A8 U+ I, R3 D
" P' f- Z$ |7 s% O+ P( H6 b+ l
- }4 @' [% L+ \' X! O& a; f1 I# t, \( f

! d# A6 k: |& y0 L5 ~! }    //打印结果. [1 ?+ c' j- D2 Y; g- y: ]( s
% Q' T# `+ p2 A- P3 ~) b
    String jobName = "user defined streaming source";$ G4 L+ c4 ]- a. W0 W1 t

' o8 z+ s& b8 A) z" d2 E    env.execute(jobName);
9 O) E$ F: |& H+ R
0 i5 x# G  P/ R}
2 N8 I* N+ f- |3 G</code></pre>
! s$ L" r% i: a" }<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
- r$ W3 f) ~2 o. t: C<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
  ]  F, n7 g% p2 `5 A<h3 id="总结">总结</h3>2 m9 P* p( W# x$ Z# [
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
) f2 {" V& t# Q! C<blockquote>( {9 `5 N7 z1 @9 m
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>) c) j# w; ~: l( {
</blockquote>3 _, k' L' z3 F3 ]8 p
/ r# W7 z1 \# X( }2 A" O! V; |# K
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|飞雪团队

GMT+8, 2025-6-15 16:09 , Processed in 0.065468 second(s), 21 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表