title: MyCAT 源码分析 —— 【单库单表】查询
date: 2017-05-30
tags:
categories: MyCAT
permalink: MyCAT/single-db-single-table-select
������关注微信公众号:【芋艿的后端小屋】有福利:
1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
3. 您对于源码的疑问每条留言都将得到认真回复。甚至不知道如何读源码也可以请教噢。
4. 新的源码解析文章实时收到通知。每周更新一篇左右。
内容形态以 顺序图 + 核心代码 为主。
如果有地方表述不错误或者不清晰,欢迎留言。
对于内容形态,非常纠结,如果有建议,特别特别特别欢迎您提出。
微信号:wangwenbin-server。
本文讲解 【单库单表】查询 所涉及到的代码。
��内容和 《MyCAT 源码分析 —— 【单库单表】插入》 超级相似,一方面本身流程基本相同,另外一方面文章结构没拆分好。我们使用 �� 标记差异的逻辑。
交互如下图:
整个过程,MyCAT Server 流程如下:
我们逐个步骤分析,一起来看看源码。
接收一条 MySQL 命令。在【1】之前,还有请求数据读取、拆成单条 MySQL SQL。
1: // ⬇️⬇️⬇️【FrontendCommandHandler.java】2: public class FrontendCommandHandler implements NIOHandler {3: 4: @Override5: public void handle(byte[] data) {6: 7: // .... 省略部分代码8: switch (data[4]) // 9: {10: case MySQLPacket.COM_INIT_DB:11: commands.doInitDB();12: source.initDB(data);13: break;14: case MySQLPacket.COM_QUERY: // 查询命令15: // 计数查询命令16: commands.doQuery();17: // 执行查询命令18: source.query(data);19: break;20: case MySQLPacket.COM_PING:21: commands.doPing();22: source.ping();23: break;24: // .... 省略部分case25: }26: }27: 28: }
INSERT
/SELECT
/UPDATE
/DELETE
等 SQL 归属于 MySQLPacket.COM_QUERY
,详细可见:《MySQL协议分析#4.2 客户端命令请求报文(客户端 -> 服务器)》。
将 二进制数组 解析成 SQL。核心代码如下:
1: // ⬇️⬇️⬇️【FrontendConnection.java】2: public void query(byte[] data) {3: // 取得语句4: String sql = null; 5: try {6: MySQLMessage mm = new MySQLMessage(data);7: mm.position(5);8: sql = mm.readString(charset);9: } catch (UnsupportedEncodingException e) {10: writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");11: return;12: } 13: // 执行语句14: this.query( sql );15: }
解析 SQL 类型。核心代码如下:
1: // ⬇️⬇️⬇️【ServerQueryHandler.java】2: @Override3: public void query(String sql) {4: // 解析 SQL 类型5: int rs = ServerParse.parse(sql);6: int sqlType = rs & 0xff;7: 8: switch (sqlType) {9: //explain sql10: case ServerParse.EXPLAIN:11: ExplainHandler.handle(sql, c, rs >>> 8);12: break;13: // .... 省略部分case14: break;15: case ServerParse.SELECT:16: SelectHandler.handle(sql, c, rs >>> 8);17: break;18: // .... 省略部分case19: default:20: if(readOnly){21: LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());22: c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");23: break;24: }25: c.execute(sql, rs & 0xff);26: }27: }28: 29:30: // ⬇️⬇️⬇️【ServerParse.java】31: public static int parse(String stmt) {32: int length = stmt.length();33: //FIX BUG FOR SQL SUCH AS /XXXX/SQL34: int rt = -1;35: for (int i = 0; i
解析 Select SQL 类型,分发到对应的逻辑。核心代码如下:
1: // ⬇️⬇️⬇️【SelectHandler.java】2: public static void handle(String stmt, ServerConnection c, int offs) {3: int offset = offs;4: switch (ServerParseSelect.parse(stmt, offs)) { // 解析 Select SQL 类型5: case ServerParseSelect.VERSION_COMMENT: // select @@VERSION_COMMENT;6: SelectVersionComment.response(c);7: break;8: case ServerParseSelect.DATABASE: // select DATABASE();9: SelectDatabase.response(c);10: break;11: case ServerParseSelect.USER: // select CURRENT_USER();12: SelectUser.response(c);13: break;14: case ServerParseSelect.VERSION: // select VERSION();15: SelectVersion.response(c);16: break;17: case ServerParseSelect.SESSION_INCREMENT: // select @@session.auto_increment_increment;18: SessionIncrement.response(c);19: break;20: case ServerParseSelect.SESSION_ISOLATION: // select @@session.tx_isolation;21: SessionIsolation.response(c);22: break;23: case ServerParseSelect.LAST_INSERT_ID: // select LAST_INSERT_ID();24: // ....省略代码25: break;26: case ServerParseSelect.IDENTITY: // select @@identity27: // ....省略代码28: break;29: case ServerParseSelect.SELECT_VAR_ALL: //30: SelectVariables.execute(c,stmt);31: break;32: case ServerParseSelect.SESSION_TX_READ_ONLY: //33: SelectTxReadOnly.response(c);34: break;35: default: // 其他,例如 select * from table36: c.execute(stmt, ServerParse.SELECT);37: }38: }39: // ⬇️⬇️⬇️【ServerParseSelect.java】40: public static int parse(String stmt, int offset) {41: int i = offset;42: for (; i
执行 SQL,详细解析见下文,核心代码如下:
1: // ⬇️⬇️⬇️【ServerConnection.java】2: public class ServerConnection extends FrontendConnection {3: public void execute(String sql, int type) {4: // .... 省略代码5: SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);6: if (schema == null) {7: writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,8: "Unknown MyCAT Database '" + db + "'");9: return;10: }11: 12: // .... 省略代码13: 14: // 路由到后端数据库,执行 SQL15: routeEndExecuteSQL(sql, type, schema);16: }17: 18: public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {19: // 路由计算20: RouteResultset rrs = null;21: try {22: rrs = MycatServer23: .getInstance()24: .getRouterservice()25: .route(MycatServer.getInstance().getConfig().getSystem(),26: schema, type, sql, this.charset, this);27: 28: } catch (Exception e) {29: StringBuilder s = new StringBuilder();30: LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e);31: String msg = e.getMessage();32: writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg);33: return;34: }35: 36: // 执行 SQL37: if (rrs != null) {38: // session执行39: session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);40: }41: 42: }43: 44: }
获得路由主流程。核心代码如下:
1: // ⬇️⬇️⬇️【SelectHandler.java】2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema,3: int sqlType, String stmt, String charset, ServerConnection sc)4: throws SQLNonTransientException {5: RouteResultset rrs = null;6: 7: // SELECT 类型的SQL, 检测缓存是否存在8: if (sqlType == ServerParse.SELECT) {9: cacheKey = schema.getName() + stmt; 10: rrs = (RouteResultset) sqlRouteCache.get(cacheKey);11: if (rrs != null) {12: checkMigrateRule(schema.getName(),rrs,sqlType);13: return rrs;14: }15: }16: }17: 18: // .... 省略代码19: int hintLength = RouteService.isHintSql(stmt);20: if(hintLength != -1){ // TODO 待读:hint21: // .... 省略代码22: }23: } else {24: stmt = stmt.trim();25: rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt,26: charset, sc, tableId2DataNodeCache);27: }28: 29: // 记录查询命令路由结果缓存30: if (rrs != null && sqlType == ServerParse.SELECT && rrs.isCacheAble()) {31: sqlRouteCache.putIfAbsent(cacheKey, rrs);32: }33: // .... 省略代码 return rrs;34: }35: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】36: @Override37: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,38: String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {39: 40: // .... 省略代码41: 42: // 处理一些路由之前的逻辑;全局序列号,父子表插入43: if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) {44: return null;45: }46: 47: // .... 省略代码48: 49: // 检查是否有分片50: if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {51: rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);52: } else {53: RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);54: if (returnedSet == null) {55: rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);56: }57: }58: 59: return rrs;60: }
��【3】第 7 至 16 行 :当 Select SQL 存在路由结果缓存时,直接返回缓存。
��【6】第 29 至 32 行 :记录 Select SQL 路由结果到缓存。
路由 详细解析,我们另开文章,避免内容过多,影响大家对【插入】流程和逻辑的理解。
获得 MySQL 连接。
发送 SQL 到 MySQL Server,执行 SQL。
核心代码如下:
1: // ⬇️⬇️⬇️【MySQLConnectionHandler.java】2: @Override3: protected void handleData(byte[] data) {4: switch (resultStatus) {5: case RESULT_STATUS_INIT:6: switch (data[4]) {7: case OkPacket.FIELD_COUNT:8: handleOkPacket(data);9: break;10: case ErrorPacket.FIELD_COUNT:11: handleErrorPacket(data);12: break;13: case RequestFilePacket.FIELD_COUNT:14: handleRequestPacket(data);15: break;16: default: // 初始化 header fields17: resultStatus = RESULT_STATUS_HEADER;18: header = data;19: fields = new ArrayList
流程基本和 《MyCAT源码分析:【单库单表】插入》 相同。我们就不另外文章解析。