热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

数据库中间件MyCAT源码分析:【单库单表】查询

title:MyCAT源码分析——【单库单表】查询date:2017-05-30tags:categories:MyCATpermalink:MyCATsingle-db-sing

title: MyCAT 源码分析 —— 【单库单表】查询
date: 2017-05-30
tags:
categories: MyCAT
permalink: MyCAT/single-db-single-table-select


������关注微信公众号:【芋艿的后端小屋】有福利:
1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
4. 新的源码解析文章实时收到通知。每周更新一篇左右




  • 1. 概述
  • 2. 接收请求,解析 SQL
  • 3. 获得路由结果
  • 4. 获得 MySQL 连接,执行 SQL
  • 5. 响应执行 SQL 结果
  • 6. 其他 :更新 / 删除

1. 概述

内容形态以 顺序图 + 核心代码 为主。
如果有地方表述不错误或者不清晰,欢迎留言。
对于内容形态,非常纠结,如果有建议,特别特别特别欢迎您提出。
微信号:wangwenbin-server。

本文讲解 【单库单表】查询 所涉及到的代码。

��内容和 《MyCAT 源码分析 —— 【单库单表】插入》 超级相似,一方面本身流程基本相同,另外一方面文章结构没拆分好。我们使用 �� 标记差异的逻辑。

交互如下图:

单库单表查询简图

整个过程,MyCAT Server 流程如下:

  1. 接收 MySQL Client 请求,解析 SQL。
  2. 获得路由结果,进行路由。
  3. 获得 MySQL 连接,执行 SQL。
  4. 响应执行结果,发送结果给 MySQL Client。

我们逐个步骤分析,一起来看看源码。

2. 接收请求,解析 SQL

【单库单表】查询(01主流程)

【1 - 2】

接收一条 MySQL 命令。在【1】之前,还有请求数据读取、拆成单条 MySQL SQL。

【3】

1: // ⬇️⬇️⬇️【FrontendCommandHandler.java】2: public class FrontendCommandHandler implements NIOHandler {3: 4: @Override5: public void handle(byte[] data) {6: 7: // .... 省略部分代码8: switch (data[4]) // 9: {10: case MySQLPacket.COM_INIT_DB:11: commands.doInitDB();12: source.initDB(data);13: break;14: case MySQLPacket.COM_QUERY: // 查询命令15: // 计数查询命令16: commands.doQuery();17: // 执行查询命令18: source.query(data);19: break;20: case MySQLPacket.COM_PING:21: commands.doPing();22: source.ping();23: break;24: // .... 省略部分case25: }26: }27: 28: }

INSERT/SELECT/UPDATE/DELETE 等 SQL 归属于 MySQLPacket.COM_QUERY,详细可见:《MySQL协议分析#4.2 客户端命令请求报文(客户端 -> 服务器)》。

【4】

将 二进制数组 解析成 SQL。核心代码如下:

1: // ⬇️⬇️⬇️【FrontendConnection.java】2: public void query(byte[] data) {3: // 取得语句4: String sql = null; 5: try {6: MySQLMessage mm = new MySQLMessage(data);7: mm.position(5);8: sql = mm.readString(charset);9: } catch (UnsupportedEncodingException e) {10: writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");11: return;12: } 13: // 执行语句14: this.query( sql );15: }

【5】

解析 SQL 类型。核心代码如下:

1: // ⬇️⬇️⬇️【ServerQueryHandler.java】2: @Override3: public void query(String sql) {4: // 解析 SQL 类型5: int rs = ServerParse.parse(sql);6: int sqlType = rs & 0xff;7: 8: switch (sqlType) {9: //explain sql10: case ServerParse.EXPLAIN:11: ExplainHandler.handle(sql, c, rs >>> 8);12: break;13: // .... 省略部分case14: break;15: case ServerParse.SELECT:16: SelectHandler.handle(sql, c, rs >>> 8);17: break;18: // .... 省略部分case19: default:20: if(readOnly){21: LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());22: c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");23: break;24: }25: c.execute(sql, rs & 0xff);26: }27: }28: 29:30: // ⬇️⬇️⬇️【ServerParse.java】31: public static int parse(String stmt) {32: int length = stmt.length();33: //FIX BUG FOR SQL SUCH AS /XXXX/SQL34: int rt = -1;35: for (int i = 0; i 36: switch (stmt.charAt(i)) {37: // .... 省略部分case case 'I':38: case 'i':39: rt = insertCheck(stmt, i);40: if (rt != OTHER) {41: return rt;42: }43: continue;44: // .... 省略部分case45: case 'S':46: case 's':47: rt = sCheck(stmt, i);48: if (rt != OTHER) {49: return rt;50: }51: continue;52: // .... 省略部分case53: default:54: continue;55: }56: }57: return OTHER;58: }

��【6】【7】

解析 Select SQL 类型,分发到对应的逻辑。核心代码如下:

1: // ⬇️⬇️⬇️【SelectHandler.java】2: public static void handle(String stmt, ServerConnection c, int offs) {3: int offset = offs;4: switch (ServerParseSelect.parse(stmt, offs)) { // 解析 Select SQL 类型5: case ServerParseSelect.VERSION_COMMENT: // select @@VERSION_COMMENT;6: SelectVersionComment.response(c);7: break;8: case ServerParseSelect.DATABASE: // select DATABASE();9: SelectDatabase.response(c);10: break;11: case ServerParseSelect.USER: // select CURRENT_USER();12: SelectUser.response(c);13: break;14: case ServerParseSelect.VERSION: // select VERSION();15: SelectVersion.response(c);16: break;17: case ServerParseSelect.SESSION_INCREMENT: // select @@session.auto_increment_increment;18: SessionIncrement.response(c);19: break;20: case ServerParseSelect.SESSION_ISOLATION: // select @@session.tx_isolation;21: SessionIsolation.response(c);22: break;23: case ServerParseSelect.LAST_INSERT_ID: // select LAST_INSERT_ID();24: // ....省略代码25: break;26: case ServerParseSelect.IDENTITY: // select @@identity27: // ....省略代码28: break;29: case ServerParseSelect.SELECT_VAR_ALL: //30: SelectVariables.execute(c,stmt);31: break;32: case ServerParseSelect.SESSION_TX_READ_ONLY: //33: SelectTxReadOnly.response(c);34: break;35: default: // 其他,例如 select * from table36: c.execute(stmt, ServerParse.SELECT);37: }38: }39: // ⬇️⬇️⬇️【ServerParseSelect.java】40: public static int parse(String stmt, int offset) {41: int i = offset;42: for (; i 43: switch (stmt.charAt(i)) {44: case ' ':45: continue;46: case '/':47: case '#':48: i = ParseUtil.comment(stmt, i);49: continue;50: case '@':51: return select2Check(stmt, i);52: case 'D':53: case 'd':54: return databaseCheck(stmt, i);55: case 'L':56: case 'l':57: return lastInsertCheck(stmt, i);58: case 'U':59: case 'u':60: return userCheck(stmt, i);61: case 'C':62: case 'c':63: return currentUserCheck(stmt, i);64: case 'V':65: case 'v':66: return versionCheck(stmt, i);67: default:68: return OTHER;69: }70: }71: return OTHER;72: }

【8】

执行 SQL,详细解析见下文,核心代码如下:

1: // ⬇️⬇️⬇️【ServerConnection.java】2: public class ServerConnection extends FrontendConnection {3: public void execute(String sql, int type) {4: // .... 省略代码5: SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);6: if (schema == null) {7: writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,8: "Unknown MyCAT Database '" + db + "'");9: return;10: }11: 12: // .... 省略代码13: 14: // 路由到后端数据库,执行 SQL15: routeEndExecuteSQL(sql, type, schema);16: }17: 18: public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {19: // 路由计算20: RouteResultset rrs = null;21: try {22: rrs = MycatServer23: .getInstance()24: .getRouterservice()25: .route(MycatServer.getInstance().getConfig().getSystem(),26: schema, type, sql, this.charset, this);27: 28: } catch (Exception e) {29: StringBuilder s = new StringBuilder();30: LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e);31: String msg = e.getMessage();32: writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg);33: return;34: }35: 36: // 执行 SQL37: if (rrs != null) {38: // session执行39: session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);40: }41: 42: }43: 44: }

3. 获得路由结果

【单库单表】插入(02获取路由)

【 1 - 5 】

获得路由主流程。核心代码如下:

1: // ⬇️⬇️⬇️【SelectHandler.java】2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema,3: int sqlType, String stmt, String charset, ServerConnection sc)4: throws SQLNonTransientException {5: RouteResultset rrs = null;6: 7: // SELECT 类型的SQL, 检测缓存是否存在8: if (sqlType == ServerParse.SELECT) {9: cacheKey = schema.getName() + stmt; 10: rrs = (RouteResultset) sqlRouteCache.get(cacheKey);11: if (rrs != null) {12: checkMigrateRule(schema.getName(),rrs,sqlType);13: return rrs;14: }15: }16: }17: 18: // .... 省略代码19: int hintLength = RouteService.isHintSql(stmt);20: if(hintLength != -1){ // TODO 待读:hint21: // .... 省略代码22: }23: } else {24: stmt = stmt.trim();25: rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt,26: charset, sc, tableId2DataNodeCache);27: }28: 29: // 记录查询命令路由结果缓存30: if (rrs != null && sqlType == ServerParse.SELECT && rrs.isCacheAble()) {31: sqlRouteCache.putIfAbsent(cacheKey, rrs);32: }33: // .... 省略代码 return rrs;34: }35: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】36: @Override37: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,38: String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {39: 40: // .... 省略代码41: 42: // 处理一些路由之前的逻辑;全局序列号,父子表插入43: if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) {44: return null;45: }46: 47: // .... 省略代码48: 49: // 检查是否有分片50: if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {51: rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);52: } else {53: RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);54: if (returnedSet == null) {55: rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);56: }57: }58: 59: return rrs;60: }

��【3】第 7 至 16 行 :当 Select SQL 存在路由结果缓存时,直接返回缓存。
��【6】第 29 至 32 行 :记录 Select SQL 路由结果到缓存。

路由 详细解析,我们另开文章,避免内容过多,影响大家对【插入】流程和逻辑的理解。

4. 获得 MySQL 连接,执行 SQL

【单库单表】查询(03执行 SQL)

【 1 - 8 】

获得 MySQL 连接。

  • PhysicalDBNode :物理数据库节点。
  • PhysicalDatasource :物理数据库数据源。

【 9 - 13 】

发送 SQL 到 MySQL Server,执行 SQL。

�� 5. 响应执行 SQL 结果

【单库单表】查询(04执行响应)

核心代码如下:

1: // ⬇️⬇️⬇️【MySQLConnectionHandler.java】2: @Override3: protected void handleData(byte[] data) {4: switch (resultStatus) {5: case RESULT_STATUS_INIT:6: switch (data[4]) {7: case OkPacket.FIELD_COUNT:8: handleOkPacket(data);9: break;10: case ErrorPacket.FIELD_COUNT:11: handleErrorPacket(data);12: break;13: case RequestFilePacket.FIELD_COUNT:14: handleRequestPacket(data);15: break;16: default: // 初始化 header fields17: resultStatus = RESULT_STATUS_HEADER;18: header = data;19: fields = new ArrayList((int) ByteUtil.readLength(data,20: 4));21: }22: break;23: case RESULT_STATUS_HEADER:24: switch (data[4]) {25: case ErrorPacket.FIELD_COUNT:26: resultStatus = RESULT_STATUS_INIT;27: handleErrorPacket(data);28: break;29: case EOFPacket.FIELD_COUNT: // 解析 fields 结束30: resultStatus = RESULT_STATUS_FIELD_EOF;31: handleFieldEofPacket(data);32: break;33: default: // 解析 fields34: fields.add(data);35: }36: break;37: case RESULT_STATUS_FIELD_EOF:38: switch (data[4]) {39: case ErrorPacket.FIELD_COUNT:40: resultStatus = RESULT_STATUS_INIT;41: handleErrorPacket(data);42: break;43: case EOFPacket.FIELD_COUNT: // 解析 每行记录 结束44: resultStatus = RESULT_STATUS_INIT;45: handleRowEofPacket(data);46: break;47: default: // 每行记录48: handleRowPacket(data);49: }50: break;51: default:52: throw new RuntimeException("unknown status!");53: }54: }

6. 其他 :更新 / 删除

流程基本和 《MyCAT源码分析:【单库单表】插入》 相同。我们就不另外文章解析。


推荐阅读
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • Oracle Database 10g许可授予信息及高级功能详解
    本文介绍了Oracle Database 10g许可授予信息及其中的高级功能,包括数据库优化数据包、SQL访问指导、SQL优化指导、SQL优化集和重组对象。同时提供了详细说明,指导用户在Oracle Database 10g中如何使用这些功能。 ... [详细]
  • [译]技术公司十年经验的职场生涯回顾
    本文是一位在技术公司工作十年的职场人士对自己职业生涯的总结回顾。她的职业规划与众不同,令人深思又有趣。其中涉及到的内容有机器学习、创新创业以及引用了女性主义者在TED演讲中的部分讲义。文章表达了对职业生涯的愿望和希望,认为人类有能力不断改善自己。 ... [详细]
  • 本文详细介绍了MysqlDump和mysqldump进行全库备份的相关知识,包括备份命令的使用方法、my.cnf配置文件的设置、binlog日志的位置指定、增量恢复的方式以及适用于innodb引擎和myisam引擎的备份方法。对于需要进行数据库备份的用户来说,本文提供了一些有价值的参考内容。 ... [详细]
  • 本文由编程笔记小编整理,介绍了PHP中的MySQL函数库及其常用函数,包括mysql_connect、mysql_error、mysql_select_db、mysql_query、mysql_affected_row、mysql_close等。希望对读者有一定的参考价值。 ... [详细]
  • ZSI.generate.Wsdl2PythonError: unsupported local simpleType restriction ... [详细]
  • 推荐系统遇上深度学习(十七)详解推荐系统中的常用评测指标
    原创:石晓文小小挖掘机2018-06-18笔者是一个痴迷于挖掘数据中的价值的学习人,希望在平日的工作学习中,挖掘数据的价值, ... [详细]
  • 解决Cydia数据库错误:could not open file /var/lib/dpkg/status 的方法
    本文介绍了解决iOS系统中Cydia数据库错误的方法。通过使用苹果电脑上的Impactor工具和NewTerm软件,以及ifunbox工具和终端命令,可以解决该问题。具体步骤包括下载所需工具、连接手机到电脑、安装NewTerm、下载ifunbox并注册Dropbox账号、下载并解压lib.zip文件、将lib文件夹拖入Books文件夹中,并将lib文件夹拷贝到/var/目录下。以上方法适用于已经越狱且出现Cydia数据库错误的iPhone手机。 ... [详细]
  • Google Play推出全新的应用内评价API,帮助开发者获取更多优质用户反馈。用户每天在Google Play上发表数百万条评论,这有助于开发者了解用户喜好和改进需求。开发者可以选择在适当的时间请求用户撰写评论,以获得全面而有用的反馈。全新应用内评价功能让用户无需返回应用详情页面即可发表评论,提升用户体验。 ... [详细]
  • Python正则表达式学习记录及常用方法
    本文记录了学习Python正则表达式的过程,介绍了re模块的常用方法re.search,并解释了rawstring的作用。正则表达式是一种方便检查字符串匹配模式的工具,通过本文的学习可以掌握Python中使用正则表达式的基本方法。 ... [详细]
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • CF:3D City Model(小思维)问题解析和代码实现
    本文通过解析CF:3D City Model问题,介绍了问题的背景和要求,并给出了相应的代码实现。该问题涉及到在一个矩形的网格上建造城市的情景,每个网格单元可以作为建筑的基础,建筑由多个立方体叠加而成。文章详细讲解了问题的解决思路,并给出了相应的代码实现供读者参考。 ... [详细]
  • ALTERTABLE通过更改、添加、除去列和约束,或者通过启用或禁用约束和触发器来更改表的定义。语法ALTERTABLEtable{[ALTERCOLUMNcolu ... [详细]
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • Oracle10g备份导入的方法及注意事项
    本文介绍了使用Oracle10g进行备份导入的方法及相关注意事项,同时还介绍了2019年独角兽企业重金招聘Python工程师的标准。内容包括导出exp命令、删用户、创建数据库、授权等操作,以及导入imp命令的使用。详细介绍了导入时的参数设置,如full、ignore、buffer、commit、feedback等。转载来源于https://my.oschina.net/u/1767754/blog/377593。 ... [详细]
author-avatar
水月琴轩_452
这个家伙很懒,什么也没留下!
RankList | 热门文章