目录
背景
Parallelism并行度
总结
背景
Parallelism是有关RelNode关系表达式的并行度以及如何将其Opeartor运算符分配给具有独立资源池的进程的元数据。同一个Operator操作符,并行执行和串性执行相比,在成本优化器CBO看来,并行执行的成本更低。
从并行性的概念来来讲,就是将大任务划分为较小的任务,其中每个小任务被分配分配给特定处理器,以完成部分主要任务。最后,从每个小任务中获得的部分结果将合并为一个最终结果。与串行执行的一个大任务相比,并行执行多个任务可以获得性能大幅度提升!
在Hive中,Parallelism并行度计算,除了参数指定,CPU cores硬件限制,Operator算法是否可以并行执行等因素的影响,主要与如TableScan、Sort、Join等等Operator的数据大小的拆分个数splitCount计算有关。
Parallelism并行度
讲述并行度之前先熟悉执行计划中Stage划分、Phase阶段定义和PhaseTransition过渡阶段判断的定义。
一个Phase从一个叶子节点如TableScan或phase变换节点如Exchange开始,如Hadoop的shuffle操作符跨网络发送数据是一种Sort-Exchange的形式。
在优化HiveQL时,都会查看执行计划,这些信息含有开头Stage依赖信息说明,操作符树,统计信息记录数、数据大小等,如图
那么这些Stage大致分为几类:
MAP/REDUCE STAGE里还有TabelScan、Sort、Filter、Project、Aggreate、Join等等各种Oprerator操作符树构成。
METASTORE STAGE 元存储,统计信息收集操作,如上图
Stage: Stage-2 Stats-Aggr Operator
统计信息的收集设置相关参数,在参数为true的前提下,并在执行DML语句才会收集。强调的是,
hive.stats.autogather
This flag enables gathering and updating statistics automatically during Hive DML operations.
Statistics are not gathered for statements.
判读Operator操作符的输入RelNode和自己是否跨进程,即父Operator与子Operator是否在一个相同的进程里。
1)HiveJoin是否为PhaseTransition的判断
是依据Join Operator的具体实现来判断的,不能的Join 算法会返回不同结果。
HiveDefaultCostModel的Join的isPhaseTransition默认是false。
HiveTezCostModel分为四种Join算法,每种算法都有isPhaseTransition判断方法,isPhaseTransition返回值如下
Common Join:true
Map Join:false
Bucket Map Join:false
Sort Merge Bucket Join:false
2)Sort Limit是否为PhaseTransition的判断
3)TableScan、Values、Exchange等RelNode的PhaseTransition的判断,默认值True。
返回数据非重复拆分数,注意splits必须是非重复的,如广播broadcast方式,其每个拷贝都是相同的,所有splitCount为1。因此,split count拆分数与由每个Operator实例发送的数据成倍数关系。
Parallelism并行处理就是对Split数据进行并行处理,在不考虑硬件CPU core和参数限制等因素影响的情况下,Split拆分数就是并行任务的个数。
1)Join的SplitCount拆分个数计算
是依据Join Operator的具体实现来判断的,不能的Join 算法会返回split count。
HiveDefaultCostModel的Join的split count为1。
HiveTezCostModel分为四种Join算法Common Join、Map Join、Bucket Map Join和Sort Merge Bucket的split count计算逻辑相同:
都用HiveAlgorithmsUtil.getSplitCountWithoutRepartition(join)方法实现的,
splitCount = (总行数*平均记录大小) / maxSplitSize,其中maxSplitSize是HiveAlgorithmsConf算法配置项初始化的每个split大小的最大值。
2)TableScan的SplitCount拆分个数计算
Hive中实现的StorageDescriptor存储类中方法,判断分桶个数,如果bucketCols分桶集合为null,则为0,否则分桶个数和分桶列集合
如果分桶列列表bucketCols不为null,使用getNumBuckets()获取分桶数作为splitCount拆分数。否则使用splitCountRepartition方法通过元数据统计信息计算出splitCount拆分数(splitCount为null,则抛出异常)。splitCountRepartition的计算逻辑在下文有讲解。
3)RelNode的SplitCount拆分个数计算
首先判断此RelNode的是否为过渡阶段Phase,如果是过渡阶段Phase,则使用splitCountRepartition方法访问元数据统计信息计算拆分数(此方法在下面有介绍)。
其次,如果不是过渡阶段Phase,则遍历此RelNode的所有输入RelNode,通过RelMetadataQuery对象获取元数据统计信息splitCount并进行累加。
总SplitCount = splitCount1 + splitCount2 + splitCount3...
根据RelMetadataQuery对象获取指定RelNode的统计信息。记录数RowCount、平均记录大小等统计信息。
计算逻辑如下:
其中maxSplitSize是HiveRelMDParallelism的属性生成对象时需初始化的每个split大小的最大值。
总结