大象教程
首页
Spark
Hadoop
HDFS
MapReduce
Hive
Hive 教程
Hive 教程
Hive 安装(基于Ubuntu系统)
Hive 架构
Hive 内置函数
Hive UDF 简介
Hive DDL 命令
Hive 视图
Hive 索引
Hive Metastore 的三种配置方式
Hive 数据模型
Hive 数据类型
Hive 操作符
Hive SerDe(序列化与反序列化)
Hive 数据分区
Hive 分桶
Hive 分区与分桶的比较
Hive Join 的原理与机制
Hive map Join
Hive bucket map join
#Hive map Join Apache hive 中的 map join 就是之前提到的 Map 端的 join 或者叫广播 join。还有一种 join 是 common join 或者 sort merge join。common join 和 sort merge join 在执行的时候消耗的资源比较多,运行较慢。为了提高 join 的运行效率,我们可能需要用到 hive 中的 map join。使用 map join 的前提是两个表做关联时需要有一个表是可以加载到内存的小表。这样 join 可以在一个 mapper 里面完成,而不需要 reduce。 ![hive map join 原理](/media/editor/file_1574952851000_20191128225413910000.png "hive map join 原理") 尽管查询经常依赖于小表连接,但是 map join 的使用加快了查询的执行速度。 在 MapReduce 任务中,第一步就是创建一个 MapReduce 本地任务,然后该 map / reduce 任务从 HDFS 读取小表的数据,然后把数据保存到内存中的哈希表,然后再存储到一个哈希表文件。接着,当 MapReduce join 任务启动的时候,它会把哈希表文件移到 Hadoop 分布式内存中,并把哈希表文件存储到每个 mapper 的本地磁盘上。所有的 mapper 都能把这个哈希表文件加载到内存,然后在 map 阶段做 join 操作。 用一个例子来理解一下上面的过程,假如有两个表,一个大表 A 和一个小表 B。对于每个大表 A 的 mapper 来说,table B 都能完全读取。因为小表已经被加载到内存了。然后,join 会在 MapReduce 作业的 map 阶段被执行,而不需要 reducer 任务,即 reducer 任务会被跳过不执行。因此,hive 中的 map join 比普通 join 要高效。 ##Map join 相关参数 下面详细讨论一下 Map join 涉及的相关配置参数。 ###hive.auto.convert.join 该参数表示是否自动把任务转为 map join。默认该配置为 true。当一个表的大小小于 25 MB(配置参数:hive.mapjoin.smalltable.filesize)时,表之间的 join 会被自动转成 map join。 ###hive.auto.convert.join.noconditionaltask 是否将多个 map join 合并为一个。默认为 true。多个 mapjoin 转换成 1 个时,所有小表的文件大小总和的最大值由 `hive.auto.convert.join.noconditionaltask.size` 配置项控制。 例如,一个大表顺序关联3个小表 a(10M),b(8M),c(12M),如果`hive.auto.convert.join.noconditionaltask.size` 的值: 1. 小于18M,则无法合并 mapjoin,必须执行3个 mapjoin。 2. 大于18M小于30M,则可以合并 a 和 b 表的 mapjoin,所以只需要执行2个 mapjoin。 3. 大于30M,则可以将3个 mapjoin 都合并为1个。 为什么要合并 mapjoin 呢?因为每个 mapjoin 都要执行一次 map,需要读写一次数据,所以多个 mapjoin 就要做多次的数据读写,合并 mapjoin 后只用读写一次,这样就能大大加快速度。但是执行 map 是内存大小是有限制的,在一次 map 里对多个小表做 mapjoin 就必须把多个小表都加入内存,为了防止内存溢出,所以加了 `hive.auto.convert.join.noconditionaltask.size` 参数来做限制。不过,这个值只是限制输入的表文件的大小,并不代表实际 mapjoin 时 hashtable 的大小。 ###hive.mapjoin.localtask.max.memory.usage 将小表转成 hashtable 的本地任务的最大内存使用率,默认0.9。 ###hive.mapjoin.followby.gby.localtask.max.memory.usage 如果 mapjoin 后面紧跟着一个 group by 任务,这种情况下本地任务的最大内存使用率,默认是0.55。 ###hive.mapjoin.check.memory.rows localtask 每处理完多少行,就执行内存检查。默认为 100000。 ##Mapjoin 的局限性 下面是 mapjoin 的一些局限: - 不能把全连接(full outer join)转换成 mapjoin。 - 可以把左连接转换成 mapjoin,但是右边的表的大小必须小于 25MB。 - 可以把右连接转换成 mapjoin,但是左边的表的大小必须小于 25MB。 ##如何知道 Hive 使用了 mapjoin 可以使用 `explain` 命令查看 HiveQL 代码的执行计划,在 Map 操作树我们可以看到 `Map Side Join Operator` 关键字,就说明 join 是 mapjoin。 ```sql hive> explain select a.* from passwords a,passwords3 b where a.col0=b.col0; OK STAGE DEPENDENCIES: Stage-4 is a root stage Stage-3 depends on stages: Stage-4 Stage-0 is a root stage STAGE PLANS: Stage: Stage-4 Map Reduce Local Work Alias -> Map Local Tables: b Fetch Operator limit: -1 Alias -> Map Local Operator Tree: b TableScan alias: b Statistics: Num rows: 1 Data size: 31 Basic stats: COMPLETE Column stats: NONE HashTable Sink Operator condition expressions: 0 {col0} {col1} {col2} {col3} {col4} {col5} {col6} 1 {col0} keys: 0 col0 (type: string) 1 col0 (type: string) Stage: Stage-3 Map Reduce Map Operator Tree: TableScan alias: a Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE Map Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {col0} {col1} {col2} {col3} {col4} {col5} {col6} 1 {col0} keys: 0 col0 (type: string) 1 col0 (type: string) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col9 Statistics: Num rows: 10960295 Data size: 524940416 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (_col0 = _col9) (type: boolean) Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6 Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 5480147 Data size: 262470184 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Local Work: Map Reduce Local Work Stage: Stage-0 Fetch Operator limit: -1 ``` ##使用 MAPJOIN 提示指定小表 可以使用 `/*+ MAPJOIN(小表表名)*/` 提示让 Hive 执行引擎知道哪个表作为小表。代码如下: ```sql hive> set hive.auto.convert.join=true; hive> set hive.auto.convert.join.noconditionaltask=true; hive> set hive.auto.convert.join.noconditionaltask.size=20971520 hive> set hive.auto.convert.join.use.nonstaged=true; hive> set hive.mapjoin.smalltable.filesize = 30000000; hive> set hive.ignore.mapjoin.hint=false; hive> Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key ``` 上面代码指定 b 表作为小表。需要注意的是,使用 mapjoin 提示需要把 `hive.ignore.mapjoin.hint` 参数设置为 false,表示不要忽略 HQL 代码中的提示,该配置默认为 true。
加我微信交流吧