您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
   
 
 
     
   
 订阅
  捐助
Hive 基础
 
作者:xrzs 来源:开源项目 发布于: 2015-8-31
   次浏览      
 

1、Hive 分区表

在Hive Select查询中一般会扫描整个表内容,会消耗很多时间做没必要的工作。有时候只需要扫描表中关心的一部分数据,因此建表时引入了partition概念。分区表指的是在创建表时指定的partition的分区空间。

Hive可以对数据按照某列或者某些列进行分区管理,所谓分区我们可以拿下面的例子进行解释。

当前互联网应用每天都要存储大量的日志文件,几G、几十G甚至更大都是有可能。存储日志,其中必然有个属性是日志产生的日期。在产生分区时,就可以按照日志产生的日期列进行划分。把每一天的日志当作一个分区。

将数据组织成分区,主要可以提高数据的查询速度。至于用户存储的每一条记录到底放到哪个分区,由用户决定。即用户在加载数据的时候必须显示的指定该部分数据放到哪个分区。

1.1 实现细节

1、一个表可以拥有一个或者多个分区,每个分区以文件夹的形式单独存在表文件夹的目录下。

2、表和列名不区分大小写。

3、分区是以字段的形式在表结构中存在,通过describe table命令可以查看到字段存在, 但是该字段不存放实际的数据内容,仅仅是分区的表示(伪列) 。

1.2 语法

1. 创建一个分区表,以 ds 为分区列:

create table invites (id int, name string) partitioned by (ds string) row format delimited fields terminated by 't' stored as textfile;

2. 将数据添加到时间为 2013-08-16 这个分区中:

load data local inpath '/home/hadoop/Desktop/data.txt' overwrite into table invites partition (ds='2013-08-16');

3. 将数据添加到时间为 2013-08-20 这个分区中:

load data local inpath '/home/hadoop/Desktop/data.txt' overwrite into table invites partition (ds='2013-08-20');

4. 从一个分区中查询数据:

select * from invites where ds ='2013-08-12';

5. 往一个分区表的某一个分区中添加数据:

insert overwrite table invites partition (ds='2013-08-12') select id,max(name) from test group by id;

可以查看分区的具体情况,使用命令:

hadoop fs -ls /home/hadoop.hive/warehouse/invites

或者:

show partitions tablename;

2、Hive 桶

对于每一个表(table)或者分区, Hive可以进一步组织成桶,也就是说桶是更为细粒度的数据范围划分。Hive也是 针对某一列进行桶的组织。Hive采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中。

把表(或者分区)组织成桶(Bucket)有两个理由:

(1)获得更高的查询处理效率。桶为表加上了额外的结构,Hive 在处理有些查询时能利用这个结构。具体而言,连接两个在(包含连接列的)相同列上划分了桶的表,可以使用 Map 端连接 (Map-side join)高效的实现。比如JOIN操作。对于JOIN操作两个表有一个相同的列,如果对这两个表都进行了桶操作。那么将保存相同列值的桶进行JOIN操作就可以,可以大大较少JOIN的数据量。

(2)使取样(sampling)更高效。在处理大规模数据集时,在开发和修改查询的阶段,如果能在数据集的一小部分数据上试运行查询,会带来很多方便。

1. 创建带桶的 table :

create table bucketed_user(id int,name string) clustered by (id) sorted by(name) into 4 buckets row format delimited fields terminated by '\t' stored as textfile;

首先,我们来看如何告诉Hive—个表应该被划分成桶。我们使用CLUSTERED BY 子句来指定划分桶所用的列和要划分的桶的个数:

CREATE TABLE bucketed_user (id INT) name STRING) 
CLUSTERED BY (id) INTO 4 BUCKETS;

在这里,我们使用用户ID来确定如何划分桶(Hive使用对值进行哈希并将结果除 以桶的个数取余数。这样,任何一桶里都会有一个随机的用户集合(PS:其实也能说是随机,不是吗?)。

对于map端连接的情况,两个表以相同方式划分桶。处理左边表内某个桶的 mapper知道右边表内相匹配的行在对应的桶内。因此,mapper只需要获取那个桶 (这只是右边表内存储数据的一小部分)即可进行连接。这一优化方法并不一定要求 两个表必须桶的个数相同,两个表的桶个数是倍数关系也可以。用HiveQL对两个划分了桶的表进行连接,可参见“map连接”部分(P400)。

桶中的数据可以根据一个或多个列另外进行排序。由于这样对每个桶的连接变成了高效的归并排序(merge-sort), 因此可以进一步提升map端连接的效率。以下语法声明一个表使其使用排序桶:

CREATE TABLE bucketed_users (id INT, name STRING) 
CLUSTERED BY (id) SORTED BY (id ASC) INTO 4 BUCKETS;

我们如何保证表中的数据都划分成桶了呢?把在Hive外生成的数据加载到划分成 桶的表中,当然是可以的。其实让Hive来划分桶更容易。这一操作通常针对已有的表。

Hive并不检查数据文件中的桶是否和表定义中的桶一致(无论是对于桶 的数量或用于划分桶的列)。如果两者不匹配,在査询时可能会碰到错 误或未定义的结果。因此,建议让Hive来进行划分桶的操作。

有一个没有划分桶的用户表:

hive> SELECT * FROM users; 
0 Nat
2 Doe
B Kay
4 Ann

2. 强制多个 reduce 进行输出:

要向分桶表中填充成员,需要将 hive.enforce.bucketing 属性设置为 true。①这 样,Hive 就知道用表定义中声明的数量来创建桶。然后使用 INSERT 命令即可。需要注意的是: clustered by和sorted by不会影响数据的导入,这意味着,用户必须自己负责数据如何如何导入,包括数据的分桶和排序。

'set hive.enforce.bucketing = true' 可以自动控制上一轮reduce的数量从而适配bucket的个数,当然,用户也可以自主设置mapred.reduce.tasks去适配bucket个数,推荐使用'set hive.enforce.bucketing = true'

3. 往表中插入数据:

INSERT OVERWRITE TABLE bucketed_users SELECT * FROM users;

物理上,每个桶就是表(或分区)目录里的一个文件。它的文件名并不重要,但是桶 n 是按照字典序排列的第 n 个文件。事实上,桶对应于 MapReduce 的输出文件分区:一个作业产生的桶(输出文件)和reduce任务个数相同。我们可以通过查看刚才 创建的bucketd_users表的布局来了解这一情况。运行如下命令:

4. 查看表的结构:

hive> dfs -ls /user/hive/warehouse/bucketed_users;

将显示有4个新建的文件。文件名如下(文件名包含时间戳,由Hive产生,因此 每次运行都会改变):

attempt_201005221636_0016_r_000000_0 
attempt_201005221636_0016_r-000001_0
attempt_201005221636_0016_r_000002_0
attempt_201005221636_0016_r_000003_0

第一个桶里包括用户IDO和4,因为一个INT的哈希值就是这个整数本身,在这里 除以桶数(4)以后的余数:②

5. 读取数据,看每一个文件的数据:

hive> dfs -cat /user/hive/warehouse/bucketed_users/*0_0; 
0 Nat
4 Ann

用TABLESAMPLE子句对表进行取样,我们可以获得相同的结果。这个子句会将 查询限定在表的一部分桶内,而不是使用整个表:

6. 对桶中的数据进行采样:

hive> SELECT * FROM bucketed_users 
> TABLESAMPLE(BUCKET 1 OUT OF 4 ON id);
0 Nat
4 Ann

桶的个数从1开始计数。因此,前面的查询从4个桶的第一个中获取所有的用户。 对于一个大规模的、均匀分布的数据集,这会返回表中约四分之一的数据行。我们 也可以用其他比例对若干个桶进行取样(因为取样并不是一个精确的操作,因此这个 比例不一定要是桶数的整数倍)。例如,下面的查询返回一半的桶:

7. 查询一半返回的桶数:

hive> SELECT * FROM bucketed_users 
> TABLESAMPLE(BUCKET 1 OUT OF 2 ON id);
0 Nat
4 Ann
2 Joe

因为查询只需要读取和TABLESAMPLE子句匹配的桶,所以取样分桶表是非常高效 的操作。如果使用rand()函数对没有划分成桶的表进行取样,即使只需要读取很 小一部分样本,也要扫描整个输入数据集:

hive〉 SELECT * FROM users 
> TABLESAMPLE(BUCKET 1 OUT OF 4 ON rand());
2 Doe

①从Hive 0.6.0开始,对以前的版本,必须把mapred.reduce .tasks设为表中要填 充的桶的个数。如果桶是排序的,还需要把hive.enforce.sorting设为true。

②显式原始文件时,因为分隔字符是一个不能打印的控制字符,因此字段都挤在一起。

3、举个完整的小栗子:

(1)建student & student1 表:

create table student(id INT, age INT, name STRING)
partitioned by(stat_date STRING)
clustered by(id) sorted by(age) into 2 buckets
row format delimited fields terminated by ',';

create table student1(id INT, age INT, name STRING)
partitioned by(stat_date STRING)
clustered by(id) sorted by(age) into 2 buckets
row format delimited fields terminated by ',';

(2)设置环境变量:

set hive.enforce.bucketing = true; 

(3)插入数据:

cat bucket.txt

1,20,zxm
2,21,ljz
3,19,cds
4,18,mac
5,22,android
6,23,symbian
7,25,wp

LOAD DATA local INPATH '/home/lijun/bucket.txt' OVERWRITE INTO TABLE student partition(stat_date="20120802");

from student
insert overwrite table student1 partition(stat_date="20120802")
select id,age,name where stat_date="20120802" sort by age;

(4)查看文件目录:

hadoop fs -ls /hive/warehouse/test.db/student1/stat_date=20120802 
Found 2 items
-rw-r--r-- 2 lijun supergroup 31 2013-11-24 19:16 /hive/warehouse/test.db/student1/stat_date=20120802/000000_0
-rw-r--r-- 2 lijun supergroup 39 2013-11-24 19:16 /hive/warehouse/test.db/student1/stat_date=20120802/000001_0

(5)查看sampling数据:

hive> select * from student1 tablesample(bucket 1 out of 2 on id);
Total MapReduce jobs = 1
Launching Job 1 out of 1
.......
OK
4 18 mac 20120802
2 21 ljz 20120802
6 23 symbian 20120802
Time taken: 20.608 seconds

注:tablesample是抽样语句,语法:TABLESAMPLE(BUCKET x OUT OF y)

y必须是table总bucket数的倍数或者因子。hive根据y的大小,决定抽样的比例。例如,table总共分了64份,当y=32时,抽取(64/32=)2个bucket的数据,当y=128时,抽取(64/128=)1/2个bucket的数据。x表示从哪个bucket开始抽取。例如,table总bucket数为32,tablesample(bucket 3 out of 16),表示总共抽取(32/16=)2个bucket的数据,分别为第3个bucket和第(3+16=)19个bucket的数据。

1、命令行操作

(1)打印查询头,需要显示设置:

set hive.cli.print.header=true;

(2)加"--",其后的都被认为是注释,但 CLI 不解析注释。带有注释的文件只能通过这种方式执行:

hive -f script_name

(3)-e后跟带引号的hive指令或者查询,-S去掉多余的输出:

hive -S -e "select * FROM mytable LIMIT 3" > /tmp/myquery

(4)遍历所有分区的查询将产生一个巨大的MapReduce作业,如果你的数据集和目录非常多, 因此建议你使用strict模型,也就是你存在分区时,必须指定where语句

hive> set hive.mapred.mode=strict;

(5)显示当前使用数据库

set hive.cli.print.current.db=true;

(6)设置 Hive Job 优先级

set mapred.job.priority=VERY_HIGH | HIGH | NORMAL | LOW | VERY_LOW

(VERY_LOW=1,LOW=2500,NORMAL=5000,HIGH=7500,VERY_HIGH=10000)
set mapred.job.map.capacity=M设置同时最多运行M个map任务
set mapred.job.reduce.capacity=N设置同时最多运行N个reduce任务

(7)Hive 中的Mapper个数的是由以下几个参数确定的:

mapred.min.split.size ,mapred.max.split.size ,dfs.block.size
splitSize = Math.max(minSize, Math.min(maxSize, blockSize));

map个数还与inputfilles的个数有关,如果有2个输入文件,即使总大小小于blocksize,也会产生2个map

mapred.reduce.tasks用来设置reduce个数。

2、表操作

(1)查看某个表所有分区

SHOW PARTITIONS ext_trackflow

查询具体某个分区

SHOW PARTITIONS ext_trackflow PARTITION(statDate='20140529');

(2)查看格式化的完整表结构

desc formatted ext_trackflow; 
DESCRIBE EXTENDED ext_trackflow;

(3)删除分区:分区的元数据和数据将被一并删除,但是对于扩展表则只删除元数据

ALTER TABLE ext_trackflow DROP PARTITION (statDate='20140529');

(4)查询是外部表还是内部表

DESCRIBE EXTENDED tablename

(5)复制表结构

CREATE EXTERNAL TABLE IF NOT EXISTS mydb.employees3  
LIKE mydb.employees
LOCATION '/path/to/data';

Note:如果你忽略关键字EXTERNAL,那么将依据 employees 是外部还是内部,如果加了那么一定是EXTERNAL,并要LOCATION

(6)为内部表某个分区导入数据,Hive将建立目录并拷贝数据到分区当中

LOAD DATA LOCAL INPATH '${env:HOME}/california-employees' 
INTO TABLE employees
PARTITION (country = 'US', state = 'CA');

(7)为外部表某个分区添加数据

ALTER TABLE log_messages ADD  IF NOT EXISTS PARTITION(year = 2012, month = 1, day = 2) 
LOCATION 'hdfs://master_server/data/log_messages/2012/01/02';

Note:Hive并不关心分区,目录是否存在,是否有数据,这会导致没有查询结果

(8)修改表:在任何时候你都可以修改表,但是你仅仅修改的是表的元数据,都实际数据不会造成任何影响

例如更改分区指定位置,这个命令不会删除旧的数据

ALTER TABLE log_messages PARTITION(year = 2011, month = 12, day = 2) 
SET LOCATION 's3n://ourbucket/logs/2011/01/02';

(9)更改表属性

ALTER TABLE log_messages SET TBLPROPERTIES (
'notes' = 'The process id is no longer captured; this column is always NULL'
);

(10)更改存储属性

ALTER TABLE log_messages
PARTITION(year = 2012, month = 1, day = 1)
SET FILEFORMAT SEQUENCEFILE;

Note:如果table是分区的话那么partition是必须的

(11)指定新的 SerDe

ALTER TABLE table_using_JSON_storage
SET SERDE 'com.example.JSONSerDe'
WITH SERDEPROPERTIES (
'prop1' = 'value1',
'prop2' = 'value2'
);

Note:SERDEPROPERTIE解释SERDE用的何种模型,属性值和名称都为字符串,方便告诉用户,为自己指定SERDE并且应用于什么模型
为当前SERDE设定

ALTER TABLE table_using_JSON_storage
SET SERDEPROPERTIES (
'prop3' = 'value3',
'prop4' = 'value4'
);

(12)改变存储属性

ALTER TABLE stocks
CLUSTERED BY (exchange, symbol)
SORTED BY (symbol)
INTO 48 BUCKETS;

(13)复杂更改表语句:为各种不同的操作添加 hook ALTER TABLE … TOUCH

ALTER TABLE log_messages TOUCH
PARTITION(year = 2012, month = 1, day = 1);

典型的应用场景就是当分区有改动的时候,那么将触发

hive -e 'ALTER TABLE log_messages TOUCH PARTITION(year = 2012, month = 1, day = 1);'

(14)ALTER TABLE … ARCHIVE PARTITION 捕获分区文件到Hadoop archive file也就是HAR

ALTER TABLE log_messages ARCHIVE
PARTITION(year = 2012, month = 1, day = 1);(只可以用在被分区的表)

(15)保护分区不被删除和查询

ALTER TABLE log_messages
PARTITION(year = 2012, month = 1, day = 1) ENABLE NO_DROP;

ALTER TABLE log_messages
PARTITION(year = 2012, month = 1, day = 1) ENABLE OFFLINE;

Note:与ENABLE对应的是DISABLE,不能应用在未被分区的表

(16)按正条件(正则表达式)显示表

hive> SHOW TABLES '.*s';

(17)外部表、内部表互转

alter table tablePartition set TBLPROPERTIES ('EXTERNAL'='TRUE');  //内部表转外部表 
alter table tablePartition set TBLPROPERTIES ('EXTERNAL'='FALSE'); //外部表转内部表

(18)分区与分桶:

partition(分区:按目录保存文件,每个partition对应一个目录)例如:

CREATE EXTERNAL TABLE table1 (
column1 STRING,
column2 STRING,
column3 STRING,
)
PARTITIONED BY (dt STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
STORED AS TEXTFILE;
ALTER TABLE table1 ADD IF NOT EXISTS PARTITION (dt=20090105);
ALTER TABLE table1 ADD IF NOT EXISTS PARTITION (dt=20090102);
ALTER TABLE table1 ADD IF NOT EXISTS PARTITION (dt=20081231);

bucket(分桶,对指定列作hash,每个bucket对应一个文件)

CREATE TABLE VT_NEW_DATA
(
column1 STRING,
column2 STRING,
column3 STRING,
)
CLUSTERED BY (column1)
SORTED BY (column1)
INTO 48 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
LINES TERMINATED BY '\n'
STORED AS RCFILE;

3、列操作

(1)重命名列,更改位置,类型和注释

ALTER TABLE log_messages 
CHANGE COLUMN hms hours_minutes_seconds INT
COMMENT 'The hours, minutes, and seconds part of the timestamp'
AFTER severity;

更改名称: new column old column type

comment不是必须的,你可以添加注释

AFTER用于更改字段位置

仅修改了元数据并未对源data做任何改动

(2)添加新列

ALTER TABLE log_messages ADD COLUMNS ( 
app_name STRING COMMENT 'Application name',
session_id LONG COMMENT 'The current session id');

(3)删除和替换列:慎用!!!

ALTER TABLE table_name ADD|REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...)

ADD是代表新增一字段,字段位置在所有列后面(partition列前)

REPLACE COLUMNS removes all existing columns and adds the new set of columns.  
REPLACE COLUMNS can also be used to drop columns. For example: "ALTER TABLE test_change REPLACE COLUMNS (a int, b int);" will remove column `c' from test_change's schema. Note that this does not delete underlying data, it just changes the schema.

(4)REGEX Column Specification

SELECT 语句可以使用正则表达式做列选择,下面的语句查询除了 ds 和 hr 之外的所有列:

SELECT `(ds|hr)?+.+` FROM test

4、查看变量

hive> set; 

hive> set-v;
… even more output!…

set’输出 hivevar,hiveconf,system 和 env 命名空间下的所有变量。

‘set -v’包括了输出Hadoop定义的全部变量。

hive> set hivevar:foo=hello; 
hive> set hivevar:foo;
hivevar:foo=hello

使用变量:

hive> create table toss1(i int, ${hivevar:foo} string);

5、一个完整的建库、表例子

-- 创建数据库
create database ecdata WITH DBPROPERTIES ('creator' = 'June', 'date' = '2014-06-01');
-- 或者使用 COMMENT 关键字
-- 查看数据库描述
DESCRIBE DATABASE ecdata;
DESCRIBE DATABASE EXTENDED ecdata;
-- 切库
use ecdata;

-- 删除表
drop table ext_trackflow;

-- 创建表
create EXTERNAL table IF NOT EXISTS ext_trackflow (
cookieId string COMMENT '05dvOVC6Il6INhYABV6LAg==',
cate1 string COMMENT '4',
area1 string COMMENT '102',
url string COMMENT 'http://cd.ooxx.com/jinshan-mingzhan-1020',
trackTime string COMMENT '2014-05-25 23:03:36',
trackURLMap map<string,String> COMMENT '{"area":"102","cate":"4,29,14052"}',
)
PARTITIONED BY (statDate STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/DataWarehouse/ods/TrackFlowTable'
;

--添加分区语句
ALTER TABLE ext_trackflow ADD PARTITION (statDate='20140525')
LOCATION '/DataWarehouse/ods/TrackFlowTable/20140525';


--每天建立分区
yesterday=`date -d '1 days ago' +'%Y%m%d'`
hive -e "use ecdata; ALTER TABLE ext_trackflow ADD PARTITION (statDate='$yesterday') LOCATION '/DataWarehouse/ods/TrackFlowTable/$yesterday';"
   
次浏览       
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训
最新活动计划
LLM大模型应用与项目构建 12-26[特惠]
QT应用开发 11-21[线上]
C++高级编程 11-27[北京]
业务建模&领域驱动设计 11-15[北京]
用户研究与用户建模 11-21[北京]
SysML和EA进行系统设计建模 11-28[北京]

MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   


并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理


GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...