热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

分布式数据库中间件–(3)Cobar对简单select命令的处理过程

友情提示:非原文链接可能会影响您的阅读体验,欢迎查看原文。(http:blog.geekcome.com)原文地址:http:blog.geekcome.comarchives28

友情提示:非原文链接可能会影响您的阅读体验,欢迎查看原文。(http://blog.geekcome.com)

原文地址:http://blog.geekcome.com/archives/284

在上一篇中介绍了Cobar和client初次建立连接的过程,Cobar监听端口,client发起连接请求,Cobar发送握手数据包,client发送认证数据包最后依据认证的结果Cobar向client发送认证结果。

在认证成功后Cobar会将该连接的回调处理函数由FrontendAuthenticator(前端认证处理器)设置成FrontendCommandHanler(前端命令处理器)。

所以在client再次向Cobar发送请求报文的时候,前端命令处理器会处理该连接。以下详细分析一下简单select语句的运行过程。


1、事件的产生

NIOReactor的R线程一直在监听selector上的每一个连接的感兴趣事件是否发生,当client发送了一条select * from tb1,select函数会返回,然后获取到该连接SelectionKey,而且该SelectKey的兴趣事件是OP_READ。此时会调用read(NIOConnection)函数。









01public void run() {








02            final Selector selector = this.selector;








03            for (;;) {








04                ++reactCount;








05                try {








06                    int res = selector.select();








07                    LOGGER.debug(reactCount + ">>NIOReactor接受连接数:" + res);








08                    register(selector);








09                    Set keys = selector.selectedKeys();








10                    try {








11                        for (SelectionKey key : keys) {








12                            Object att = key.attachment();








13                            if (att != null && key.isValid()) {








14                                int readyOps = key.readyOps();








15                                if ((readyOps & SelectionKey.OP_READ) != 0) {








16                                    LOGGER.debug("select读事件");








17                                    read((NIOConnection) att);








18                               ..............................








19                            }








20                             ...........................








21                        }








22                    } ..................








23                } ............








24            }








25  }

2、调用该连接的read函数进行处理

该函数在上一篇中提到过,该函数的实如今AbstractConnection中,实现从channel中读取数据到缓冲区,然后从缓冲区完整的取出整包数据交给FrontendConnection类的handle()函数处理。

该函数交给processor进行异步处理。从processor中的线程池获取一个线程来运行该任务。这里调用详细的handler来进行处理。

刚開始提到的,当认证成功后,Cobar将连接的回调处理函数设置为FrontendCommandHandler。所以这里会调用前端命令处理器的handler函数进行数据的处理。

在这里须要先了解MySQL数据包的格式:


MySQLclient命令请求报文

该处理函数例如以下:









01public void handle(byte[] data) {








02    LOGGER.info("data[4]:"+data[4]);








03    switch (data[4]) {








04    case MySQLPacket.COM_INIT_DB:








05        commands.doInitDB();








06        source.initDB(data);








07        break;








08    case MySQLPacket.COM_QUERY:








09        commands.doQuery();








10        source.query(data);








11        break;








12    case MySQLPacket.COM_PING:








13        commands.doPing();








14        source.ping();








15        break;








16    case MySQLPacket.COM_QUIT:








17        commands.doQuit();








18        source.close();








19        break;








20    case MySQLPacket.COM_PROCESS_KILL:








21        commands.doKill();








22        source.kill(data);








23        break;








24    case MySQLPacket.COM_STMT_PREPARE:








25        commands.doStmtPrepare();








26        source.stmtPrepare(data);








27        break;








28    case MySQLPacket.COM_STMT_EXECUTE:








29        commands.doStmtExecute();








30        source.stmtExecute(data);








31        break;








32    case MySQLPacket.COM_STMT_CLOSE:








33        commands.doStmtClose();








34        source.stmtClose(data);








35        break;








36    case MySQLPacket.COM_HEARTBEAT:








37        commands.doHeartbeat();








38        source.heartbeat(data);








39        break;








40    default:








41        commands.doOther();








42        source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");








43    }








44}

由于每一个报文都有消息头,消息头固定的是4个字节,前3个字节是消息长度,后面的一个字节是报文序号,例如以下所看到的


所以data[4]是第五个字节。也就是消息体的第一个字节。client向Cobar端发送的是命令报文,第一个字节是详细的命令。

假设是select语句,那么data[4]就是COM_QUERY,然后会调用详细连接的query成员函数,其定义在FrontendConnection类中。









01public void query(byte[] data) {








02    if (queryHandler != null) {








03        // 取得语句








04        MySQLMessage mm = new MySQLMessage(data);








05        mm.position(5);








06        String sql = null;








07        try {








08            sql = mm.readString(charset);








09        catch (UnsupportedEncodingException e) {








10            writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");








11            return;








12        }








13        if (sql == null || sql.length() == 0) {








14            writeErrMessage(ErrorCode.ER_NOT_ALLOWED_COMMAND, "Empty SQL");








15            return;








16        }








17        LOGGER.debug("解析的SQL语句:"+sql);








18        // 运行查询








19        queryHandler.query(sql);








20    else {








21        writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Query unsupported!");








22    }








23}

首先新建一个MySQLMessage对象,将数据包的索引位置定位到第6个字节位置处。然后将后面的全部的字节读取成指定编码格式的SQL语句,这里就形成了完整的SQL语句。

查询的时候Cobar控制台输出例如以下内容:


11:35:33,392 INFO data[4]:3
11:35:33,392 DEBUG 解析的SQL语句:select * from tb2


解析出SQL语句后交给queryHandler处理。该对象是在新建连接的时候设置的ServerQueryHandler类,事实上现的query函数例如以下:









01public void query(String sql) {








02    //这里就得到了完整的SQL语句,接收自client








03    ServerConnection c = this.source;








04    if (LOGGER.isDebugEnabled()) {








05        LOGGER.debug(new StringBuilder().append(c).append(sql).toString());








06    }








07    //该函数对SQL语句的语法和语义进行分析,并返回SQL语句的对于类型,运行对应的操作








08    int rs = ServerParse.parse(sql);








09    switch (rs & 0xff) {








10    .......................








11    case ServerParse.SELECT:








12        //select操作运行








13        SelectHandler.handle(sql, c, rs >>> 8);








14        break;








15    .......................








16    }








17}

首先对SQL语句进程解析,通过parse函数对语句解析后返回语句类型的编号。

假设语句没有语法错误,则直接交给SelectHandler进行处理。假设是一般的select语句,则直接调用ServerConnection的execute运行sql


c.execute(stmt, ServerParse.SELECT);


在ServerConnection中的execute函数中须要进行路由检查,由于select的数据不一定在一个数据库中,须要按拆分的规则进行路由的检查。









1// 路由计算








2RouteResultset rrs = null;








3try {








4    rrs = ServerRouter.route(schema, sql, this.charset, this);








5    LOGGER.debug("路由计算结果:"+rrs.toString());








6}

详细的路由算法也是比較复杂,以后会专门分析。

Cobar的DEBUG控制台输出路由的计算结果例如以下:


11:35:33,392 DEBUG 路由计算结果:select * from tb2, route={
1 -> dnTest2.default{select * from tb2}
2 -> dnTest3.default{select * from tb2}
}


该条SQL语句的select内容分布在dnTset2和dnTest3中,所以要分别向这两个数据库进行查询。

经过比較复杂的资源处理最后在每一个后端数据库上运行函数execute0。









01private void execute0(RouteResultsetNode rrn, Channel c, boolean autocommit, BlockingSession ss, int flag) {








02    ServerConnection sc = ss.getSource();








03    .........................








04    try {








05        // 运行并等待返回








06        BinaryPacket bin = ((MySQLChannel) c).execute(rrn, sc, autocommit);








07        // 接收和处理数据,运行到这里就说明上面的运行已经得到运行结果的返回








08        final ReentrantLock lock = MultiNodeExecutor.this.lock;








09        lock.lock();








10        try {








11            switch (bin.data[0]) {








12            case ErrorPacket.FIELD_COUNT:








13                c.setRunning(false);








14                handleFailure(ss, rrn, new BinaryErrInfo((MySQLChannel) c, bin, sc, rrn));








15                break;








16            case OkPacket.FIELD_COUNT:








17                OkPacket ok = new OkPacket();








18                ok.read(bin);








19                affectedRows += ok.affectedRows;








20                // set lastInsertId








21                if (ok.insertId > 0) {








22                    insertId = (insertId == 0) ? ok.insertId : Math.min(insertId, ok.insertId);








23                }








24                c.setRunning(false);








25                handleSuccessOK(ss, rrn, autocommit, ok);








26                break;








27            default// HEADER|FIELDS|FIELD_EOF|ROWS|LAST_EOF








28                final MySQLChannel mc = (MySQLChannel) c;








29                if (fieldEOF) {








30                    for (;;) {








31                        bin = mc.receive();








32                        switch (bin.data[0]) {








33                        case ErrorPacket.FIELD_COUNT:








34                            c.setRunning(false);








35                            handleFailure(ss, rrn, new BinaryErrInfo(mc, bin, sc, rrn));








36                            return;








37                        case EOFPacket.FIELD_COUNT:








38                            handleRowData(rrn, c, ss);








39                            return;








40                        default:








41                            continue;








42                        }








43                    }








44                else {








45                    bin.packetId = ++packetId;// HEADER








46                    List headerList = new LinkedList();








47                    headerList.add(bin);








48                    for (;;) {








49                        bin = mc.receive();








50                        switch (bin.data[0]) {








51                        case ErrorPacket.FIELD_COUNT:








52                            c.setRunning(false);








53                            handleFailure(ss, rrn, new BinaryErrInfo(mc, bin, sc, rrn));








54                            return;








55                        case EOFPacket.FIELD_COUNT:








56                            bin.packetId = ++packetId;// FIELD_EOF








57                            for (MySQLPacket packet : headerList) {








58                                buffer = packet.write(buffer, sc);








59                            }








60                            headerList = null;








61                            buffer = bin.write(buffer, sc);








62                            fieldEOF = true;








63                            handleRowData(rrn, c, ss);








64                            return;








65                        default:








66                            bin.packetId = ++packetId;// FIELDS








67                            switch (flag) {








68                            case RouteResultset.REWRITE_FIELD:








69                                StringBuilder fieldName = new StringBuilder();








70                                fieldName.append("Tables_in_").append(ss.getSource().getSchema());








71                                FieldPacket field = PacketUtil.getField(bin, fieldName.toString());








72                                headerList.add(field);








73                                break;








74                            default:








75                                headerList.add(bin);








76                            }








77                        }








78                    }








79                }








80            }








81        finally {








82            lock.unlock();








83        }








84    }//异常处理....................








85}

这里真正的运行SQL语句,然后等待后端运行语句的返回数据,在成功获取后端Mysql返回的结果后,该函数返回的数据包是结果集数据包。

当client发起认证请求或命令请求后,server会返回对应的运行结果给client。client在收到响应报文后,须要首先检查第1个字节的值,来区分响应报文的类型。



































响应报文类型第1个字节取值范围
OK 响应报文0×00
Error 响应报文0xFF
Result Set 报文0×01 – 0xFA
Field 报文0×01 – 0xFA
Row Data 报文0×01 – 0xFA
EOF 报文0xFE

注:响应报文的第1个字节在不同类型中含义不同,比方在OK报文中,该字节并没有实际意义,值恒为0×00;而在Result Set报文中,该字节又是长度编码的二进制数据结构(Length Coded Binary)中的第1字节。

Result Set 消息分为五部分,结构例如以下:































结构说明
[Result Set Header]列数量
[Field]列信息(多个)
[EOF]列结束
[Row Data]行数据(多个)
[EOF]数据结束

函数运行完毕后,返回的结果都放入LinkedList中,当读取结果完毕后放入多节点运行器的缓冲区。假设buffer满了,就通过前端连接写出给client。

作者:Yong Man

出处:极客来 GeekCome

原文:分布式数据库中间件–(3) Cobar对简单select命令的处理过程

提示:本文版权归作者,欢迎转载,但未经作者允许必须保留此段声明,且在文章页面明显位置给出原文连接。

假设对文章有不论什么问题,都能够在评论中留言,我会尽可能的答复您,谢谢你的阅读



分布式数据库中间件–(3) Cobar对简单select命令的处理过程的相关教程结束。



推荐阅读
  • 本文介绍了一种轻巧方便的工具——集算器,通过使用集算器可以将文本日志变成结构化数据,然后可以使用SQL式查询。集算器利用集算语言的优点,将日志内容结构化为数据表结构,SPL支持直接对结构化的文件进行SQL查询,不再需要安装配置第三方数据库软件。本文还详细介绍了具体的实施过程。 ... [详细]
  • 本文介绍了数据库的存储结构及其重要性,强调了关系数据库范例中将逻辑存储与物理存储分开的必要性。通过逻辑结构和物理结构的分离,可以实现对物理存储的重新组织和数据库的迁移,而应用程序不会察觉到任何更改。文章还展示了Oracle数据库的逻辑结构和物理结构,并介绍了表空间的概念和作用。 ... [详细]
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • 本文分析了Wince程序内存和存储内存的分布及作用。Wince内存包括系统内存、对象存储和程序内存,其中系统内存占用了一部分SDRAM,而剩下的30M为程序内存和存储内存。对象存储是嵌入式wince操作系统中的一个新概念,常用于消费电子设备中。此外,文章还介绍了主电源和后备电池在操作系统中的作用。 ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • 本文详细介绍了Linux中进程控制块PCBtask_struct结构体的结构和作用,包括进程状态、进程号、待处理信号、进程地址空间、调度标志、锁深度、基本时间片、调度策略以及内存管理信息等方面的内容。阅读本文可以更加深入地了解Linux进程管理的原理和机制。 ... [详细]
  • 关于我们EMQ是一家全球领先的开源物联网基础设施软件供应商,服务新产业周期的IoT&5G、边缘计算与云计算市场,交付全球领先的开源物联网消息服务器和流处理数据 ... [详细]
  • 怎么在PHP项目中实现一个HTTP断点续传功能发布时间:2021-01-1916:26:06来源:亿速云阅读:96作者:Le ... [详细]
  • Redis底层数据结构之压缩列表的介绍及实现原理
    本文介绍了Redis底层数据结构之压缩列表的概念、实现原理以及使用场景。压缩列表是Redis为了节约内存而开发的一种顺序数据结构,由特殊编码的连续内存块组成。文章详细解释了压缩列表的构成和各个属性的含义,以及如何通过指针来计算表尾节点的地址。压缩列表适用于列表键和哈希键中只包含少量小整数值和短字符串的情况。通过使用压缩列表,可以有效减少内存占用,提升Redis的性能。 ... [详细]
  • 本文讨论了在使用sp_msforeachdb执行动态SQL命令时,当发生错误时如何捕获数据库名称。提供了两种解决方案,并介绍了如何正确使用'?'来显示数据库名称。 ... [详细]
  • 本文介绍了在Windows系统上使用C语言命令行参数启动程序并传递参数的方法,包括接收参数程序的代码和bat文件的编写方法,同时给出了程序运行的结果。 ... [详细]
  • 本文介绍了OpenStack的逻辑概念以及其构成简介,包括了软件开源项目、基础设施资源管理平台、三大核心组件等内容。同时还介绍了Horizon(UI模块)等相关信息。 ... [详细]
  • 在Oracle11g以前版本中的的DataGuard物理备用数据库,可以以只读的方式打开数据库,但此时MediaRecovery利用日志进行数据同步的过 ... [详细]
  • 合并列值-合并为一列问题需求:createtabletab(Aint,Bint,Cint)inserttabselect1,2,3unionallsel ... [详细]
  • 使用C++编写程序实现增加或删除桌面的右键列表项
    本文介绍了使用C++编写程序实现增加或删除桌面的右键列表项的方法。首先通过操作注册表来实现增加或删除右键列表项的目的,然后使用管理注册表的函数来编写程序。文章详细介绍了使用的五种函数:RegCreateKey、RegSetValueEx、RegOpenKeyEx、RegDeleteKey和RegCloseKey,并给出了增加一项的函数写法。通过本文的方法,可以方便地自定义桌面的右键列表项。 ... [详细]
author-avatar
阿笨猫
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有