热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

微服务MySQL分库分表数据到MongoDB同步方案[转]

微服,务,mysql,机柜,平台,结合,到,mongo

需求背景

近年来,微服务概念持续火热,网络上针对微服务和单体架构的讨论也是越来越多,面对日益增长的业务需求是,很多公司做技术架构升级时优先选用微服务方式。我所在公司也是选的这个方向来升级技术架构,以支撑更大访问量和更方便的业务扩展。

发现问题

微服务拆分主要分两种方式:拆分业务系统不拆分数据库,拆分业务系统拆分库。如果数据规模小的话大可不必拆分数据库,因为拆分数据看必将面对多维度数据查询,跨进程之间的事务等问题。而我所在公司随着业务发展单数据库实例已经不能满足业务需要,所以选择了拆分业务系统同时拆分数据库的模式,所以也面临着以上的问题。本文主要介绍多维度数据实时查询解决方案。当前系统架构和存储结构如下:

image

解决思路

  • 要对多数据库数据进行查询,首先就需要将数据库同步到一起以方便查询

  • 为了满足大数据量数据需求,所以优先选择NOSQL数据库做同步库

  • NOSQL数据库基本无法进行关联查询,所以需要将关系数据进行拼接操作,转换成非关系型数据

  • 业务多维度查询需要实时性,所以需要选择NOSQL中实时性相对比较好的数据库:MongoDB

根据以上思路,总结数据整合架构如下图所示:

image

解决方案

目前网上一些数据同步案例分两种:MQ消息同步和binlog数据读取同步

先说MQ消息同步,该同步方式我所在公司试用过一段时间,发现以下问题:

  • 数据围绕业务进行,对业务关键性数据操作发送MQ消息,对业务系统依赖性比较高

  • 对于数据库中存量数据需要单独处理

  • 对于工具表还需要单独维护同步

  • 每次新增数据表都需要重新添加MQ逻辑

考虑到以上问题,用MQ方式同步数据并不是最优解决办法

使用binlog 数据读取方式目前有一些成熟方案,比如tungsten replicator,但这些同步工具只能实现数据1:1复制,数据复制过程自定义逻辑添加比较麻烦,不支持分库分表数据归集操作。综上所述,最优方案应该是读取后binlog后自行处理后续数据逻辑。目前binlog读取binlog工具中最成熟的方案应该就是alibaba开源的canal了。

canal

canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件 。阿里云DRDS、阿里巴巴TDDL 二级索引、小表复制. 都是基于canal做的,应用广泛。
canal原理相对比较简单:

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议

  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)

  • canal解析binary log对象(原始为byte流)

canal介绍: https://github.com/alibaba/canal/wiki

我使用的是canal的HA模式,由zookeeper选举可用实例,每个数据库一个instance,服务端配置如下:

目录:

conf     database1         -instance.properties     database2         -instance.properties     canal.properties

instance.properties

canal.instance.mysql.slaveId = 1001 canal.instance.master.address = X.X.X.X:3306 canal.instance.master.journal.name =  canal.instance.master.position =  canal.instance.master.timestamp =  canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 canal.instance.filter.regex = .*\\..* canal.instance.filter.black.regex =

canal.properties

canal.id= 1 canal.ip=X.X.X.X canal.port= 11111 canal.zkServers=X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181 canal.zookeeper.flush.period = 1000 canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 canal.instance.memory.buffer.size = 16384 canal.instance.memory.buffer.memunit = 1024  canal.instance.memory.batch.mode = MEMSIZE canal.instance.detecting.enable = true canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false canal.instance.transaction.size =  1024 canal.instance.fallbackIntervalInSeconds = 60 canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 canal.instance.filter.query.dcl = true canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.binlog.format = ROW,STATEMENT,MIXED  canal.instance.binlog.image = FULL,MINIMAL,NOBLOB canal.instance.get.ddl.isolation = false canal.destinatiOns= example,p4-test canal.conf.dir = ../conf canal.auto.scan = true canal.auto.scan.interval = 5 canal.instance.global.mode = spring  canal.instance.global.lazy = false canal.instance.global.spring.xml = classpath:spring/default-instance.xml

部署数据流如下:

image

tip:
虽然canal同时支持mixed和row类型的binlog日志,但是获取行数据时如果是mixed类型的日志则获取不到表名,所以本方案暂只支持row格式的binlog

数据同步

创建canal client应用订阅canal读取的binlog数据

1.开启多instance 订阅,订阅多个instance

public void initCanalStart() {     List destinations = canalProperties.getDestination();     final List canalClientList = new ArrayList<>();     if (destinations != null && destinations.size() > 0) {         for (String destination : destinations) {             // 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover             CanalConnector connector = CanalConnectors.newClusterConnector(canalProperties.getZkServers(), destination, "", "");             CanalClient client = new CanalClient(destination, connector);             canalClientList.add(client);             client.start();         }     }     Runtime.getRuntime().addShutdownHook(new Thread() {         public void run() {             try {                 logger.info("## stop the canal client");                 for (CanalClient canalClient : canalClientList) {                     canalClient.stop();                 }             } catch (Throwable e) {                 logger.warn("##something goes wrong when stopping canal:", e);             } finally {                 logger.info("## canal client is down.");             }         }     }); }

订阅消息处理

private void process() {     int batchSize = 5 * 1024;     while (running) {         try {             MDC.put("destination", destination);             connector.connect();             connector.subscribe();             while (running) {                 Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据                 long batchId = message.getId();                 int size = message.getEntries().size();                 if (batchId != -1 && size > 0) {                     saveEntry(message.getEntries());                 }                 connector.ack(batchId); // 提交确认                 // connector.rollback(batchId); // 处理失败, 回滚数据             }         } catch (Exception e) {             logger.error("process error!", e);         } finally {             connector.disconnect();             MDC.remove("destination");         }     } }

根据数据库事件处理消息,过滤消息列表,对数据变动进行处理,用到信息为:

  • insert :schemaName,tableName,beforeColumnsList

  • update :schemaName,tableName,afterColumnsList

  • delete :schemaName,tableName,afterColumnsList

RowChange rowChage = null;     try {         rowChage = RowChange.parseFrom(entry.getStoreValue());     } catch (Exception e) {         throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);     }     EventType eventType = rowChage.getEventType();     logger.info(row_format,             entry.getHeader().getLogfileName(),             String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),             entry.getHeader().getTableName(), eventType,             String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime));     if (eventType == EventType.QUERY || rowChage.getIsDdl()) {         logger.info(" sql ----> " + rowChage.getSql());         continue;     }     DataService dataService = SpringUtil.getBean(DataService.class);     for (RowData rowData : rowChage.getRowDatasList()) {         if (eventType == EventType.DELETE) {             dataService.delete(rowData.getBeforeColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());         } else if (eventType == EventType.INSERT) {             dataService.insert(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());         } else if (eventType == EventType.UPDATE) {             dataService.update(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());         } else {             logger.info("未知数据变动类型:{}", eventType);         }     } }

ColumnsList转换成MongoTemplate 可用的数据类:DBObject,顺便做下数据类型转换

public static DBObject columnToJson(List columns) {     DBObject obj = new BasicDBObject();     try {         for (CanalEntry.Column column : columns) {             String mysqlType = column.getMysqlType();             //int类型,长度11以下为Integer,以上为long             if (mysqlType.startsWith("int")) {                 int lenBegin = mysqlType.indexOf('(');                 int lenEnd = mysqlType.indexOf(')');                 if (lenBegin > 0 && lenEnd > 0) {                     int length = Integer.parseInt(mysqlType.substring(lenBegin + 1, lenEnd));                     if (length > 10) {                         obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));                         continue;                     }                 }                 obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Integer.parseInt(column.getValue()));             } else if (mysqlType.startsWith("bigint")) {                 obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));             } else if (mysqlType.startsWith("decimal")) {                 int lenBegin = mysqlType.indexOf('(');                 int lenCenter = mysqlType.indexOf(',');                 int lenEnd = mysqlType.indexOf(')');                 if (lenBegin > 0 && lenEnd > 0 && lenCenter > 0) {                     int length = Integer.parseInt(mysqlType.substring(lenCenter + 1, lenEnd));                     if (length == 0) {                         obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));                         continue;                     }                 }                 obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Double.parseDouble(column.getValue()));             } else if (mysqlType.equals("datetime") || mysqlType.equals("timestamp")) {                 obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_TIME_FORMAT.parse(column.getValue()));             } else if (mysqlType.equals("date")) {                 obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_FORMAT.parse(column.getValue()));             } else if (mysqlType.equals("time")) {                 obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : TIME_FORMAT.parse(column.getValue()));             } else {                 obj.put(column.getName(), column.getValue());             }         }     } catch (ParseException e) {         e.printStackTrace();     }     return obj; }

tip:
DBObject对象如果同时用于保存原始数据和组合数据或其他数据,使用时应该深度拷贝对象生成副本,然后使用副本

数据拼接

我们获取了数据库数据后做拼接操作,比如两张用户表:

user_info:{id,user_no,user_name,user_password} user_other_info:{id,user_no,idcard,realname}

拼接后mongo数据为:

user:{_id,user_no,userInfo:{id,user_no,user_name,user_password},userOtherInfo:{id,user_no,idcard,realname})

接收到的数据信息很多,如何才能简单的触发数据拼接操作呢?

先看我们能获取的信息:schemaName,tableName,DBObject,Event(insert,update,delete)

将这些信息标识拼接起来看看:/schemaName/tableName/Event(DBObject),没错,就是一个标准的restful链接。只要我们实现一个简单的springMVC 就能自动获取需要的数据信息进行拼接操作。

先实现@Controller,定义名称为Schema,value对应schemaName

@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public  @interface Schema {  String value() default ""; }

然后实现@RequestMapping,定义名称为Table,直接使用Canal中的EventType 对应RequestMethod

@Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented public  @interface Table {     String value() default "";     CanalEntry.EventType[] event() default {}; }

然后创建springUtil,实现接口ApplicationContextAware,应用启动 加载的时候初始化两个Map:intanceMap,handlerMap

private static ApplicationContext applicationContext = null; //库名和数据处理Bean映射Map private static Map instanceMap = new HashMap(); //路劲和数据处理Method映射Map private static Map handlerMap = new HashMap(); @Override public void setApplicationContext(ApplicationContext applicationContext) {     if (SpringUtil.applicationContext == null) {         SpringUtil.applicationContext = applicationContext;         //初始化instanceMap数据         instanceMap();         //初始化handlerMap数据         handlerMap();     } } private void instanceMap() {     Map beans = applicationContext.getBeansWithAnnotation(Schema.class);     for (Object bean : beans.values()) {         Class clazz = bean.getClass();         Object instance = applicationContext.getBean(clazz);         Schema schema = clazz.getAnnotation(Schema.class);         String key = schema.value();         instanceMap.put(key, instance);         logger.info("instanceMap [{}:{}]", key, bean == null ? "null" : clazz.getName());     } } private void handlerMap() {     if (instanceMap.size() <= 0)         return;     for (Map.Entry entry : instanceMap.entrySet()) {         if (entry.getValue().getClass().isAnnotationPresent(Schema.class)) {             Schema schema = entry.getValue().getClass().getAnnotation(Schema.class);             String schemeName = schema.value();             Method[] methods = entry.getValue().getClass().getMethods();             for (Method method : methods) {                 if (method.isAnnotationPresent(Table.class)) {                     Table table = method.getAnnotation(Table.class);                     String tName = table.value();                     CanalEntry.EventType[] events = table.event();                     //未标明数据事件类型的方法不做映射                     if (events.length < 1) {                         continue;                     }                     //同一个方法可以映射多张表                     for (int i = 0; i < events.length; i++) {                         String path = "/" + schemeName + "/" + tName + "/" + events[i].getNumber();                         handlerMap.put(path, method);                         logger.info("handlerMap [{}:{}]", path, method.getName());                     }                 } else {                     continue;                 }             }         } else {             continue;         }     } }

调用方法:

public static void doEvent(String path, DBObject obj) throws Exception {     String[] pathArray = path.split("/");     if (pathArray.length != 4) {         logger.info("path 格式不正确:{}", path);         return;     }     Method method = handlerMap.get(path);     Object schema = instanceMap.get(pathArray[1]);     //查找不到映射Bean和Method不做处理     if (method == null || schema == null) {         return;     }     try {         long begin = System.currentTimeMillis();         logger.info("integrate data:{},{}", path, obj);         method.invoke(schema, new Object[]{obj});         logger.info("integrate data consume: {}ms:", System.currentTimeMillis() - begin);     } catch (Exception e) {         logger.error("调用组合逻辑异常", e);         throw new Exception(e.getCause());     } }

数据拼接消息处理:

@Schema("demo_user") public class UserService {     @Table(value = "user_info", event = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE})     public void saveUser_UserInfo(DBObject userInfo) {         String userNo = userInfo.get("user_no") == null ? null : userInfo.get("user_no").toString();         DBCollection collection = completeMongoTemplate.getCollection("user");         DBObject queryObject = new BasicDBObject("user_no", userNo);         DBObject user = collection.findOne(queryObject);         if (user == null) {             user = new BasicDBObject();             user.put("user_no", userNo);             user.put("userInfo", userInfo);             collection.insert(user);         } else {             DBObject updateObj = new BasicDBObject("userInfo", userInfo);             DBObject update = new BasicDBObject("$set", updateObj);             collection.update(queryObject, update);         }     } }

示例源码

https://github.com/zhangtr/canal-mongo


推荐阅读
  • 利用GitHub热门资源,成功斩获阿里、京东、腾讯三巨头Offer
    Spring框架作为Java生态系统中的重要组成部分,因其强大的功能和灵活的扩展性,被广泛应用于各种规模的企业级应用开发中。本文将通过一份在GitHub上获得极高评价的Spring全家桶文档,探讨如何掌握Spring框架及其相关技术,助力职业发展。 ... [详细]
  • Spring Cloud学习指南:深入理解微服务架构
    本文介绍了微服务架构的基本概念及其在Spring Cloud中的实现。讨论了微服务架构的主要优势,如简化开发和维护、快速启动、灵活的技术栈选择以及按需扩展的能力。同时,也探讨了微服务架构面临的挑战,包括较高的运维要求、分布式系统的复杂性、接口调整的成本等问题。最后,文章提出了实施微服务时应遵循的设计原则。 ... [详细]
  • Go 通过 Map/Filter/ForEach 等流式 API 高效处理数据
    go,通过,map,filter,foreach,等,流,式,ap ... [详细]
  • 近期,公司在构建新的交易系统时遇到了一个常见的问题——金额存储。由于涉及资金的操作需要高度的准确性,使用float类型进行金额计算可能会导致不可预见的误差。本文将深入探讨这一问题,并提供解决方案。 ... [详细]
  • 本文详细介绍了如何利用go-zero框架从需求分析到最终部署至Kubernetes的全过程,特别聚焦于微服务架构中的网关设计与实现。项目采用了go-zero及其生态组件,涵盖了从API设计到RPC调用,再到生产环境下的监控与维护等多方面内容。 ... [详细]
  • 嵌入式开发环境搭建与文件传输指南
    本文详细介绍了如何为嵌入式应用开发搭建必要的软硬件环境,并提供了通过串口和网线两种方式将文件传输到开发板的具体步骤。适合Linux开发初学者参考。 ... [详细]
  • 深入解析Spring Cloud微服务架构与分布式系统实战
    本文详细介绍了Spring Cloud在微服务架构和分布式系统中的应用,结合实际案例和最新技术,帮助读者全面掌握微服务的实现与优化。 ... [详细]
  • MainActivityimportandroid.app.Activity;importandroid.os.Bundle;importandroid.os.Handler;im ... [详细]
  • 本文探讨了随着并发需求的增长,MySQL数据库架构如何从简单的单一实例发展到复杂的分布式系统,以及每一步演进背后的原理和技术解决方案。 ... [详细]
  • 精选Unity开源项目:UniRx实现响应式编程
    本文介绍了Unity中的响应式编程框架——UniRx,探讨了其在解决异步编程难题中的应用及优势。 ... [详细]
  • 基于 NCNN 框架的 PelleeNet_SSD 实现,适用于嵌入式和移动设备的高效目标检测。 ... [详细]
  • Spring Cloud Config: 高效统一的配置管理解决方案
    Spring Cloud Config 是一个用于集中管理和分发应用程序配置的工具,支持多环境下的配置管理(如开发、测试和生产环境),并且能够根据需求动态调整配置参数(例如,在大型促销活动期间增加数据库的最大连接数)。 ... [详细]
  • 本文详细探讨了 Java 中 Daemon 线程的特点及其应用场景,并深入分析了 Random 类的源代码,帮助开发者更好地理解和使用这些核心组件。 ... [详细]
  • Linux 5.3内核正式发布并标记为稳定
    知名Linux内核开发者Greg Kroah-Hartman宣布,最新的Linux 5.3内核已正式标记为稳定版本,适用于大规模部署。该版本带来了多项新特性和改进,包括对最新硬件的支持和性能优化。 ... [详细]
  • 微服务架构详解及其入门指南
    本文详细介绍了微服务的基本概念、发展历程、与传统架构的区别及优势,并探讨了适合采用微服务架构的场景。此外,文章还深入分析了几个主流的微服务开发框架,特别是Spring Cloud的组成和特点。 ... [详细]
author-avatar
宇剑小窝_911
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有