01
业务背景
贝壳整装大家居业务致力于为消费者提供“家的整体解决方案”,通过个性化设计方案、多样化产品组合、标准化履约交付,通过整装、定制、软装、家电及局部翻新五大服务,满足家装前中后的所有需求。2024 年一季度,家装业务合同额达到 33.9 亿元,同比增长 26.1%,净收入 24.1 亿元,同比增长 71.1%;包括定制家具、软装等在内的新零售合同额 9.4 亿,与去年同期相比提升 5.1 个百分点。此外,标准化与科学管理的成果也开始显现,交付能力伴随收入规模同步提升,一季度贝壳家装工期约 104 天,相比去年同期下降约 18 天。
业务的快速发展,需要数据能力的同步支撑。基于业界先进的流式湖仓概念,贝壳家装数仓团队正在逐步构建符合行业特点的家装湖仓。在这一过程中,我们发现 Paimon 的一些特性特别适合解决家装行业数据建设中的一些典型问题。具体来说,Paimon 所提供的变更日志功能,可以作为一种新的数据转换方式,巧妙地解决家装行业数据建设中的业绩流水和历史变更信息的构建问题。
02
基于 Paimon Audit Log 构建业绩流水
装修行业中,项目订单的生命周期是非常长的。从签约到正签,再到合同款支付 20%、40%、60% 等节点,以及之后的开工直至竣工的各个环节,一同构成了装修项目的完整流程。在这一过程中,项目的进展往往伴随着各种可能的变动,如工程变更、合同变更、退款以及部分退款等,这些变动都会对项目的业绩产生直接影响。
所以在计算业绩时,特别是计算某个时间区间内的业绩时,一种常用的方法是根据订单金额的变化来创建业绩流水。通过记录每一笔订单的金额变动,才能清晰地追踪到项目在不同阶段的业绩情况。前期、我们分别建设了天级业绩流水和实时业绩流水来计算业绩。
天级流水,主要通过将 T-1 分区和 T-2 分区进行比较,得到变化的数据后,再将这些数据与 T-2 数据合并,最终生成天级业绩流水记录。这种方式能够较为直观地展示项目每天的业绩变化情况,但也存在一定的缺点。由于作业调度每天才进行一次,数据的消费者只能看到昨天的变化数据,因此存在一定的时效性问题。
图 1:天级业绩流水
为了弥补天级流水的不足,我们建设了实时业绩流水。其原理是利用 MySQL 的 binlog 日志,实时捕获订单金额的变更记录,并计算出差值来生成流水。这种方式能够实时反映项目的业绩变动情况,但是实现成本相对较高。它需要依赖 Kafka、Flink、HBase、Doris 等一系列组件的支持,同时还需要每日使用 Hive 离线任务来纠正历史数据,这无疑增加了开发和维护的难度。
图 2:实时业绩流水
以上数据场景中,都需要计算流水数据。而 Paimon 天然的提供了变化日志,来满足创建流水的场景。在分析业绩过程中,我们利用 Paimon 的聚合能力,结合 Paimon + Flink 的数据湖技术,计算合同业绩。起初,我们设计的方案如下:
图 3:Paimon 计算业绩流程图
ODS 层:业绩表 A 记录了项目的业绩金额。项目表 B 表记录了项目的属性信息。表结构如下:
create table project_amount (
project_id bigint comment '装修项目id'
,amount decimal(19,4) comment '已支付金额'
...
,primary key(project_id) not enforced
)
comment '业绩表'
with
(
'changelog-producer' = 'lookup'
)
;
create table project (
project_id bigint comment '装修项目id'
,address string comment '地址'
,region_code string comment '所属业绩大区编码'
,region_name string comment '所属业绩大区名称'
...
,primary key(project_id) not enforced
)
comment '项目订单表'
with
(
'changelog-producer' = 'lookup'
)
;
DWD 层:利用 Paimon 的局部列更新功能,将业绩表 A 和项目表 B 进行合并成为明细宽表 C。表结构如下:
create table if not exists project_info (
project_id bigint comment '装修项目id'
,address string comment '地址'
,region_code string comment '所属业绩大区编码'
,region_name string comment '所属业绩大区名称'
,amount decimal(19,4) comment '已支付金额'
,primary key(project_id) not enforced
)
comment '项目工期表'
with
(
'changelog-producer' = 'lookup'
,'merge-engine' = 'partial-update'
,'metastore.partitioned-table' = 'true'
,'partial-update.ignore-delete' = 'true'
)
;
DWS 层:利用 Paimon 的聚合模式引擎功能,按照大区汇总业绩数据表 D。表结构如下:
create table if not exists dws_amount
(
region_code string comment '大区编码'
,region_name string comment '大区名称'
,project_cnt bigint comment '项目数'
,amount decimal(19,4) comment '业绩金额'
,primary key(region_code,region_name) not enforced
)
comment '大区业绩汇总'
with
(
'merge-engine'='aggregation'
,'changelog-producer'='lookup'
,'fields.project_cnt.aggregate-function'='sum'
,'fields.amount.aggregate-function'='sum'
,'file.format'='avro'
,'metadata.stats-mode'='none'
)
;
数据 ETL 结果如下:
图 4:Paimon 计算业绩 ETL 过程
但是同一个项目订单,在更改属性或者度量值后,产生的新的变更日志被再次消费时,Paimon 的聚合表就会重复计算,导致数据膨胀。以具体实例来说,假设我们将项目订单 1 的业绩额从 500 变更为 600,理论上,正常的业绩额应该更新为 600。然而,在实际操作中,业绩额竟然变为了 1100(编者注:此处细节详见留言区)。
为了解决这个问题,我们需要生成业绩变化流水,并将其打入 Paimon 聚合模型表中。这样,我们就可以确保每个变更都被准确记录,并避免重复计算的发生。而 Paimon 的系统表 audit_log 恰好为我们提供了这样的条件。audit_log 表维护了每个变更记录,包括变更的时间、变更的内容以及变更前后的数据等详细信息。
使用系统表 audit_log 创建业绩流水记录的 DWS 层 Flink 任务如下:
insert into dws_amount
select region_code
,region_name
,case when rowkind in('+I' ,'+U') then amount
when rowkind in('-D' ,'-U') then - amount
else 0
end as amount
b from project_info$audit_log
;
数据流中,对于新增或者更新后的数据,如 +I(新增)和 +U(更新),我们将其业绩标记为正数,表示这些数据是有效的。如果是 -U、-D 这种更改前或者被删除的数据,我们将其业绩标记为负数,表示它们是无效的。最终得到了准确的计算结果。
图 5:业绩流水计算准确业绩变化
03
基于 Paimon Audit Log 构建历史变更信息
家装行业中的另一个特点,是其业绩并非简单按照合同金额计算,而是需要等到付款比例达到一定的数值之后,才能满足业绩计算的标准。具体而言,当合同的付款比例达到规定的数值后,家装公司便会按照最近一次合同变更的时间作为业绩达标时间,为相应的服务者团队计入增量业绩。
图 6:家装领域业绩时间计算方式
为了准确计算业绩达标的时间,我们需要精准判断本次支付是否刚好触发了业绩达标。这就要求我们密切关注每一份装修合同的变更情况,并构建一个全面而细致的历史变更信息。
在传统的数据加工方式中,我们需要将最新的合同变更记录与最近的合同历史快照进行关联,以便构建出完整的合同历史变更信息。然而,这样的关联操作往往涉及大量的数据,因此会消耗大量的计算资源。
图 7:传统数据加工方式方式
然而,在流式湖仓的框架下,我们可以利用 Paimon 的 Audit Log 功能,以更加优雅和高效的方式实现这一目标。首先,我们基于 Paimon 主键模型构建一张事实表,基于业务数据库的变化,动态更新湖表中的合同记录。随着数据的不断流入,湖表中的记录被不断地更新,呈现出一个又一个不同的版本。
值得注意的是,虽然主键模型能够反映合同的最新状态,但它却无法直接展示合同的变更过程。而 Paimon 的 Audit Log 表为我们提供了数据变更的完整记录。这些记录详细描述了数据从一个状态到另一个状态的转变过程。基于 Audit Log 记录,我们创建了一个 Paimon 局部更新模型来存储每次合同变更的前后两个版本的信息。这张新表包含两组字段:第一组字段用于表示合同变更之前的状态,我们通过不断解析主键模型的 -U 日志来更新这些字段;第二组字段则用于表示合同变更之后的状态,我们利用主键模型的 +U 日志来实时更新这些字段。通过这种方式,我们得以追踪并体现出了合同的版本变更信息。
图 8:基于合同信息的 Audit Log 得到合同的最新变更信息
代码实现方面,可以参考下面这个样例:
CREATE TABLE if not exists contract_change (
contract_id bigint comment '合同id'
,pre_update_time string comment '更改前合同更新时间'
,pre_paid_amount bigint comment '更改前合同已支付金额,分'
,pre_paid_ratio decimal(5,2) comment '更改前合同已支付比例'
...
,next_update_time string comment '更改后合同更新时间'
,next_paid_amount bigint comment '更改后合同已支付金额,分'
,next_paid_ratio decimal(5,2) comment '更改后合同已支付比例'
...
,PRIMARY KEY (`contract_id`) NOT ENFORCED
)
WITH (
'merge-engine' ='partial-update' -- 使用部分更新数据合并机制产生宽表
,'changelog-producer' = 'lookup'
);
INSERT INTO contract_change
select contract_id -- '合同id'
,update_time as pre_update_time -- '更改前合同更新时间'
,paid_amount as pre_paid_amount -- '更改前合同已支付金额,分'
,paid_ratio as pre_paid_ratio -- '更改前合同已支付比例'
...
,cast(null as string ) as next_update_time -- '更改后合同更新时间'
,cast(null as bigint ) as next_paid_amount -- '更改后合同已支付金额,分'
,cast(null as decimal(5,2)) as next_paid_ratio -- '更改后合同已支付比例'
...
from contract$audit_log
where rowkind = '-U'
union all
select contract_id -- '合同id'
,cast(null as string ) as pre_update_time -- '更改前合同更新时间'
,cast(null as bigint ) as pre_paid_amount -- '更改前合同已支付金额,分'
,cast(null as decimal(5,2)) as pre_paid_ratio -- '更改前合同已支付比例'
...
update_time as next_update_time -- '更改后合同更新时间'
paid_amount as next_paid_amount -- '更改后合同已支付金额,分'
paid_ratio as next_paid_ratio -- '更改后合同已支付比例'
...
from contract$audit_log
where rowkind = '+U'
;
现在,我们已经成功记录了合同的版本变更信息。通过其中记录的合同变更前后信息就可以实时判断合同是否达到了业绩标准。然而,在实际应用中,业绩标准并非一成不变,而是随着市场环境和考核策略的变化而不断调整。因此,为了更灵活地应对业绩标准的变化,我们需要能够获取合同的全部历史变更信息。
随着数据流入,当前的这个变更信息模型会不断地更新和覆盖,始终只能呈现出最近一次的版本变更信息。此时,我们需要再次借助 Audit Log 表。Audit Log 表详细记录了合同版本变更的每一个痕迹,包括每个版本的增加(+I)和更新(+U)操作。通过读取这些数据,我们可以追踪合同的每一次变更。
在读取这些数据时,我们还需要进行一定的过滤操作。因为在数据更新的过程中,会出现起止时间相等的版本。这种情况通常是由于 Audit Log 表中下一次的 -U 和上一次 +U 所包含的信息相同所导致的。为了避免冗余信息的干扰,我们需要在读取数据时进行甄别,过滤掉这些起止时间相等的版本。
经过这样的处理,我们就能够获取到合同的全部历史变更信息,并将这些信息平铺呈现出来。
图 9:基于合同最新变更信息的 Audit Log 得到合同的历史变更信息
代码实现方面,可以参考下面这个样例:
CREATE TABLE if not exists contract_change_his (
contract_id bigint comment '合同id'
,pre_update_time string comment '更改前合同更新时间'
,pre_paid_amount bigint comment '更改前合同已支付金额,分'
,pre_paid_ratio decimal(5,2) comment '更改前合同已支付比例'
...
,next_update_time string comment '更改后合同更新时间'
,next_paid_amount bigint comment '更改后合同已支付金额,分'
,next_paid_ratio decimal(5,2) comment '更改后合同已支付比例'
...
,PRIMARY KEY (`project_id`,`pre_update_time`) NOT ENFORCED
)
WITH (
'merge-engine' = 'deduplicate'
,'changelog-producer' = 'lookup'
);
INSERT INTO contract_change_his -- 累计结果拉链表
select contract_id -- '合同id'
,pre_update_time -- '更改前合同更新时间'
,pre_paid_amount -- '更改前合同已支付金额,分'
,pre_paid_ratio -- '更改前合同已支付比例'
...
,next_update_time -- '更改后合同更新时间'
,next_paid_amount -- '更改后合同已支付金额,分'
,next_paid_ratio -- '更改后合同已支付比例'
...
from contract_change$audit_log
where rowkind in ('+I','+U')
and pre_update_time <> next_update_time
;
04
总结与展望
Paimon 的出现为 Flink 生态提供了一种全新的数据承载介质。以往在实时数据处理中,主要都是针对增量数据进行事务型业务过程的统计和分析。而像家装这类生命周期较长的行业,其分析场景更倾向于圈定分析目标之后的持续状态追踪。Paimon 的出现使得我们可以将全量数据保存于湖仓之中,并且进行实时地更新。其强大的 Audit Log 功能更是能够在部分场景彻底颠覆传统的数仓建设思路。
贝壳家装数仓团队将继续完善相关方案,并且探索流式湖仓建设过程中的更多可能,为家装行业带来更多的数据力量。也期待 Paimon 在今后的迭代中越做越好,让流式湖仓赋能更多行业。