编辑推荐: |
工作中使用到了hive,mysql等数据库,不同的数据库有不同的应用场景,该如何正确的选择数据存储与处理方式,需要了解底层原理,才能少走弯路,本文主要是记录一下hive的实现原理以及一些对应的概念。
本文来自知乎,由火龙果软件Anna编辑、推荐。 |
|
Front
在开始了解hive之前,需要了解一些知识或者概念,可以更好的理解hive实现原理
MapReduce
Google MapReduce是Google基于函数式编程map(映射),reduce(化简)提出的一种分布式编程模型,在模型中隐藏了分布式集群的实现细节,交由框架底层进行实现,能够使程序员在不了解分布式并行编程的情况下,将自己书写的程序在分布式系统上运行
编程模型
Map: 将输入的一对键值对转换为一组中间键值对 (k1,v1) -> list(k2,v2)
Reduce: 将所有键相同的中间键值对合并,得到关于那个键的结果 (k2,list(v2)) ->
(k2,v3)
举个栗子
以一个很简单的WordCount为例子,假设给定大量数据的文档,计算其中每个单词出现的次数,下面是伪代码
map(String key,String
value,Context context){
// key: document name
// value: document contents
String[] words = value.split(separator);
for(String word:words){
context.write(word,1);
}
}
reduce(String word,Iterable<Integer>
values,Context context){
int sum = 0;
for(Integer value:values){
sum += value;
}
con.write(word,sum);
} |
更多的栗子
计算 URL 访问频率:Map 函数处理日志中 web 页面请求的记录,然后输出(URL,1)。Reduce
函数把相同URL 的 value 值都累加起来,产生(URL,记录总数)结果。
倒转网络链接图:Map 函数在源页面(source)中搜索所有的链接目标(target)并输出为(target,source)。
Reduce 函数把给定链接目标(target)的链接组合成一个列表,输出(target,list(source))。
倒排索引:Map 函数分析每个文档输出一个(词,文档号)的列表,Reduce 函数的输入是一个给定词的所有
(词,文档号),排序所有的文档号,输出(词,list(文档号))。所有的输出集合形成一个简单的倒排索引,它
以一种简单的算法跟踪词在文档中的位置。
每个主机的检索词向量:检索词向量用一个(词,频率)列表来概述出现在文档或文档集中的最重要的一些 词。Map
函数为每一个输入文档输出(主机名,检索词向量),其中主机名来自文档的 URL。Reduce 函数接收给
定主机的所有文档的检索词向量,并把这些检索词向量加在一起,丢弃掉低频的检索词,输出一个最终的(主机名,检索词向量)。
系统实现
首先,用户通过 MapReduce 客户端指定 Map 函数和 Reduce 函数,以及此次 MapReduce
计算的配置,包括中间结果键值对的 Partition 数量 R 以及用于切分中间结果的哈希函数 hash
。 用户开始 MapReduce 计算后,整个 MapReduce 计算的流程可总结如下:
作为输入的文件会被分为 M 个 Split,每个 Split 的大小通常在 16~64 MB 之间
如此,整个 MapReduce 计算包含 M 个Map 任务和 R 个 Reduce 任务。Master
结点会从空闲的 Worker 结点中进行选取并为其分配 Map 任务和 Reduce 任务
收到 Map 任务的 Worker 们(又称 Mapper)开始读入自己对应的 Split,将读入的内容解析为输入键值对并调用由用户定义的
Map 函数。由 Map 函数产生的中间结果键值对会被暂时存放在缓冲内存区中
在 Map 阶段进行的同时,Mapper 们周期性地将放置在缓冲区中的中间结果存入到自己的本地磁盘中,同时根据用户指定的
Partition 函数(默认为 hash(key) mod R)将产生的中间结果分为 R 个部分。任务完成时,Mapper
便会将中间结果在其本地磁盘上的存放位置报告给 Master
Mapper 上报的中间结果存放位置会被 Master 转发给 Reducer。当 Reducer
接收到这些信息后便会通过 RPC 读取存储在 Mapper 本地磁盘上属于对应 Partition
的中间结果。在读取完毕后,Reducer 会对读取到的数据进行排序以令拥有相同键的键值对能够连续分布
之后,Reducer 会为每个键收集与其关联的值的集合,并以之调用用户定义的 Reduce 函数。Reduce
函数的结果会被放入到对应的 Reduce Partition 结果文件
实际上,在一个 MapReduce 集群中,Master 会记录每一个 Map 和 Reduce 任务的当前完成状态,以及所分配的
Worker。除此之外,Master 还负责将 Mapper 产生的中间结果文件的位置和大小转发给
Reducer。
值得注意的是,每次 MapReduce 任务执行时, M 和 R 的值都应比集群中的 Worker
数量要高得多,以达成集群内负载均衡的效果。
列式存储
什么是列式存储
传统事务型数据库通常采用行式存储。以下图为例,所有的列依次排列构成一行,以行为单位存储,再配合以 B+
树作为索引,就能快速通过主键找到相应的行数据。
行式存储对于 OLTP(联机事务处理) 场景是很自然的:大多数操作都以实体(entity)为单位,即大多为增删改查一整行记录,显然把一行数据存在物理上相邻的位置是个很好的选择。
然而,对于 OLAP (联机分析处理)场景,一个典型的查询需要遍历整个表,进行分组、排序、聚合等操作,这样一来按行存储的优势就不复存在了。更糟糕的是,分析型
SQL 常常不会用到所有的列,而仅仅对其中某些感兴趣的列做运算,那一行中那些无关的列也不得不参与扫描。
列式存储就是为这样的需求设计的。如下图所示,同一列的数据被一个接一个紧挨着存放在一起,表的每列构成一个长数组。
显然,列式存储对于 OLTP 不友好,一行数据的写入需要同时修改多个列。但对 OLAP 场景有着很大的优势:
当查询语句只涉及部分列时,只需要扫描相关的列
每一列的数据都是相同类型的,彼此间相关性更大,对列数据压缩的效率较高
列式存储与分布式文件系统
在现代的大数据架构中,GFS、HDFS 等分布式文件系统已经成为存放大规模数据集的主流方式。分布式文件系统相比单机上的磁盘,具备多副本高可用、容量大、成本低等诸多优势,但也带来了一些单机架构所没有的问题:
读写均要经过网络,吞吐量可以追平甚至超过硬盘,但是延迟要比硬盘大得多,且受网络环境影响很大。
可以进行大吞吐量的顺序读写,但随机访问性能很差,大多不支持随机写入。为了抵消网络的 overhead,通常写入都以几十
MB 为单位。 上述缺点对于重度依赖随机读写的 OLTP 场景来说是致命的。所以我们看到,很多定位于
OLAP 的列式存储选择放弃 OLTP 能力,从而能构建在分布式文件系统之上。
要想将分布式文件系统的性能发挥到极致,无非有几种方法:按块(分片)读取数据、流式读取、追加写入等。我们在后面会看到一些开源界流行的列式存储模型,将这些优化方法体现在存储格式的设计中。
列式存统储系案例
Apache ORC
Apache ORC 最初是为支持 Hive 上的 OLAP 查询开发的一种文件格式,如今在 Hadoop
生态系统中有广泛的应用。ORC 支持各种格式的字段,包括常见的 int、string 等,也包括 struct、list、map
等组合字段;字段的 meta 信息就放在 ORC 文件的尾部(这被称为自描述的)。
数据结构及索引
为分区构造索引是一种常见的优化方案,ORC 的数据结构分成以下 3 个层级,在每个层级上都有索引信息来加速查询
File Level:即一个 ORC 文件,Footer 中保存了数据的 meta 信息,还有文件数据的索引信息,例如各列数据的最大最小值(范围)、NULL
值分布、布隆过滤器等,这些信息可用来快速确定该文件是否包含要查询的数据。每个 ORC 文件中包含多个
Stripe。
Stripe Level 对应原表的一个范围分区,里面包含该分区内各列的值。每个 Stripe 也有自己的一个索引放在
footer 里,和 file-level 索引类似。
Row-Group Level :一列中的每 10000 行数据构成一个 row-group,每个
row-group 拥有自己的 row-level 索引,信息同上
ORC 里的 Stripe 就像传统数据库的页,它是 ORC 文件批量读写的基本单位。这是由于分布式储存系统的读写延迟较大,一次
IO 操作只有批量读取一定量的数据才划算。这和按页读写磁盘的思路也有共通之处。
像其他很多储存格式一样,ORC 选择将统计数据和 Metadata 放在 File 和 Stripe
的尾部而不是头部。但 ORC 在 Stripe 的读写上还有一点优化,那就是把分区粒度小于 Stripe
的结构(如 Column 和 Row-Group)的索引统一抽取出来放到 Stripe 的头部。这是因为在批处理计算中一般是把整个
Stripe 读入批量处理的,将这些索引抽取出来可以减少在批处理场景下需要的 IO(批处理读取可以跳过这一部分)。
Dremel (2010) / Apache Parquet
Dremel 是 Google 研发的用于大规模只读数据的查询系统,用于进行快速的 ad-hoc 查询,弥补
MapReduce 交互式查询能力的不足。为了避免对数据的二次拷贝,Dremel 的数据就放在原处,通常是
GFS 这样的分布式文件系统,为此需要设计一种通用的文件格式。
Dremel 的系统设计和大多 OLAP 的列式数据库并无太多创新点,但是其精巧的存储格式却变得流行起来,Apache
Parquet 就是它的开源复刻版。注意 Parquet 和 ORC 一样都是一种存储格式,而非完整的系统。
嵌套数据模型
Google 内部大量使用 Protobuf 作为跨平台、跨语言的数据序列化格式,相比 JSON 要更紧凑并具有更强的表达能力。Protobuf
不仅允许用户定义必须(required)和可选(optinal)字段,还允许用户定义 repeated
字段,意味着该字段可以出现 0~N 次,类似变长数组。
Dremel 格式的设计目的就是按列来存储 Protobuf 的数据。由于 repeated 字段的存在,这要比按列存储关系型的数据困难一些。一般的思路可能是用终止符表示每个
repeat 结束,但是考虑到数据可能很稀疏,Dremel 引入了一种更为紧凑的格式。
作为例子,下图左半边展示了数据的 schema 和 2 个 Document 的实例,右半边是序列化之后的各个列。序列化之后的列多出了
R、D 两列,分别代表 Repetition Level 和 Definition Level,通过这两个值就能确保唯一地反序列化出原本的数据。
Repetition Level 表示当前值在哪一个级别上重复。对于非 repeated 字段只要填上
trivial 值 0 即可;否则,只要这个字段可能出现重复(无论本身是 repeated 还是外层结构是
repeated),应当为 R 填上当前值在哪一层上 repeat。
举个例子说明:对于 Name.Language.Code 我们一共有三条非 NULL 的记录。
第一个是 en-us,出现在第一个 Name 的第一个 Lanuage 的第一个 Code 里面。在此之前,这三个元素是没有重复过的,都是第一次出现。所以其
R=0
第二个是 en,出现在下一个 Language 里面。也就是说 Language 是重复的元素。Name.Language.Code
中Language 排第二个,所以其 R=2
第三个是 en-gb,出现在下一个 Name 中,Name 是重复元素,排第一个,所以其 R=1
注意到 en-gb是属于第3个 Name 的而非第2个Name,为了表达这个事实,我们在 en 和
en-gb中间放了一个 R=1 的 NULL。
Definition Level 是为了说明 NULL 被定义在哪一层,也就宣告那一层的 repeat
到此为止。对于非 NULL 字段只要填上 trivial 值,即数据本身所在的 level 即可。
同样举个例子,对于 Name.Language.Country 列
us 非 NULL 值填上 Country 字段的 level 即 D=3
NULL 在 R1 内部,表示当前 Name 之内、后续所有 Language 都不含有 Country
字段。所以D为2。
NULL 在 R1 内部,表示当前 Document 之内、后续所有 Name 都不含有 Country
字段。所以D为1。
gb 非 NULL 值填上 Country 字段的 level 即 D=3
NULL 在 R2 内部,表示后续所有 Document 都不含有 Country 字段。所以D为0。
可以证明,结合 R、D 两个数值一定能唯一构建出原始数据。为了高效编解码,Dremel 在执行时首先构建出状态机,之后利用状态机处理列数据。不仅如此,状态机还会结合查询需求和数据的
structure 直接跳过无关的数据。
Hive
hive实际上是以HDFS作为存储,MapReduce作为计算引擎,YARN作为资源分配及容错机制,依托于hadoop生态系统实现的一种OLAP数据仓库,具体描述如下
Hive是一个SQL解析引擎,将SQL语句转译成MapReduce Job,然后再Hadoop平台上运行,达到快速开发的目的。
Hive中的表是纯逻辑表,就只是表的定义等,即表的元数据。本质就是Hadoop的目录/文件,达到了元数据与数据存储分离的目的
Hive本身不存储数据,它完全依赖HDFS和MapReduce。
Hive的内容是读多写少,默认不支持对数据的update和delete
Hive架构
Hive 由外部CLI,Hive Thrift Server或者Web UI提交SQL,提交至Driver中,Driver将sql解析成MapReduce执行计划,并进行逻辑优化及物理优化后提交至MapReduce进行执行,如果有需要写入的数据就写入HDFS文件中,并且记录下Metadata至Metastore中
Hive的存储文件格式
Hive所有存储都是以文件格式区分目录存放在hdfs上的,储存格式的不同及特点区分于各个文件格式的特点,Hive支持在建表时使用STORED
AS (TextFile|RCFile|SequenceFile|AVRO|ORC|Parquet)来指定存储格式
以下是每种格式的特点
TextFile: 行式存储,每一行都是一条记录,每行都以换行符(\ n)结尾。数据不做压缩,磁盘开销大,数据解析开销大。可结合Gzip、Bzip2使用(系统自动检查,执行查询时自动解压),但使用这种方式,hive不会对数据进行切分,从而无法对数据进行并行操作。
SequenceFile: 行式存储,是Hadoop API提供的一种二进制文件支持,其具有使用方便、可分割、可压缩的特点。支持三种压缩选择:NONE,
RECORD, BLOCK。 Record压缩率低,一般建议使用BLOCK压缩。
RCFile:行列存储相结合,首先,其将数据按行分块,保证同一个record在一个块上,避免读一个记录需要读取多个block。其次,块数据列式存储,有利于数据压缩和快速的列存取。
AVRO:开源项目,为Hadoop提供数据序列化和数据交换服务。您可以在Hadoop生态系统和以任何编程语言编写的程序之间交换数据。Avro是基于大数据Hadoop的应用程序中流行的文件格式之一。
ORC: 列式存储,Hive从大型表读取,写入和处理数据时,使用ORC文件可以提高性能。
Parquet: 列式存储,面向列的二进制文件格式,不可以直接读取
我们在读多写少并且只使用hive的情况下,应该尽量使用orc以提高性能
hql解析流程
Hive会将Hive sql解析为MapReduce,在了解SQL如何编译为MapReduce之前,先看看MapReduce框架实现SQL操作的基础原理
MapReduce实现基本SQL操作的原理
Join的实现原理
select u.name,
o.orderid from order o join user u on o.uid =
u.uid;
|
在map的输出value中为不同表的数据打上tag标记,在reduce阶段根据tag判断数据来源。MapReduce的过程如下(这里只是说明最基本的Join的实现,还有其他的实现方式)
Group By的实现原理
select rank,
isonline, count(*) from city group by rank, isonline; |
将GroupBy的字段组合为map的输出key值,利用MapReduce的排序,在reduce阶段保存LastKey区分不同的key。MapReduce的过程如下(当然这里只是说明Reduce端的非Hash聚合过程)
Distinct的实现原理
select dealid,
count(distinct uid) num from order group by dealid; |
当只有一个distinct字段时,如果不考虑Map阶段的Hash GroupBy,只需要将GroupBy字段和Distinct字段组合为map输出key,利用mapreduce的排序,同时将GroupBy字段作为reduce的key,在reduce阶段保存LastKey即可完成去重
HQL转化为MapReduce的过程
将HQL转为MapReduce执行的主要流程如下
语法解析 将HQL解析为AST(AbstractSyntaxTree,抽象语法树),入口为ParseDriver.run()方法。该步骤主要借助于Antlr3实现SQL的词法和语法解析,这里不详细介绍Antlr,只需要了解使用Antlr构造特定的语言只需要编写一个语法文件,定义词法和语法替换规则即可,Antlr完成了词法分析、语法分析、语义分析、中间代码生成的过程。
语义分析第一阶段 AST Tree仍然非常复杂,不够结构化,不方便直接翻译为MapReduce程序,AST
Tree转化为QueryBlock就是将SQL进一部抽象和结构化。QueryBlock是一条SQL最基本的组成单元,包括三个部分:输入源,计算过程,输出。简单来讲一个QueryBlock就是一个子查询。
生成查询计划 将QueryBlock解析成Operator树,Hive最终生成的MapReduce任务,Map阶段和Reduce阶段均由OperatorTree组成。Operator,就是在Map阶段或者Reduce阶段完成单一特定的操作。
逻辑优化 优化已生成的Operator树,合并操作符,达到减少MapReduce Job,减少shuffle数据量的目的
生成MR任务 将Operator树解析为Task有向无环图
物理优化 改写Task有向无环图,将某些结点的普通Task改写为可在运行时进行分枝选择的ConditionalTask
执行 将Task有向无环图交由框架进行执行
总结
Hive是属于OLAP型的数据仓库,适用场景为执行时效性不高的,数据量大的离线分析型计算,例如报表分析,如果有大量事务性操作,请使用OLTP型数据库,如mysql等
Hive存储文件格式有多种,默认为行式存储的TextFile,占用空间较大,并且计算读取性能不高,在使用hive时,尽量选用orc格式存储,压缩比例较大,切读取性能很高
Hive SQL底层为MapReduce,需提交至yarn上执行,yarn分配资源给MapReduce任务,大量hive
sql同时提交可能会非常损耗master结点分配任务的资源,如果需要在程序中调用hive sql insert时,请使用批量插入的sql或者通过其它方式(如编写用户自定义函数读取mysql)进行实现
|