热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMq的最终一致性分布式事务

RabbitMq的最终一致性分布式事务使用rabbitmq的步骤1.运行安装在服务器上的rabbit服务2.在项目中安装依赖3.编写对应的配置文件4.创建对应配置并加上启动注解5.


RabbitMq的最终一致性分布式事务

    • 使用rabbitmq的步骤
      • 1.运行安装在服务器上的rabbit服务
      • 2.在项目中安装依赖
      • 3.编写对应的配置文件
      • 4.创建对应配置并加上启动注解
      • 5.创建message表记录发送次数及信息
      • 6.发送请求时并创建message信息
      • 7.创建spring Task定时器并定时输出rabbitmq信息
      • 8.创建消息确定方法,确认接受方收到的了消息并进行了处理
      • 9.消息接受者创建消息重复表进行消息去重
      • 10.接受方微服务创建监听器监听rabbitmq信息


使用rabbitmq的步骤

在这里插入图片描述


1.运行安装在服务器上的rabbit服务

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Rs8LPaRN-1664352105453)(C:\Users\han\AppData\Roaming\Typora\typora-user-images\image-20220901201727164.png)]

或者在docker上运行

# 使用数据卷
docker volume rm rabbitmq-5672-data
docker volume create --name rabbitmq-5672-data
docker run -d --rm --name rabbitmq-5672 \-v /etc/localtime:/etc/localtime:ro \-v rabbitmq-5672-data:/var/lib/rabbitmq \-p 5672:5672 \-p 15672:15672 \rabbitmq:3.10-management

# 这个例子挂载「数据存储目录」
docker run -d --rm --name rabbitmq-5672 \-v /etc/localtime:/etc/localtime:ro \-v ~/docker/5672/data:/var/lib/rabbitmq \-p 5672:5672 \-p 15672:15672 \rabbitmq:3.10-management

2.在项目中安装依赖

<dependency><groupId>org.springframework.amqpgroupId><artifactId>spring-rabbit-testartifactId><scope>testscope>
dependency>

3.编写对应的配置文件

## 连接rabbitmq服务器
spring.rabbitmq.host&#61;192.168.12.12
spring.rabbitmq.port&#61;5672
spring.rabbitmq.username&#61;guest
spring.rabbitmq.password&#61;guest
spring.rabbitmq.virtual-host&#61;hl## 手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode&#61;manual
spring.rabbitmq.listener.direct.acknowledge-mode&#61;manual## 确认消息已发送到交换机&#xff08; Exchange &#xff09;
spring.rabbitmq.publisher-confirm-type&#61;CORRELATED# 确认消息已发送到队列&#xff08;Queue&#xff09;
spring.rabbitmq.publisher-returns&#61;true

4.创建对应配置并加上启动注解

&#64;Configuration
&#64;EnableRabbit
&#64;Slf4j
&#64;Transactional
public class RabbitConfig {&#64;Resourceprivate MessageDao messageDao;public static final String EMPLOYEE_LIST &#61; "employee-list";public static final String DEPARTMENT_DELETE &#61; "department-delete";&#64;Beanpublic Queue DepartmentDelete(){return new Queue(DEPARTMENT_DELETE);}&#64;Beanpublic Queue employeeList(){return new Queue(EMPLOYEE_LIST);}&#64;Bean("rabbitTemplate")public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate &#61; new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {&#64;Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {//每次发送队列信息将触发此方法,需要添加配置属性System.out.println(correlationData.getId());Message message &#61; messageDao.getOne(Long.parseLong(Objects.requireNonNull(correlationData.getId())));if (ack){message.setStatus("B");}message.setRetryCount(message.getRetryCount()-1);log.info("剩余消息数:"&#43;message.getRetryCount());messageDao.save(message);}});// rabbitTemplate.setMandatory(true);
//
// rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
// &#64;Override
// public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// log.info("ReturnCallback 消息&#xff1a;{}", message);
// log.info("ReturnCallback 回应码&#xff1a;{}", replyCode);
// log.info("ReturnCallback 回应信息&#xff1a;{}", replyText);
// log.info("ReturnCallback 交换机&#xff1a;{}", exchange);
// log.info("ReturnCallback 路由键&#xff1a;{}", routingKey);
// }
// });return rabbitTemplate;}}

5.创建message表记录发送次数及信息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WSNp3Mbr-1664352105454)(C:\Users\han\AppData\Roaming\Typora\typora-user-images\image-20220901202318282.png)]

drop table if exists message;
create table message
(id bigint auto_increment,exchange varchar(64) ,routing_key varchar(64) not null,content varchar(128) not null,retry_count int not null,status varchar(32) not null,primary key (id)
);

创建对应的DAO类和实体类


6.发送请求时并创建message信息

public void deleteById(Long id) {departmentDao.deleteById(id);Message message &#61; new Message(null, null, RabbitConfig.DEPARTMENT_DELETE, id&#43;"", 5, "A");messageDao.save(message);
}

7.创建spring Task定时器并定时输出rabbitmq信息

&#64;Component
&#64;Slf4j
&#64;Transactional
&#64;EnableScheduling
public class RabbitTimer {&#64;Resourceprivate MessageMysqlDao messageDao;&#64;Resourceprivate RabbitTemplate rabbitTemplate;&#64;Scheduled(fixedDelay &#61; 6000)private void process(){//获取状态不等于C和次数大于0的信息QueryWrapper<Message> wrapper &#61; new QueryWrapper<>();wrapper.ne("status", "C");wrapper.gt("retry_count", 0);List<Message> messageList &#61; messageDao.selectList(wrapper);if (messageList.size()&#61;&#61;0){log.info("暂无消息发送,请等待...");}else {//进行信息发送for (Message message : messageList) {String content &#61; message.getId()&#43;":"&#43;message.getContent();CorrelationData correlationData &#61; new CorrelationData(message.getId()&#43;"");if (message.getExchange()&#61;&#61;null) {rabbitTemplate.convertAndSend(message.getRoutingKey(), (Object) content, correlationData);}else{rabbitTemplate.convertAndSend(message.getExchange(), message.getRoutingKey(), content, correlationData);}log.info("消息 {} 已发送",content);}}}
}

8.创建消息确定方法,确认接受方收到的了消息并进行了处理

&#64;RestController
&#64;RequestMapping("/message")
&#64;RequiredArgsConstructor
public class MessageController implements IAMessageController{&#64;Resourceprivate MessageMysqlDao messageMysqlDao;&#64;PostMapping("/update/{id}")public String messageUpdate(&#64;PathVariable("id")Long id){QueryWrapper<Message> wrapper &#61; new QueryWrapper<>();wrapper.eq("id", id);Message message &#61; new Message();message.setStatus("C");messageMysqlDao.update(message,wrapper);return "success";}}

9.消息接受者创建消息重复表进行消息去重

在这里插入图片描述

drop table if exists recived_message;
create table recived_message
(id bigint auto_increment,recived_at datetime
);

10.接受方微服务创建监听器监听rabbitmq信息

消息接受者处理消息发送者发送的消息,在消息处理无误后进行发送openfeign请求,给消息提供者发送确认信息

&#64;Configuration
&#64;RequiredArgsConstructor
&#64;Transactional
public class HarvestResultLister {private final HarvestPlanMysqlDao harvestPlanMysqlDao;private final ReceivedMessageMysqlDao receivedMessageMysqlDao;private final HarvestCheckClient harvestCheckClient;&#64;RabbitListener(queues &#61; RabbitConfig.HARVEST_CHECK)public void harvestUpdateByCheck(String msg, Channel channel,&#64;Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {System.out.println(msg);String[] split &#61; msg.split(":");if (split.length !&#61; 3) {throw new RabbitDataError("发送的数据异常");}String mesId &#61; split[0];//获取发送内容idString contentId &#61; split[1];//获取被修改的采收idString harvestId &#61; split[2];ReceivedMessage receivedMessage &#61; receivedMessageMysqlDao.selectById(Long.parseLong(contentId ));if (receivedMessage !&#61; null){throw new RabbitDataError("发送重复数据");}//存入数据receivedMessageMysqlDao.insert( new ReceivedMessage(Long.parseLong(contentId ), new Date()));QueryWrapper<HarvestPlan> wrapper &#61; new QueryWrapper<>();wrapper.eq("id", Long.parseLong(harvestId));HarvestPlan harvestPlan &#61; new HarvestPlan();harvestPlan.setPurchaseStatusId(3L);String result &#61; harvestCheckClient.messageUpdate(Long.parseLong(mesId));if (!"success".equals(result)){throw new RabbitDataError("确认消息未正常传回");}} catch (Exception e) {e.printStackTrace();throw e;}finally {try {channel.basicAck(tag, false);} catch (IOException e) {e.printStackTrace();}}}
}

推荐阅读
  • 本文详细介绍了如何在PHP中使用Memcached进行数据缓存,包括服务器连接、数据操作、高级功能等。 ... [详细]
  • 本文详细介绍了Oracle 11g中的创建表空间的方法,以及如何设置客户端和服务端的基本配置,包括用户管理、环境变量配置等。 ... [详细]
  • 从理想主义者的内心深处萌发的技术信仰,推动了云原生技术在全球范围内的快速发展。本文将带你深入了解阿里巴巴在开源领域的贡献与成就。 ... [详细]
  • 本文介绍了如何利用X_CORBA实现远程对象调用,并通过多个示例程序展示了其功能与应用,包括基础的Hello World示例、文件传输工具以及一个完整的聊天系统。 ... [详细]
  • 本文探讨了一种统一的语义数据模型,旨在支持物联网、建筑及企业环境下的数据转换。该模型强调简洁性和可扩展性,以促进不同行业间的插件化和互操作性。对于智能硬件开发者而言,这一模型提供了重要的参考价值。 ... [详细]
  • 初探Hadoop:第一章概览
    本文深入探讨了《Hadoop》第一章的内容,重点介绍了Hadoop的基本概念及其如何解决大数据处理中的关键挑战。 ... [详细]
  • 本文由公众号【数智物语】(ID: decision_engine)发布,关注获取更多干货。文章探讨了从数据收集到清洗、建模及可视化的全过程,介绍了41款实用工具,旨在帮助数据科学家和分析师提升工作效率。 ... [详细]
  • 本文分享了作者在使用LaTeX过程中的几点心得,涵盖了从文档编辑、代码高亮、图形绘制到3D模型展示等多个方面的内容。适合希望深入了解LaTeX高级功能的用户。 ... [详细]
  • 本文介绍了Tomcat的基本操作,包括启动、关闭及首次访问的方法,并详细讲解了如何在IDEA中创建Web项目,配置Servlet及其映射,以及如何将项目部署到Tomcat。 ... [详细]
  • 本文探讨了使用Python实现监控信息收集的方法,涵盖从基础的日志记录到复杂的系统运维解决方案,旨在帮助开发者和运维人员提升工作效率。 ... [详细]
  • 【MySQL】frm文件解析
    官网说明:http:dev.mysql.comdocinternalsenfrm-file-format.htmlfrm是MySQL表结构定义文件,通常frm文件是不会损坏的,但是如果 ... [详细]
  • Java虚拟机及其发展历程
    Java虚拟机(JVM)是每个Java开发者日常工作中不可或缺的一部分,但其背后的运作机制却往往显得神秘莫测。本文将探讨Java及其虚拟机的发展历程,帮助读者深入了解这一关键技术。 ... [详细]
  • 服务器虚拟化存储设计,完美规划储存与资源,部署高性能虚拟化桌面
    规划部署虚拟桌面环境前,必须先估算目前所使用实体桌面环境的工作负载与IOPS性能,并慎选储存设备。唯有谨慎估算贴近实际的IOPS性能,才能 ... [详细]
  • 深入解析:存储技术的演变与发展
    本文探讨了从单机文件系统到分布式文件系统的存储技术发展过程,详细解释了各种存储模型及其特点。 ... [详细]
  • 本文总结了近年来在实际项目中使用消息中间件的经验和常见问题,旨在为Java初学者和中级开发者提供实用的参考。文章详细介绍了消息中间件在分布式系统中的作用,以及如何通过消息中间件实现高可用性和可扩展性。 ... [详细]
author-avatar
耗子很傻爱钻洞
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有