热门标签 | HotTags
当前位置:  开发笔记 > 后端 > 正文

mysqlkafkabinlog_利用Binlog和Kafka实时同步mysql数据到Elasticsearch(三)Binlog日志生产消息到Kafka...

目录前言-项目模块BinlogMiddleware1、binlog中间件,负责解析binlog,把变动的数据以json形式发送到kafka队列。Kafk

目录

前言

- 项目模块

BinlogMiddleware

1、binlog中间件,负责解析binlog,把变动的数据以json形式发送到kafka队列。

KafkaMiddleware

2、kafka中间件,负责消费kafka队列中的Message,把数据写入Elasticsearch中。

- 基础服务

(1)Mysql

(2)Kafka(用于存放mysql变动消息,存放于Kafka队列)

(3)Elasticsearch

- 项目源码

简介:

BinlogMiddleware服务主要负责监听Binlog日志,并将其发送到Kafka队列(及Kafka生产者)。

本示例模拟监听teemoliu数据库的user、role表。为了方便表结构设计的很简单,均只含有id、name两个属性。

中间件写进Kafka队列的消息格式如下:

{"event":"teemoliu.user.update","value":[1,"TeemoLiu"]}

{"event":"teemoliu.role.insert","value":[1,"管理员"]}

项目结构如下:

2、导入maven引用。

com.github.shyiko

mysql-binlog-connector-java

0.16.1

com.alibaba

fastjson

1.2.49

org.springframework.kafka

spring-kafka

org.apache.kafka

kafka-clients

1.1.1

3、配置文件如下:

# 停用服务端口

spring.main.web-environment=false

# binlog配置

server.id=1

binlog.host=localhost

binlog.port=3306

binlog.user=root

binlog.password=root

# 指定监听的表格

binlog.database.table=teemoliu.user,teemoliu.role

# kafka

spring.kafka.bootstrap-servers=localhost:9092

kafka.topic=binlog

kafka.partNum=3

kafka.repeatNum=1

4、创建Binlog数据传输对象

public class BinlogDto {

private String event;

private Object value;

public BinlogDto(String event, Object value) {

this.event = event;

this.value = value;

}

public BinlogDto() {

}

public String getEvent() {

return event;

}

public void setEvent(String event) {

this.event = event;

}

public Object getValue() {

return value;

}

public void setValue(Object value) {

this.value = value;

}

}

5、创建Kafka数据传输对象

public class Message {

private Long id;

private String msg;

private Date sendTime;

public Message(Long id, String msg, Date sendTime) {

this.id = id;

this.msg = msg;

this.sendTime = sendTime;

}

public Message() {

}

public Long getId() {

return id;

}

public void setId(Long id) {

this.id = id;

}

public String getMsg() {

return msg;

}

public void setMsg(String msg) {

this.msg = msg;

}

public Date getSendTime() {

return sendTime;

}

public void setSendTime(Date sendTime) {

this.sendTime = sendTime;

}

}

6、binlog监听BinlogClientRunner

@Component

public class BinlogClientRunner implements CommandLineRunner {

@Value("${binlog.host}")

private String host;

@Value("${binlog.port}")

private int port;

@Value("${binlog.user}")

private String user;

@Value("${binlog.password}")

private String password;

// binlog server_id

@Value("${server.id}")

private long serverId;

// kafka话题

@Value("${kafka.topic}")

private String topic;

// kafka分区

@Value("${kafka.partNum}")

private int partNum;

// Kafka备份数

@Value("${kafka.repeatNum}")

private short repeatNum;

// kafka地址

@Value("${spring.kafka.bootstrap-servers}")

private String kafkaHost;

// 指定监听的数据表

@Value("${binlog.database.table}")

private String database_table;

@Autowired

KafkaSender kafkaSender;

@Async

@Override

public void run(String... args) throws Exception {

// 创建topic

kafkaSender.createTopic(kafkaHost, topic, partNum, repeatNum);

// 获取监听数据表数组

List databaseList = Arrays.asList(database_table.split(","));

HashMap tableMap = new HashMap();

// 创建binlog监听客户端

BinaryLogClient client = new BinaryLogClient(host, port, user, password);

client.setServerId(serverId);

client.registerEventListener((event -> {

// binlog事件

EventData data = event.getData();

if (data != null) {

if (data instanceof TableMapEventData) {

TableMapEventData tableMapEventData = (TableMapEventData) data;

tableMap.put(tableMapEventData.getTableId(), tableMapEventData.getDatabase() + "." + tableMapEventData.getTable());

}

// update数据

if (data instanceof UpdateRowsEventData) {

UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;

String tableName = tableMap.get(updateRowsEventData.getTableId());

if (tableName != null && databaseList.contains(tableName)) {

String eventKey = tableName + ".update";

for (Map.Entry row : updateRowsEventData.getRows()) {

String msg = JSON.toJSONString(new BinlogDto(eventKey, row.getValue()));

kafkaSender.send(topic, msg);

}

}

}

// insert数据

else if (data instanceof WriteRowsEventData) {

WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data;

String tableName = tableMap.get(writeRowsEventData.getTableId());

if (tableName != null && databaseList.contains(tableName)) {

String eventKey = tableName + ".insert";

for (Serializable[] row : writeRowsEventData.getRows()) {

String msg = JSON.toJSONString(new BinlogDto(eventKey, row));

kafkaSender.send(topic, msg);

}

}

}

// delete数据

else if (data instanceof DeleteRowsEventData) {

DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data;

String tableName = tableMap.get(deleteRowsEventData.getTableId());

if (tableName != null && databaseList.contains(tableName)) {

String eventKey = tableName + ".delete";

for (Serializable[] row : deleteRowsEventData.getRows()) {

String msg = JSON.toJSONString(new BinlogDto(eventKey, row));

kafkaSender.send(topic, msg);

}

}

}

}

}));

client.connect();

}

}



推荐阅读
  • 本文介绍了高校天文共享平台的开发过程中的思考和规划。该平台旨在为高校学生提供天象预报、科普知识、观测活动、图片分享等功能。文章分析了项目的技术栈选择、网站前端布局、业务流程、数据库结构等方面,并总结了项目存在的问题,如前后端未分离、代码混乱等。作者表示希望通过记录和规划,能够理清思路,进一步完善该平台。 ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • 2018深入java目标计划及学习内容
    本文介绍了作者在2018年的深入java目标计划,包括学习计划和工作中要用到的内容。作者计划学习的内容包括kafka、zookeeper、hbase、hdoop、spark、elasticsearch、solr、spring cloud、mysql、mybatis等。其中,作者对jvm的学习有一定了解,并计划通读《jvm》一书。此外,作者还提到了《HotSpot实战》和《高性能MySQL》等书籍。 ... [详细]
  • MySQL数据库锁机制及其应用(数据库锁的概念)
    本文介绍了MySQL数据库锁机制及其应用。数据库锁是计算机协调多个进程或线程并发访问某一资源的机制,在数据库中,数据是一种供许多用户共享的资源,如何保证数据并发访问的一致性和有效性是数据库必须解决的问题。MySQL的锁机制相对简单,不同的存储引擎支持不同的锁机制,主要包括表级锁、行级锁和页面锁。本文详细介绍了MySQL表级锁的锁模式和特点,以及行级锁和页面锁的特点和应用场景。同时还讨论了锁冲突对数据库并发访问性能的影响。 ... [详细]
  • 2021最新总结网易/腾讯/CVTE/字节面经分享(附答案解析)
    本文分享作者在2021年面试网易、腾讯、CVTE和字节等大型互联网企业的经历和问题,包括稳定性设计、数据库优化、分布式锁的设计等内容。同时提供了大厂最新面试真题笔记,并附带答案解析。 ... [详细]
  • ElasticSerach初探第一篇认识ES+环境搭建+简单MySQL数据同步+SpringBoot整合ES
    一、认识ElasticSearch是一个基于Lucene的开源搜索引擎,通过简单的RESTfulAPI来隐藏Lucene的复杂性。全文搜索,分析系统&# ... [详细]
  • 讨伐Java多线程与高并发——MQ篇
    本文是学习Java多线程与高并发知识时做的笔记。这部分内容比较多,按照内容分为5个部分:多线程基础篇JUC篇同步容器和并发容器篇线程池篇MQ篇本篇 ... [详细]
  • 2019我的金三银四
    先讲一下自己的情况吧,二本学生,17年毕业,目前在一家跨境电商从事Java技术开发工作(不是阿里,没那么厉害),技术栈目前偏向于容器云、持续集成持续交付这一块,也就是SpringBoot、Kuber ... [详细]
  • 你知道Kafka和Redis的各自优缺点吗?一文带你优化选择,不走弯路 ... [详细]
  • kafka教程基本概念
    kafka教程基本概念 ... [详细]
  • MySQL插入数据的四种方式及安全性分析
    本文介绍了MySQL插入数据的四种方式:插入完整的行、插入行的一部分、插入多行和插入查询结果,并对其安全性进行了分析。在插入行时,应注意字段的定义和赋值,以提高安全性。同时指出了使用insert语句的不安全性,应尽量避免使用。建议在表中定义相关字段,并根据定义的字段赋予相应的值,以增加插入操作的安全性。 ... [详细]
  • 一次上线事故,30岁+的程序员踩坑经验之谈
    本文主要介绍了一位30岁+的程序员在一次上线事故中踩坑的经验之谈。文章提到了在双十一活动期间,作为一个在线医疗项目,他们进行了优惠折扣活动的升级改造。然而,在上线前的最后一天,由于大量数据请求,导致部分接口出现问题。作者通过部署两台opentsdb来解决问题,但读数据的opentsdb仍然经常假死。作者只能查询最近24小时的数据。这次事故给他带来了很多教训和经验。 ... [详细]
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • 前言最近一段时间在整公司项目里一个功能的优化,用到了多线程处理。期间也是踩了不少的坑,在这里想说下我遇到的问题和注意事项。以及怎样知道启动的那些多线程都 ... [详细]
  • 大家好,这是一个为了梦想而保持学习的博客。这个专题会记录我对于KAFKA的学习和实战经验,希望对大家有所帮助,目录形式依旧为问答的方式,相当于是模拟面试。一、概述在对kafka有了 ... [详细]
author-avatar
kanney姜_958
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有