热门标签 | HotTags
当前位置:  开发笔记 > 数据库 > 正文

Impala源代码分析(3)-backend查询执行过程

这篇文章主要介绍impala-backend是怎么执行一个SQLQuery的。在Impala中SQLQuery的入口函数是:voidImpalaServer::query(QueryHandlequery_handle,constQueryquery)生成一个QueryExecState伴随这个SQL执行的生命周期,代表正在执行的这个SQL;调用E

这篇文章主要介绍impala-backend是怎么执行一个SQL Query的。 在Impala中SQL Query的入口函数是: void ImpalaServer::query(QueryHandle query_handle, const Query query) 生成一个QueryExecState伴随这个SQL执行的生命周期,代表正在执行的这个SQL; 调用E

这篇文章主要介绍impala-backend是怎么执行一个SQL Query的。
在Impala中SQL Query的入口函数是:
void ImpalaServer::query(QueryHandle& query_handle, const Query& query)

  • 生成一个QueryExecState伴随这个SQL执行的生命周期,代表正在执行的这个SQL;
  • 调用Execute函数启动执行流程;
  • 启动一个Wait线程等待结果。

这个Execute()函数首先是通过JNI向impala-fe请求SQL解析和执行计划生成(已经在上一篇文章中讲了这个过程),得到该Query对应的TExecRequest对象,交由impala-backend执行。
从下面这个函数开始backend执行,同时开始fragment status report。
Status ImpalaServer::QueryExecState::Exec(TExecRequest* exec_request)
因为我们知道在impala里面,一个Query是分配到多个节点执行的,我们把其中负责分配和协调这个Query执行的组件叫Coordinator;参与这个Query执行的每个节点叫backend instance,每个backend instance上面会执行一个或者多个PlanFragment。那么每个Query就对应一个Coordinator对象和多个backend instance,同时Coordinator中的query_profile_ 变量是用来统计这个query的执行的整个profile的。

Coordinator

这里首先生成Coordinator用于协调这个Query的执行,然后调用
Status?Coordinator::Exec(
const TUniqueId& query_id, TQueryExecRequest* request,
const TQueryOptions& query_options)
启动异步的执行过程:说白了这个Coordinator就是老板,把活(PlanFragment)都给各个下属(backend instance)安排好了,发出去,然后自己下班走人了,才不会等着下属干完了才走呢。因为老板早就安排好自己的秘书(ImpalaServer::Wait())去盯着结果呢。
这个函数里面最重要的两个步骤:

  • ComputeScanRangeAssignment(*request);
  • ComputeFragmentExecParams(*request);

其中ComputeScanRangeAssignment(const TQueryExecRequest& exec_request)?用于填充std::vector scan_range_assignment_ 这个数组是以PlanFragment为索引的。
typedef boost::unordered_map FragmentScanRangeAssignment表示某个PlanFragment的backend instance以及其对应的PerNodeScanRanges的映射。而PerNodeScanRanges表示某个PlanFragment所涉及到的所有PlanNode到ScanRange的映射。

另外一个函数ComputeFragmentExecParams?(const TQueryExecRequest& exec_request)?用于填充std::vector fragment_exec_params_?。这个参数中每个FragmentExecParams对应着一个PlanFragment执行中用到的参数。

  • Status Coordinator::ComputeFragmentHosts(const TQueryExecRequest& exec_request):为每个PlanFragment找到执行所在的backend instance。如果一个PlanFragment是UNPARTITIONED,那么就在这个Coordinator所在的host上运行;如果一个PlanFragment含有ScanNode,那么就调度这个PlanFragment到HDFS/HBase数据块所在的那些DataNodes上,也就是这些DataNodes就成为了执行这个Query的backend instance。
  • 计算TQueryExecRequest.fragments中每个PlanFragment会在哪些hosts上得到执行,填充到fragment_exec_params_ 中。
  • 依次给每个PlanFragment执行的每个host分配一个instance_id。
  • 填充每个?FragmentExecParams?的destinations(即Data Sink的目的地PlanFragment)和per_exch_num_senders(这个ExchangeNode会接收来自多少个PlanFragment的数据)

回到Coordinator::Exec()函数中,下面就该把各个PlanFragment分配干活了。

  • 如果有Coordinator PlanFragment,那么先new PlanFragmentExecutor()生成这个PlanFragment所对应的PlanFragmentExecutor。然后填充其对应的TExecPlanFragmentParams。
  • 下面是个双层循环:外层遍历PlanFragment,内层遍历backend instance,生成与每个instance关联的BackendExecState(主要是生成TExecPlanFragmentParams用于Coordinator与多个backend instance交互时的参数),并加入backend_exec_states_列表,用于Coordinator对所有的backend instance执行状况的管理。然后向每个instance发起RPC请求开始执行,请求协议是ImpalaInternalService:: ExecPlanFragment(TExecPlanFragmentParams)

Status fragments_exec_status = ParallelExecutor::Exec(
bind(mem_fn(&Coordinator::ExecRemoteFragment), this, _1),
reinterpret_cast(&backend_exec_states_[backend_num - num_hosts]),
num_hosts);

每个Coordinator,PlanFragmentExecutor和ExecNode都会有一个RuntimeProfile,所有的RuntimeProfile会构成树状结构来记录每个执行节点的执行过程中的信息。
在Coordinator有个成员变量boost::scoped_ptr query_profile_用于表示这个query过程中的所有的profile信息。
每个Coordinator还有个aggregate_profile_专门负责aggregate相关的profile。

PlanFragmentExecutor和ExecNode

无论是在Coordinator端还是在backend instance端执行的PlanFragment都是由一个PlanFragmentExecutor控制的。下面我们看看PlanFragment在backend instance是怎么执行的?
在RPC的server端调用了ImpalaServer::ExecPlanFragment()->ImpalaServer::StartPlanFragmentExecution()
生成FragmentExecState里面含有一个PlanFragmentExecutor。那么下面就是分析PlanFragmentExecutor怎么控制Query的执行的了。

  • FragmentExecState::Prepare()调用PlanFragmentExecutor::Prepare()
  • FragmentExecState::Exec()调用PlanFragmentExecutor::Open(),这个是PlanFragment执行的主循环,block直到该PlanFragment执行结束。

真正控制PlanFragment执行的是PlanFragmentExecutor,主要由Prepare()/Open()/GetNext()/Close()这几个函数组成。

1,? PlanFragmentExecutor::Prepare(TExecPlanFragmentParams):准备执行,主要流程如下:

  • 设定这个query能够使用的内存mem_limit;
  • DescriptorTbl::Create():初始化descriptor table;
  • ExecNode::CreateTree():生成执行树的结构(父子关系)。执行树由ExecNode组成,每一个ExecNode也提供了Prepare(), Open(), GetNext()函数。后面执行ExecNode::Prepare/Open/GenNext /EvalConjuncts/Close函数都是按照这个树状结构递归下去的。初始化完成后,PlanFragmentExecutor ::plan_指向了执行树的根节点。在这棵树中,root节点被最后执行,叶子节点被最先执行;
  • 设置该PlanFragment的Exchange Node会接收来自多少个sender的数据;
  • 调用plan_->Prepare():从根节点开始递归初始化执行树,主要是初始化runtime_profile等统计信息和conjuncts的LLVM本地代码生成 (adding functions to the LlvmCodeGen object);
  • 如果使用本地代码生成,调用runtime_state_->llvm_codegen()->OptimizedModule()进行优化;
  • 把所有的ScanNode对应的Scan Range映射到file/offset/length;
  • DataSink::CreateDataSink();
  • set up profile counter;
  • 生成RowBatch用于存储结果。

2,PlanFragmentExecutor::Open()

先是start the profile-reporting thread,然后调用OpenInternal()

(1)???? 调用plan_->Open()沿着生成的ExecNode执行树依次调用ExecNode:: Open()
下面以HdfsScanNode::Open()为例说明:

  • 调用DiskIoMgr:: RegisterReader初始化与HDFS的连接hdfs_connection_;
  • 把要读取的File 和Split加入HdfsScanNode的队列queued_ranges_中;
  • 调用HdfsScanNode::DiskThread驱动HdfsScanNode::StartNewScannerThread()->HdfsScanNode::ScannerThread->HdfsScanner:: ProcessSplit()去读取数据(目前一个scanner thread只能读取一个scan range);
  • 调用IssueQueuedRanges()把上面加入queued_ranges_中的预读取Range发送给DiskIoMgr。由于上一步中已经启动了disk thread,所以就可以读取数据了。

(2)???? 如果当前这个PlanFragmen有sink,那么需要把这个PlanFragment要发给其他PF的数据都发出去。在发出去之前肯定得获取要发的东西吧,调用PlanFragmentExecutor ::GetNextInternal()从上到下递归调用执行树的ExecNode::GetNext()获取执行结果。
上面说到对于ExecNode::Open()不同种类的ExecNode的逻辑是不一样的,对于GetNext()也是一样的,可以参考下HdfsScanNode::GetNext()或者HashJoinNode::GetNext()看看具体是怎么获取查询结果的。

3,? PlanFragmentExecutor::GextNext(RowBatch** batch)

显示触发执行树的ExecNode::GetNext()函数获取查询结果。当其标记PlanFragmentExecutor::done_==true时,则表明所有数据已经被处理完,该PlanFragmentExecutor可以退出了。

至此,impala-backend也分析完了。总的来说impala在执行过程中和MapReduce及Hive的不同可以概括为一拉一推。

  • 在MapReduce中,Map的输出结果要等着Reduce去拉;而impala中各个PlanFragment执行结束之后DataSink是推送到其他PlanFragment的。这样能更加有效利用带宽,加快Job执行速度。
  • 在Hive中,逻辑上下游节点是由上游节点推送给下游节点的;而impala中是下游节点通过递归调用GetNext()向上游节点拉取的。

推荐阅读
  • hive和mysql的区别是什么[mysql教程]
    hive和mysql的区别有:1、查询语言不同,hive是hql语言,MySQL是sql语句;2、数据存储位置不同,hive把数据存储在hdfs上,MySQL把数据存储在自己的系统 ... [详细]
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
  • 在Hive中合理配置Map和Reduce任务的数量对于优化不同场景下的性能至关重要。本文探讨了如何控制Hive任务中的Map数量,分析了当输入数据超过128MB时是否会自动拆分,以及Map数量是否越多越好的问题。通过实际案例和实验数据,本文提供了具体的配置建议,帮助用户在不同场景下实现最佳性能。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • sh cca175problem03evolveavroschema.sh ... [详细]
  • hadoop3.1.2 first programdefault wordcount (Mac)
    hadoop3.1.2安装完成后的第一个实操示例程 ... [详细]
  • 2012年9月12日优酷土豆校园招聘笔试题目解析与备考指南
    2012年9月12日,优酷土豆校园招聘笔试题目解析与备考指南。在选择题部分,有一道题目涉及中国人的血型分布情况,具体为A型30%、B型20%、O型40%、AB型10%。若需确保在随机选取的样本中,至少有一人为B型血的概率不低于90%,则需要选取的最少人数是多少?该问题不仅考察了概率统计的基本知识,还要求考生具备一定的逻辑推理能力。 ... [详细]
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • Panabit应用层流量管理解决方案
    Panabit是一款国内领先的应用层流量管理解决方案,提供高度开放且免费的专业服务,尤其擅长P2P应用的精准识别与高效控制。截至2009年3月25日,该系统已实现对多种网络应用的全面支持,有效提升了网络资源的利用效率和安全性。 ... [详细]
  • Hudi是一种数据湖的存储格式,在Hadoop文件系统之上提供了更新数据和删除数据的能力以及流式消费变化数据的能力。应用场景近实时数据摄取Hudi支持插入、更新和删除数据的能力。您 ... [详细]
  • 单个物理维度可以被事实表多次引用,每个引用连接逻辑上存在差异的角色维度。例如,事实表可以有多个日期,每个日期通过外键引用不同的日期维度,原则上每个外键表示不同的日期维度视图,这样 ... [详细]
  • Hadoop的分布式架构改进与应用
    nsitionalENhttp:www.w3.orgTRxhtml1DTDxhtml1-transitional.dtd ... [详细]
  • MongoDB——聚合操作详解
    聚合(Aggregation)为集合文档数据提供各种处理数据方法,并返回计算结果。MongoDB提供了3种方式来执行聚合命令࿱ ... [详细]
author-avatar
平凡的冰冰凝凝
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有