您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center   Code  
会员   
   
 
     
   
 
 订阅
Apache hudi在腾讯的落地与实践
 
作者:Forward Xu(徐前进)
   次浏览      
 2023-5-31
 
编辑推荐:
本文首先介绍 Apache Hudi 在腾讯的落地与实践。希望对你的学习有帮助。
本文来自于微信公众号DataFunSummit,由Linda编辑、推荐。

导读 这里放置导读部分的内容随着 Apache Parquet 和 Apache ORC 等存储格式以及 Presto 和 Apache Impala 等查询引擎的发展,Hadoop 生态系统有潜力作为面向分钟级延时场景的通用统一服务层。然而,为了实现这一点,Hadoop 生态系统需要在 Hadoop 分布式文件系统(HDFS)中进行高效、低延迟的数据摄取和数据准备。为了解决这个问题,我们在Uber内部构建了 Hudi。Apache Hudi 是新一代流式数据湖平台,支持插入、更新、删除以及增量数据处理;可助力构建高效的企业级数据湖,目前已经在国内外多个大型公司生产落地。今天会和大家分享 Apache Hudi 在腾讯的落地与实践。

01 Hudi的功能与构成

1. HUDI 的能力和定位

Apache Hudi是一个基于数据库内核的流式数据湖平台,支持流式工作负载,事务,并发控制,Schema演进与约束;同时支持 Spark/Presto/Trino/Hive 等生态对接,在数据库内核侧支持可插拔索引的更新,删除,同时会自动管理文件大小,数据 Clustering,Compaction,Cleaning 等。

2. HUDI 的整体结构

可以基于云存储/HDFS 构建基于 Hudi 的 Lakehouse,Hudi 支持 Parquet、ORC、HFile、Avro 格式,同时提供了非常丰富的 API,如 Spark DF、RDD、FlinkSQL、Flink DataStream API,利用这些 API 可以非常方便地对 Hudi 表进行操作,同时 Hudi 也集成了其他生态,如 MPP 引擎 StarRocks,Doris 等。

02 Hudi的基本概念介绍

基本概念介绍:

Hudi 的基本概念由 Timeline 和 File Layout 组成

Timeline 由一个个 commit 组成,包括 DELTA_COMMIT,COMMITS,CLEANS,ROLLBACK,REPLACECOMMIT 等,同时每个 commit 都包含对应的状态,如 requested,inflight,completed 三种状态,分别代表请求开始处理,正在处理,处理完成。

File Layout 主要由 FileGroup 构成,FileGroup 由 FileSlice 组成,每个 FileSlice 相当于一个版本,包含一个 Base 文件和多个 Log 文件。

Hudi支持MOR和COW两种类型,MOR表对流式写入更友好,延迟更低,对于更新的log文件支持同步和异步两种模式进行 Compaction,生成新的Base文件,以加速查询,支持 Snapshot,Read Optimized,Incremental 读取。

而对于COW表,每次写入需要重写文件,写放大相对严重,延迟相对MOR较高,更适合写少读多的场景。

为了加速数据的更新,Hudi 支持多种索引,如分区级别的索引以及全表索引,分区级别的索引可以保证数据在分区内的唯一性,全表索引保证数据在表级的唯一性(开销较大)。Hudi 支持了多种类型的索引实现,典型的如 BLOOM、BUCKET 索引,以及自定义索引等方式。

另外一个核心的概念是Hudi的Table Service,包含Compaction操作,Compaction针对FileSlice进行操作,会将Base文件和其对应的Log文件进行合并,产生新的Base文件;可以通过指定NUM_COMMITS或TIME_ELAPSED两种策略调度执行Compaction,对于调度执行而言,Hudi为不影响主链路的写入,支持了异步调度与执行,以及同步调度与执行,同步调度异步执行方式,满足不同的需求。

另外一个Table Service是Clean,Clean用于删除过期的文件,同样与Compaction类型也提供了多种策略以及调度执行策略,值得注意的是对于做了Savepoint的时间点,其对应的文件不会被删除。

接下来分析对于COW表的不同查询的实现,如在instant 0 时刻写入一部分数据(ABCDE),在instant 1时刻更新A -> A',D -> D',在instant 2时刻更新A' -> A'',E -> E',并插入F 那么对于快照查询(Snapshot Query)每次都是读取的最新的FileSlice,增量查询(Incremental Query)读取指定commit之间的Parquet文件,然后再将时间范围下推至Parquet文件进行过滤,只读取符合条件的变更的数据。

对于MOR表,快照查询(SNAPSHOT Query)读取的是Base文件与Log合并后的最新结果;而增量查询读取指定commit之间的Parquet以及Log文件,然后再对Log文件进行Block级别的过滤(根据Commit时间),合并重复key后返回结果。

03 Hudi 的应用场景

1. CDC 数据入湖

这个场景主要是DB数据入湖入仓,把原来T + 1的数据新鲜度提升到分钟级别。数据新鲜度通过目前比较火的以Debezium、Maxwell为代表的CDC(change Data Capture)技术实现。以Streaming近实时的方式同步到数仓中。在传统的Hive数仓中想保证实时是非常困难的,尤其是文件更新,湖表实时写入更新,基本不可能实现。

CDC技术对数仓本身存储是有要求的,首先是更新效率得足够高,能够支持以Streaming方式写入,并且能够非常高效的更新。尤其是CDC log在更新过程中还可能会乱序,如何保证这种乱序更新的ACID语义,是有很高要求的,当前能满足乱序更新的湖格式只有Hudi能做到,而且Hudi还考虑到了更新的效率问题,是目前比较先进的架构。

图中方案3相比上面的方案,比较适合体量比较大(每天增量能达到亿级别)、数据平台比较健全的公司,中间有一套统一的数据同步方案(汇总不同源表数据同步至消息队列),消息队列承担了数据的容错、容灾、缓存功能。同时,这套方案的扩展性也更好。通过Kafka的topic subscribe方式,可以比较灵活地分发数据。通过以上三种方式入湖Hudi,以某数据中台为例已经有6000多张源表写入Hudi,日增几十亿数据入湖。

2. 分钟级实时数仓

第二个场景是构造分钟级别的实时数仓,分钟级别的端到端数据新鲜度,同时又非常开放的OLAP查询引擎可以适配。其实是对Kappa架构或者是原先Streaming数仓架构的一套新解法。在没有这套架构之前,实时分析会跳过Hudi直接把数据双写到OLAP系统中,比如ClickHouse、ES、MongoDB等。当数仓存储已经可以支持高效率分级别更新,能够对接OLAP引擎,那么这套架构就被大大简化,首先不用双写,一份数据就可以保证only one truth语义,避免双写带来数据完整性的问题。其次因为湖格式本身是非常开放的,在查询端引擎可以有更多选择,比如Hudi就支持Presto、Trino、Spark、StarRocks,以及云厂商的Redshift引擎,会有非常高的灵活度。多层数据可见性也从T+1 小时或天,缩短到分钟级别。

3. 流式计算PV/UV

Apache Hudi 的Payload是一种可扩展的数据处理机制,通过不同的Payload我们可以实现复杂场景的定制化数据写入方式,大大增加了数据处理的灵活性。Hudi Payload 在写入和读取 Hudi 表时对数据进行去重、过滤、合并等操作的工具类,通过使用参数"hoodie.datasource.write.payload.class"指定我们需要使用的Payload class。为了实现 pv/uv计算,我们实现了 RecordCountAvroPayload ,它可以在对数据去重的时候,将重复数据的数量记录下来,这里的重复指的是HoodieKey(primary key + partition path)相同。以往处理方式是通过Flink + window 聚合实现,该方式有延迟数据丢弃和state爆掉风险,Hudi Payload机制则没有这些风险。

4. 多流拼接(大宽表)

上图是一个典型的非常复杂的业务落地, 消息流1由Kafka写入Hudi商品销售明细表,消息流2由Kafka写入Hudi用户基本属性表,然后结合Hudi商品标签表和Hve用户扩展属性表进行实时和离线拼接大宽表。

在实现多流拼接功能前有三个前置条件需要满足:

基于乐观锁的Timeline

基于marker的早期冲突检测

启用occ(乐观并发控制)

这里主要描述基于时间线服务器的标记机制,该机制优化了存储标记的相关延迟。Hudi 中的时间线服务器用作提供文件系统和时间线视图。如下图所示,新的基于时间线服务器的标记机制将标记创建和其他标记相关操作从各个执行器委托给时间线服务器进行集中处理。时间线服务器在内存中为相应的标记请求维护创建的标记,时间线服务器通过定期将内存标记刷新到存储中有限数量的底层文件来实现一致性。通过这种方式,即使数据文件数量庞大,也可以显著减少与标记相关的实际文件操作次数和延迟,从而提高写入性能。

实现的原理基本上就是通过自定义的 Payload class 来实现相同 key 不同源数据的合并逻辑,写端会在批次内做多源的合并,并写入 log,读端在读时合并时也会调用相同的逻辑来处理跨批次的情况。这里需要注意的是乱序和迟到数据(out-of-order and late events)的问题。如果不做处理,在下游经常会导致旧数据覆盖新数据,或者列更新不完整的情况。针对乱序和迟到数据,我们对 Hudi 做了 Multiple ordering value 的增强,保证每个源只能更新属于自己那部分列的数据,并且可以根据设置的 event time (ordering value) 列,确保只会让新数据覆盖旧数据。最后结合 lock less multiple writers 来实现多 Job 多源的并发写入。

介绍多流拼接场景下 Snapshot Query 的核心过程,即先对 LogFile 进行去重合并,然后再合并 BaseFile 和去重后的 LogFile 中的数据。上图显示了整个数据合并的过程,具体可以拆分成以下两个过程:

(1)Merge LogFile

Hudi 现有逻辑是将 LogFile 中的数据读出来存放在 Map 中,对于 LogFile 中每条 Record,如果 Key 不存在 Map 中,则直接放入 Map,如果 Key 已经存在于 Map 中,则需要更新操作。

在多流拼接中,因为 LogFile 中存在不同数据流写入的数据,即每条数据的列可能不相同,所以在更新的时候需要判断相同 Key 的两个 Record 是否来自同一个流,是则做更新,不是则做拼接。如图 3 所示,读到 LogFile2 中的主键是 key1 的 Record 时,key1 对应的 Record 在 Map 中已经存在,但这两个 Record 来自不同流,则需要拼接形成一条新的 Record (key1,b0_new,c0_new,d0_new) 放入 Map 中。

(2)Merge BaseFile and LogFile

Hudi 现有默认逻辑是对于每一条存在于 BaseFile 中的 Record,查看 Map 中是否存在 key 相同的 Record,如果存在,则用 Map 中的 Record 覆盖 BaseFile 中的 Record。在多流拼接中,Map 中的 Record 不会完整覆盖 BaseFile 中对应的 Record,可能只会更新部分列的值,即 Map 中的 Record 对应的列。

如上图所示,以最简单的覆盖逻辑为例,当读到 BaseFile 中的主键是 key1 的 Record 时,发现 key1 在 Map 中已经存在并且对应的 Record 有 BCD 三列的值,则更新 BaseFile 中的 BCD 列,得到新的 Record(key1,b0_new,c0_new,d0_new,e0),注意 E 列没有被更新,所以保持原来的值 e0。对于新增的 Key 如 Key3 对应的 Record,则需要将 BCE 三列补上默认值形成一条完整的 Record。

5. 批流探索-广告归因

广告归因是指在用户在广告行为链路中,使用科学的匹配模型两两匹配各环节的行为数据点,可用于判断用户从何渠道下载应用(或打开小程序),通过匹配用户广告行为,分析是何原因促使用户产生转化。广告归因的数据结果是衡量广告效果、评估渠道质量的重要依据,可帮助广告主合理优化广告素材,高效开展拉新、促活营销推广,而实时广告归因则能更及时地应用到优化广告投放的过程中。

在增长买量业务场景中,买量团队在快手、百度、字节等渠道上投放广告,比如某云游戏广告素材,吸引潜在用户点击广告,进入业务开始玩云游戏,也可以下载游戏的APK安装包,从而实现将用户转化成业务新增用户和游戏新增用户的目的。如下图所示,渠道方可以获取用户的点击数据,业务可以获取新增用户的数据,在点击归因链路中,就是将业务新增用户匹配到用户在某渠道上近N天的最后一次广告点击,在正常的业务过程中,先有用户点击广告数据,后有业务新增用户数据,根据离线数据统计经验,点击转化成新增用户的窗口时间最长不超过3天,也就是N=3。

数据流一,Flink SQL消费点击数据,并通过Upsert方式(row-level update)写入数据湖Hudi点击表,MOR特性取最后一次点击数据。

数据流二,Flink SQL消费应用宝新增数据,通过Append方式写入数据湖Hudi新增表。

批处理三,Super SQL读Hudi新增表(当日)、Hudi点击表(近N天)关联,通过Merge Into语法(row-level update)写入归因结果Hudi表。Super SQL底层计算引擎是Spark3,该任务通过US系统每10分钟调度一次。

数据流四,Flink SQL通过snapshot-id方式(流式读取)将归因结果表实时出湖到CDMQ,保持数据应用接口和方案一致。

基于Hudi方案优势如下:

准实时和离线数据统一存储,归因率和T+1保持一致,Hudi归因率从原来的80%提升至 85%。

•Flink SQL、Super SQL开发简化了编码过程,降低了开发成本。

稳定性高,一般情况的数据延迟通过US在下个定时周期自动修复,维护成本低。

时效性是10分钟调度+3分钟运行 <15分钟。

6. 批流探索:流转批

在某些业务场景下,我们需要一个标志来衡量Hudi数据写入的进度,比如:Flink 实时向 Hudi 表写入数据,然后使用这个 Hudi 表来支持批量计算并通过一个 flag 来评估它的分区数据是否完整从而进一步写入分区数据进行分区级别的ETL,这也就是我们通常说的流转批。

上左图中Flink Sink包含了两个算子。第一个writer 算子,它负责把数据写入文件,writer在checkpoint触发时,会把自己写入的最大的一个时间传到commit算子中,然后commit算子从多个上游传过来的时间中选取一个最小值作为这一批提交数据的时间,并写入HUDI表的元数据中。

我们的方案是将这个进度值(EventTime)存储为 Hudi 提交(版本)元数据的属性里,然后通过访问这个元数据属性获取这个进度值。在下游的批处理任务之前加一个监控任务去监控最新快照元数据。如果它的时间已经超过了当前的分区时间,就认为这个表的数据已经完备了,这个监控任务就会成功触发下游的批处理任务进行计算,这样可以防止在异常场景下数据管道或者批处理任务空跑的情况。

上右图是一个Flink 1分钟级别入库到HUDI ODS表, 然后通过流转批计算写入HUDI DWD表的执行过程。

如何解决乱序到来问题, 我们可以通过设置SpedGapTime来设置允许延迟到来的范围默认是0 不会延迟到来。我们同时也为社区做了Call Command的命令,其主要有50多个功能,主要覆盖在运维表、优化表、快照管理和元数据管理,可用其进行数据读取、文件合并、数据优化、统计表信息等具体功能,便于集成与平台化。

04 Hudi的未来展望

Hudi前期support Change-Data-Capture,这张图大致描述了Hudi在Upsert、Delete所产生的更新的数据和底热数据。它会产生before和after的数据,我们从得到的数据再得到更新或删除操作的数据,从而还原出用户不能写入的数据。

我们在内部会有一个计算引擎,进行Parse+Query Optimizer和Physical Plan+Query Execution的功能,我们以SQL为统一入口,基于Hudi的Flink ETL,将我们所需要的数据在ODS层、DWD层、ADS层实时物化下来,统一通过Query改写的手段,让用户在查询时,直接读取出物化的结果,达到更低时效的写入物化结果,还能得到若干倍的查询性能的提升。该方案目前正在进行中,未来将不断进行探索。

05 问答环节

Q1:流转批的check脚本需要自己实现吗?是否需要和调度系统进行集成?

A1:可以的,直接可以SQL的方式实现。但带来的问题是你用spark去提交和执行这样一个命令的话,可能会有一个申请资源的消耗。比如执行命令一秒钟,申请资源两分钟,这样就有一个开销了。假如你用一个Java client的方式,然后集成到你的平台的话,那么你仍然可以保留快速获取未解信息的逻辑。所以两种方式都可以,不一定要写脚本。

Q2:Call Command未来会支持Flink吗?

A2:在Flink这块还没有看到类似Call Command的语法,它是一个来源于存储过程的语法, Flink这一块要加的话会晚一些,至少需要先把语法的部分实现。

   
次浏览       
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训

最新活动计划
C++高级编程 12-25 [线上]
白盒测试技术与工具实践 12-24[线上]
LLM大模型应用与项目构建 12-26[特惠]
需求分析最佳实践与沙盘演练 1-6[线上]
SysML建模专家 1-16[北京]
UAF架构体系与实践 1-22[北京]
 
 
最新文章
大数据平台下的数据治理
如何设计实时数据平台(技术篇)
大数据资产管理总体框架概述
Kafka架构和原理
ELK多种架构及优劣
最新课程
大数据平台搭建与高性能计算
大数据平台架构与应用实战
大数据系统运维
大数据分析与管理
Python及数据分析
更多...   
成功案例
某通信设备企业 Python数据分析与挖掘
某银行 人工智能+Python+大数据
北京 Python及数据分析
神龙汽车 大数据技术平台-Hadoop
中国电信 大数据时代与现代企业的数据化运营实践
更多...