热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Impala源代码分析(1)Impala架构和RPC

Impala总共分为3个组件:impalad,statestored,clientimpala-shell。关于这三个组件的基本功能在这篇文章中已经介绍过了。Client?:可以是
文章目录[隐藏]
  • (1)Client impalad(frontend)
  • (2)Impalad(backend) statestored
  • (3)Statestord impalad(backend)
  • (4)Impalad(backend) other impalad(backend) (这两个是互为client/server的)
  • (5)Impalad backend other frontend
  • 1, impala-shell
  • 2, statestored
  • 3, impalad

Impala总共分为3个组件:impalad, statestored, client/impala-shell。关于这三个组件的基本功能在这篇文章中已经介绍过了。 Client?: 可以是Python CLI(官方提供的impala_shell.py),JDBC/ODBC或者Hue。无论哪个其实就是一个Thrift的client,连接到impala

Impala总共分为3个组件:impalad, statestored, client/impala-shell。关于这三个组件的基本功能在这篇文章中已经介绍过了。

Client?: 可以是Python CLI(官方提供的impala_shell.py),JDBC/ODBC或者Hue。无论哪个其实就是一个Thrift的client,连接到impalad的21000端口。

Impalad: 分为frontend和backend两部分,这个进程有三个ThriftServer(beeswax_server, hs2_server, be_server)对系统外和系统内提供服务。

Statestored: 集群内各个backend service的数据交换中心,每个backend会在statestored注册,以后statestored会与所有注册过的backend交换update消息。

RPC
Component Service Port Access Requirement Comment
ImpalaDaemon Impala Daemon Backend Port 22000 Internal ImpalaBackendService export
Impala Daemon Frontend Port 21000 External ImpalaService export
Impala Daemon HTTP Server Port 25000 External[email protected]~码$网 Impala debug web server
StateStoreSubscriber Service Port 23000 Internal StateStoreSubscriberService
?ImpalaStateStore Daemon StateStore HTTP Server Port 25010 External StateStore debug web server
StateStore Service Port 24000 Internal StateStoreService export

下面介绍三个组件之间的Thrift RPC(“”前面的表示RPC client,“”后面的表示RPC server)

(1)Client impalad(frontend)

BeeswaxService(beeswax.thrift): client通过query()提交SQL请求,然后异步调用get_state()监听该SQL的查询进度,一旦完成,调用fetch()取回结果。

TCLIService(cli_service.thrift): client提交SQL请求,功能和上面类似,更丰富的就是对DDL操作的支持,例如GetTables()返回指定table的元数据。

ImpalaService和ImpalaHiveServer2Service(ImpalaService.thrift)分别是上面两个类的子类,各自丰富了点功能而已,核心功能没啥大变化。

(2)Impalad(backend) statestored

StateStoreService(StateStoreService.thrift): statestored保存整个系统所有backend service状态的全局数据库,这里是个单节点中央数据交换中心(该节点保存的状态是soft state,一旦宕机,保存的状态信息就没了)。例如每个impala backend启动的时候会调用StateStoreService.RegisterService()向statestored注册自己(其实是通过跟这个backend service捆绑在一起的StateStoreSubscriber标识的),然后再调用StateStoreService.RegisterSubscription()表明这个StateStoreSubscriber接收来自statestored的update。

(3)Statestord impalad(backend)

StateStoreSubscriberService(StateStoreSubscriberService.thrift): backend向statestored调用RegisterSubscription之后,statestored就会定期向backend这边捆绑的StateStoreSubscriber发送该backend的状态更新信息。然后backend这边调用StateStoreSubscriberService.UpdateState()更新相关状态。同时这个UpdateState()调用在impalad backend/StateStoreSubscriber这端还会返回该backend的一些update信息给statestored。

(4)Impalad(backend) other impalad(backend) (这两个是互为client/server的)

ImpalaInternalService(ImpalaInternalService.thrift):某个backend的coordinator要向其他backend的execute engine发送执行某个plan fragment的请求(提交ExecPlanFragment并要求返回ReportExecStatus)。这部分功能会在backend分析中详细讨论。

(5)Impalad backend other frontend

ImpalaPlanService(ImpalaPlanService.thrift):可以由其他形式的frontend生成TExecRequest然后交给backend执行。

另外,Impala frontend是用Java写的,而backend使用C++写的。Frontend负责把输入的SQL解析,然后生成执行计划,之后通过Thrift的序列化/反序列化的方式传给backend。TExecRequest(frontend.thrift)是中间传输的数据结构,表示了一个Query/DML/DDL的查询请求,也是SQL执行过程中在frontend和backend之间的数据接口。所以我们可以把impala-frontend换掉,用其他的形式拼凑出这个TExecRequest就可以传给backend执行,这也就是前面说的ImpalaPlanService干的事。

impala组件执行流程

1, impala-shell

client就可以通过Beeswax和HiveServer2的Thrift API向Impala提交query。这两种访问接口的作用是一样的(都是用于client提交query,返回query result)。

Impala_shell.py是通过Beeswax方式访问impala的,下面我们看看impala_shell.py是怎么向impalad提交query的。

(1)通过OptionParser()解析命令行参数。如果参数中有—query或者—query_file,则执行execute_queries_non_interactive_mode(options),这是非交互查询(也就是就查询一个SQL或者一个写满SQL的文件);否则进入ImpalaShell.cmdloop (intro)循环。

(2)进入命令行循环后,一般是先connect某一个impalad,输入”connect localhost:21000”,进入do_connect(self, args)函数。这个函数根据用户指定的host和port,生成与相应的impalad的socket连接。最重要的就是这行代码:

self.imp_service = ImpalaService.Client(protocol)

至此imp_service就是client端的代理了,所有请求都通过它提交。

(3)下面以select命令为例说明,如果client输入这样的命令”select col1, col2 from tbl”,则进入do_select(self, args)函数。在这个函数里首先生成BeeswaxService.Query对象,向这个对象填充query statement和configuration。然后进入__query_with_result()函数通过imp_service.query(query)提交query。注意ImpalaService都是异步的,提交之后返回一个QueryHandle,然后就是在一个while循环里不断__get_query_state()查询状态。如果发现这个SQL的状态是FINISHED,那么就通过fetch() RPC获取结果。

2, statestored

Statestored进程对外提供StateStoreService RPC服务,而StateStoreSubscriberService RPC服务是在impalad进程中提供的。StateStoreService这个RPC的逻辑实现是在StateStore这个类里面实现的。

Statestored收到backend发送的RegisterService RPC请求时,调用StateStore::RegisterService()处理,主要做两件事:

(1)根据TRegisterServiceRequest提供的service_id把该service加入StateStore.service_instances_。

通常在整个impala集群只存在名为“impala_backend_service”这一个服务,所以service_id=”impala_backend_service”。而每个backend捆绑的是不一样的,所以就形成了service和backend一对多的关系,这个关系存储在StateStore.service_instances_组。

(2)Impalad backend在向statestored RegisterService的时候,会把subscriber_address发送过去。在statestored端,会根据这个subscriber_address生成对应的Subscriber对象(表示与该Subscriber捆绑的backend)。把与该backend绑定的Subscriber加入StateStore.subscribers_这个map里。每个Subscriber有个唯一的id,这样分布在集群内的impala backend就有了全局唯一id了。

这样如果以后某个backend/StateStoreSubscriber fail或者其中运行的SQL任务出了问题,在statestored这里就会有体现了,那么就会通知给其他相关的backend。

那么每个backend是怎么update的呢?StateStore::UpdateLoop()负责定期向各个backend推送其所订阅的service的所有成员的更新,目前的更新策略是全量更新,未来会考虑增量更新。

3, impalad

Impalad进程的服务被wrapper在ImpalaServer这个类中。ImpalaServer包括fe和be的功能,实现了ImpalaService(Beeswax), ImpalaHiveServer2Service(HiveServer2)和ImpalaInternelService API。

全局函数CreateImpalaServer()创建了一个ImpalaServer其中包含了多个ThriftServer:

(1)创建一个名为beeswax_server的ThriftServer对系统外提供ImpalaService(Beeswax)服务,主要服务于Query查询,是fe/frontend的核心服务,端口21000

(2)创建一个名为hs2_server的ThriftServer对系统外提供ImpalaHiveServer2Service服务,提供Query, DML, DDL相关操作,端口21050

(3)创建一个名为be_server的ThriftServer对系统内其他impalad提供ImpalaInternalService,端口22000

(4)创建ImpalaServer对象,前面三个ThriftServer的TProcessor被赋值这个ImpalaServer对象,所以对前面三个Thrift服务的RPC请求都交由这个ImpalaServer对象处理。最典型的例子就是我们通过Beeswax接口提交了一个BeeswaxService.query()请求,在impalad端的处理逻辑是由void ImpalaServer::query(QueryHandle& query_handle, const Query& query)这个函数(在impala-beeswax-server.cc中实现)完成的。

下面是impalad-main.cc的主函数:

int main(int argc, char** argv) {  //参数解析,开启日志(基于Google gflags和glog)  InitDaemon(argc, argv);  LlvmCodeGen::InitializeLlvm();  // Enable Kerberos security if requested.  if (!FLAGS_principal.empty()) {    EXIT_IF_ERROR(InitKerberos("Impalad"));  }  //因为frontend, HBase等相关组件是由Java开发的,所以下面这几行都是初始化JNI相关的reference和method id  JniUtil::InitLibhdfs();  EXIT_IF_ERROR(JniUtil::Init());  EXIT_IF_ERROR(HBaseTableScanner::Init());  EXIT_IF_ERROR(HBaseTableCache::Init());  InitFeSupport();  //ExecEnv类是impalad backend上Query/PlanFragment的执行环境。  //生成SubscriptionManager, SimpleScheduler和各种Cache  ExecEnv exec_env;  //生成Beeswax, hive-server2和backend三种ThriftServer用于接收client请求,不过这三种服务的后端真正的处理逻辑都是ImpalaServer* server这个对象。  ThriftServer* beeswax_server = NULL;  ThriftServer* hs2_server = NULL;  ThriftServer* be_server = NULL;  ImpalaServer* server =      CreateImpalaServer(&exec_env, FLAGS_fe_port, FLAGS_hs2_port, FLAGS_be_port,          &beeswax_server, &hs2_server, &be_server);  //因为be_server是对系统内提供服务的,先启动它。  be_server->Start();  //这里面关键是启动了SubscriptionManager和Scheduler  Status status = exec_env.StartServices();  if (!status.ok()) {    LOG(ERROR) RegisterSubscription(services, "impala.server",        cb.get());    if (!status.ok()) {      LOG(ERROR) <<"Could not register with state store service: "                 Start();  beeswax_server->Join();  hs2_server->Join();  delete be_server;  delete beeswax_server;  delete hs2_server;}

exec_env.StartServices()调用SubscriptionManager.Start(),进一步调用StateStoreSubscriber.Start()启动一个ThriftServer。

StateStoreSubscriber实现了StateStoreSubscriberService(StateStoreSubscriberService.thrift中定义),用于接收来自statestored的update,并把与这个StateStoreSubscriber捆绑的backend的update反馈给statestored。这样这个backend就可以对其他backend可见,这样就可以接受其他impala backend发来的任务更新了(当然,接收backend更新是通过statestored中转的)。

参考文献:

http://www.sizeofvoid.net/wp-content/uploads/ImpalaIntroduction2.pdf



推荐阅读
  • 本文介绍了lua语言中闭包的特性及其在模式匹配、日期处理、编译和模块化等方面的应用。lua中的闭包是严格遵循词法定界的第一类值,函数可以作为变量自由传递,也可以作为参数传递给其他函数。这些特性使得lua语言具有极大的灵活性,为程序开发带来了便利。 ... [详细]
  • 本文介绍了在Python3中如何使用选择文件对话框的格式打开和保存图片的方法。通过使用tkinter库中的filedialog模块的asksaveasfilename和askopenfilename函数,可以方便地选择要打开或保存的图片文件,并进行相关操作。具体的代码示例和操作步骤也被提供。 ... [详细]
  • 本文详细介绍了SQL日志收缩的方法,包括截断日志和删除不需要的旧日志记录。通过备份日志和使用DBCC SHRINKFILE命令可以实现日志的收缩。同时,还介绍了截断日志的原理和注意事项,包括不能截断事务日志的活动部分和MinLSN的确定方法。通过本文的方法,可以有效减小逻辑日志的大小,提高数据库的性能。 ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • Python实现变声器功能(萝莉音御姐音)的方法及步骤
    本文介绍了使用Python实现变声器功能(萝莉音御姐音)的方法及步骤。首先登录百度AL开发平台,选择语音合成,创建应用并填写应用信息,获取Appid、API Key和Secret Key。然后安装pythonsdk,可以通过pip install baidu-aip或python setup.py install进行安装。最后,书写代码实现变声器功能,使用AipSpeech库进行语音合成,可以设置音量等参数。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 本文介绍了计算机网络的定义和通信流程,包括客户端编译文件、二进制转换、三层路由设备等。同时,还介绍了计算机网络中常用的关键词,如MAC地址和IP地址。 ... [详细]
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • CEPH LIO iSCSI Gateway及其使用参考文档
    本文介绍了CEPH LIO iSCSI Gateway以及使用该网关的参考文档,包括Ceph Block Device、CEPH ISCSI GATEWAY、USING AN ISCSI GATEWAY等。同时提供了多个参考链接,详细介绍了CEPH LIO iSCSI Gateway的配置和使用方法。 ... [详细]
  • 本文介绍了一个适用于PHP应用快速接入TRX和TRC20数字资产的开发包,该开发包支持使用自有Tron区块链节点的应用场景,也支持基于Tron官方公共API服务的轻量级部署场景。提供的功能包括生成地址、验证地址、查询余额、交易转账、查询最新区块和查询交易信息等。详细信息可参考tron-php的Github地址:https://github.com/Fenguoz/tron-php。 ... [详细]
  • 如何用JNI技术调用Java接口以及提高Java性能的详解
    本文介绍了如何使用JNI技术调用Java接口,并详细解析了如何通过JNI技术提高Java的性能。同时还讨论了JNI调用Java的private方法、Java开发中使用JNI技术的情况以及使用Java的JNI技术调用C++时的运行效率问题。文章还介绍了JNIEnv类型的使用方法,包括创建Java对象、调用Java对象的方法、获取Java对象的属性等操作。 ... [详细]
  • PHP反射API的功能和用途详解
    本文详细介绍了PHP反射API的功能和用途,包括动态获取信息和调用对象方法的功能,以及自动加载插件、生成文档、扩充PHP语言等用途。通过反射API,可以获取类的元数据,创建类的实例,调用方法,传递参数,动态调用类的静态方法等。PHP反射API是一种内建的OOP技术扩展,通过使用Reflection、ReflectionClass和ReflectionMethod等类,可以帮助我们分析其他类、接口、方法、属性和扩展。 ... [详细]
author-avatar
书友10689978
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有