热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

redis监听写入mysql_基于阿里巴巴Canal框架,kafka监听协议实现Mysql与Redis数据同步...

准备找到我们所需的安装包文件关于Apache的相关包,都可以在这个网站找到啦http:mirrors.hust.edu.cnapache安装JDK此处略过安装zookeeper下载

准备

找到我们所需的安装包文件

关于Apache的相关包,都可以在这个网站找到啦

http://mirrors.hust.edu.cn/apache/

940819196a7b

安装JDK

此处略过

安装zookeeper

下载源码包,并解压

wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

tar -zxvf zookeeper-3.4.14.tar.gz

mv zookeeper-3.4.14/ zookeeper

修改配置文件

编辑 /etc/profile 文件, 在文件末尾添加以下环境变量配置

export ZOOKEEPER_HOME=/usr/local/zookeeper

export PATH=$PATH:$ZOOKEEPER_HOME/bin

运行以下命令使环境变量生效

source /etc/profile

重命名配置文件

初次使用需要将config下zoo_sample.cfg 重命名为 zoo.cfg

cd zookeeper/conf/

mv zoo_sample.cfg zoo.cfg

940819196a7b

创建目录data目录

用于存放持久化数据的地方

mkdir data

940819196a7b

修改配置文件

修改zoo.cfg中的datadir路径为将刚刚创建的data目录的地址

940819196a7b

启动zookeeper服务

/usr/local/software/zookeeper/bin

./zkServer.sh start

940819196a7b

当然也可以通过status命令,来查看zk是否成功运运行,以及什么模式进行运行等

./zkServer.sh status

940819196a7b

好,如下图就代表我们成功连接启动了zk服务器啦~

940819196a7b

安装kafka

下载源码包,并解压

wget http://mirrors.hust.edu.cn/apache/kafka/2.2.2/kafka_2.11-2.2.2.tgz

tar tar -zxvf kafka_2.11-2.2.2.tgz

mv kafka_2.11-2.2.2/ kafka

创建logs目录

cd kafka

mkdir logs

940819196a7b

修改配置文件

vim /usr/local/software/kafka/config/server.properties 修改参数

940819196a7b

启动kafka Server

启动kafka

bin/kafka-server-start.sh -daemon config/server.properties &

关闭kafka

bin/kafka-server-stop.sh -daemon config/server.properties &

查看kafka是否关闭

jps

查看所有topic

bin/kafka-topics.sh --list --zookeeper 192.168.137.5:2181

查看指定topic下的数据

bin/kafka-console-consumer.sh --bootstrap-server 192.168.137.5:9092 --from-beginning --topic sunny-topic

如何判断是否启动成功,请看下图

940819196a7b

安装mysql

安装mysql

此忽略

配置mysql

vim /etc/my.cnf

log-bin=mysql-bin # 开启 binlog

binlog-format=ROW # 选择 ROW 模式

server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

940819196a7b

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

drop user 'canal'@'%'; ##注意,这里如果没有创建canal账号,则会报错,所以没有创建则直接忽略这一步

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';

grant all privileges on *.* to 'canal'@'%' identified by 'canal';

flush privileges;

重启mysql

service mysqld restart

查看是否成功开启binlog日志

show variables like '%log_bin%';

log_bin为ON则开启

940819196a7b

检查权限

一定要检查mysql user 权限为y

940819196a7b

安装Canal

下载源码包,并解压

940819196a7b

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.deployer-1.1.5-SNAPSHOT.tar.gz

mkdir canal

tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C canal

解压完成后,可以看到如下结构

940819196a7b

配置修改

vi conf/example/instance.properties

940819196a7b

修改canal 配置文件

vim /usr/local/software/canal/conf/canal.properties

注释写着暂时支持三种监听模式,默认是tcp模式,我们选择KafKa监听

940819196a7b

更改为kafka的连接地址,或集群地址

940819196a7b

改完后,进入bin目录重启canalServer端

重启完了后,如何查看是否集成kafka成功了呢,很简单

直接进入zk里面查看kafka的主题,是否有我们刚才在server端定义的名称即可确定是否集成成功

注意的是:如果重启后没有发现zk里面有自己的topic主题,可能是懒加载的原因,可以通过修改数据库来实现同步数据,这个时候zk就会有自己的Topic了

940819196a7b

启动canal

bin/startup.sh

940819196a7b

查看是否启动成功

通过查看日志

在canal目录下的/logs/example/example.log日志

940819196a7b

那么,zk、kafka、canal都启动了

我们新建一个数据库,并新增一个表,添加一条数据,看数据是否会被监听到

940819196a7b

查看某个topic的所有消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.137.5:9092 --from-beginning --topic sunny-topic

消息已成功达到kafka消息队列

940819196a7b

建立SpringBoot项目

项目结构

940819196a7b

添加maven依赖

org.springframework.boot

spring-boot-starter-parent

2.1.11.RELEASE

org.springframework.kafka

spring-kafka

org.springframework.boot

spring-boot-starter-data-redis

com.alibaba

fastjson

1.2.70

配置文件

# kafka

spring:

kafka:

# kafka服务器地址(可以多个)

bootstrap-servers: 192.168.137.5:9092

consumer:

# 指定一个默认的组名

group-id: kafka2

# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

auto-offset-reset: earliest

# key/value的反序列化

key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

producer:

# key/value的序列化

key-serializer: org.apache.kafka.common.serialization.StringSerializer

value-serializer: org.apache.kafka.common.serialization.StringSerializer

# 批量抓取

batch-size: 65536

# 缓存容量

buffer-memory: 524288

redis:

host: 127.0.0.1

# password:

port: 6379

database: 0

kafka消费者代码

@Component

public class MembetKafkaConsumer {

@Autowired

private RedisUtil redisUtil;

@KafkaListener(topics = "sunny-topic")

public void receive(ConsumerRecord, ?> consumer) {

System.out.println("topic名称:" + consumer.topic() + ",key:" +

consumer.key() + "," +

"分区位置:" + consumer.partition()

+ ", 下标" + consumer.offset() + "," + consumer.value());

String json = (String) consumer.value();

JSONObject jsonObject = JSONObject.parseObject(json);

String type = jsonObject.getString("type");

String pkNames = jsonObject.getJSONArray("pkNames").getString(0);

JSONArray data = jsonObject.getJSONArray("data");

for (int i = 0; i

JSONObject dataObject = data.getJSONObject(i);

String key = dataObject.getString(pkNames);

switch (type) {

case "UPDATE":

case "INSERT":

redisUtil.setString(key, dataObject.toJSONString());

break;

case "DELETE":

redisUtil.delKey(key);

break;

}

}

}

// @KafkaListener(topics = "sunny-topic")

// public void onMessage(String message){

// //insertIntoDb(buffer);//这里为插入数据库代码

// System.out.println(message);

// }

}

redis工具类

@Component

public class RedisUtil {

@Autowired

private StringRedisTemplate stringRedisTemplate;

/**

* 存放string类型

*

* @param key key

* @param data 数据

* @param timeout 超时间

*/

public void setString(String key, String data, Long timeout) {

stringRedisTemplate.opsForValue().set(key, data);

if (timeout != null) {

stringRedisTemplate.expire(key, timeout, TimeUnit.SECONDS);

}

}

/**

* 存放string类型

*

* @param key key

* @param data 数据

*/

public void setString(String key, String data) {

setString(key, data, null);

}

/**

* 根据key查询string类型

*

* @param key

* @return

*/

public String getString(String key) {

String value = stringRedisTemplate.opsForValue().get(key);

return value;

}

/**

* 根据对应的key删除key

*

* @param key

*/

public boolean delKey(String key) {

return stringRedisTemplate.delete(key);

}

}

数据库数据发生改变,kafka立马消费消息

940819196a7b

Redis同步数据

940819196a7b

本文就到这,如有疑问,评论一起讨论



推荐阅读
  • 搭建Windows Server 2012 R2 IIS8.5+PHP(FastCGI)+MySQL环境的详细步骤
    本文详细介绍了搭建Windows Server 2012 R2 IIS8.5+PHP(FastCGI)+MySQL环境的步骤,包括环境说明、相关软件下载的地址以及所需的插件下载地址。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • uniapp开发H5解决跨域问题的两种代理方法
    本文介绍了uniapp开发H5解决跨域问题的两种代理方法,分别是在manifest.json文件和vue.config.js文件中设置代理。通过设置代理根域名和配置路径别名,可以实现H5页面的跨域访问。同时还介绍了如何开启内网穿透,让外网的人可以访问到本地调试的H5页面。 ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • Nginx使用(server参数配置)
    本文介绍了Nginx的使用,重点讲解了server参数配置,包括端口号、主机名、根目录等内容。同时,还介绍了Nginx的反向代理功能。 ... [详细]
  • 本文介绍了C++中省略号类型和参数个数不确定函数参数的使用方法,并提供了一个范例。通过宏定义的方式,可以方便地处理不定参数的情况。文章中给出了具体的代码实现,并对代码进行了解释和说明。这对于需要处理不定参数的情况的程序员来说,是一个很有用的参考资料。 ... [详细]
  • 本文介绍了在Vue项目中如何结合Element UI解决连续上传多张图片及图片编辑的问题。作者强调了在编码前要明确需求和所需要的结果,并详细描述了自己的代码实现过程。 ... [详细]
  • imx6ull开发板驱动MT7601U无线网卡的方法和步骤详解
    本文详细介绍了在imx6ull开发板上驱动MT7601U无线网卡的方法和步骤。首先介绍了开发环境和硬件平台,然后说明了MT7601U驱动已经集成在linux内核的linux-4.x.x/drivers/net/wireless/mediatek/mt7601u文件中。接着介绍了移植mt7601u驱动的过程,包括编译内核和配置设备驱动。最后,列举了关键词和相关信息供读者参考。 ... [详细]
  • 本文介绍了在mac环境下使用nginx配置nodejs代理服务器的步骤,包括安装nginx、创建目录和文件、配置代理的域名和日志记录等。 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • Java在运行已编译完成的类时,是通过java虚拟机来装载和执行的,java虚拟机通过操作系统命令JAVA_HOMEbinjava–option来启 ... [详细]
  • 本文介绍了深入浅出Linux设备驱动编程的重要性,以及两种加载和删除Linux内核模块的方法。通过一个内核模块的例子,展示了模块的编译和加载过程,并讨论了模块对内核大小的控制。深入理解Linux设备驱动编程对于开发者来说非常重要。 ... [详细]
  • 服务器上的操作系统有哪些,如何选择适合的操作系统?
    本文介绍了服务器上常见的操作系统,包括系统盘镜像、数据盘镜像和整机镜像的数量。同时,还介绍了共享镜像的限制和使用方法。此外,还提供了关于华为云服务的帮助中心,其中包括产品简介、价格说明、购买指南、用户指南、API参考、最佳实践、常见问题和视频帮助等技术文档。对于裸金属服务器的远程登录,本文介绍了使用密钥对登录的方法,并提供了部分操作系统配置示例。最后,还提到了SUSE云耀云服务器的特点和快速搭建方法。 ... [详细]
author-avatar
颂春堂中药
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有