编辑推荐: |
本文来自于infoq,介绍了使用Appache
Arrow进行向量化队列处理,Dremio系统中的Arrow查询处理等。 |
|
要点
列式数据库有助于减少联机分析处理(OLAP)的负载,因为查询会涉及到列的一个子集,但这些列都有大量的行数。
列式存储格式使我们可以采用一些基于每列的轻量级压缩算法(lightweight compression
algorithms) 。
向量化的数据处理通过有效使用CPU缓冲机制的方法,来开发更快速的分析查询引擎。
Arrow的列式数据结构允许使用轻量级方案,如字典编码(dictionary encoding)、位压缩(bit
packing ),或是运行长度编码(run length),这样在压缩比例一定时,可以提高查询性能。
列式数据库组织磁盘或内存中给定的连续列数据。基于列的存储方式,有助于减少联机分析处理(OLAP)的负载,因为查询会涉及到列的一个子集,但这些列都有大量的行数。对于这类查询,使用列数据格式可以大大减少从磁盘到内存和从内存到寄存器的数据转换。这样可以有效地提高整个存储体系的吞吐量。而且,列式格式让我们可以使用一些基于每列的轻量级压缩算法。这种情况下,压缩算法性能会更好,因为压缩引擎的输入数据是同一类型数据,能够压缩的更好更快。
向量化处理自MonerDB-X100(Vectorwise)系统开始流行,现在已成为在现代硬件条件下构建高效分析查询引擎,进而加速数据处理的标准。这种模式需要按列表示的数据来编写高效优化的查询处理算法。向量化的过程和传统的基于元组的查询过程模式有着显著区别。
两种方法最主要的不同是,前者是基于列而不是基于行/元组来重写查询处理算法。连续存储的一列数据,在内存中可以表示为一个向量,这个向量包含了该列中固定数目的一些值。
向量模式和传统模式的第二个不同是,我们可以添加一个块,而不是在查询计划树顶部一次添加一个元组。一个块由固定的一组元组(记录)组成,它代表一组向量,这些向量和列/字段有一一对应的关系。向量块是数据的基本单元,它经由执行计划树,从一个操作符流向另一个操作符。
图1:传统的一次添加一个元组的处理和向量化处理比较
在图1中,左侧图为传统的一次操作一个元组的处理流程。扫描运算符开始获取输入数据,并通过过滤运算符开始推动元组的处理。接下来,过滤运算符传递符合条件的元组到聚合运算符。运算符不停调用查询计划树下层的下一个运算符。其结果就是位于树下层的运算符把元组推向位于树顶的运算符。这就是查询的执行过程。
现在,由于有大量的函数调用,且每次函数调用时从一个运算符到另一个运算符需要处理或传输的数据不多,在这个执行过程中,性能开销很大。其次,当仅需要处理元组里列的一个子集时,需要传递整个元组。
右侧的为一个向量模型,往该模型中添加一个向量块,每个向量有一组记录或列值。在这个数据集合中,有多少列,就有多少向量。不断往查询计划树上面压入一批向量,它们就是查询计划中不同操作符的输入与输出。这种方法远比其它方法有效,因为这种方法在不同的操作符间平摊了函数调用的开销,其次,操作基于列而不再基于行或元组。
向量化的代码可以充分利用CPU的缓存。例如,有10列的一行数据和只需操作一列的查询计划。在基于行的查询处理模式中,9列数据会不必要的占用缓存,限制了可以进入缓存的数据数量。在基于列的处理中,只会读入感兴趣的列数据,这样可以一起处理更多的值,同时有效使用了CPU的内存带宽。
向量处理背后的主要思想是,按列(或列式数据数据)工作,并把从多列到元组(行)的实际转化推迟到查询计划的很靠后的位置进行处理,大部分情况是需要展示结果给用户时。这正是为什么查询执行算法通常会重写,来做基于列的处理。如果我们以列式方式存储数据,但是处理代码是基于行处理编写的,那么当读到列时,就不得不组合很多列的数据来构成一个元组,并将该元组传递给查询操作符来进行传统的逐行处理。执行过程中的元组构建迟早会影响对列式数据进行高度优化的查询处理。
现代处理器有扩展的指令集,在单独的一个指令中,该指令集可以扩展向量执行概念到多列的值。单指令多数据流(SIMD)指令在20世纪90年代成为桌面运算主流,提高了多媒体应用如游戏的性能。
对多个值做相同改变,比如调整一个图像的亮度时,SIMD尤其有用。每个像素点的亮度由红色(R),绿色(G)和蓝色(B)的值确定。改变亮度,要从内存读取R,G和B的值,并进行调整,调整后的值要重写回内存。不使用SIMD,像素的RGB值会依次单个读入内存。使用SIMD,像素的RGB数据块可以在一个指令中一起进行处理。这样就极大地提高了有效性。
这些概念在分析学的数据处理中非常适用。SIMD采用和并发无关的数据级并行。SIMD指令允许在同一时钟周期内,对不同的列数据执行相同的指令,
实际上执行吞吐量(throughput of execution)可以提高4倍或更多。列式数据可以遵循SIMD处理,这样可以存储列值到内存中的有序排列且字节对齐的密集数组中,这些数据会载入到固定宽度的SIMD寄存器中。现在的Intel编译器配置了AVX-512(高级矢量扩展)指令集,该指令集增加SIMD寄存器的宽度到512比特。换而言之,可以并行运算16个4字节的整数列值。其它的SIMD指令集还有SSE,SSE2,AVX,AVX2.
要使用向量处理和SIMD,很重要的一点是正确组织数据以最大化收益。现在有一种内存处理的开源框架叫Apache
Arrow。Arrow确保内存中的数据是正确对齐的,这样能最大限度地利用向量化和SIMD指令的优势。
Apache Arrow
Arrow项目是Apache软件基金会顶级的开源项目。Arrow定义了标准的方式来表示可有效处理的内存数据,同时支持多种流行的编程语言中,包括Java,C++和Python。
Arrow项目两年发发布,取得了快速的发展。自发布后,十多个主要的开源项目中的开发者都为Arrow社区做出了贡献。Arrow使很多不同类型的项目都能受益,并简化了过程之间的数据交换。使用Arrow的好处非常显著。
使用Appache Arrow进行向量化队列处理
Arrow的主要关注点在CPU和GPU的效率上。Arrow对列式数据(扁平的或嵌套的)提供与语言无关的标准格式和相应的库,这样可以在先进的硬件上有效的运行分析任务。
数据仓库工作和分析查询从列式数据中获益匪浅,因为查询通常涉及到列的子集,在这些列间,会涉及到大量的行。分析工作的查询包括大量的回归,扫描和复杂的连接。
可以用列式数据格式来编写简单而有效的查询过程代码,以加快分析操作。紧凑的for循环代码能快速运行在列值上,并执行必需的操作,如FILTER,COUNT,
SUM和MIN等等。这种方法对CPU友好,因为Cache line中填充的为相关数据,即一系列从列获取的值,它们都需要进行处理。类似的,我们从磁盘将所需列式数据读入内存时,只需要读取所需列即可。因此,在磁盘I/O和CPU内存带宽占用方面,基于列式格式数据编写的查询算法,效率远比基于行的其它算法高。
而且,在编译中,如果有优化机会,编辑器会将紧凑循环代码自动转换为向量指令。当编写基于行数据的查询处理算法时,这种优化机会是没有的。
Arrow列式数据的内存格式实际上让CPU使用率压缩设计(CPU-efficient compression
schemes),这种设计是轻量级的,更关注实际的查询过程性能而不是实际的压缩率,后者会严重影响CPU效率。
Dremio系统中的Arrow查询处理
Dremio是自服务数据的开源平台。核心的引擎名为Sabot,它完全基于Arrow库的顶层进行构建。
首先来讨论Dremio里Arrow相关的内存管理。Arrow实际包含一个基于Chunk管理的分配器,该分配器完全构建在Netty的JEMallloc的顶层实现中。主要的内存管理模式或分配模式是一种基于树的模式,从根分配器开始进行分配。这样,可以在根分配器下创建多个子分配器。每个分配器有一个初始预留(创建分配器时触发)和最大的分配限额。预留不是指预先分配,它意味着在分配器整个生命周期中,可用于分配操作符对的预留的内存数量。
图2: 树状分配系统
在运行引擎中,可以将堆外内存缓冲用做内存列式数据结构的底层内存(underlying memory)。在Java的垃圾回收中,应避免使用JVM堆,以减少开销。
现在,来讨论怎样使用树状分配模式,以及Dremio中的初始预留和内存限制两个概念。查询计划树的每个运算符得到它自己的分配器(父分配器)。这样,每个运算符会为其中的每个独立任务创建一个或多个分配器(带初始预留和内存限制)。
以外部sort运算符为例。这个运算符负责在内存不足时,很好的处理排序查询。在任何低内存的情况下,它都可以溢出数据。
图3:对外部sort运算符进行基于树的分配
在运算符的顶层,有一个该运算符的根分配器。查询开始运行前,会先建立该运算符自己的分配器。排序运算符有两个主要的子构件。一个是运行内存,另一个是运行磁盘,每个子构件创建独立于运算符分配器的子分配器。
进入运算符的批量数据,由运行内存负责其获取和排序。当所有的输入都被处理后,所以的数据都已排序,运算符可以开始从运行内存的子构件中输出数据。
磁盘运行负责管理溢出。如果内存不足,需要溢出(一次或多次)内存中已排序的数据。一旦数据溢出,需要重新对一些处理进行排序,从磁盘往内存中加载多个已排序的数据流,在内存中进行其合并来完成处理,然后将数据从运算符抽取出来。处理溢出数据的代码需要保证有足够的空间,可以加载2组或多组(或批)溢出记录到内存里,来继续内存中的合并处理。磁盘运行部件会一直监测多个异常周期(或循环)和每个周期内最大溢出批次的大小。子分配器预留足够的内存空间,可以加载每次溢出循环产生的溢出批次。
在Dremio中,数据作为一组向量,通过管道从一个运算符流向另一个运算符。这叫做记录批次。一个记录批次由固定个数的列向量(列式描述的行数据结构)组成。记录批次是Dremio运行引擎的任务单位。
图4: 从一个查询运算符到另一个的管道数据流(不需经过拷贝)
在这个例子中,有两个运算符:scan和aggregation。数据有三列,每个列有个Arrow向量,总共有三个向量。上图表示scan运算符的输出(记录向量)正好作为aggregation运算符的输入。
在某些运算中,比如有些类型的join和aggregation,可能需要把基于列的数据转换为基于行的数据。通过性能实验,可以发现列式数据对于hash表的插入,hash
join和hash aggregation算法中的lookup不够有效。对aggregation和join,在加入到hash表之前,需要把主要的列从输入记录批次转换到到相应的行表示数据里,因此这些算法实现部分(尤其hash表的代码)运行基于行式表达。
图5:向量化的Hash Aggregation,及其主要列数据的行式表达
下面是向量化代码的概述,用来执行从列式数据到行式数据的转化:
编写一段代码将向量列式数据转换为行式表达,可以在hash表中有效的进行insertion/lookup的操作(向量化的hash
aggregation和join也会用到该表)。对于hash aggregation和hash join,可以通过对主要的列进行GROUP
BY或join来实现。
static void
pivot4Bytes(
VectorPivotDef def ,
FixedBlockVector fixedBlock,
final int count) {
/* source column vector to pivot */
final FieldVector field =def.getIncomingVector();
/* source column vector buffers */
final List <ArrowBuf> buffers = field.getFieldBuffers();
final int blockLength = fixedBlock.getBlockWidth();
final int bitOffset = def.getNullBitOffset();
/* validity buffer of source vector */
long srcBitsAddr = buffers.get(0).memoryAddress();
/* data buffer of source vector */
long srcDataAddr = buffers.get(1).memoryAddress();
/* target memory region to store pivoted (row-wise)
representation */
long targetAddr = fixedBlock.getMemoryAddress();
/* determine number of null values to work
through a word at a time */
final int remainCount = count % WORD_BITS;
final int wordCount = (count - remainCount)
/ WORD_BITS;
final long finalWordAddr = srcDataAddr + (wordCount
* WORD_BITS * FOUR_BYTE);
long bitTargetAddr = targetAddr + def. getNullByte
Offset ();
long valueTargetAddr = targetAddr + def.getOffset();
// decode word at a time -- 64 column values
while (srcDataAddr < finalWordAddr) {
final long bitValues = PlatformDependent .getLong
(srcBitsAddr);
if (bitValues == NONE_SET) {
// noop (all nulls).
bitTargetAddr += (WORD_BITS * blockLength);
valueTargetAddr += (WORD_BITS * blockLength);
srcDataAddr += (WORD_BITS * FOUR_BYTE);
} else if (bitValues == ALL_SET) {
// all set, set the bit values using a constant
AND.
// Independently set the data values without
transformation.
final int bitVal = 1 << bitOffset;
for (int i = 0; i < WORD_BITS; i++, bitTargetAddr
+= blockLength ) {
PlatformDependent.putInt(bitTargetAddr,
PlatformDependent.getInt(bitTargetAddr) | bitVal);
}
for (int i = 0; i < WORD_BITS; i++, valueTargetAddr
+= blockLength , srcDataAddr += FOUR_BYTE) {
PlatformDependent .putInt(valueTargetAddr, PlatformDependent
.getInt(srcDataAddr));
}
} else {
// some nulls, some not, update each value to
zero or the value, depending on the null bit.
for (int i = 0; i < WORD_BITS; i++, bitTargetAddr
+= blockLength , valueTargetAddr += blockLength,
srcDataAddr += FOUR_BYTE) {
final int bitVal = ((int) (bitValues >>>
i)) & 1;
PlatformDependent.putInt (bitTargetAddr,
PlatformDependent.getInt (bitTargetAddr) | (bitVal
<< bitOffset ));
PlatformDependent.putInt(valueTargetAddr, PlatformDependent
.getInt(srcDataAddr) * bitVal);
}
}
srcBitsAddr += WORD_BYTES;
}
if(remainCount > 0) {
// do the remaining bits..
} |
代码实例:从一列向量(源)到另一列向量(目标)做向量拷贝,使用2字节选择向量。高效C/C++风格的紧凑循序代码,直接作用于underlying
memory(实例适合固定宽度为4字节的列):
用例:SELECT C1 from FOO where C2 > 1000;
首先,在紧凑for循环向量代码中,对C2做有效的过滤处理,构建选择向量,存储通过该过滤器的列值偏移。现在开始运行另一个循环,使用这些偏移量来索引从C1传递出来的值。
static class
FourByteCopier extends FieldBufferCopier {
private static final int SIZE = 4;
private final FieldVector source;
private final FieldVector target;
private final FixedWidthVector targetAlt;
public FourByteCopier(FieldVector source,
FieldVector target) {
this.source = source;
this.target = target;
this.targetAlt = (FixedWidthVector) target;
}
@Override
public void copy(long offsetAddr, int count)
{
targetAlt.allocateNew(count);
final long max = offsetAddr + count * 2;
final long srcAddr = source.getDataBufferAddress();
long dstAddr = target.getDataBufferAddress();
for(long addr = offsetAddr; addr < max; addr
+= STEP _ SIZE, dstAddr += SIZE){
PlatformDependent.putInt(dstAddr,
PlatformDependent.getInt(srcAddr + ((char)PlatformDependent.getShort(addr))
* SIZE));
}
} |
确定向量的批次大小
现在来讨论实际怎么确定向量大小。Dremio中的任务单元叫记录批次或数据批次或集,由Arrow内存向量组成。每个向量代表了数据集中的一个域或一个列,由固定数量的记录构成。
图6:在查询运算符中传递的记录批次
假设有一个百万的记录数据集合。在同一时间需要处理的是约4800个记录的记录批次。记录批次是在运算符之间管道中流动的数据单元。现在,关于如何固定一个数据批次的记录总数,有很多不同的方案,该数可能多达64,000。
可以看到,大的批次容量如8000或16,000实际上可以提升效率,因为单位任务增加了,需要重复执行一个过程的次数就会降低。但是,大的批次也会引起管道问题,因为运算符之间实际传递的数据量也增加了。然而使用小的批次大小,如128或256,虽然单个批次的处理会变快,运算符之间传递的数据会慢很多,但因为处理过程重复的绝对次数和要创建的对象容量大,会很快到达查询的堆顶。这也是为什么在Dremio中,使用到的标准记录批次大小是可配置的,大多数情况下,这个大小为4096个记录。
通过批次大小,可以实际控制为向量分配的内存数量。在external sort,aggregation和join等运算中,一定要意识到运算符需要工作的内存数量,实际上不能依赖Arrow
API提供的给向量分配的缺省内存。
在内存受限的情况下,仔细配置向量的批次大小,进而正确地给这些记录分配内存,这样可以写出能很好工作的健壮算法。
通常传统意义的压缩在数据库和其它系统如LZO,ZLIB等中的使用是重量级的,而压缩列式格式可以平衡使用上的轻量级和CPU有效压缩方案。传统压缩算法提供更好的压缩比例,但会影响CPU效率,因为压缩和解压缩增加了查询的总时间消耗。
Arrow的纵列格式让我们可以使用轻量级方案,如字典编码(dictionary encoding)、位压缩(bit
packing),或是长度编码,后者可根据压缩比例调整查询性能。其次,可以直接操作压缩后的列式数据,这样在开始处理之前,不需要对所有的列式数据进行解压碎,查询性能就可以提高一个数量级。
下面举例说明怎么对可变宽度列值进行使用字典编码。
图7: 使用带SIMD的字典编码来有效断言Strings的估值
列COUNTRY有United States,China,India,France和United
Kingdom等国家,其长度是可变的,需要编写一个基于COUNTRY的过滤的查询。对该列进行字典编码,像固定宽度的字典编码值的过滤器一样,重写可变长度字符串的过滤器,这样可以有效的进行过滤处理。
SELECT C1, C2 FROM FOO WHERE COUNTRY=’FRANCE’
首先查阅字典,得到“FRANCE”的字典编码值为4。加载字典值4到SIMF寄存器,然后加载的所有已编码值,同时比较编码后的值和4,从而找出该单元相对于COUNTRY列值为“FRANCE”单元的位置(或索引)。
这就是字典编码的强有力的地方。实际上,可以压缩可变长度列宽的值到固定长度的字典值数组中,然后从写查询处理算法,该算法可以非常有效的在这些压缩后的列值间依次通过。
数据反射
为加快查询速度,Dremio使用名为数据反射(Data Reflection)的特性来进行数据优化。数据反射在磁盘中存储,它使用列式数据格式,采用了Parquet列式存储格式(Apache
Parquet)。当从数据反射中读取数据时,会从Parquet把数据加载为相应的列格式到Arrow的内存,以便在执行引擎中进行处理。
数据反射的最初读取实际是基于行格式的。它基本是行导向的读取,完全没有利用源(磁盘)和目标(内存)数据格式都是列式的情况。
因此我们重写读取为完全的向量化读取,该过程中,为基于列的处理。这提高了两方面的代码效率,即从磁盘读取Parquet页(压缩过或没有压缩过的)和重构Arrow列向量。
我们也提供支持Parquet扫描的过滤器压入。在查询中的断言能直接压入到Parquet扫描代码中,因此在重构Arrow内存向量时,只需加载内存中需要的列数据。
总结
Dremio是基于Apache Arrow的开源数据处理框架,具有向量化特性。在本文中,原文作者讨论了这些特性的主要方面,细节的技术讨论在今年San
Jose的Strata Conference中已给出。在Drmio中,扩展了Arrow在整个内存运行引擎的使用。最近,原文作者所在团队重写了Arrow中的大部分Java实现,提高其性能和堆使用。一些TPCH查询显示延迟降低了60%。他们计划做更多的改进,包括本地SIMD加速库,并认为它在处理效率上可以有极大的改进。 |