哔哩哔哩技术 2024年11月26日
​B 站基于 Iceberg 的流批一体的探索和实践
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

B站利用Iceberg构建了流批一体的数据处理架构,涵盖了用户行为数据传输、商业和AI在线训练、DB数据同步以及Iceberg维表Join等场景。通过将Hive替换为Iceberg,优化了数据存储和计算流程,提升了数据新鲜度和查询性能,降低了成本并提高了效率。同时,B站还自研了Iceberg表智能数据优化服务Magnus,进一步提升了Iceberg的查询性能。本次分享详细介绍了B站基于Iceberg的实践经验和技术细节,以及未来规划和展望。

🤔**海量用户行为数据传输优化**: B站将离线链路的数据存储从Hive替换为Iceberg,并使用Flink Streaming进行数据计算,将数据新鲜度提升至分钟级,同时优化了数据重复和异步传输问题,并通过Magnus服务进行数据优化,降低存储成本和提升查询性能。

🚀**商业和AI在线训练统一**: B站将实时和离线训练的特征计算结果存储统一到Iceberg,并使用统一API,减少了Kafka存储资源消耗,降低了计算成本,并对Iceberg数据进行冷热分离,加速实时训练。

📊**DB数据同步实时化**: B站利用Iceberg MOR表实现了DB数据的实时同步,并提供了增量流读Change Log能力,提升了数据同步的时效性,并通过优化Equality Delete Files和增量流读策略,优化了增量流读的速率。

🔗**Iceberg维表Join优化**: B站基于Iceberg的增量流读能力,构建了新的Flink维表Join算子,将维表全量数据存储在State中,并支持增量流读更新State数据,有效降低了对远端DB的压力,实现了流批一体的维表Join场景。

🔄**Magnus与Flink冲突解决**: B站通过取消Magnus对CDC写入的MOR表做托管数据合并操作,并由Flink JobManager主动控制触发compaction操作,解决了Magnus和Flink写入冲突问题,保证了数据同步和MOR表产出的时效性。

张陈毅 2024-11-26 12:02 上海

张陈毅 哔哩哔哩 资深开发工程师

导读 大家好,我是来自哔哩哔哩的张陈毅,今天给大家分享的 topic 是B 站基于 Iceberg 的流批一体的探索和实践。

本次的分享主要分为五个部分:

1. 海量用户行为数据传输

2. 商业和 AI 的在线训练

3. DB 数据同步

4. Iceberg 维表 Join

5. Q&A

分享嘉宾|张陈毅 哔哩哔哩 资深开发工程师

编辑整理|杨维旭

内容校对|李瑶

出品社区|DataFun


01

海量用户行为数据传输

1. 实时数据传输概览

如上图所示,B 站的数据众多,大致可以划分为三类:日志数据、用户行为数据和业务库数据。每个业务方上报的数据以 log id 进行标识,log id 是数据在传输和集成过程中的源信息标识,其中日志数据和用户行为数据会通过 log-agent 和 bfe-agent 上报给到数据网关,然后数据网关会根据 log id 分别路由到内部的 Kafka 数据缓冲层,业务库数据则会通过 Flink CDC connector 将全量数据和增量的 binlog 数据分发到数据缓冲层 Kafka。下游的数据分发会根据用户的实际场景,可以写入到 Kafka、Hive 和 Iceberg。

2. 海量用户行为数据传输(背景)

上图是迭代前的用户行为数据传输架构。APP 移动端行为数据和 Web 服务器行为数据会统一传输给 Kafka 缓冲层。接着分为实时和离线两条链路,实时链路中,数据通过 Lancer 数据集成分发到 Kafka ODS 层,然后使用 Flink Bsql 流式计算写入到 Kafka DWD 层,供下游应用层做实时计算。离线链路中,通过 Lancer 数据集成之后,分发到 Hive ODS 层,一般是小时或天级别分区表,然后内部的调度系统会拉起 Spark 离线任务,产出 Hive 的 DWD 层数据,供下游应用层做离线计算。

对比实时链路和离线链路,它们的特征差异也比较明显。主要有以下几点:

3. 海量用户行为数据传输(迭代)

我们结合数据湖 Iceberg,对整个用户行为数据链路做了一定的迭代,在数据进入 Kafka 缓冲层之前,会保持不变。在下游的数据分发中,将离线链路的 ODS 层至 DWD 层的存储从 Hive 替换成了 Iceberg。其中 ODS 到 DWD 的计算,由原先的 Spark 离线变成了 Flink Streaming,这主要得益于 Iceberg 可以根据 snapshot 做增量的流读消费,可以让 DWD 数据的新鲜度基本保持在分钟级,相比原先的 Hive 的小时级缩短了不少。

如果在数据传输过程中发现数据异常,可以借助平台工具,直接将 Flink 流计算的 SQL 根据业务时间诉求自动格式化为相应的 SQL,转化成 Flink Batch SQL,直接跑在 Batch 任务中,无需做多引擎的 SQL 开发,并且可以直接复用流计算的 UDF。除此之外,相比于 Kafka 链路,如果用户需要对 Kafka DWD 数据做 Batch 相关的分析,用户原先一般会将 Kafka DWD 数据 dump 到 Hive 中,然后再做分析,而使用 Iceberg 后,可以直接使用 FlinkBatch 去做 query。

在此次迭代中,我们也同步优化了用户行为数据场景中数据重复的问题,在之前的离线链路中,同一个用户行为数据可能会在多个任务中同时存在。改为 Iceberg 后,我们在下游数据应用层做了 Iceberg 的视图,上游的数据会按照 BU 来划分,整体减少了数据重复存储的浪费。

最后一点是对数据异步的优化,Iceberg 会借助内部服务做异步优化数据,可以整体加速 Batch query 的速率。

4. 海量用户行为数据传输(数据优化)

Magnus 是 B 站自研的一个 Iceberg 表智能数据优化服务,它的一大功能是做数据优化。

上图中展示了优化后的数据流程图。可以看到,Iceberg 的数据由 Spark、Flink 和 SDK 任务写入,当任务提交一个 snapshot 时,会发送一个 Commit Event 给到 Magnus 服务,然后 Magnus 服务会根据不同的事件属性以及不同的表优化策略生成对应的 OptimizeJob,提交到内部的队列当中等待,Job 提交器会根据集群资源的空闲情况,依次提交队列中的 Optimize Job。其中 Flink 流计算写入的数据生成的 Optimize Job 的优先级相对较高,会优先执行。

其中的优化点主要包括:

通过以上五个优化策略,在 Batch 场景中可以有效提升 Iceberg 的查询性能,减少文件 IO,优化数据存储结构,提升整体的数据访问效率。

5. 海量用户行为数据传输(收益)

通过上述改造,带来了降本和提效两方面的收益。

在成本方面,一年的计算和存储成本节省约 355 万,其中 CPU 使用率降低了 20%,内存消耗降低了 22%。性能方面,通过新技术迭代,经过多个实际迁移任务的前后分析,发现平均性能提升了 48.9%。

02

商业和 AI 的在线训练

1. 商业和 AI 的在线训练(背景)

在商业和 AI 的在线训练场景中,展现流和点击流会通过 Flink 双流 join 进行行为归因,生成展点流的 Base 数据,写入 Kafka,然后展点流数据会通过另一个 Flink 作业进行特征计算,会有本地的 Jni 的调用,其依赖于离线计算好的用户画像、视频基础信息、UP 主挖掘信息等数据。特征计算的结果会写到 Kafka,得到一个实时流训练样本。实时流训练样本会通过两种训练方式进入到训练平台:第一种是通过实时训练读取 Kafka 实时流数据输入;第二种是将 Kafka 的数据通过 Flink SQL Dump 到 Hive 小时表或天表中供离线训练。除了发布到线上的这条链路外,还有多个实验链路会从 Kafka 读取 base 展点流数据并写入到训练平台的存储中供下一步训练使用。

这一架构存在诸多痛点:

2. 商业和 AI 的在线训练(迭代)

基于上述背景,我们首先将后半段特征计算的输出结果由原先的 Kafka 改为了 Iceberg,实时训练和离线训练会使用统一的 API,这样在存储和 API 层面就达到了统一的结果。与之前的双存储相比,节省了 Kafka 的存储资源, 对写入 Iceberg 的数据我们也做了压缩,并在 HDFS 文件上进行了优化,进一步降低了整体的存储成本。与此同时,相比之前的模式,我们节省了 dump 任务流计算的 CPU 消耗,整体计算成本降低 20%。对于算法 online 发布链路,我们还对 Iceberg 数据做了冷热分离,将 3 天内的新鲜数据放在高存储成本介质 SSD 上,用于加速下游实时训练。

3. 商业和 AI 的在线训练(规划)

对于上述场景,我们也在规划后续升级 Flink 1.20 版本,并做一些适配,计划将 Kafka base 展点流数据替换为 Iceberg。目前我们内部的 Flink 主要是 1.11 和 1.15 两个版本,即使开启了 Unaligned Checkpoint,也没办法满足用户一分钟的时效性要求。因为展现流和点击流做双流 join 使用的是社区的 interval join 的定制版,即便把我们数据延迟发送时的双流 join 时间窗口设置为 1 小时,在流量较大的背景下,依然会导致其 keyed state 处在很大的量级,大致在 10 TB 级别以上。当降低 checkpoint 时间的时候,我们发现 RocksDB 的 CPU 共振会导致 checkpoint 失败率上升,最后影响下游数据的正常消费。Iceberg 的数据可读是严格依赖于 snapshot commit 的成功生成的,我们期望借助于高版本的 GIC 的能力,降低 checkpoint 的时间来满足双流 join 生成 Iceberg 数据时效性。

03

DB 数据同步

1. DB 数据同步(背景)

上图展示了迭代前的 DB 数据同步流程。

首先 Flink CDC 会将全量数据和增量的 binlog 数据同步到 Kafka 缓冲层,然后使用 Flink 流,将 binlog 完整写出到 Iceberg Append 表中,其中会附带完整的 binlog 记录,依据流计算的 watermark 推进进度,通知下游 Spark merge 任务,数据 merge 合并会依赖于 binlog 的 timestamp 和主键去做排序,然后实现数据更新、新增以及删除操作的同步。由于迭代的原因,我们目前暂未将 Iceberg 的数据给到用户,而是将 Iceberg Dump 的 Hive 表给到用户。整个 DB 同步是通过实时加离线的组合同步产出 Hive 表给到用户,每个分区为全量的 DB 快照数据。

除此之外,在高流量场景下,我们需要对 DB 做维表 join 的情况下,我们也会建议用户直接使用 Hive 作为 MySQL 维表 join 的一个替代方案,用以缓解 DB query 的压力。但是缺点也比较明显,就是 Hive 维表 join 的时效性会比较低,至少为小时级。

2. DB 数据同步(迭代)

基于上述背景,首先我们保留了原有实时加离线组合链路上产出 Hive 表的这条链路,原因是用户使用基数比较庞大。在此基础上面我们增加了一条实时链路,去做用户实时场景的补强。我们内部的 Iceberg 升到 1.4 版本之后做了一些改造,将 Flink CDC 的数据直接写到 Iceberg MOR 表中,这样 Iceberg 表可以作为 MySQL 的一个镜像来做实时查询,时效性可以提升到分钟级。并且通过改造,提供了 Iceberg 增量流读 change log 的能力。

3. DB 数据同步(流读)

Changelog 增量流读的具体改造如上图所示。

Iceberg 中数据文件分为三类:

在这里,我们对 Equality Delete Files 的内容做了一定的改造,将其做了扩展,记录了整个需要删除的 row data 值用于适配 CDC 同步场景,当 DB 数据同步更新的时候,binlog 会记录 update before 的结果,在删数据的时候,binlog 也会记录整个删除的数据值,这样在相同的 snapshot 下若有一条行级删除数据需要写入 Equality Delete Files,直接将整个删除的完整记录写入此文件即可,这样可以方便后面后续增量流读 changelog,无需查询历史 snapshot 的 Data File,从而优化增量流读 change log 的速率。

第二个优化点是在增量流读 MOR 表方面,为了保证数据能够同步有序的回放 binlog 的记录,在增量流读时,我们首先会根据 snapshot 的 timestamp 时间去做排序,然后在相同的 snapshot 下,会先读 Equality Delete Files,然后依次去读 Data File 和 Position Delete Files。

在写入侧,我们也针对自身场景,额外在 Write Operator 引入了 keyed state 记录主键值。前文提到,我们内部的 DB 数据同步会首先写入到 Kafka 中,然后再消费,而写入 Kafka 的流计算任务是会存在重启的,因此我们增加 keyed state 来防止数据重放。

最后一点是借助 Magnus 服务,对 MOR 表做异步的 compaction 合并操作,防止 Equality Delete Files 过多影响 Batch 查询。

4. DB 数据同步(展望)

对 DB 同步场景,我们期望将两条链路合并成一条单链路,并提供下面三个能力:

以上前两点在上述新增链路中已经实现,而第三点合入到此链路中需要做以下几个优化事项:

04

Iceberg 维表 Join

1. Iceberg 维表 Join(背景)

接下来我们看一下 Iceberg 维表 join 的场景。AI 和商业的流计算任务都有一个显著特性,就是数据流量特别大。当使用双流 join 来关联远端 DB 进行维表 join 时,就会存在以下痛点:

2. Iceberg 维表 Join(迭代)

针对上述问题,我们发现可以基于 Iceberg 的 change log 增量流读能力,提供 lookup join 的替代品,构建一个新的 Flink 维表 join,此维表 join 需要满足如下几个特性:

通过上面能力的迭代,Iceberg 满足了新 join 对于 source 所需特性的诉求,基于此我们也做了相应的适配,构建新的维表 join 算子。

3. Iceberg 维表 Join(改造)

为了构建新的 Lookup join,我们做了如下改造:

4. Iceberg 维表 Join(收益)

改造前实时链路的主流会做 LookupJoin 关联 DB 写到 Kafka 中,结果数据一般会 dump 到 Hive 用于离线分析。如果要对离线分析的结果从上游做数据的回刷,需要将上游 topic 的数据先做 dump 到 Hive,然后使用 Spark SQL 去关联 Hive 的维表,写入到 Hive 表。

改造后,可以直接用 Iceberg 主流维表 Join 关联 Iceberg 维表,写出到下游的 Sink Iceberg,如果要用 batch 的话,可以直接使用平台工具将流任务转化成批任务,指定好相对应的时间即可。

通过改造,Flink 流超大流量的维表 join,对远端 DB 的压力可以直接降为 0,离线回刷可以整体复用 Flink 流计算的逻辑,UDF 也可以得到相应的复用。

05

Q&A

Q:Magnus 和 Flink 都会对文件做写入,冲突是如何解决的?

A:Magnus 会对 MOR 的合并也会做 compaction 的操作,我们之前也遇到了冲突的问题,后面是取消了 Magnus 对 CDC 写入的 MOR 表做托管数据合并操作,而是让 Flink 的 JobManager 去主动控制触发,也就相当于是  JobManager 会根据当前表的流量,配置不同的属性,配置不同的调度步长,比如一个小时或者两个小时做一次请求 Magnus 的 compaction 操作。这一过程中,整个主流会 block 住,然后等到 Magnus 数据 compaction 完成之后,再将主流数据放下去。优化后 Magnus 能够在分钟级完成整个 compaction,因此对于整个 DB 同步以及数据 MOR 表产出的时效性不会有太大的影响。

以上就是本次分享的内容,谢谢大家。


分享嘉宾

INTRODUCTION


张陈毅

哔哩哔哩

资深开发工程师

专注于 Flink SQL/State 和流批一体的工作,为内部提供 Flink 引擎相关的技术支撑



跳转微信打开

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Iceberg 流批一体 Flink 数据湖 B站
相关文章