1、Hive
分区表
在Hive Select查询中一般会扫描整个表内容,会消耗很多时间做没必要的工作。有时候只需要扫描表中关心的一部分数据,因此建表时引入了partition概念。分区表指的是在创建表时指定的partition的分区空间。
Hive可以对数据按照某列或者某些列进行分区管理,所谓分区我们可以拿下面的例子进行解释。
当前互联网应用每天都要存储大量的日志文件,几G、几十G甚至更大都是有可能。存储日志,其中必然有个属性是日志产生的日期。在产生分区时,就可以按照日志产生的日期列进行划分。把每一天的日志当作一个分区。
将数据组织成分区,主要可以提高数据的查询速度。至于用户存储的每一条记录到底放到哪个分区,由用户决定。即用户在加载数据的时候必须显示的指定该部分数据放到哪个分区。
1.1 实现细节
1、一个表可以拥有一个或者多个分区,每个分区以文件夹的形式单独存在表文件夹的目录下。
2、表和列名不区分大小写。
3、分区是以字段的形式在表结构中存在,通过describe table命令可以查看到字段存在, 但是该字段不存放实际的数据内容,仅仅是分区的表示(伪列)
。
1.2 语法
1. 创建一个分区表,以 ds 为分区列:
create table invites (id int, name string) partitioned
by (ds string) row format delimited fields terminated
by 't' stored as textfile;
2. 将数据添加到时间为 2013-08-16 这个分区中:
load data local inpath '/home/hadoop/Desktop/data.txt'
overwrite into table invites partition (ds='2013-08-16');
3. 将数据添加到时间为 2013-08-20 这个分区中:
load data local inpath '/home/hadoop/Desktop/data.txt'
overwrite into table invites partition (ds='2013-08-20');
4. 从一个分区中查询数据:
select * from invites where ds ='2013-08-12';
5. 往一个分区表的某一个分区中添加数据:
insert overwrite table invites partition (ds='2013-08-12')
select id,max(name) from test group by id;
可以查看分区的具体情况,使用命令:
hadoop fs -ls /home/hadoop.hive/warehouse/invites
或者:
show partitions tablename;
2、Hive 桶
对于每一个表(table)或者分区, Hive可以进一步组织成桶,也就是说桶是更为细粒度的数据范围划分。Hive也是
针对某一列进行桶的组织。Hive采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中。
把表(或者分区)组织成桶(Bucket)有两个理由:
(1)获得更高的查询处理效率。桶为表加上了额外的结构,Hive 在处理有些查询时能利用这个结构。具体而言,连接两个在(包含连接列的)相同列上划分了桶的表,可以使用
Map 端连接 (Map-side join)高效的实现。比如JOIN操作。对于JOIN操作两个表有一个相同的列,如果对这两个表都进行了桶操作。那么将保存相同列值的桶进行JOIN操作就可以,可以大大较少JOIN的数据量。
(2)使取样(sampling)更高效。在处理大规模数据集时,在开发和修改查询的阶段,如果能在数据集的一小部分数据上试运行查询,会带来很多方便。
1. 创建带桶的 table :
create table bucketed_user(id int,name string) clustered
by (id) sorted by(name) into 4 buckets row format delimited
fields terminated by '\t' stored as textfile;
首先,我们来看如何告诉Hive—个表应该被划分成桶。我们使用CLUSTERED
BY 子句来指定划分桶所用的列和要划分的桶的个数:
CREATE TABLE bucketed_user (id INT) name STRING) CLUSTERED BY (id) INTO 4 BUCKETS; |
在这里,我们使用用户ID来确定如何划分桶(Hive使用对值进行哈希并将结果除 以桶的个数取余数。这样,任何一桶里都会有一个随机的用户集合(PS:其实也能说是随机,不是吗?)。
对于map端连接的情况,两个表以相同方式划分桶。处理左边表内某个桶的 mapper知道右边表内相匹配的行在对应的桶内。因此,mapper只需要获取那个桶
(这只是右边表内存储数据的一小部分)即可进行连接。这一优化方法并不一定要求 两个表必须桶的个数相同,两个表的桶个数是倍数关系也可以。用HiveQL对两个划分了桶的表进行连接,可参见“map连接”部分(P400)。
桶中的数据可以根据一个或多个列另外进行排序。由于这样对每个桶的连接变成了高效的归并排序(merge-sort),
因此可以进一步提升map端连接的效率。以下语法声明一个表使其使用排序桶:
CREATE TABLE bucketed_users (id INT, name STRING) CLUSTERED BY (id) SORTED BY (id ASC) INTO 4 BUCKETS; |
我们如何保证表中的数据都划分成桶了呢?把在Hive外生成的数据加载到划分成 桶的表中,当然是可以的。其实让Hive来划分桶更容易。这一操作通常针对已有的表。
Hive并不检查数据文件中的桶是否和表定义中的桶一致(无论是对于桶 的数量或用于划分桶的列)。如果两者不匹配,在査询时可能会碰到错
误或未定义的结果。因此,建议让Hive来进行划分桶的操作。
有一个没有划分桶的用户表:
hive> SELECT * FROM users; 0 Nat 2 Doe B Kay 4 Ann |
2. 强制多个 reduce 进行输出:
要向分桶表中填充成员,需要将 hive.enforce.bucketing 属性设置为 true。①这
样,Hive 就知道用表定义中声明的数量来创建桶。然后使用 INSERT 命令即可。需要注意的是: clustered
by和sorted by不会影响数据的导入,这意味着,用户必须自己负责数据如何如何导入,包括数据的分桶和排序。
'set hive.enforce.bucketing = true' 可以自动控制上一轮reduce的数量从而适配bucket的个数,当然,用户也可以自主设置mapred.reduce.tasks去适配bucket个数,推荐使用'set
hive.enforce.bucketing = true'
3. 往表中插入数据:
INSERT OVERWRITE TABLE bucketed_users SELECT * FROM
users;
物理上,每个桶就是表(或分区)目录里的一个文件。它的文件名并不重要,但是桶 n 是按照字典序排列的第
n 个文件。事实上,桶对应于 MapReduce 的输出文件分区:一个作业产生的桶(输出文件)和reduce任务个数相同。我们可以通过查看刚才
创建的bucketd_users表的布局来了解这一情况。运行如下命令:
4. 查看表的结构:
hive> dfs -ls /user/hive/warehouse/bucketed_users;
将显示有4个新建的文件。文件名如下(文件名包含时间戳,由Hive产生,因此
每次运行都会改变):
attempt_201005221636_0016_r_000000_0 attempt_201005221636_0016_r-000001_0 attempt_201005221636_0016_r_000002_0 attempt_201005221636_0016_r_000003_0 |
第一个桶里包括用户IDO和4,因为一个INT的哈希值就是这个整数本身,在这里 除以桶数(4)以后的余数:②
5. 读取数据,看每一个文件的数据:
hive> dfs -cat /user/hive/warehouse/bucketed_users/*0_0; 0 Nat 4 Ann |
用TABLESAMPLE子句对表进行取样,我们可以获得相同的结果。这个子句会将 查询限定在表的一部分桶内,而不是使用整个表:
6. 对桶中的数据进行采样:
hive> SELECT * FROM bucketed_users > TABLESAMPLE(BUCKET 1 OUT OF 4 ON id); 0 Nat 4 Ann |
桶的个数从1开始计数。因此,前面的查询从4个桶的第一个中获取所有的用户。 对于一个大规模的、均匀分布的数据集,这会返回表中约四分之一的数据行。我们
也可以用其他比例对若干个桶进行取样(因为取样并不是一个精确的操作,因此这个 比例不一定要是桶数的整数倍)。例如,下面的查询返回一半的桶:
7. 查询一半返回的桶数:
hive> SELECT * FROM bucketed_users > TABLESAMPLE(BUCKET 1 OUT OF 2 ON id); 0 Nat 4 Ann 2 Joe |
因为查询只需要读取和TABLESAMPLE子句匹配的桶,所以取样分桶表是非常高效
的操作。如果使用rand()函数对没有划分成桶的表进行取样,即使只需要读取很 小一部分样本,也要扫描整个输入数据集:
hive〉 SELECT * FROM users > TABLESAMPLE(BUCKET 1 OUT OF 4 ON rand()); 2 Doe |
①从Hive 0.6.0开始,对以前的版本,必须把mapred.reduce .tasks设为表中要填
充的桶的个数。如果桶是排序的,还需要把hive.enforce.sorting设为true。
②显式原始文件时,因为分隔字符是一个不能打印的控制字符,因此字段都挤在一起。
3、举个完整的小栗子:
(1)建student & student1 表:
create table student(id INT, age INT, name STRING) partitioned by(stat_date STRING) clustered by(id) sorted by(age) into 2 buckets row format delimited fields terminated by ','; create table student1(id INT, age INT, name STRING) partitioned by(stat_date STRING) clustered by(id) sorted by(age) into 2 buckets row format delimited fields terminated by ','; |
(2)设置环境变量:
set hive.enforce.bucketing = true; |
(3)插入数据:
cat bucket.txt 1,20,zxm 2,21,ljz 3,19,cds 4,18,mac 5,22,android 6,23,symbian 7,25,wp LOAD DATA local INPATH '/home/lijun/bucket.txt' OVERWRITE INTO TABLE student partition(stat_date="20120802"); from student insert overwrite table student1 partition(stat_date="20120802") select id,age,name where stat_date="20120802" sort by age; |
(4)查看文件目录:
hadoop fs -ls /hive/warehouse/test.db/student1/stat_date=20120802 Found 2 items -rw-r--r-- 2 lijun supergroup 31 2013-11-24 19:16 /hive/warehouse/test.db/student1/stat_date=20120802/000000_0 -rw-r--r-- 2 lijun supergroup 39 2013-11-24 19:16 /hive/warehouse/test.db/student1/stat_date=20120802/000001_0 |
(5)查看sampling数据:
hive> select * from student1 tablesample(bucket 1 out of 2 on id); Total MapReduce jobs = 1 Launching Job 1 out of 1 ....... OK 4 18 mac 20120802 2 21 ljz 20120802 6 23 symbian 20120802 Time taken: 20.608 seconds |
注:tablesample是抽样语句,语法:TABLESAMPLE(BUCKET x OUT OF y)
y必须是table总bucket数的倍数或者因子。hive根据y的大小,决定抽样的比例。例如,table总共分了64份,当y=32时,抽取(64/32=)2个bucket的数据,当y=128时,抽取(64/128=)1/2个bucket的数据。x表示从哪个bucket开始抽取。例如,table总bucket数为32,tablesample(bucket
3 out of 16),表示总共抽取(32/16=)2个bucket的数据,分别为第3个bucket和第(3+16=)19个bucket的数据。
1、命令行操作
(1)打印查询头,需要显示设置:
set hive.cli.print.header=true;
(2)加"--",其后的都被认为是注释,但 CLI
不解析注释。带有注释的文件只能通过这种方式执行:
hive -f script_name
(3)-e后跟带引号的hive指令或者查询,-S去掉多余的输出:
hive -S -e "select * FROM mytable
LIMIT 3" > /tmp/myquery
(4)遍历所有分区的查询将产生一个巨大的MapReduce作业,如果你的数据集和目录非常多,
因此建议你使用strict模型,也就是你存在分区时,必须指定where语句
hive> set hive.mapred.mode=strict;
(5)显示当前使用数据库
set hive.cli.print.current.db=true;
(6)设置 Hive Job 优先级
set mapred.job.priority=VERY_HIGH | HIGH | NORMAL | LOW | VERY_LOW
(VERY_LOW=1,LOW=2500,NORMAL=5000,HIGH=7500,VERY_HIGH=10000)
set mapred.job.map.capacity=M设置同时最多运行M个map任务
set mapred.job.reduce.capacity=N设置同时最多运行N个reduce任务 |
(7)Hive 中的Mapper个数的是由以下几个参数确定的:
mapred.min.split.size ,mapred.max.split.size ,dfs.block.size splitSize = Math.max(minSize, Math.min(maxSize, blockSize)); |
map个数还与inputfilles的个数有关,如果有2个输入文件,即使总大小小于blocksize,也会产生2个map
mapred.reduce.tasks用来设置reduce个数。
2、表操作
(1)查看某个表所有分区
SHOW PARTITIONS ext_trackflow
查询具体某个分区
SHOW PARTITIONS ext_trackflow PARTITION(statDate='20140529');
(2)查看格式化的完整表结构
desc formatted ext_trackflow; DESCRIBE EXTENDED ext_trackflow; |
(3)删除分区:分区的元数据和数据将被一并删除,但是对于扩展表则只删除元数据
ALTER TABLE ext_trackflow DROP PARTITION
(statDate='20140529');
(4)查询是外部表还是内部表
DESCRIBE EXTENDED tablename
(5)复制表结构
CREATE EXTERNAL TABLE IF NOT EXISTS mydb.employees3 LIKE mydb.employees LOCATION '/path/to/data'; |
Note:如果你忽略关键字EXTERNAL,那么将依据 employees
是外部还是内部,如果加了那么一定是EXTERNAL,并要LOCATION
(6)为内部表某个分区导入数据,Hive将建立目录并拷贝数据到分区当中
LOAD DATA LOCAL INPATH '${env:HOME}/california-employees' INTO TABLE employees PARTITION (country = 'US', state = 'CA'); |
(7)为外部表某个分区添加数据
ALTER TABLE log_messages ADD IF NOT EXISTS PARTITION(year = 2012, month = 1, day = 2) LOCATION 'hdfs://master_server/data/log_messages/2012/01/02'; |
Note:Hive并不关心分区,目录是否存在,是否有数据,这会导致没有查询结果
(8)修改表:在任何时候你都可以修改表,但是你仅仅修改的是表的元数据,都实际数据不会造成任何影响
例如更改分区指定位置,这个命令不会删除旧的数据
ALTER TABLE log_messages PARTITION(year = 2011, month = 12, day = 2) SET LOCATION 's3n://ourbucket/logs/2011/01/02'; |
(9)更改表属性
ALTER TABLE log_messages SET TBLPROPERTIES ( 'notes' = 'The process id is no longer captured; this column is always NULL' ); |
(10)更改存储属性
ALTER TABLE log_messages PARTITION(year = 2012, month = 1, day = 1) SET FILEFORMAT SEQUENCEFILE; |
Note:如果table是分区的话那么partition是必须的
(11)指定新的 SerDe
ALTER TABLE table_using_JSON_storage SET SERDE 'com.example.JSONSerDe' WITH SERDEPROPERTIES ( 'prop1' = 'value1', 'prop2' = 'value2' ); |
Note:SERDEPROPERTIE解释SERDE用的何种模型,属性值和名称都为字符串,方便告诉用户,为自己指定SERDE并且应用于什么模型
为当前SERDE设定
ALTER TABLE table_using_JSON_storage SET SERDEPROPERTIES ( 'prop3' = 'value3', 'prop4' = 'value4' ); |
(12)改变存储属性
ALTER TABLE stocks CLUSTERED BY (exchange, symbol) SORTED BY (symbol) INTO 48 BUCKETS; |
(13)复杂更改表语句:为各种不同的操作添加 hook ALTER TABLE … TOUCH
ALTER TABLE log_messages TOUCH PARTITION(year = 2012, month = 1, day = 1); |
典型的应用场景就是当分区有改动的时候,那么将触发
hive -e 'ALTER TABLE log_messages TOUCH
PARTITION(year = 2012, month = 1, day = 1);'
(14)ALTER TABLE … ARCHIVE PARTITION
捕获分区文件到Hadoop archive file也就是HAR
ALTER TABLE log_messages ARCHIVE PARTITION(year = 2012, month = 1, day = 1);(只可以用在被分区的表) |
(15)保护分区不被删除和查询
ALTER TABLE log_messages PARTITION(year = 2012, month = 1, day = 1) ENABLE NO_DROP;
ALTER TABLE log_messages
PARTITION(year = 2012, month = 1, day = 1) ENABLE
OFFLINE; |
Note:与ENABLE对应的是DISABLE,不能应用在未被分区的表
(16)按正条件(正则表达式)显示表
hive> SHOW TABLES '.*s';
(17)外部表、内部表互转
alter table tablePartition set TBLPROPERTIES ('EXTERNAL'='TRUE'); //内部表转外部表 alter table tablePartition set TBLPROPERTIES ('EXTERNAL'='FALSE'); //外部表转内部表 |
(18)分区与分桶:
partition(分区:按目录保存文件,每个partition对应一个目录)例如:
CREATE EXTERNAL TABLE table1 ( column1 STRING, column2 STRING, column3 STRING, ) PARTITIONED BY (dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE; ALTER TABLE table1 ADD IF NOT EXISTS PARTITION (dt=20090105); ALTER TABLE table1 ADD IF NOT EXISTS PARTITION (dt=20090102); ALTER TABLE table1 ADD IF NOT EXISTS PARTITION (dt=20081231); |
bucket(分桶,对指定列作hash,每个bucket对应一个文件)
CREATE TABLE VT_NEW_DATA ( column1 STRING, column2 STRING, column3 STRING, ) CLUSTERED BY (column1) SORTED BY (column1) INTO 48 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' STORED AS RCFILE; |
3、列操作
(1)重命名列,更改位置,类型和注释
ALTER TABLE log_messages CHANGE COLUMN hms hours_minutes_seconds INT COMMENT 'The hours, minutes, and seconds part of the timestamp' AFTER severity; |
更改名称: new column old column type
comment不是必须的,你可以添加注释
AFTER用于更改字段位置
仅修改了元数据并未对源data做任何改动
(2)添加新列
ALTER TABLE log_messages ADD COLUMNS ( app_name STRING COMMENT 'Application name', session_id LONG COMMENT 'The current session id'); |
(3)删除和替换列:慎用!!!
ALTER TABLE table_name ADD|REPLACE COLUMNS
(col_name data_type [COMMENT col_comment], ...)
ADD是代表新增一字段,字段位置在所有列后面(partition列前)
REPLACE COLUMNS removes all existing columns and adds the new set of columns. REPLACE COLUMNS can also be used to drop columns. For example:
"ALTER TABLE test_change REPLACE COLUMNS (a int, b int);"
will remove column `c' from test_change's schema. Note that this does not delete underlying data,
it just changes the schema. |
(4)REGEX Column Specification
SELECT 语句可以使用正则表达式做列选择,下面的语句查询除了 ds
和 hr 之外的所有列:
SELECT `(ds|hr)?+.+` FROM test
4、查看变量
hive> set; … hive> set-v; … even more output!… |
set’输出 hivevar,hiveconf,system 和 env
命名空间下的所有变量。
‘set -v’包括了输出Hadoop定义的全部变量。
hive> set hivevar:foo=hello; hive> set hivevar:foo; hivevar:foo=hello |
使用变量:
hive> create table toss1(i int, ${hivevar:foo}
string);
5、一个完整的建库、表例子
-- 创建数据库 create database ecdata WITH DBPROPERTIES ('creator' = 'June', 'date' = '2014-06-01'); -- 或者使用 COMMENT 关键字 -- 查看数据库描述 DESCRIBE DATABASE ecdata; DESCRIBE DATABASE EXTENDED ecdata; -- 切库 use ecdata; -- 删除表 drop table ext_trackflow; -- 创建表 create EXTERNAL table IF NOT EXISTS ext_trackflow ( cookieId string COMMENT '05dvOVC6Il6INhYABV6LAg==', cate1 string COMMENT '4', area1 string COMMENT '102', url string COMMENT 'http://cd.ooxx.com/jinshan-mingzhan-1020', trackTime string COMMENT '2014-05-25 23:03:36', trackURLMap map<string,String> COMMENT '{"area":"102","cate":"4,29,14052"}', ) PARTITIONED BY (statDate STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' COLLECTION ITEMS TERMINATED BY '\002' MAP KEYS TERMINATED BY '\003' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION '/DataWarehouse/ods/TrackFlowTable' ; --添加分区语句 ALTER TABLE ext_trackflow ADD PARTITION (statDate='20140525') LOCATION '/DataWarehouse/ods/TrackFlowTable/20140525'; --每天建立分区 yesterday=`date -d '1 days ago' +'%Y%m%d'` hive -e "use ecdata; ALTER TABLE ext_trackflow ADD PARTITION
(statDate='$yesterday') LOCATION '/DataWarehouse/ods/TrackFlowTable/$yesterday';" |
|