编辑推荐: |
本文主要介绍了数据湖概念及相关知识。希望对你的学习有帮助。
本文来自于CSDN,由Linda编辑、推荐。 |
|
数据湖技术
概述
什么是数据湖?
数据湖是一个集中式的存储库,允许你以任意规模存储多个来源、所有结构化和非结构化数据,可以按照原样存储数据,无需对数据进行结构化处理,并运行不同类型的分析,对数据进行加工,例如:大数据处理、实时分析、机器学习,以指导做出更好地决策。
大数据为什么需要数据湖?
当前基于 Hive 的离线数据仓库已经非常成熟,在传统的离线数据仓库中对记录级别的数据进行更新是非常麻烦的,需要对待更新的数据所属的整个分区,甚至是整个表进行全面覆盖才行,由于离线数仓多级逐层加工的架构设计,数据更新时也需要从贴源层开始逐层反应到后续的派生表中去。
随着实时计算引擎的不断发展以及业务对于实时报表的产出需求不断膨胀,业界最近几年就一直聚焦并探索于实时数仓建设。根据数仓架构演变过程,在
Lambda 架构中含有离线处理与实时处理两条链路,其架构图如下: 正是由于两条链路处理数据导致数据不一致等一些列问题所以才有了
Kappa 架构,Kappa 架构如下: Kappa 架构可以称为真正的实时数仓,目前在业界最常用实现就是
Kafka + Flink,然而基于 Kafka + Flink 的实时数仓方案也有几个非常明显的缺陷,所以在目前很多企业中实时数仓构建中经常使用混合架构,没有实现所有业务都采用
Kappa 架构中实时处理实现。Kappa架构缺陷如下:
Kafka 无法支持海量数据存储:对于海量数据量的业务线来说,Kafka 一般只能存储非常短时间的数据,比如最近一周,甚至最近一天。
Kafka无法支持高效的OLAP查询:大多数业务都希望能在 DWD、DWS 层支持即席查询的,但是
Kafka 无法非常友好地支持这样的需求。
无法复用目前已经非常成熟的基于离线数仓的数据血缘、数据质量管理体系:需要重新实现一套数据血缘、数据质量管理体系。
Kafka不支持update/upsert,目前Kafka仅支持append。
为了解决 Kappa 架构的痛点问题,业界最主流是采用“批流一体”方式,这里批流一体可以理解为批和流使用
SQL 同一处理,也可以理解为处理框架的统一,例如:Spark、Flink。但这里更重要指的是存储层上的统一,只要存储层面上做到“批流一体”就可以解决以上
Kappa 遇到的各种问题。数据湖技术可以很好的实现存储层面上的“批流一体”,这就是为什么大数据中需要数据湖的原因。
数据湖技术之Iceberg
Iceberg概念及特点
Apache Iceberg是一种用于大型数据分析场景的开放表格式(Table Format)。Iceberg使用一种类似于SQL表的高性能表格式,Iceberg格式表单表可以存储数十PB数据,适配Spark、Trino、PrestoDB、Flink和Hive等计算引擎提供高性能的读写和元数据管理功能,Iceberg是一种数据湖解决方案。
注意:Trino 就是原来的 PrestoSQL,2020 年 12 月 27 日,PrestoSQL
项目更名为 Trino,Presto 分成两大分支:PrestoDB、PrestorSQL。
Iceberg 非常轻量级,可以作为 lib 与 Spark、Flink 进行集成,Iceberg
官网:https://iceberg.apache.org/,Iceberg具备以下特点:
Iceberg 支持实时/批量数据写入和读取,支持 Spark、Flink 计算引擎。
Iceberg 支持事务 ACID,支持添加、删除、更新数据。
不绑定任务底层存储,支持 Parquet、ORC、Avro 格式兼容行存储和列存储。
Iceberg 支持隐藏分区和分区变更,方便业务进行数据区分策略。
Iceberg 支持快照数据重复查询,具备版本回滚功能。
Iceberg 扫描计划很快,读取表或者查询文件可以不需要分布式 SQL 引擎。
Iceberg 通过元数据来对查询进行高效过滤。
基于乐观锁的并发支持,提供多线程并发写入能力,并保证数据线性一致。
Iceberg数据存储格式
1. Iceberg术语
data files(数据文件):
数据文件是 Apache Iceberg 表真实存储数据的文件,一般是在表的数据存储目录的 data
目录下,如果我们的文件格式选择的是 parquet,那么文件是以“.parquet”结尾,例如:
00000-0-root_20211212192602_8036d31b-9598-4e30-8e67-ce6c39f034da-job_1639237002345_0025-00001.parquet
就是一个数据文件。
Iceberg 每次更新会产生多个数据文件(data files)。
Snapshot(快快照):
快照代表一张表在某个时刻的状态。每个快照里面会列出表在某个时刻的所有 data files 列表。data
files是存储在不同的 manifest files 里面,manifest files 是存储在一个
manifest list 文件里面,而一个Manifest list 文件代表一个快照。
Manifest list(清单列表):
manifest list 是一个元数据文件,它列出构建表快照(Snapshot)的清单(Manifest
file)。这个元数据文件中存储的是 manifest file 列表,每个 manifest file
占据一行。每行中存储了 manifest file 的路径、其存储的数据文件(data files)的分区范围,增加了几个数文件、删除了几个数据文件等信息,这些信息可以用来在查询时提供过滤,加快速度。
Manifest file(清单文件):
manifest file也是一个元数据文件,它列出组成快照(Snapshot)的数据文件(data
files)的列表信息。每行都是每个数据文件的详细描述,包括数据文件的状态、文件路径、分区信息、列级别的统计信息(比如每列的最大最小值、空值数等)、文件的大小以及文件里面数据行数等信息。其中列级别的统计信息可以在扫描表数据时过滤掉不必要的文件。
manifest file 是以 avro 格式进行存储的,以“.avro”后缀结尾,例如:8138fce4-40f7-41d7-82a5-922274d2abba-m0.avro。
2. 表格式
Apache Iceberg 作为一款数据湖解决方案,是一种用于大型分析数据集的开放表格式(Table
Format),表格式可以理解为元数据及数据文件的一种组织方式。Iceberg 底层数据存储可以对接
HDFS,S3 文件系统,并支持多种文件格式,处于计算框架(Spark、Flink)之下,数据文件之上。 下面介绍下
Iceberg 底层文件组织方式,下图是 Iceberg 中表格式,s0、s1 代表的是表 Snapshot
信息,每个表示当前操作的一个快照,每次 commit 都会生成一个快照 Snapshot,每个 Snapshot
快照对应一个manifest list 元数据文件,每个 manifest list 中包含多个 manifest
元数据文件,manifest 中记录了当前操作生成数据所对应的文件地址,也就是 data file
的地址。
基于 snapshot 的管理方式,Iceberg 能够获取表历史版本数据、对表增量读取操作,data
files 存储支持不同的文件格式,目前支持 parquet、ORC、Avro 格式。 Iceberg特点详述
1. Iceberg分区与隐藏分区(Hidden Partition)
Iceberg 支持分区来加快数据查询,在 Iceberg 中设置分区后,可以在写入数据时将相似的行分组,在查询时加快查询速度。Iceberg
中可以按照年、月、日和小时粒度划分时间戳组织分区。
在 Hive 中也支持分区,但是要想使分区能加快查询速度,需要在写 SQL 时指定对应的分区条件过滤数据,在
Iceberg 中写 SQL 查询时不需要再在 SQL 中特别指定分区过滤条件,Iceberg 会自动分区,过滤掉不需要的数据。
在 Iceberg 中分区信息可以被隐藏起来,Iceberg 的分区字段可以通过一个字段计算出来,在建表或者修改分区策略之后,新的数据会自动计算所属的分区,在查询时同样不用关心表的分区是什么字段,只需要关心业务逻辑,Iceberg
会自动过滤不需要的分区数据。
正是由于 Iceberg 的分区信息和表数据存储目录是独立的,使得 Iceberg 的表分区可以被修改,而且不会涉及到数据迁移。
2. Iceberg表演化(Table Evolution)
在 Hive 分区中,如果把一个按照天分区的表改成按小时分区,那么没有办法在原有表上进行修改,需要创建一个按照小时分区的表,然后把数据加载到此表中。
Iceberg 支持就地表演化,可以通过 SQL 的方式进行表级别模式演化。例如:更改表分区布局。Iceberg
进行以上操作时,代价极低,不存在读出数据重新写入或者迁移数据这种费时费力的操作。
3. 模式演化(Schema Evolution)
Iceberg 支持一些几种 Schema 的演化:
Add:向表或者嵌套结构增加列
Drop:从表或者嵌套结构中移除列
Rename:重命名表中或者嵌套结构中的列
Update:将复杂结构(Struct、Map<Key, Value>, list) 中的基本类型扩展类型长度,比如:tinyint
修改成 int。
Reorder:改变列的顺序,也可以改变嵌套结构中字段的排列顺序。
注意:
Iceberg Schema 的改变只是元数据的操作改变,不会涉及到重写数据文件。Map 结构类型不支持
Add 和 Drop 字段。
Iceberg 保证 Schema 演化是没有副作用的独立操作,不会涉及到重写数据文件,具体如下:
增加列时不会从另一个列中读取已存在的数据;
删除列或者嵌套结构中的字段时,不会改变任何其他列的值;
更新列或者嵌套结构中的字段时,不会改变任何其他列的值;
改变列或者嵌套结构中的字段顺序时,不会改变相关联的值。
Iceberg 实现以上的原因使用唯一的 id 来追踪表中的每一列,当添加一个列时,会分配新的 ID,因此列对应的数据不会被错误使用。
4. 分区演化(Partition Evolution)
Iceberg 分区可以在现有表中更新,因为 Iceberg 查询流程并不和分区信息直接关联。
当我们改变一个表的分区策略时,对应修改分区之前的数据不会改变,依然会采用老的分区策略,新的数据会采用新的分区策略,也就是说同一个表会有两种分区策略,旧数据采用旧分区策略,新数据采用新分区策略,在元数据里两个分区策略相互独立,不重合。
因此,在我们写 SQL 进行数据查询时,如果存在跨分区策略的情况,则会解析成两个不同执行计划,如 Iceberg
官网提供图所示: 图中 booking_table 表 2008 年按月分区,进入 2009 年后改为按天分区,这两种分区策略共存于该表中。得益于
Iceberg 的隐藏分区(Hidden Partition),针对上图中的 SQL 查询,不需要在
SQL 中特别指定指定分区过滤条件(是按照月还是按照天),Iceberg 会自动分区,过滤掉不需要的数据。
5. 列顺序演化(Sort Order Evolution)
Iceberg 可以在一个已存在的表上修改排序策略。修改了排序策略之后,旧数据依旧采用老排序策略不变。往
Iceberg 里写数据的计算引擎总是会选择最新的排序策略,但是当排序的代价极其高昂的时候,就不进行排序了。
Iceberg数据类型
Iceberg表支持以下数据类型:
Hive 与 Iceberg 整合
Iceberg 就是一种表格式,支持使用 Hive 对 Iceberg 进行读写操作,但是对 Hive
的版本有要求,如下:
这里基于 Hive 3.1.2 版本进行 Hive 操作 Iceberg 表。
1. 开启Hive支持Iceberg
下载 iceberg-hive-runtime.jar:想要使用Hive支持查询Iceberg表,首先需要下载“iceberg-hive-runtime.jar”,Hive
通过该 Jar 可以加载 Hive 或者更新 Iceberg 表元数据信息。下载地址:https://iceberg.apache.org/releases/: 将以上
jar 包下载,并上传到 Hive 服务端和客户端对应的 lib 目录下。另外再向 Hive 中 Iceberg
格式表插入数据时需要用到“libfb303-0.9.3.jar”,将此包也上传到 Hive 服务端和客户端
lib 目录下。
② 配置hive-site.xml文件:添加如下配置
<property>
<name>iceberg.engine.hive.enabled</name>
<value>true</value>
</property>
|
2. Hive中操作Iceberg表格式
在 node03 启动 Hive 的 MetaStore 服务
hive --service metastore &
|
从 Hive 引擎的角度来看,在运行环境中有 Catalog 概念(catalog主要描述了数据集的位置信息,就是元数据),Hive
与 Iceberg 整合时,Iceberg 支持多种不同的 Catalog 类型,例如:Hive、Hadoop、第三方厂商的
AWS Glue 和自定义 Catalog。
在实际应用场景中,Hive 可能使用上述任意 Catalog,甚至跨不同 Catalog 类型 join
数据,为此 Hive 提供了 org.apache.iceberg.mr.hive.HiveIcebergStorageHandler(位于包
iceberg-hive-runtime.jar)来支持读写 Iceberg 表,并通过在 Hive
中设置 “iceberg.catalog.<catalog_name>.type” 属性来决定加载
Iceberg 表的方式,该属性可以配置:hive、hadoop,其中“<catalog_name>”是自己随便定义的名称,主要是在
hive 中创建 Iceberg 格式表时配置 iceberg.catalog 属性使用。
在 Hive 中创建 Iceberg 格式表时,根据创建 Iceberg 格式表时是否指定 iceberg.catalog
属性值,有以下三种方式决定 Iceberg 格式表如何加载(数据存储在什么位置)。
① 如果没有设置 iceberg.catalog 属性,默认使用 HiveCatalog 来加载:这种方式就是说如果在Hive中创建Iceberg格式表时,不指定iceberg.catalog属性,那么数据存储在对应的hive
warehouse路径下。在 Hive 客户端node3节点进入Hive,操作如下:
create table test_iceberg_tbl1(
id int,
name string,
age int
) partitioned by (dt string)
stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
hive (datalake)> add jar /bigdata/install/hive-3.1.2/lib/iceberg-hive-runtime-0.13.2.jar;
hive (datalake)> add jar /bigdata/install/hive-3.1.2/lib/libfb303-0.9.3.jar;
hive (datalake)> insert into test_iceberg_tbl1 values (1, 'zhangsan', 32, '20220706');
hive (datalake)> select * from test_iceberg_tbl1;
OK
test_iceberg_tbl1.id test_iceberg_tbl1.name test_iceberg_tbl1.age test_iceberg_tbl1.dt
1 zhangsan 32 20220706
Time taken: 0.707 seconds, Fetched: 1 row(s)
|
在 Hive 默认的 warehouse 目录下可以看到创建的表目录: ② 如果设置了 iceberg.catalog
对应的 catalog 名字,就用对应类型的 catalog 加载:
这种情况就是说在 Hive 中创建 Iceberg 格式表时,如果指定了 iceberg.catalog
属性值,那么数据存储在指定的 catalog 名称对应配置的目录下。在 Hive 客户端node3节点进入Hive,操作如下:
hive (datalake)> set iceberg.catalog.another_hive.type = hive;
create table test_iceberg_tbl2(
id int,
name string,
age int
) partitioned by (dt string)
stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
tblproperties('iceberg.catalog'='another_hive');
hive (datalake)> insert into test_iceberg_tbl2 values(2, 'lisi', 14, '20220706');
hive (datalake)> select * from test_iceberg_tbl2;
OK
test_iceberg_tbl2.id test_iceberg_tbl2.name test_iceberg_tbl2.age test_iceberg_tbl2.dt
2 lisi 14 20220706
Time taken: 0.371 seconds, Fetched: 1 row(s)
|
以上方式指定 “iceberg.catalog.another_hive.type=hive” 后,实际上就是使用的
hive 的 catalog,这种方式与第一种方式不设置效果一样,创建后的表存储在 hive 默认的
warehouse 目录下。也可以在建表时指定 location 写上路径,将数据存储在自定义对应路径上。 除了可以将
catalog 类型指定成 hive 之外,还可以指定成 hadoop,在 Hive 中创建对应的
iceberg 格式表时需要指定 location 来指定 iceberg 数据存储的具体位置,这个位置是
具有一定格式规范 的自定义路径。在 Hive 客户端node3节点进入Hive,操作如下:
hive (datalake)> set iceberg.catalog.hadoop.type = hadoop;
hive (datalake)> set iceberg.catalog.hadoop.warehouse=hdfs:
create external table test_iceberg_tbl3(
id int,
name string,
age int
) partitioned by (dt string)
stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
location 'hdfs://node01:8020/iceberg_data/datalake/test_iceberg_tbl3'
tblproperties('iceberg.catalog'='hadoop');
|
注意:以上 location 指定的路径必须是 “iceberg.catalog.hadoop.warehouse”
指定路径的子路径,格式必须是 ${iceberg.catalog.hadoop.warehouse}/${当前建表使用的hive库}/${创建的当前iceberg表名}。
hive (datalake)> insert into test_iceberg_tbl3 values (3, "wangwu", 35, "20220706");
hive (datalake)> select * from test_iceberg_tbl3;
OK
test_iceberg_tbl3.id test_iceberg_tbl3.name test_iceberg_tbl3.age test_iceberg_tbl3.dt
3 wangwu 35 20220706
Time taken: 0.402 seconds, Fetched: 1 row(s)
|
在指定的“iceberg.catalog.hadoop.warehouse”路径下可以看到创建的表目录: 如果
iceberg.catalog 属性设置为 “location_based_table”,可以从指定的根路径下加载
Iceberg 表。这种情况就是说如果 HDFS 中已经存在 iceberg 格式表,我们可以通过在
Hive 中创建 Icerberg 格式表指定对应的 location 路径映射数据。在Hive客户端中操作如下:
create table test_iceberg_tbl4(
id int,
name string,
age int,
dt string
) stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
location 'hdfs://node01:8020/spark/person'
tblproperties('iceberg.catalog'='location_based_table');
|
注意:指定的 location 路径下必须是 iceberg 格式表数据,并且需要有元数据目录才可以。不能将其他数据映射到
Hive iceberg 格式表。
由于 Hive 建表语句分区语法 “Partitioned by” 的限制,如果使用 Hive 创建
Iceberg 格式表,目前只能按照 Hive 语法来写,底层转换成 Iceberg 标识分区,这种情况下不能使用
Iceberge 的分区转换,例如:days(timestamp),如果想要使用 Iceberg 格式表的分区转换标识分区,需要使用
Spark 或者 Flink 引擎创建表。
Iceberg表数据组织结构
1. 在 Hive 中创建 Iceberg 表并插入数据
create table test_iceberg_tbl(
id int,
name string,
age int
) partitioned by (dt string)
stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
hive (datalake)> insert into test_iceberg_tbl values (1, 'zs', 13, '20220706');
hive (datalake)> insert into test_iceberg_tbl values (2, 'ls', 24, '20220706');
hive (datalake)> insert into test_iceberg_tbl values (3, 'ww', 35, '20220707');
hive (datalake)> insert into test_iceberg_tbl values (4, 'zl', 66, '20220707');
hive (datalake)> insert into test_iceberg_tbl values (5, 'tq', 47, '20220707');
hive (datalake)> select * from test_iceberg_tbl;
OK
test_iceberg_tbl.id test_iceberg_tbl.name test_iceberg_tbl.age test_iceberg_tbl.dt
5 tq 47 20220707
1 zs 13 20220706
3 ww 35 20220707
4 zl 66 20220707
2 ls 24 20220706
Time taken: 0.382 seconds, Fetched: 5 row(s)
|
2. 查看 Iceberg 底层数据存储
从 HDFS 下载上述目录文件并查看
hdfs dfs -get /user/hive/warehouse/datalake.db/test_iceberg_tbl
tree test_iceberg_tbl/
|
通过上图我们可以看到有 5 个 Snapshot 快照,以上 5 个 Snapshot 实际上就是对应了 5 个 Manifest
list 清单列表。
Iceberg表数据查询
我们可以通过 avro-tools.jar 来查看 avro 数据内容,下载地址:https://mvnrepository.com/artifact/org.apache.avro/avro-tools,这里选择
avro-tools-1.11.0.jar。
使用:
java -jar /bigdata/soft/avro-tools-1.11.0.jar tojson snap-*.avro
|
1. 查询最新快照数据
查询最新快照数据原理如下图所示: 查询 Iceberg 表数据时,首先获取最新的 metadata 信息,这里先获取到
00005-0e6ad893-fd11-49e2-9dce-fd1d1d9091ea.metadata.json
元数据信息,解析当前元数据文件可以拿到当前表的快照id:“3196813301730135187”
以及这张表的所有快照信息,也就是 json 信息中 snapshots 数组对应的值。 根据当前表的快照
id 值可以获取对应的 snapshot 对应的 avro 文件信息:snap-3196813301730135187-1-1a71733b-232d-409d-9846-a626cb44593f.avro。我们可以找到当前快照对应的路径,看到其包含的Manifest
清单文件有5个: 读取该Iceberg格式表最新数据就是读取这几个文件中描述对应的parquet数据文件即可。 我们可以看到快照文件中不仅包含了
manifest 路径信息,还包含以 added_data_files_count、existing_data_files_count、deleted_data_files_count
等属性信息,Iceberg 根据 deleted_data_files_count 大于 0 来判断对应的
manifest 清单文件里面是不是被删除的数据,如果一个 manifest 清单文件该值大于 0
代表数据删除,读数据时就无需读这个 manifest 清单文件对应的数据文件。
根据 Manifest list 找到了各个对应的 manifest 清单文件,每个文件中描述了对应
parquet 文件存储的位置信息,可以看到在对应的avro文件中有“status”属性,该属性为
1 代表对应的 parquet 文件为新增文件,需要读取,为 2 代表parquet文件被删除。
2. 查询指定快照数据
Apache Iceberg 支持查询历史上任何时刻的快照,在查询时需要指定 snapshot-id
属性即可,这个只能通过 Spark/Flink 来查询实现,例如在 Spark 中查询某个快照数据如下:
spark.read.option("snapshot-id", 4259769591722561320L).format("iceberg").load("path")
|
查询指定快照数据的原理如下图所示: 通过上图可以看出,实际上读取历史快照数据和读取最新数据不同之处就是找到的
snapshot-id 不同而已,原理都是一样。
3. 根据时间戳查看某个快照的数据
Apache iceberg 还支持通过 as-of-timestamp 参数执行时间戳来读取某个快照的数据,同样也是通过
Spark/Flink 来读取,Spark 读取代码如下:
spark.read.option("as-of-timestamp", "时间戳").format("iceberg").load("path")
|
实际上通过时间戳找到对应数据文件的原理与通过 snapshot-id 找到数据文件原理一样,在 *.metadata.json
文件中,除了有“current-snapshot-id”、“snapshots”属性外还有“snapshot-log”属性,该属性对应的值如下: 我们可以看到其中有个
timestamp-ms 属性和 snapshot-id 属性,并且是按照 timestamp-ms
升序的。在 Iceberg 内部实现中,它会将 as-of-timestamp 指定的时间和 snapshot-log
数组里面每个元素的 timestamp-ms 进行比较,找出最后一个满足 timestamp-ms
<= as-of-timestamp 对应的 snapshot-id,原理同上,通过snapshot-id再找到要读取的数据文件。
|