热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SparkSQL/Catalyst内部原理与RBO

本文结合案例详述了SparkSQL的工作原理,包括但不限于Parser,Analyzer,Optimizer,Rule-basedoptimizat

Spark SQL / Catalyst 内部原理 与 RBO

原创文章,转载请务必将下面这段话置于文章开头处。
本文转发自技术世界,原文链接 http://www.jasongj.com/spark/rbo/

本文所述内容均基于 2018年9月10日 Spark 最新 Release 2.3.1 版本。后续将持续更新

Spark SQL 架构

Spark SQL 的整体架构如下图所示
Spark SQL Catalyst

从上图可见,无论是直接使用 SQL 语句还是使用 DataFrame,都会经过如下步骤转换成 DAG 对 RDD 的操作

  • Parser 解析 SQL,生成 Unresolved Logical Plan
  • 由 Analyzer 结合 Catalog 信息生成 Resolved Logical Plan
  • Optimizer根据预先定义好的规则对 Resolved Logical Plan 进行优化并生成 Optimized Logical Plan
  • Query Planner 将 Optimized Logical Plan 转换成多个 Physical Plan
  • CBO 根据 Cost Model 算出每个 Physical Plan 的代价并选取代价最小的 Physical Plan 作为最终的 Physical Plan
  • Spark 以 DAG 的方法执行上述 Physical Plan
  • 在执行 DAG 的过程中,Adaptive Execution 根据运行时信息动态调整执行计划从而提高执行效率

Parser

Spark SQL 使用 Antlr 进行记法和语法解析,并生成 UnresolvedPlan。

当用户使用 SparkSession.sql(sqlText : String) 提交 SQL 时,SparkSession 最终会调用 SparkSqlParser 的 parsePlan 方法。该方法分两步

  • 使用 Antlr 生成的 SqlBaseLexer 对 SQL 进行词法分析,生成 CommonTokenStream
  • 使用 Antlr 生成的 SqlBaseParser 进行语法分析,得到 LogicalPlan

现在两张表,分别定义如下

CREATE TABLE score (
  id INT,
  math_score INT,
  english_score INT
)
CREATE TABLE people (
  id INT,
  age INT,
  name INT
)

对其进行关联查询如下

SELECT sum(v)
FROM (
  SELECT score.id,
    100 + 80 + score.math_score + score.english_score AS v
  FROM people
  JOIN score
  ON people.id = score.id
  AND people.age > 10
) tmp

生成的 UnresolvedPlan 如下图所示。

Spark SQL Parser

从上图可见

  • 查询涉及的两张表,被解析成了两个 UnresolvedRelation,也即只知道这们是两张表,却并不知道它们是 EXTERNAL TABLE 还是 MANAGED TABLE,也不知道它们的数据存在哪儿,更不知道它们的表结构如何
  • sum(v) 的结果未命名
  • Project 部分只知道是选择出了属性,却并不知道这些属性属于哪张表,更不知道其数据类型
  • Filter 部分也不知道数据类型

Spark SQL 解析出的 UnresolvedPlan 如下所示

== Parsed Logical Plan ==
'Project [unresolvedalias('sum('v), None)]
+- 'SubqueryAlias tmp
   +- 'Project ['score.id, (((100 + 80) + 'score.math_score) + 'score.english_score) AS v#493]
      +- 'Filter (('people.id = 'score.id) && ('people.age > 10))
         +- 'Join Inner
            :- 'UnresolvedRelation `people`
            +- 'UnresolvedRelation `score`

Analyzer

从 Analyzer 的构造方法可见

  • Analyzer 持有一个 SessionCatalog 对象的引用
  • Analyzer 继承自 RuleExecutor[LogicalPlan],因此可对 LogicalPlan 进行转换
class Analyzer(
    catalog: SessionCatalog,
    conf: SQLConf,
    maxIterations: Int)
  extends RuleExecutor[LogicalPlan] with CheckAnalysis {

Analyzer 包含了如下的转换规则

lazy val batches: Seq[Batch] = Seq(
    Batch("Hints", fixedPoint,
      new ResolveHints.ResolveBroadcastHints(conf),
      ResolveHints.RemoveAllHints),
    Batch("Simple Sanity Check", Once,
      LookupFunctions),
    Batch("Substitution", fixedPoint,
      CTESubstitution,
      WindowsSubstitution,
      EliminateUnions,
      new SubstituteUnresolvedOrdinals(conf)),
    Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions ::
      ResolveRelations ::
      ResolveReferences ::
      ResolveCreateNamedStruct ::
      ResolveDeserializer ::
      ResolveNewInstance ::
      ResolveUpCast ::
      ResolveGroupingAnalytics ::
      ResolvePivot ::
      ResolveOrdinalInOrderByAndGroupBy ::
      ResolveAggAliasInGroupBy ::
      ResolveMissingReferences ::
      ExtractGenerator ::
      ResolveGenerate ::
      ResolveFunctions ::
      ResolveAliases ::
      ResolveSubquery ::
      ResolveSubqueryColumnAliases ::
      ResolveWindowOrder ::
      ResolveWindowFrame ::
      ResolveNaturalAndUsingJoin ::
      ExtractWindowExpressions ::
      GlobalAggregates ::
      ResolveAggregateFunctions ::
      TimeWindowing ::
      ResolveInlineTables(conf) ::
      ResolveTimeZone(conf) ::
      ResolvedUuidExpressions ::
      TypeCoercion.typeCoercionRules(conf) ++
      extendedResolutionRules : _*),
    Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
    Batch("View", Once,
      AliasViewChild(conf)),
    Batch("Nondeterministic", Once,
      PullOutNondeterministic),
    Batch("UDF", Once,
      HandleNullInputsForUDF),
    Batch("FixNullability", Once,
      FixNullability),
    Batch("Subquery", Once,
      UpdateOuterReferences),
    Batch("Cleanup", fixedPoint,
      CleanupAliases)
  )

例如, ResolveRelations 用于分析查询用到的 Table 或 View。本例中 UnresolvedRelation (people) 与 UnresolvedRelation (score) 被解析为 HiveTableRelation (json.people) 与 HiveTableRelation (json.score),并列出其各自包含的字段名。

经 Analyzer 分析后得到的 Resolved Logical Plan 如下所示

== Analyzed Logical Plan ==
sum(v): bigint
Aggregate [sum(cast(v#493 as bigint)) AS sum(v)#504L]
+- SubqueryAlias tmp
   +- Project [id#500, (((100 + 80) + math_score#501) + english_score#502) AS v#493]
      +- Filter ((id#496 = id#500) && (age#497 > 10))
         +- Join Inner
            :- SubqueryAlias people
            :  +- HiveTableRelation `jason`.`people`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#496, age#497, name#498]
            +- SubqueryAlias score
               +- HiveTableRelation `jason`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#500, math_score#501, english_score#502]

Analyzer 分析前后的 LogicalPlan 对比如下

Spark SQL Analyzer

由上图可见,分析后,每张表对应的字段集,字段类型,数据存储位置都已确定。Project 与 Filter 操作的字段类型以及在表中的位置也已确定。

有了这些信息,已经可以直接将该 LogicalPlan 转换为 Physical Plan 进行执行。

但是由于不同用户提交的 SQL 质量不同,直接执行会造成不同用户提交的语义相同的不同 SQL 执行效率差距甚远。换句话说,如果要保证较高的执行效率,用户需要做大量的 SQL 优化,使用体验大大降低。

为了尽可能保证无论用户是否熟悉 SQL 优化,提交的 SQL 质量如何, Spark SQL 都能以较高效率执行,还需在执行前进行 LogicalPlan 优化。

Optimizer

Spark SQL 目前的优化主要是基于规则的优化,即 RBO (Rule-based optimization)

  • 每个优化以 Rule 的形式存在,每条 Rule 都是对 Analyzed Plan 的等价转换
  • RBO 设计良好,易于扩展,新的规则可以非常方便地嵌入进 Optimizer
  • RBO 目前已经足够好,但仍然需要更多规则来 cover 更多的场景
  • 优化思路主要是减少参与计算的数据量以及计算本身的代价

PushdownPredicate
PushdownPredicate 是最常见的用于减少参与计算的数据量的方法。

前文中直接对两表进行 Join 操作,然后再 进行 Filter 操作。引入 PushdownPredicate 后,可先对两表进行 Filter 再进行 Join,如下图所示。

Spark SQL RBO Predicate Pushdown

当 Filter 可过滤掉大部分数据时,参与 Join 的数据量大大减少,从而使得 Join 操作速度大大提高。

这里需要说明的是,此处的优化是 LogicalPlan 的优化,从逻辑上保证了将 Filter 下推后由于参与 Join 的数据量变少而提高了性能。另一方面,在物理层面,Filter 下推后,对于支持 Filter 下推的 Storage,并不需要将表的全量数据扫描出来再过滤,而是直接只扫描符合 Filter 条件的数据,从而在物理层面极大减少了扫描表的开销,提高了执行速度。

ConstantFolding
本文的 SQL 查询中,Project 部分包含了 100 + 800 + match_score + english_score 。如果不进行优化,那如果有一亿条记录,就会计算一亿次 100 + 80,非常浪费资源。因此可通过 ConstantFolding 将这些常量合并,从而减少不必要的计算,提高执行速度。

Spark SQL RBO Constant Folding

ColumnPruning
在上图中,Filter 与 Join 操作会保留两边所有字段,然后在 Project 操作中筛选出需要的特定列。如果能将 Project 下推,在扫描表时就只筛选出满足后续操作的最小字段集,则能大大减少 Filter 与 Project 操作的中间结果集数据量,从而极大提高执行速度。

Spark SQL RBO Column Pruning

这里需要说明的是,此处的优化是逻辑上的优化。在物理上,Project 下推后,对于列式存储,如 Parquet 和 ORC,可在扫描表时就只扫描需要的列而跳过不需要的列,进一步减少了扫描开销,提高了执行速度。

经过如上优化后的 LogicalPlan 如下

== Optimized Logical Plan ==
Aggregate [sum(cast(v#493 as bigint)) AS sum(v)#504L]
+- Project [((180 + math_score#501) + english_score#502) AS v#493]
   +- Join Inner, (id#496 = id#500)
      :- Project [id#496]
      :  +- Filter ((isnotnull(age#497) && (age#497 > 10)) && isnotnull(id#496))
      :     +- HiveTableRelation `jason`.`people`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#496, age#497, name#498]
      +- Filter isnotnull(id#500)
         +- HiveTableRelation `jason`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#500, math_score#501, english_score#502]

SparkPlanner

得到优化后的 LogicalPlan 后,SparkPlanner 将其转化为 SparkPlan 即物理计划。

本例中由于 score 表数据量较小,Spark 使用了 BroadcastJoin。因此 score 表经过 Filter 后直接使用 BroadcastExchangeExec 将数据广播出去,然后结合广播数据对 people 表使用 BroadcastHashJoinExec 进行 Join。再经过 Project 后使用 HashAggregateExec 进行分组聚合。

Spark SQL RBO Column Pruning

至此,一条 SQL 从提交到解析、分析、优化以及执行的完整过程就介绍完毕。

本文介绍的 Optimizer 属于 RBO,实现简单有效。它属于 LogicalPlan 的优化,所有优化均基于 LogicalPlan 本身的特点,未考虑数据本身的特点,也未考虑算子本身的代价。下文将介绍 CBO,它充分考虑了数据本身的特点(如大小、分布)以及操作算子的特点(中间结果集的分布及大小)及代价,从而更好的选择执行代价最小的物理执行计划,即 SparkPlan。

posted on 2018-09-11 07:52 郭俊Jason 阅读(...) 评论(...) 编辑 收藏


推荐阅读
  • PHP 5.5.31 和 PHP 5.6.17 安全更新发布
    PHP 5.5.31 和 PHP 5.6.17 已正式发布,主要包含多个安全修复。强烈建议所有用户尽快升级至最新版本以确保系统安全。 ... [详细]
  • oracle c3p0 dword 60,web_day10 dbcp c3p0 dbutils
    createdatabasemydbcharactersetutf8;alertdatabasemydbcharactersetutf8;1.自定义连接池为了不去经常创建连接和释放 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 开机自启动的几种方式
    0x01快速自启动目录快速启动目录自启动方式源于Windows中的一个目录,这个目录一般叫启动或者Startup。位于该目录下的PE文件会在开机后进行自启动 ... [详细]
  • MySQL Decimal 类型的最大值解析及其在数据处理中的应用艺术
    在关系型数据库中,表的设计与SQL语句的编写对性能的影响至关重要,甚至可占到90%以上。本文将重点探讨MySQL中Decimal类型的最大值及其在数据处理中的应用技巧,通过实例分析和优化建议,帮助读者深入理解并掌握这一重要知识点。 ... [详细]
  • DVWA学习笔记系列:深入理解CSRF攻击机制
    DVWA学习笔记系列:深入理解CSRF攻击机制 ... [详细]
  • 深入探索HTTP协议的学习与实践
    在初次访问某个网站时,由于本地没有缓存,服务器会返回一个200状态码的响应,并在响应头中设置Etag和Last-Modified等缓存控制字段。这些字段用于后续请求时验证资源是否已更新,从而提高页面加载速度和减少带宽消耗。本文将深入探讨HTTP缓存机制及其在实际应用中的优化策略,帮助读者更好地理解和运用HTTP协议。 ... [详细]
  • 零拷贝技术是提高I/O性能的重要手段,常用于Java NIO、Netty、Kafka等框架中。本文将详细解析零拷贝技术的原理及其应用。 ... [详细]
  • 本文探讨了如何在 Java 中将多参数方法通过 Lambda 表达式传递给一个接受 List 的 Function。具体分析了 `OrderUtil` 类中的 `runInBatches` 方法及其使用场景。 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 本文总结了在SQL Server数据库中编写和优化存储过程的经验和技巧,旨在帮助数据库开发人员提升存储过程的性能和可维护性。 ... [详细]
  • php更新数据库字段的函数是,php更新数据库字段的函数是 ... [详细]
  • 本文讨论了在进行 MySQL 数据迁移过程中遇到的所有 .frm 文件报错的问题,并提供了详细的解决方案和建议。 ... [详细]
  • 如何将TS文件转换为M3U8直播流:HLS与M3U8格式详解
    在视频传输领域,MP4虽然常见,但在直播场景中直接使用MP4格式存在诸多问题。例如,MP4文件的头部信息(如ftyp、moov)较大,导致初始加载时间较长,影响用户体验。相比之下,HLS(HTTP Live Streaming)协议及其M3U8格式更具优势。HLS通过将视频切分成多个小片段,并生成一个M3U8播放列表文件,实现低延迟和高稳定性。本文详细介绍了如何将TS文件转换为M3U8直播流,包括技术原理和具体操作步骤,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 在iOS开发中,基于HTTPS协议的安全网络请求实现至关重要。HTTPS(全称:HyperText Transfer Protocol over Secure Socket Layer)是一种旨在提供安全通信的HTTP扩展,通过SSL/TLS加密技术确保数据传输的安全性和隐私性。本文将详细介绍如何在iOS应用中实现安全的HTTPS网络请求,包括证书验证、SSL握手过程以及常见安全问题的解决方法。 ... [详细]
author-avatar
F_hai丽
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有