编辑推荐: |
本文主要介绍字节跳动在
Bucket 方面的优化。
本文来自微信公众号过往记忆大数据,由火龙果软件Linda编辑、推荐。
|
|
本文主要从以下四个方面介绍:
Spark SQL 在字节跳动的应用
什么是分桶
Spark 分桶的限制
字节跳动在分桶方面的优化

下面是 Spark SQL 在字节跳动的应用。
2016年主要是小规模的测试阶段
2017年用于处理 Ad-hoc 工作负载
2018年在生产环境下处理少量的 ETL 管道工作;
2019年在生产环境下全面部署;
2020年成为 DW 领域的主要计算引擎。

什么是分桶

上面例子展示了创建分桶表的方法。主要关键字是 clustered by (xxx) sorted
by (xxx) into N buckets
如果我们往分桶表里面插入数据,可以如下使用
INSERT INTO order
SELECT order_id, user_id, product, amount
FROM order_staging
可见,这个和正常表的使用并没有什么区别。

如果我们进行一个 ShuffleHashJoin 的时候,首先需要将表的数据按照 on 的条件进行分区,然后才是进行
Join 操作。 
但是如果参与 Join 的表已经实现分桶了,那么在执行 ShuffleHashJoin 的时候省去
Shuffle 的操作。比如上面的例子如果我们对 order 和 user 表按照 user_id
字段进行分桶,那么在 ShuffleHashJoin 的时候就不需要进行 Exchange 操作了。

对于 SortMergeJoin ,需要对 on 里面的条件字段进行 Exchange 操作,然后再进行
Sort 操作,最后才是执行 SortMergeJoin(更多关于 Join 的策略可以参见过往记忆大数据的《每个
Spark 工程师都应该知道的五种 Join 策略》文章)。 
如果参与 Join 的表已经分桶了,那么不需要就行 Exchange 和 Sort 操作了。
Spark 分桶的限制
小文件问题 
执行上面的 SQL,每个 task 最多可能产生 1024 个文件,其中 1024 是分桶的数量。所以如果我们有
M 个 task,那么最多产生的文件个数为 M * 1024。比如上面的 attempt_20200519145628_0014_m_000014_0
目录下产生了 988 个文件。
解决小文件的问题可以加上 DISTRIBUTE BY ,如下:
INSERT INTO order SELECT order_id, user_id, product,
amount
FROM order_staging
DISTRIBUTE BY user_id
如果 1024 是 M 的倍数,那么最多会产生 1024 个文件,其中 M = spark.sql.shuffle.partitions;
如果 M 是 1024 的倍数,那么最多会产生 M 个文件,其中 M = spark.sql.shuffle.partitions。
Spark 分桶和其他 SQL 引擎不兼容 
Spark 的分桶和 Hive 的分桶是不兼容的,同时和 Presto 也是不兼容的;但是 Presto
与 Hive 的分桶是兼容的。
Spark 的分桶和 Hive 不兼容主要原因是以下原因导致的:
Hive 在生成分桶的时候会额外进行一个 Reduce 操作,以保证相同分桶的数据都存储在一个文件中。而
Spark SQL 在写分桶文件时不需要 Shuffle 操作,这样就会导致每个分桶最多产生 M 个文件,这就导致上面说的小文件问题;
Spark 分桶和 Hive 分桶采用不同的 hash 算法。Hive 用的是 HiveHash;而
Spark 用的是 Murmur3,所以数据的分布是不一样的。

因为 Spark 和 Hive 分桶不兼容,所以当 Spark 的分桶表和 Hive 的分桶表进行
SortMergeJoin 的时候是需要进行 Sort 和 Exchange 操作的。
额外的排序操作

因为 Spark SQL 表中的每个分桶里面最少包含一个文件,所以在进行 Join 之前需要进行额外的排序操作。
分桶数不对齐 
如果参与 Join 的表分桶数不一致,那么其中一张表需要进行额外的 Exchange 操作。
参与 Join 的 key 和分桶列不一样需要额外操作

当参与 Join 的 key 和分桶的列不一样时,需要额外的 Exchange 操作。

上面的例子尽管参与 Join 的表都是对 user_id 字段进行分桶,并且分桶数一样,但是还是需要额外的
Exchange 操作。
字节跳动在分桶方面的优化
Spark 分桶和 Hive 分桶对齐

前面介绍了 Spark 和 Hive 分桶不兼容,对于这方面,字节跳动将 Hive 分桶表和 Spark
分桶表进行了对齐,主要包括:

Spark SQL 写 Hive 分桶表的逻辑和 Hive 一致。 重写了 InsertIntoHiveTable#requiredOrdering
和 InsertIntoHiveTable#requiredDistribution,并且也使用了
HiveHash 算法。

对于读方面,重写了 HiveTableScanExec#outputPartitioning 和
HiveTableScanExec#outputOrdering,使用了 HiveHash 算法,并且使用了
Hive 的分桶元数据。 
上面是 Spark 读取 Hive 分桶表改进前和改进后的区别。可以看到,改进后,outputPartitioning
为 HashPartitioning,并且 outputOrdering 为 SortOrder,满足了
requireChildDistribution 为 HashClusteredDistribution的要求以及requireChildOrdering
为 SortOrder,从而在进行 SortMergeJoin 的时候省去了 Exchange 和
Sort 操作。
One to Mange Bucket Join
另一个改进是 One to Merge Bucket Join,比如下面例子 A 表有三个分桶,B
表有六个分桶。

如果我们在 Spark 对上面两张表进行 Join 操作,B 表需要额外的 Sort 操作,因为上面两张表的分桶数不一样。但是在字节公司,由于对性能的要求,需要避免
Sort 操作。
一种方法是将 A 表的分桶 0 和 B 表的分桶 0 、分桶 3 进行关联;将 A 表的分桶 1 和
B 表的分桶 1 、分桶 4 进行关联;将 A 表的分桶 2 和 B 表的分桶 2 、分桶 5 进行关联。我们只需要将
A 表复制一份,这样 A 表也满足 6 个分桶。将 A 表和 A 表进行 Union 可以产生 到
6 个分桶的新表,但是 Spark 自带的 Union 操作之后 outputPartitioning
和 outputOrdering 将被删除,所以字节自己开发出 bucket union,使得 outputPartitioning
和 outputOrdering 被保留,这样就可以省去 Sort 和 Exchange 操作。

不过上面的方面在 B left join A 、B left semi join A、B anti
join A、B inner join A 可以正常工作,但是在 B right join A、B
full outer join A、B cross join A 的时候结果有重复,因为 A 表的数据被扫描了两次。 
为了解决这个问题,在 TableScan 后面加上了 hash(10) % buckets = bucket
id 的过滤条件,比如 bucket 0 将会把 3、9、15 过滤掉,通过这种办法将会消除重复数据。 
字节的另外一个优化是如果 Join 的 Key 不仅仅是分桶的 Key,原生的 Spark 会产生额外的
Exchange 和 Sort 操作。

通过优化后,Exchange 将消除。

|