在Spark 3.0中,AQE框架具有以下三个功能:
Dynamically coalescing shuffle partitions (动态整合shuffle partitions 数量)
Dynamically switching join strategies ( 动态切换连接策略 )
Dynamically optimizing skew joins ( 动态优化倾斜连接 )
以下各节将详细讨论这三个功能。
Dynamically coalescing shuffle partitions
在Spark中运行查询来处理非常大的数据时,shuffle通常会对查询性能产生非常重要的影响。Shuffle是一种昂贵的操作,因为它需要在网络中移动数据,以便以下游操作所需的方式重新分发数据。
spark.sql.shuffle.partitions数量很难设置 ,因为每个查询的阶段之间的数据大小可能差异很大:
如果分区太少,那么每个分区的数据量可能会很大,处理这些大分区的任务可能需要将数据溢出到磁盘上(例如,涉及排序或聚合时),结果,放慢查询速度。
如果分区太多,那么每个分区的数据量可能会非常小,会有很多小的网络数据读取来读取shuffle blocks,这也会因为I/O模式效率低下而减慢查询速度。拥有大量任务也会给Spark任务调度器增加更多负担。
为了解决这个问题,我们可以在开始时设置相对较大的shuffle分区,然后在运行时通过查看shuffle文件统计信息将相邻的小分区合并为更大的分区。
例如,假设我们正在运行查询SELECT max(i)FROM tbl GROUP BY j。 输入数据tbl很小,因此在分组之前只有两个分区。 初始重排分区号设置为5,因此在进行本地分组后,将部分分组的数据重排为5个分区。 如果没有AQE,Spark将启动五个任务以进行最终聚合。 但是,这里有三个非常小的分区,为每个分区启动单独的任务很浪费。
相反,AQE将这三个小分区合并为一个,因此,最终的聚合现在只需要执行三个任务,而不是五个。
Dynamically switching join strategies
Spark支持多种连接策略,如果连接的一端能够很好地适应内存,那么广播散列连接通常是性能最好的连接。由于这个原因,如果连接关系的估计大小低于广播大小阈值,Spark计划一个广播散列连接。但是有很多事情会使这种大小估计出错——比如存在一个非常有选择性的过滤器——或者连接关系是一系列复杂的操作,而不仅仅是一次扫描。
为了解决这个问题,AQE现在在运行时根据最精确的连接关系大小重新规划连接策略。在下面的示例中可以看到,连接的右侧比估计的要小得多,而且小到可以进行广播,因此在AQE重新优化后,静态规划的排序合并连接现在被转换为广播散列连接。
对于在运行时转换的广播散列连接,我们可以进一步将常规洗牌优化为本地化的洗牌(即以每个映射器为基础而不是每个减速器为基础进行读取的洗牌(shuffle),以减少网络流量。
Dynamically optimizing skew joins
当数据在集群的各个分区之间分布不均匀时,就会出现数据倾斜。严重的倾斜会显著降低查询性能,特别是使用连接时。AQE倾斜连接优化可以自动从转移文件统计中检测这种倾斜。然后它将倾斜的分区分割成更小的子分区,这些子分区将分别从另一侧连接到相应的分区。
让我们以表A连接表B为例,其中表A的分区A0比它的其他分区大得多。
因此,倾斜连接优化将把分区A0分割成两个子分区,并将每个子分区连接到表B对应的分区B0。
如果没有这种优化,将有四个任务运行sort merge join,而其中一个任务花费的时间要长得多。在此优化之后,将有5个任务运行join,但每个任务将花费大致相同的时间,从而获得更好的总体性能。
使用
我们可以设置参数 spark.sql.adaptive.enabled 为true来开启AQE,在Spark 3.0中默认是false,并满足以下条件:
1, 非流式查询
2, 包含至少一个exchange(如join、聚合、窗口算子)或者一个子查询
AQE通过减少了对静态统计数据的依赖,成功解决了Spark CBO的一个难以处理的trade off(生成统计数据的开销和查询耗时)以及数据精度问题。相比之前具有局限性的CBO,现在就显得非常灵活 – 我们再也不需要提前去分析数据了!