01
实时湖仓
这是一个非常常见的图,展示了数据架构的时效性演进。
目前在企业中典型的数据架构大致分为两种,一种是批式数仓,传统的 Hive 表加上 Hive 或 Spark 的计算,然后可能后面再对接一些 OLAP 引擎,包括 Doris 或 StarRocks。这套架构的主要问题在于时效性。这里的时效性分为两个含义:
第一个含义是 ETL 的时效性,也就是数据流入数仓中,什么时候能够全部处理完毕,准备好查询,这个 ETL 的时间在批处理计算中通常是按天或小时计算的,根据分区来定义。
另一方面是查询的时效性,会因使用的查询引擎而异。例如,使用 Spark 或 Hive 进行查询通常需要分钟级的时间,而使用 Doris 进行查询则可能达到秒级。
近年来,Flink 在中国的实时数据仓库领域得到了广泛应用,其架构包括 Flink 流计算、Kafka 作为中间数据流转,以及将结果表直接存储到 OLAP 系统中,这种纯流式的架构在许多企业中得到了推广。其 ETL 的时效性可以达到秒级,当然,这也取决于整个处理链路的不同,有些可能还是分钟级。查询的时效性可以更快,比如当数据最终存储到 ADS 层时,如果是 MySQL,可以提供毫秒级的查询;如果存储到 OLAP 系统,也可以提供毫秒级或秒级的查询。这两种架构在各个企业中都很常见。
作为一个早期从事流计算的研发人员,我一直在思考如何让实时数据处理在更广泛的范围内推广,让更多的数据能够进入实时处理领域,而不是所有数据都必须等到ETL(提取、转换、加载)的次日才能被查看。
随着近年来的推广,几乎所有企业都建立了实时架构和实时数据仓库架构,尤其是 Flink 架构。然而,在企业中,大部分数据仍然存储在批处理系统中。实际上,人们只是将大约 10% 的数据转换到实时数据仓库中,以实现秒级的事务和操作(ETL)。
因此,我们进行了许多尝试,比如第一个尝试是采用 Kappa 架构,将所有数据都导入实时数据仓库。但这样做整体的复杂性非常高,开发一条实时链路并不容易,中间结果不可查询,开发过程也很复杂。此外,最核心的问题是成本非常高,与传统的批式数据仓库相比,成本可能高出十倍、几十倍甚至上百倍。
第二个尝试是流批一体的架构。利用 Flink 的流计算能力,在满足实时数据流计算的同时,也利用 Flink 批式 ETL 的能力,至少在周期性计算层,让数据仓库开发人员能够编写一套周期性工作,这套工作可以在两种架构中通用。但随着这几年的推广,我们发现实现这一目标的难度非常大。一方面,Flink 的批处理功能还不够成熟。另一方面,还有一个非常核心的问题,即两套架构的存储方式不同。一边是 Kafka 或者 OLAP 系统如 Doris 和 StarRocks,另一边是 Hive 这种表存储的格式,它们的操作方式完全不同。如果只是用一套计算引擎的SQL 来统一它们,会发现在业务上根本无法使用。
因此近年来聚焦于实时湖仓的架构。我们分析了之前架构的问题,主要是两种架构太过分离,只有实时链路,即实时数据仓库这条链路,才能实现数据时效性的提升。
批处理架构最大的问题在于存储能力不足。比如使用 Hive 存储,仅仅是将文件放置在一个文件夹中,至于文件如何组织、如何处理,它一概不负责,只能通过写一个大的 insert overwrite 语句来更新分区,因此其能力极其有限。而湖格式的能力在于管理每一个文件,能够完成 ACID(原子性、一致性、隔离性、持久性)操作,甚至能够完成流式读取和流式写入,实现分钟级的更新。这就能够提升数据的时效性,或者使表向流式处理和实时处理方向发展。
这样我们就得出了这样一套架构——实时湖仓。它不再局限于批处理计算,而是既能进行批处理计算也能进行流计算,各种计算都可以在上面进行,存储是完全统一的。基于这种统一的计算,结合湖格式和 OLAP,能够达到分钟级的时效性。这样一套时效性的提升都是在原有的批式数据湖仓架构中完成的。
因此,实时湖仓是批式数据仓库的原地升级,它并不是一个替代关系,而是批式数据仓库的直接升级。在原有的批式数据仓库的时效性为 T+1 的情况下,通过流计算和实时更新技术,能够将时效性提升到分钟级。在原有读写操作非常粗粒度的情况下,能够实现流式更新、批式更新能够实现流式读取,甚至能够实现对查询文件的过滤,以实现高性能查询的效果。
简而言之,实时湖仓解锁了完整的大数据全生态,在一套存储架构上,能够实现流批一体的计算,也能完成典型的 OLAP 查询。
原有的批式数据仓库可以被认为是经典的绿皮车,运行缓慢,但吞吐量大,成本低。而实时数据仓库则像是飞机,非常快,但成本很高,也容易出问题,就像下雨天常常有飞机晚点。而实时湖仓想要达到的效果是传统火车的升级版,仍然在地上,仍然是传统火车的成本,但可以变得更快,像高铁一样。
02
Apache Paimon
介绍完实时湖仓是什么之后,在介绍 Paimon 之前,需要先谈谈 Iceberg。Apache Iceberg 在国外使用相当广泛,在最近一次峰会中,其创始人表示,Iceberg 是 Shared Database Storage for Big Data,即共享数据库存储。如何理解这句话呢?在传统的大数据中,共享的不仅仅是 Iceberg,Hive 也是共享的,几乎所有的计算引擎都可以访问 Hive,包括 Doris、Spark、Flink 等。但为什么 Hive 的存储方式不行呢?因为它只是一个共享的文件存储,而不是共享的数据库存储,它缺少了太多能力。所以,Iceberg 在国外的定位就是 Hive 存储格式的升级。
Iceberg 增加的能力主要包括:
对象存储友好
ACID transactions
INSERT & UPDATE & DELETE
Time Travel and rollback
Schema Evolution
Tag & Branch
Paimon 则是站在 Iceberg 这个巨人的肩膀上做了全新的设计。Paimon 核心的创新点就是原生支持了在一张表上对它定义主键,定义主键之后就可以对于这张表进行流式的更新。举个例子,针对 MySQL,定义主键之后,可以对它进行一些 update 的更新,同一个主键不用先去删再去增,直接去写 insert 即可。这样就解锁了流式处理的能力。可以在 Flink 中挂载一个 Sink,直接进行流式更新这张表,然后基于它的主键进行一次更新。在更新的过程中,也可以像 MySQL 一样产生对应的 changlog,让流处理更加简单。
那么 Paimon 是如何进行主键更新的呢?主键更新的底层核心结构是 LSM,也就是 Log-Structured Merge-Tree。这种结构已经得到了广泛验证,更适合更新或偏实时的领域。Paimon 在这方面的创新是将 LSM 结构引入湖格式中,将实时更新、实时消费带入了湖格式。
实际上 LSM 结构很简单,它是一个排好序的层次结构。它给湖格式更新带来的最大好处是,在进行压缩(compaction)时,不需要全部重写一遍。从图中可以看到,它实际上是一个三角形,越底层的数据量越大。LSM 结构只需要维护几层数据,这意味着新来的数据只需要与最上层的数据进行合并(merge),进行小规模的压缩(minor compaction),这样整体的写磁盘(write amplification)就非常小,因此压缩的效率要高得多。至于读取操作,LSM 结构也是排好序的,可以进行读取时合并(merge on read),对每一层已经排好序的数据进行合并读取,其成本也不会太大。
上图展示了 Paimon 从过去到现在再到未来的发展历程和方向。Paimon 最初作为 Flink 的一个子项目在 Flink 社区中发展,最初的名字叫做 Flink Table Store。随着我们的发展,随着一些业务的落地,我们发现实际上大家需要的是一个共享的湖格式,而不是一个简单的 Flink 组件。Spark 和 OLAP 引擎等都需要读取 Paimon 的数据,并希望与 Paimon 进行更深层次的集成。因此,我们决定将 Paimon 从 Flink 社区中独立出来,成为一个全新的项目。经过一年的孵化期,发布了通用可用(GA)版本,并且许多企业都在不断优化这个方案,直到 2024 年 3 月,Paimon 正式毕业,成为 Apache 的一个顶级项目。
这次毕业实际上也标志着 Paimon 不再是 Flink 的一个子项目,它不仅与 Flink,还与 Spark 和其他引擎,包括 Doris、StarRocks 等 OLAP 引擎都有了非常好的集成。预计在 2024 年下半年会正式发布 1.0 版本,这意味着 Paimon 在整个大数据引擎中的 OLAP 领域,已经实现了非常好的集成。
03
应用场景
第一个场景是 Paimon 最初开始应用的场景,2023 年的主流应用是这样的简单场景:数据库 CDC 入湖,Paimon 可以使 CDC 入湖变得更简单、更高效、更自动化,链路也更简洁。你可以直接启动一个 Flink 作业写入 Paimon,然后用 Spark 来查询,其它的清理、compaction(压缩)等工作都为你自动完成。
在这个基础上,Paimon 社区也提供了一套工具,可以帮助你进行 schema evolution,将 MySQL 的数据,甚至 Kafka 的数据同步到 Paimon 中。上游增加列,Paimon 也会跟着增加列。还有一些整库同步的功能,通过一个 Paimon 作业就可以同步成百上千张这样的小表。
这里分享一张阿里智能引擎实践的示例图。智能引擎的核心问题是下游有各种各样的需求来读取业务库的表,可能需要将业务库的表发送到 Kafka 中,或者并行读取的需求,许多请求直接打到业务的备库上,可能导致业务库在很多时候不够稳定,整体的并发也受到限制。因此,业务库偶尔有挂掉的风险,而且只能安排在晚上处理,白天直接处理可能会导致系统崩溃。
这里进行的一个改变就是通过 Paimon,将 Paimon 作为整个业务数据库的统一镜像表。Paimon 相比 Hive 的优势在于,可以通过 schema 离线地将 MySQL 表同步到 Paimon 中。Paimon 的下游可以支持分钟级的流计算,可以进行流式读取,也可以批量查询 Paimon 表,批量查询的时效性是分钟级的。因此,其核心是将流和批处理都统一到了 Paimon 这张表上,所有下游业务都通过 Paimon 的统一入口来消费业务库的数据。因此,整体的吞吐量没有上限,因为众所周知,Paimon 是建立在文件系统上的,全天 24 小时都可以进行数据拉取,对业务库的压力小了很多。
在这个场景中,Paimon 提供了很多更新的能力,不仅仅是更新,保留最后一条记录,也可以在更新时定义部分更新,还可以在 Paimon 上定义聚合引擎,在湖上完成一个自动聚合的能力,或是通过 Paimon 的 change log producer 来实时产生 change log 给下游消费。因此可以基于 Flink 加 Paimon 构建出完整的一套流的这样一个 ETL,这条链路当中几乎没有 state 的存在,所有的数据都是基于分钟级的批量更新,因此成本很低。查询可以通过 Doris、StarRocks 来查询。
另一个要分享的是蚂蚁的一个应用实践。需要说明的一点是,这里讲的并不是要替代实时链路,而是许多离线链路希望变得更加实时,但由于实时处理的成本太高,所以很难迁移过来。
在蚂蚁计算 UV 指标的例子中,之前是使用 Flink 的全状态链路来实现的,但后来发现大量业务难以迁移到这种模式,因此将其替换为 Paimon。利用 Paimon 的 upsert(更新或插入)更新机制来进行去重,并且利用 Paimon 的轻量级日志 changlog 来消费数据,为下游提供实时的 PV(Page View,页面浏览量)和 UV 计算。
在整体资源消耗方面,Paimon 方案使得整体 CPU 使用率下降了 60%,同时 checkpoint 的稳定性也得到了显著提升。此外,由于 Paimon 支持 point-to-point(端到端)写入,任务的回滚和重置时间也大幅减少。整体架构因为变得更加简单,因此在业务研发成本上也实现了降低。
接下来分享的是偏向 OLAP(在线分析处理)的应用场景。首先,Spark 与 Paimon 的集成非常好,不逊于 Spark 的内表。通过 Spark 或 Flink 进行一些 ETL(提取、转换、加载)操作,将数据写入 Paimon 中。基于 Paimon 进行 z-order 排序、聚簇,甚至构建文件级索引,然后通过 Doris 或 StarRocks 进行 OLAP 查询,这样就可以达到全链路 OLAP 的效果。
内部对阿里旗下的饿了么进行了评测。当然也可以将所有数据写入 OLAP 类型的表,但 OLAP 系统的问题主要是其存储是基于 SSD 的,它与计算紧密结合,为了达到 OLAP 性能,其成本非常高,导致大量数据无法实时化。
而将数据直接写入 Paimon,因为 Paimon 背后是 OSS 这类对象存储,其整体成本非常低,但时效性只有 1 到 5 分钟,所以这里需要权衡,对于某些对时效性要求不高的数据,可以直接写入 Paimon,通过 Paimon 的一些排序或数据聚簇手段,使数据更利于 OLAP 查询。然后使用 StarRocks 或 Doris 直接进行 OLAP 查询,其查询延迟在大多数时候与 OLAP 内表相差不大。但其成本能降到直接进入 OLAP 系统成本的 1/10,这样做的效果可以加速更多更大量的业务数据。
04
Paimon 前沿技术
最后来分享一下 Paimon 相关的一些前沿技术。
在数据湖格式中,大家听得最多的可能是 merge on read 或 copy on write。Merge on read 即在更新时保留大量 Delta 数据,查询时会比较慢。Copy on write 即在更新时直接对数据进行重写,写入成本非常大,但读取数据非常高效。所以 merge on read 和 copy on write 是两个极端。Merge on write 想做的是,比如在上图的主键表定义一个主键,定义一个 deletion vectors 模式。它要做的是在写入时,流式生成对原有数据的 deletion vectors,这样不是只写增量,而是先删除之前的数据再写增量,读取时只需读取之前的文件,再基于 deleting vector 直接进行高效的 OLAP 查询。所以大家把这种模式定义为 merge on write,即在写入时进行一定 merge,带来的效果是写入慢一些,但读取快很多,因此这是一个更新和极速查询兼得的方案。
Paimon 在最新版本中完全支持了标签(tag)和分支(branch)的功能。不仅支持标签,最新版本还支持了标签的自动 TTL(Time-To-Live,生存时间)管理。当你将标签和分支结合起来使用,会觉得整个 Paimon 的数据操作可以像 Git 一样。这在很多情况下非常有用,比如进行工程验证或测试时,使用这些分支和标签会非常方便。
另外,我们内部目前正在进行的一件事是基于 branch 能力来实现完整的流批一体的概念。比如,有一个分支是用于流处理,正在进行流式读取和写入,还有另一个 branch 是批处理的,我可以同时进行批处理的写操作。这样,基于同一张表,从业务角度来看,它能够实现流和批完全隔离的流批一体效果。
关于通用索引的支持,正如刚才提到的 OLAP 场景,deletion vector 模式也是一种面向 OLAP 的技术手段,通用索引的支持也是向优秀的 OLAP 引擎看齐的一种手段。例如,像 Doris、StarRocks 这样的 OLAP 引擎,它们不仅支持 Min/Max 索引,还有 bitmap、Bloom Filter、倒排索引等能力。湖格式(Lake Format)也可以拥有这样丰富的索引能力,并且可以在低廉的对象存储基础上,实现非常高效的基于过滤条件的数据跳过(data skipping),达到高效的 OLAP 查询能力。所以 Paimon 的最新版本也在研发 bitmap 索引,后续也会探索倒排索引的实现。大家都知道,一旦命中 bloomfilter,可能会有 10 到 100 倍的性能提升,命中 bitmap 索引也可能有数倍的性能提升。