您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center   Code  
会员   
   
 
     
   
 订阅
  捐助
Spark SQL 分桶表在字节跳动的优化
 
 
   次浏览      
2021-11-29
 
编辑推荐:
本文主要介绍字节跳动在 Bucket 方面的优化。
本文来自微信公众号过往记忆大数据,由火龙果软件Linda编辑、推荐。

本文主要从以下四个方面介绍:

Spark SQL 在字节跳动的应用

什么是分桶

Spark 分桶的限制

字节跳动在分桶方面的优化

下面是 Spark SQL 在字节跳动的应用。

2016年主要是小规模的测试阶段

2017年用于处理 Ad-hoc 工作负载

2018年在生产环境下处理少量的 ETL 管道工作;

2019年在生产环境下全面部署;

2020年成为 DW 领域的主要计算引擎。

什么是分桶

上面例子展示了创建分桶表的方法。主要关键字是 clustered by (xxx) sorted by (xxx) into N buckets

如果我们往分桶表里面插入数据,可以如下使用

INSERT INTO order

SELECT order_id, user_id, product, amount

FROM order_staging

可见,这个和正常表的使用并没有什么区别。

如果我们进行一个 ShuffleHashJoin 的时候,首先需要将表的数据按照 on 的条件进行分区,然后才是进行 Join 操作。

但是如果参与 Join 的表已经实现分桶了,那么在执行 ShuffleHashJoin 的时候省去 Shuffle 的操作。比如上面的例子如果我们对 order 和 user 表按照 user_id 字段进行分桶,那么在 ShuffleHashJoin 的时候就不需要进行 Exchange 操作了。

对于 SortMergeJoin ,需要对 on 里面的条件字段进行 Exchange 操作,然后再进行 Sort 操作,最后才是执行 SortMergeJoin(更多关于 Join 的策略可以参见过往记忆大数据的《每个 Spark 工程师都应该知道的五种 Join 策略》文章)。

如果参与 Join 的表已经分桶了,那么不需要就行 Exchange 和 Sort 操作了。

Spark 分桶的限制

小文件问题

执行上面的 SQL,每个 task 最多可能产生 1024 个文件,其中 1024 是分桶的数量。所以如果我们有 M 个 task,那么最多产生的文件个数为 M * 1024。比如上面的 attempt_20200519145628_0014_m_000014_0 目录下产生了 988 个文件。

解决小文件的问题可以加上 DISTRIBUTE BY ,如下:

INSERT INTO order SELECT order_id, user_id, product, amount

FROM order_staging

DISTRIBUTE BY user_id

如果 1024 是 M 的倍数,那么最多会产生 1024 个文件,其中 M = spark.sql.shuffle.partitions;

如果 M 是 1024 的倍数,那么最多会产生 M 个文件,其中 M = spark.sql.shuffle.partitions。

Spark 分桶和其他 SQL 引擎不兼容

Spark 的分桶和 Hive 的分桶是不兼容的,同时和 Presto 也是不兼容的;但是 Presto 与 Hive 的分桶是兼容的。

Spark 的分桶和 Hive 不兼容主要原因是以下原因导致的:

Hive 在生成分桶的时候会额外进行一个 Reduce 操作,以保证相同分桶的数据都存储在一个文件中。而 Spark SQL 在写分桶文件时不需要 Shuffle 操作,这样就会导致每个分桶最多产生 M 个文件,这就导致上面说的小文件问题;

Spark 分桶和 Hive 分桶采用不同的 hash 算法。Hive 用的是 HiveHash;而 Spark 用的是 Murmur3,所以数据的分布是不一样的。

因为 Spark 和 Hive 分桶不兼容,所以当 Spark 的分桶表和 Hive 的分桶表进行 SortMergeJoin 的时候是需要进行 Sort 和 Exchange 操作的。

额外的排序操作

因为 Spark SQL 表中的每个分桶里面最少包含一个文件,所以在进行 Join 之前需要进行额外的排序操作。

分桶数不对齐

如果参与 Join 的表分桶数不一致,那么其中一张表需要进行额外的 Exchange 操作。

参与 Join 的 key 和分桶列不一样需要额外操作

当参与 Join 的 key 和分桶的列不一样时,需要额外的 Exchange 操作。

上面的例子尽管参与 Join 的表都是对 user_id 字段进行分桶,并且分桶数一样,但是还是需要额外的 Exchange 操作。

字节跳动在分桶方面的优化

Spark 分桶和 Hive 分桶对齐

前面介绍了 Spark 和 Hive 分桶不兼容,对于这方面,字节跳动将 Hive 分桶表和 Spark 分桶表进行了对齐,主要包括:

Spark SQL 写 Hive 分桶表的逻辑和 Hive 一致。 重写了 InsertIntoHiveTable#requiredOrdering 和 InsertIntoHiveTable#requiredDistribution,并且也使用了 HiveHash 算法。

对于读方面,重写了 HiveTableScanExec#outputPartitioning 和 HiveTableScanExec#outputOrdering,使用了 HiveHash 算法,并且使用了 Hive 的分桶元数据。

上面是 Spark 读取 Hive 分桶表改进前和改进后的区别。可以看到,改进后,outputPartitioning 为 HashPartitioning,并且 outputOrdering 为 SortOrder,满足了 requireChildDistribution 为 HashClusteredDistribution的要求以及requireChildOrdering 为 SortOrder,从而在进行 SortMergeJoin 的时候省去了 Exchange 和 Sort 操作。

One to Mange Bucket Join

另一个改进是 One to Merge Bucket Join,比如下面例子 A 表有三个分桶,B 表有六个分桶。

如果我们在 Spark 对上面两张表进行 Join 操作,B 表需要额外的 Sort 操作,因为上面两张表的分桶数不一样。但是在字节公司,由于对性能的要求,需要避免 Sort 操作。

一种方法是将 A 表的分桶 0 和 B 表的分桶 0 、分桶 3 进行关联;将 A 表的分桶 1 和 B 表的分桶 1 、分桶 4 进行关联;将 A 表的分桶 2 和 B 表的分桶 2 、分桶 5 进行关联。我们只需要将 A 表复制一份,这样 A 表也满足 6 个分桶。将 A 表和 A 表进行 Union 可以产生 到 6 个分桶的新表,但是 Spark 自带的 Union 操作之后 outputPartitioning 和 outputOrdering 将被删除,所以字节自己开发出 bucket union,使得 outputPartitioning 和 outputOrdering 被保留,这样就可以省去 Sort 和 Exchange 操作。

不过上面的方面在 B left join A 、B left semi join A、B anti join A、B inner join A 可以正常工作,但是在 B right join A、B full outer join A、B cross join A 的时候结果有重复,因为 A 表的数据被扫描了两次。

为了解决这个问题,在 TableScan 后面加上了 hash(10) % buckets = bucket id 的过滤条件,比如 bucket 0 将会把 3、9、15 过滤掉,通过这种办法将会消除重复数据。

字节的另外一个优化是如果 Join 的 Key 不仅仅是分桶的 Key,原生的 Spark 会产生额外的 Exchange 和 Sort 操作。

通过优化后,Exchange 将消除。

 

 

   
次浏览       
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训
最新活动计划
LLM大模型应用与项目构建 12-26[特惠]
QT应用开发 11-21[线上]
C++高级编程 11-27[北京]
业务建模&领域驱动设计 11-15[北京]
用户研究与用户建模 11-21[北京]
SysML和EA进行系统设计建模 11-28[北京]
 
最新文章
InfluxDB概念和基本操作
InfluxDB TSM存储引擎之数据写入
深度漫谈数据系统架构——Lambda architecture
Lambda架构实践
InfluxDB TSM存储引擎之数据读取
最新课程
Oracle数据库性能优化、架构设计和运行维护
并发、大容量、高性能数据库设计与优化
NoSQL数据库(原理、应用、最佳实践)
企业级Hadoop大数据处理最佳实践
Oracle数据库性能优化最佳实践
更多...   
成功案例
某金融公司 Mysql集群与性能优化
北京 并发、大容量、高性能数据库设计与优化
知名某信息通信公司 NoSQL缓存数据库技术
北京 oracle数据库SQL优化
中国移动 IaaS云平台-主流数据库及存储技术
更多...