admin 发表于 2022-2-12 14:35:42

第10讲:Flink Side OutPut 分流


<h4 id="flink系列文章">Flink系列文章</h4>
<ol>
<li><ahref="https://www.ikeguang.com/?p=1976">第01讲:Flink 的应用场景和架构模型</a></li>
<li><ahref="https://www.ikeguang.com/?p=1977">第02讲:Flink 入门程序 WordCount 和 SQL 实现</a></li>
<li><ahref="https://www.ikeguang.com/?p=1978">第03讲:Flink 的编程模型与其他框架比较</a></li>
<li><ahref="https://www.ikeguang.com/?p=1982">第04讲:Flink 常用的 DataSet 和 DataStream API</a></li>
<li><ahref="https://www.ikeguang.com/?p=1983">第05讲:Flink SQL &amp; Table 编程和案例</a></li>
<li><ahref="https://www.ikeguang.com/?p=1985">第06讲:Flink 集群安装部署和 HA 配置</a></li>
<li><ahref="https://www.ikeguang.com/?p=1986">第07讲:Flink 常见核心概念分析</a></li>
<li><ahref="https://www.ikeguang.com/?p=1987">第08讲:Flink 窗口、时间和水印</a></li>
<li><ahref="https://www.ikeguang.com/?p=1988">第09讲:Flink 状态与容错</a></li>
</ol>
<blockquote>
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
</blockquote>
<p>这一课时将介绍 Flink 中提供的一个很重要的功能:旁路分流器。</p>
<h3 id="分流场景">分流场景</h3>
<p>我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?</p>
<h3 id="分流的方法">分流的方法</h3>
<p>通常来说针对不同的场景,有以下三种办法进行流的拆分。</p>
<h4 id="filter-分流">Filter 分流</h4>
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CAy6ADUaXAACSFUbdpuA911-20210223084827182.png" ></p>
<p>Filter 方法我们在第 04 课时中(Flink 常用的 DataSet 和 DataStream API)讲过,这个算子用来根据用户输入的条件进行过滤,每个元素都会被 filter() 函数处理,如果 filter() 函数返回 true 则保留,否则丢弃。那么用在分流的场景,我们可以做多次 filter,把我们需要的不同数据生成不同的流。</p>
<p>来看下面的例子:</p>
<p>复制代码</p>
<pre><code class="language-java">public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //获取数据源
    List data = new ArrayList&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt;();
    data.add(new Tuple3&lt;&gt;(0,1,0));
    data.add(new Tuple3&lt;&gt;(0,1,1));
    data.add(new Tuple3&lt;&gt;(0,2,2));
    data.add(new Tuple3&lt;&gt;(0,1,3));
    data.add(new Tuple3&lt;&gt;(1,2,5));
    data.add(new Tuple3&lt;&gt;(1,2,9));
    data.add(new Tuple3&lt;&gt;(1,2,11));
    data.add(new Tuple3&lt;&gt;(1,2,13));

    DataStreamSource&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt; items = env.fromCollection(data);



    SingleOutputStreamOperator&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt; zeroStream = items.filter((FilterFunction&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt;) value -&gt; value.f0 == 0);

    SingleOutputStreamOperator&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt; oneStream = items.filter((FilterFunction&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt;) value -&gt; value.f0 == 1);



    zeroStream.print();

    oneStream.printToErr();





    //打印结果

    String jobName = "user defined streaming source";

    env.execute(jobName);

}
</code></pre>
<p>在上面的例子中我们使用 filter 算子将原始流进行了拆分,输入数据第一个元素为 0 的数据和第一个元素为 1 的数据分别被写入到了 zeroStream 和 oneStream 中,然后把两个流进行了打印。</p>
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/Ciqc1F7CA2WAYbshAAKj494h86s723-20210223084827296.png" ></p>
<p>可以看到 zeroStream 和 oneStream 分别被打印出来。</p>
<p>Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。</p>
<h4 id="split-分流">Split 分流</h4>
<p>Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。</p>
<p>我们来看下面的例子:</p>
<p>复制代码</p>
<pre><code class="language-java">public static void main(String[] args) throws Exception {



    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    //获取数据源

    List data = new ArrayList&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt;();

    data.add(new Tuple3&lt;&gt;(0,1,0));

    data.add(new Tuple3&lt;&gt;(0,1,1));

    data.add(new Tuple3&lt;&gt;(0,2,2));

    data.add(new Tuple3&lt;&gt;(0,1,3));

    data.add(new Tuple3&lt;&gt;(1,2,5));

    data.add(new Tuple3&lt;&gt;(1,2,9));

    data.add(new Tuple3&lt;&gt;(1,2,11));

    data.add(new Tuple3&lt;&gt;(1,2,13));





    DataStreamSource&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt; items = env.fromCollection(data);





    SplitStream&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt; splitStream = items.split(new OutputSelector&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt;() {

      @Override

      public Iterable&lt;String&gt; select(Tuple3&lt;Integer, Integer, Integer&gt; value) {

            List&lt;String&gt; tags = new ArrayList&lt;&gt;();

            if (value.f0 == 0) {

                tags.add("zeroStream");

            } else if (value.f0 == 1) {

                tags.add("oneStream");

            }

            return tags;

      }

    });



    splitStream.select("zeroStream").print();

    splitStream.select("oneStream").printToErr();



    //打印结果

    String jobName = "user defined streaming source";

    env.execute(jobName);

}
</code></pre>
<p>同样,我们把来源的数据使用 split 算子进行了切分,并且打印出结果。</p>
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA4aAbUSJAAG1LWNB3qw627-20210223084827377.png" ></p>
<p>但是要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。</p>
<p>复制代码</p>
<pre><code class="language-java">Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
</code></pre>
<p>这是什么原因呢?我们在源码中可以看到注释,该方式已经废弃并且建议使用最新的 SideOutPut 进行分流操作。</p>
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CA6OAJ-JDAAIrh1JSAEo033-20210223084827602.png" ></p>
<h4 id="sideoutput-分流">SideOutPut 分流</h4>
<p>SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:</p>
<ul>
<li>定义 OutputTag</li>
<li>调用特定函数进行数据拆分
<ul>
<li>ProcessFunction</li>
<li>KeyedProcessFunction</li>
<li>CoProcessFunction</li>
<li>KeyedCoProcessFunction</li>
<li>ProcessWindowFunction</li>
<li>ProcessAllWindowFunction</li>
</ul>
</li>
</ul>
<p>在这里我们使用 ProcessFunction 来讲解如何使用 SideOutPut:</p>
<p>复制代码</p>
<pre><code class="language-java">public static void main(String[] args) throws Exception {



    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    //获取数据源

    List data = new ArrayList&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt;();

    data.add(new Tuple3&lt;&gt;(0,1,0));

    data.add(new Tuple3&lt;&gt;(0,1,1));

    data.add(new Tuple3&lt;&gt;(0,2,2));

    data.add(new Tuple3&lt;&gt;(0,1,3));

    data.add(new Tuple3&lt;&gt;(1,2,5));

    data.add(new Tuple3&lt;&gt;(1,2,9));

    data.add(new Tuple3&lt;&gt;(1,2,11));

    data.add(new Tuple3&lt;&gt;(1,2,13));





    DataStreamSource&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt; items = env.fromCollection(data);



    OutputTag&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt; zeroStream = new OutputTag&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt;("zeroStream") {};

    OutputTag&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt; oneStream = new OutputTag&lt;Tuple3&lt;Integer,Integer,Integer&gt;&gt;("oneStream") {};





    SingleOutputStreamOperator&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt; processStream= items.process(new ProcessFunction&lt;Tuple3&lt;Integer, Integer, Integer&gt;, Tuple3&lt;Integer, Integer, Integer&gt;&gt;() {

      @Override

      public void processElement(Tuple3&lt;Integer, Integer, Integer&gt; value, Context ctx, Collector&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt; out) throws Exception {



            if (value.f0 == 0) {

                ctx.output(zeroStream, value);

            } else if (value.f0 == 1) {

                ctx.output(oneStream, value);

            }

      }

    });



    DataStream&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt; zeroSideOutput = processStream.getSideOutput(zeroStream);

    DataStream&lt;Tuple3&lt;Integer, Integer, Integer&gt;&gt; oneSideOutput = processStream.getSideOutput(oneStream);



    zeroSideOutput.print();

    oneSideOutput.printToErr();





    //打印结果

    String jobName = "user defined streaming source";

    env.execute(jobName);

}
</code></pre>
<p>可以看到,我们将流进行了拆分,并且成功打印出了结果。这里要注意,Flink 最新提供的 SideOutPut 方式拆分流是<strong>可以多次进行拆分</strong>的,无需担心会爆出异常。</p>
<p><img src="https://kingcall.oss-cn-hangzhou.aliyuncs.com/blog/img/CgqCHl7CBMKAGHoUAAM-5UL5geg132-20210223084827698.png" ></p>
<h3 id="总结">总结</h3>
<p>这一课时我们讲解了 Flink 的一个小的知识点,是我们生产实践中经常遇到的场景,Flink 在最新的版本中也推荐我们使用 SideOutPut 进行流的拆分。</p>
<blockquote>
<p>关注公众号:<code>大数据技术派</code>,回复<code>资料</code>,领取<code>1024G</code>资料。</p>
</blockquote>

页: [1]
查看完整版本: 第10讲:Flink Side OutPut 分流