Hive优化器原理与源码解析—统计信息Parallelism并行度计算

   日期:2024-12-26    作者:cwi4k 移动:http://oml01z.riyuangf.com/mobile/quote/47018.html

目录

背景

Parallelism并行度

总结

背景

Parallelism是有关RelNode关系表达式的并行度以及如何将其Opeartor运算符分配给具有独立资源池的进程的元数据。同一个Operator操作符,并行执行和串性执行相比,在成本优化器CBO看来,并行执行的成本更低。

从并行性的概念来来讲,就是将大任务划分为较小的任务,其中每个小任务被分配分配给特定处理器,以完成部分主要任务。最后,从每个小任务中获得的部分结果将合并为一个最终结果。与串行执行的一个大任务相比,并行执行多个任务可以获得性能大幅度提升!

在Hive中,Parallelism并行度计算,除了参数指定,CPU cores硬件限制,Operator算法是否可以并行执行等因素的影响,主要与如TableScan、Sort、Join等等Operator的数据大小的拆分个数splitCount计算有关。

Parallelism并行度

讲述并行度之前先熟悉执行计划中Stage划分、Phase阶段定义和PhaseTransition过渡阶段判断的定义。

一个Phase从一个叶子节点如TableScan或phase变换节点如Exchange开始,如Hadoop的shuffle操作符跨网络发送数据是一种Sort-Exchange的形式。

在优化HiveQL时,都会查看执行计划,这些信息含有开头Stage依赖信息说明,操作符树,统计信息记录数、数据大小等,如图

那么这些Stage大致分为几类:

MAP/REDUCE STAGE里还有TabelScan、Sort、Filter、Project、Aggreate、Join等等各种Oprerator操作符树构成。

METASTORE STAGE 元存储,统计信息收集操作,如上图

Stage: Stage-2 Stats-Aggr Operator

统计信息的收集设置相关参数,在参数为true的前提下,并在执行DML语句才会收集。强调的是,

hive.stats.autogather

This flag enables gathering and updating statistics automatically during Hive DML operations.

Statistics are not gathered for statements.

判读Operator操作符的输入RelNode和自己是否跨进程,即父Operator与子Operator是否在一个相同的进程里。

1)HiveJoin是否为PhaseTransition的判断

是依据Join Operator的具体实现来判断的,不能的Join 算法会返回不同结果。

HiveDefaultCostModel的Join的isPhaseTransition默认是false。

HiveTezCostModel分为四种Join算法,每种算法都有isPhaseTransition判断方法,isPhaseTransition返回值如下

Common Join:true

Map Join:false

Bucket Map Join:false

Sort Merge Bucket Join:false

2)Sort Limit是否为PhaseTransition的判断

3)TableScan、Values、Exchange等RelNode的PhaseTransition的判断,默认值True。

返回数据非重复拆分数,注意splits必须是非重复的,如广播broadcast方式,其每个拷贝都是相同的,所有splitCount为1。因此,split count拆分数与由每个Operator实例发送的数据成倍数关系。

Parallelism并行处理就是对Split数据进行并行处理,在不考虑硬件CPU core和参数限制等因素影响的情况下,Split拆分数就是并行任务的个数。

1)Join的SplitCount拆分个数计算

是依据Join Operator的具体实现来判断的,不能的Join 算法会返回split count。

HiveDefaultCostModel的Join的split count为1。

HiveTezCostModel分为四种Join算法Common Join、Map Join、Bucket Map Join和Sort Merge Bucket的split count计算逻辑相同:

都用HiveAlgorithmsUtil.getSplitCountWithoutRepartition(join)方法实现的,

splitCount = (总行数*平均记录大小) / maxSplitSize,其中maxSplitSize是HiveAlgorithmsConf算法配置项初始化的每个split大小的最大值。

2)TableScan的SplitCount拆分个数计算

Hive中实现的StorageDescriptor存储类中方法,判断分桶个数,如果bucketCols分桶集合为null,则为0,否则分桶个数和分桶列集合

如果分桶列列表bucketCols不为null,使用getNumBuckets()获取分桶数作为splitCount拆分数。否则使用splitCountRepartition方法通过元数据统计信息计算出splitCount拆分数(splitCount为null,则抛出异常)。splitCountRepartition的计算逻辑在下文有讲解。

3)RelNode的SplitCount拆分个数计算

首先判断此RelNode的是否为过渡阶段Phase,如果是过渡阶段Phase,则使用splitCountRepartition方法访问元数据统计信息计算拆分数(此方法在下面有介绍)。

其次,如果不是过渡阶段Phase,则遍历此RelNode的所有输入RelNode,通过RelMetadataQuery对象获取元数据统计信息splitCount并进行累加。

总SplitCount = splitCount1 + splitCount2 + splitCount3...

根据RelMetadataQuery对象获取指定RelNode的统计信息。记录数RowCount、平均记录大小等统计信息。

计算逻辑如下:

其中maxSplitSize是HiveRelMDParallelism的属性生成对象时需初始化的每个split大小的最大值。

总结


特别提示:本信息由相关用户自行提供,真实性未证实,仅供参考。请谨慎采用,风险自负。


举报收藏 0评论 0
0相关评论
相关最新动态
推荐最新动态
点击排行
{
网站首页  |  关于我们  |  联系方式  |  使用协议  |  隐私政策  |  版权隐私  |  网站地图  |  排名推广  |  广告服务  |  积分换礼  |  网站留言  |  RSS订阅  |  违规举报  |  鄂ICP备2020018471号