kuhuo
kuhuo
发布于 2024-06-30 / 19 阅读
0
0

Flink DataStream API 批处理能力演进之路

最近在和一个朋友闲聊时,他问了一个很有意思的问题:Flink 是如何在流处理引擎上支持批处理能力的?

鉴于 Flink 已经成为了流处理领域的事实标准,可能很多人都不知道,Flink 在诞生的第一天起就是支持批处理的。DataSet API 也是从那时起就被引入的,它被用来支持对有界数据的批处理操作。随着 Flink 社区逐步意识到基于 Pipeline 的架构非常适合流处理,因此发展出了 DataStream API[1],它是为无界的流式应用开发的,引入了状态,事件时间和窗口等特殊概念。

但随着对两套 API 本质的深入思考,Flink 社区逐渐发现:DataStream API 其实完全可以成为 DataSet API 的超集。

  • 概念上:有界数据集只是无限数据流的一种特例。

  • 语义上:DataStream API 覆盖了 DataSet API 的大部分,同时还有针对实时流计算的扩展,只有少数和分区有关的语义暂时没有支持。具体的差异见下图:

同时维护两套 API 也对社区造成了很大的困扰,并且用户开发作业前必须提前在两种 API 中作出选择。对于用户来说,离线和实时作业具有相同的处理逻辑是很常见的。如果只编写一次代码,就能达到分别开发流和批两个作业的相同效果,将会带来极大的便利。鉴于以上诸多原因,Flink 最终走上了基于 DataStream 的流批一体的发展道路。也正是如此,Flink 社区早在 1.12 版本就开始逐步弃用 DataSet API,将会在 Flink 2.0 中完全移除 DataSet 相关代码。同时,不断提升 DataStream 上的批处理能力,以 DataStream 为核心打造流批一体的 API。

流批一体是一个相对宽泛的概念,它包含 API,调度,Shuffle,容错等多个维度,本文主要关注于 API 及其底层算子执行上 DataStream 对批处理所做的工作,其他细节可以参考文章《Flink 执行引擎:流批一体的融合之路》[2]。下面我们将沿着批处理语义和性能优化以及 Batch API 功能增强两个大的方向回顾 Flink DataStream API 批处理能力演进之路。

01

批处理语义和性能优化

DataStream API 虽然理论上可以覆盖绝大多数 DataSet API 上的语义和操作,但在一些细微之处还是存在一些差异。下面我们从几个方面详细介绍一些 Flink 社区在这方面所做的努力。

1.1 输出语义

为了最大化数据的实时性,DataStream 上算子的输出是增量式的。例如:KeyedStream.reduce,它会在每次到来一条新的数据时更新内部维护的状态,并向下游发送当前最新的聚合值。用数据库的术语来说,它产生了一个 Upsert 流作为输出: 如果一个键有 10 个输入元素,那么也会得到 10 条输出记录。

而对于实时性往往没有这么强要求的批作业来说,这些中间的增量输出会极大地增加下游算子的计算负担。由于批作业的算子不需要感知数据的 Changelog, 其更期望的是一种 All-or-Nothing 式的输出语义,即仅仅在每个 Key 最后一条数据到来后,才向下游发送数据。因此,我们需要在批模式下对一些 API(例如:KeyedStream#reduce, sum,min,max,minBy,maxBy) 的行为做出改变,使其仅在输入结束时输出最终结果。

下表描述了 Sum 操作在流和批两种模式下的输入输出情况:

(假设它们具有相同的 Key,4 为该 Key 的最后一条数据)

输入

流模式输出

批模式输出

1

1

2

3

3

6

4

10

10

1.2 状态访问和更新算法

对于有状态算子,DataSet 算子在迭代数据时直接在内存中维护最新的状态值。在 DataStream API 中,状态的访问和更新则是通过与 StateBackend 交互所进行的。实现流批一体的统一架构,就意味着 DataStream API 在流和批模式下要尽可能复用相同的算子实现。但是与 RocksDB 等 StateBackend 交互会带来不小的 IO 开销,站在 Flink 开发者的视角上,该如何解决这个问题呢?让我们更深入地思考一下他们之间的本质差异。

流模式下 DataStream API 上的聚合算法其实可以类比为基于 Hash 的聚合,StateBackend 在这里扮演着哈希表的角色。下图展示了在流模式下一个 KeyedOperator 的输入数据和状态存储的关系:

(绿色部分表示新数据到来后状态存储的更新)

我们可以看到:在流模式下,状态存储必须维持一个哈希表,为每个 Key 存储一条 Item。值得注意的是,该状态并不是完全存储在内存中的,达到一定阈值后需要溢写到磁盘。由于批作业是没有 Checkpoint 的,并且其 Shuffle 的中间数据直接写入到了磁盘中,发生 Failover 后直接从上一个 Stage 的数据重新计算状态即可,因此并不需要对状态进行持久化存储,理论上状态完全可以放在内存中。

接下来要考虑的是内存是否有 OOM 风险:对于单个 Key 来说,其状态不会非常大。由于批作业的数据是有界的,如果我们能对 key 进行分组,就可以在同一时间只追踪单一 Key 的状态。沿着这个思路,我们可以把基于 Hash 的状态访问算法变为基于排序的。因此,Flink 在批执行模式下会对 KeyOperator 的所有输入数据按 Key 进行排序,并且在该模式下使用一种特殊的 StateBackend,它在内存中追踪当前 Key 所对应的状态,当 Key 发生切换时清除上一个 Key 的状态值。

批执行模式下,一个 KeyedOperator 的输入数据和状态存储的关系如下图所示:

需要注意的是,这种方式引入了额外的数据排序开销,当状态访问的频率比较低,状态的数据量比较小时,对性能会有负面影响。但是考虑到绝大多数的批处理作业规模都比较大,其中的有状态算子往往需要 per-record 的访问和更新状态。比如对常见的 Join、Group Agg 等,往往存在很多重复 Key 的数据,该优化带来的收益通常比排序带来的开销要大的多。

1.3 EventTime 和 Watermark

实时数据流中事件可能是乱序的,即时间戳为 T 的事件可能出现在时间戳为 T+1 的事件之后。此外,系统无法确定是否将来还有时间戳为 t < T 的元素到来。因此,Flink 的流处理模式是建立在事件的顺序无法得到保证的前提下的。为了消除这种无序性带来的影响,Flink 引入了一种名为 Watermark 的标记。一个时间戳为 T 的 Watermark 到来,表示不会再收到或者可以直接忽略任何 t < T 的数据。

但在批执行模式下,数据是有界的,我们明确知道每一条数据的时间,因此可以认为不存在无法预知的迟到数据。发送中间的 Watermark 是没有意义的, 反而只会增加网络传输的压力和下游处理这些 Watermark 的复杂度。由于定时器和窗口的闭合都需要 Watermark 来触发,因此我们可以只在输入结束时发送 MAX_WATERMARK,或者在每个 Key 结束时发送 MAX_WATERMARK。这样既不会引入太多开销,又可以统一流批算子对于 EventTime 的处理。

02

Batch API 功能增强

需要注意的是,DataStream API 和 DataSet API 所支持的操作并非完全一一对应。Flink 社区有一个官方迁移文档来专门讲解如何从 DataSet 作业迁移到 DataStream 作业[3]。在该文档中,根据迁移所带来的代码改动和执行效率的差异,把 DataSet API 分成了四大类:

  1. 在 DataStream 有等价的 API,只需要很少的方法名改动就可以完成迁移。

  2. 通过 DataStream 上不等价的其他 API 可以实现同样的行为,迁移虽然需要进行代码改动,但是执行效率和 DataSet 相同。

  3. 通过 DataStream 上不等价的其他 API 可以实现同样的行为,迁移不仅需要进行代码改动,而且执行效率可能会存在一些差异。

  4. 目前 DataStream 没有支持,且没有简单的 Workaround 的 API。

按照目前 DataStream 上对这些操作的支持情况,我们又可以把它们进一步分为下面两大类:

2.1 完美支持或者可以通过 Workaround 支持

上述四类中,第1和第2类都属于可以无痛迁移的,第3类可以通过 Workaround 来实现,但是在执行效率上有比较大的差异。因此,我们主要关注于第三和第四类。

第三类主要有两种操作:全量 Partition 处理以及笛卡尔积。DataStream 上可以通过 Window 机制来支持这类需求,但是其中主要存在以下两个问题:

2. 需要明确知道输入在什么时候结束,在拿到全量数据后才能进行处理。

Flink 目前内置的窗口一般都是随着时间推进到某个具体的点,或者输入数据的量达到某个具体的值来触发的。并没有一种能够感知输入是否结束的窗口实现。文档通过自定义的 WindowAssigner 和 Window Trigger 实现了一种仅在输入结束时才触发计算的窗口。

随着用户作业的迁移,我们看到这种需求其实广泛存在,因此 Flink 社区在 FLIP-331[4] 中提出了 EndOfStreamWindow 的概念,并会在 Flink 1.20 版本中进行支持,你可以通过如下方式来使用:

input.window(GlobalWindows.createWithEndOfStreamTrigger())
                .apply(
                        new WindowFunction<T, R, KEY, GlobalWindow>() {
                            @Override
                            public void apply(
                                    KEY key,
                                    GlobalWindow window,
                                    Iterable<T> input,
                                    Collector<R> out)
                                    throws Exception {
                                // do something with the iterable input, It has all the input data.
                            }
                        },
                        resultType);

1. Non-Keyed Stream 上不支持窗口操作

Flink 中的窗口是基于 State 来实现的,而不同 Key 的 State 是不属于同一个命名空间的,因此窗口只有在能明确定义 Key 的流上才有意义。文档中引入如下函数来给数据附加上当前分区(并行度)的信息,然后以该字段作为数据的 Key。


public static class AddSubtaskIDMapFunction<T> extends RichMapFunction<T, Tuple2<String, T>> {
    @Override
    public Tuple2<String, T> map(T value) {
        return Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value);
    }
}

这种方式虽然可以产生正确结果,但也引入了per-record的额外开销。为了优化这个问题,Flink 社区在 FLIP-380[5] 中引入了对 Non-Keyed 全量分区处理的原生支持。下面一一介绍这几个 API 的使用方式和注意事项:

(1)Map Partition

该 API 用来对一个分区的数据做全量处理,并在获取所有数据后进行输出。

假如我们需要计算每个分区内数据的条数,并输出给下游算子。可以使用如下方式来实现:


inputStream.fullWindowPartition()
                .mapPartition(
         new MapPartitionFunction<Record, Long>() {
                             @Override
                    public void mapPartition(
                            Iterable<Record> values, Collector<Long> out)
                            throws Exception {
                        long counter = 0;
                        for (T value : values) {
                            counter++;
                        }
                        out.collect(counter));
                    }
          })

它与 map 的主要区别如下:

MapPartition

Map

计算触发时机

所有输入结束后触发一次

每条输入数据都会触发一次

输入数据类型

包含所有数据的Iterable对象

每条数据自身

值得注意的是:MapPartition 虽然给调用者提供了一个基于全量数据的 Iterable 对象,但它并不会把全量数据都加载到内存。该 API 的底层实现充分利用了 Flink 执行引擎的反压机制,在对 Iterable 对象进行迭代时只会按需把数据加载到内存。

(2)Reduce/Aggregate Partition

该 API 主要用于对分区内的数据做全量聚合,分别需要传入 ReduceFunction 和 AggregateFunction。ReduceFunction 描述了两条输入数据如何合并产生同样类型的输出数据,而 AggregateFunction 是更通用的 ReduceFunction, 它通过引入一个中间的 Accumulator, 支持产生不同类型的输出。

下面的例子展示了如何在一个双字段的 Tuple 数据流上对其第二个字段做全量聚合

inputStream.fullWindowPartition()
       .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                        @Override
                        public Tuple2<String, Integer> reduce(
                              Tuple2<String, Integer> value1,
                              Tuple2<String, Integer> value2) throws Exception {
                          return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                                    }
                                })

(3)Sort Partition

另一种比较重要的操作就是排序,对分区内数据进行排序的需求在批处理中是广泛存在的。理论上,我们可以通过 MapPartition 来轻松实现全内存的排序,但是在大规模 Batch 作业中,把数据全部加载到内存中往往是不现实的。SortPartition API 支持外部排序,在数据量到达一定阈值后会溢写磁盘,因此无需担心内存的 OOM 问题。

下面是一个对分区内数据做全量升序排列的示例代码:

DataStreamSource<Tuple2<String, Integer>> source = xxxxx
 // 按照 tuple 的第一个字段进行排序
 SingleOutputStreamOperator<Tuple2<String, Integer>> sortedPartition =
                source.fullWindowPartition().sortPartition(1, Order.ASCENDING);

注意:排序算子会使用 Flink Managed Memory。内存的大小会影响排序的效率,过小的内存会导致数据频繁地写入和读出磁盘。如果你的一些排序操作相对较重(数据 Record 比较大,数据量比较多),建议调大“execution.sort-partition.memory”值来提升性能。

2.2 目前还不支持

上述第四类代表目前 Flink DataStream API 还没有支持的操作。主要有两种: RangePartition 和 GroupCombine.

其中 GroupCombine 会把数据分成多个批次,对每个批次的数据进行合并。它并不是用户的业务需求,是引擎为了提高执行效率而对用户的需求,因此Flink 社区暂时没有计划支持该操作。而 RangePartition 基于现有的 DataStream API 可以实现,但是相对复杂(需要用户实现复杂的采样算法),笔者所在的团队已经对此在做 PoC 实现了,未来会在合适的时机贡献回社区。

03

总结

本文回顾了 Flink 在批处理能力上从 DataSet API 到流批一体的 DataStream API 的演进,并从批处理语义&性能优化以及 Batch API 功能增强两大方面分别展示了 Flink 社区是如何思考和提升 DataStream 批处理能力的,相信随着社区的不断努力,Flink Batch 会越来越好。Flink DataStream API 的流批一体能力也将在数据处理领域扮演越来越重要的角色。


评论