热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMq的最终一致性分布式事务

RabbitMq的最终一致性分布式事务使用rabbitmq的步骤1.运行安装在服务器上的rabbit服务2.在项目中安装依赖3.编写对应的配置文件4.创建对应配置并加上启动注解5.


RabbitMq的最终一致性分布式事务

    • 使用rabbitmq的步骤
      • 1.运行安装在服务器上的rabbit服务
      • 2.在项目中安装依赖
      • 3.编写对应的配置文件
      • 4.创建对应配置并加上启动注解
      • 5.创建message表记录发送次数及信息
      • 6.发送请求时并创建message信息
      • 7.创建spring Task定时器并定时输出rabbitmq信息
      • 8.创建消息确定方法,确认接受方收到的了消息并进行了处理
      • 9.消息接受者创建消息重复表进行消息去重
      • 10.接受方微服务创建监听器监听rabbitmq信息


使用rabbitmq的步骤

在这里插入图片描述


1.运行安装在服务器上的rabbit服务

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Rs8LPaRN-1664352105453)(C:\Users\han\AppData\Roaming\Typora\typora-user-images\image-20220901201727164.png)]

或者在docker上运行

# 使用数据卷
docker volume rm rabbitmq-5672-data
docker volume create --name rabbitmq-5672-data
docker run -d --rm --name rabbitmq-5672 \-v /etc/localtime:/etc/localtime:ro \-v rabbitmq-5672-data:/var/lib/rabbitmq \-p 5672:5672 \-p 15672:15672 \rabbitmq:3.10-management

# 这个例子挂载「数据存储目录」
docker run -d --rm --name rabbitmq-5672 \-v /etc/localtime:/etc/localtime:ro \-v ~/docker/5672/data:/var/lib/rabbitmq \-p 5672:5672 \-p 15672:15672 \rabbitmq:3.10-management

2.在项目中安装依赖

<dependency><groupId>org.springframework.amqpgroupId><artifactId>spring-rabbit-testartifactId><scope>testscope>
dependency>

3.编写对应的配置文件

## 连接rabbitmq服务器
spring.rabbitmq.host&#61;192.168.12.12
spring.rabbitmq.port&#61;5672
spring.rabbitmq.username&#61;guest
spring.rabbitmq.password&#61;guest
spring.rabbitmq.virtual-host&#61;hl## 手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode&#61;manual
spring.rabbitmq.listener.direct.acknowledge-mode&#61;manual## 确认消息已发送到交换机&#xff08; Exchange &#xff09;
spring.rabbitmq.publisher-confirm-type&#61;CORRELATED# 确认消息已发送到队列&#xff08;Queue&#xff09;
spring.rabbitmq.publisher-returns&#61;true

4.创建对应配置并加上启动注解

&#64;Configuration
&#64;EnableRabbit
&#64;Slf4j
&#64;Transactional
public class RabbitConfig {&#64;Resourceprivate MessageDao messageDao;public static final String EMPLOYEE_LIST &#61; "employee-list";public static final String DEPARTMENT_DELETE &#61; "department-delete";&#64;Beanpublic Queue DepartmentDelete(){return new Queue(DEPARTMENT_DELETE);}&#64;Beanpublic Queue employeeList(){return new Queue(EMPLOYEE_LIST);}&#64;Bean("rabbitTemplate")public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate &#61; new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {&#64;Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {//每次发送队列信息将触发此方法,需要添加配置属性System.out.println(correlationData.getId());Message message &#61; messageDao.getOne(Long.parseLong(Objects.requireNonNull(correlationData.getId())));if (ack){message.setStatus("B");}message.setRetryCount(message.getRetryCount()-1);log.info("剩余消息数:"&#43;message.getRetryCount());messageDao.save(message);}});// rabbitTemplate.setMandatory(true);
//
// rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
// &#64;Override
// public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// log.info("ReturnCallback 消息&#xff1a;{}", message);
// log.info("ReturnCallback 回应码&#xff1a;{}", replyCode);
// log.info("ReturnCallback 回应信息&#xff1a;{}", replyText);
// log.info("ReturnCallback 交换机&#xff1a;{}", exchange);
// log.info("ReturnCallback 路由键&#xff1a;{}", routingKey);
// }
// });return rabbitTemplate;}}

5.创建message表记录发送次数及信息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WSNp3Mbr-1664352105454)(C:\Users\han\AppData\Roaming\Typora\typora-user-images\image-20220901202318282.png)]

drop table if exists message;
create table message
(id bigint auto_increment,exchange varchar(64) ,routing_key varchar(64) not null,content varchar(128) not null,retry_count int not null,status varchar(32) not null,primary key (id)
);

创建对应的DAO类和实体类


6.发送请求时并创建message信息

public void deleteById(Long id) {departmentDao.deleteById(id);Message message &#61; new Message(null, null, RabbitConfig.DEPARTMENT_DELETE, id&#43;"", 5, "A");messageDao.save(message);
}

7.创建spring Task定时器并定时输出rabbitmq信息

&#64;Component
&#64;Slf4j
&#64;Transactional
&#64;EnableScheduling
public class RabbitTimer {&#64;Resourceprivate MessageMysqlDao messageDao;&#64;Resourceprivate RabbitTemplate rabbitTemplate;&#64;Scheduled(fixedDelay &#61; 6000)private void process(){//获取状态不等于C和次数大于0的信息QueryWrapper<Message> wrapper &#61; new QueryWrapper<>();wrapper.ne("status", "C");wrapper.gt("retry_count", 0);List<Message> messageList &#61; messageDao.selectList(wrapper);if (messageList.size()&#61;&#61;0){log.info("暂无消息发送,请等待...");}else {//进行信息发送for (Message message : messageList) {String content &#61; message.getId()&#43;":"&#43;message.getContent();CorrelationData correlationData &#61; new CorrelationData(message.getId()&#43;"");if (message.getExchange()&#61;&#61;null) {rabbitTemplate.convertAndSend(message.getRoutingKey(), (Object) content, correlationData);}else{rabbitTemplate.convertAndSend(message.getExchange(), message.getRoutingKey(), content, correlationData);}log.info("消息 {} 已发送",content);}}}
}

8.创建消息确定方法,确认接受方收到的了消息并进行了处理

&#64;RestController
&#64;RequestMapping("/message")
&#64;RequiredArgsConstructor
public class MessageController implements IAMessageController{&#64;Resourceprivate MessageMysqlDao messageMysqlDao;&#64;PostMapping("/update/{id}")public String messageUpdate(&#64;PathVariable("id")Long id){QueryWrapper<Message> wrapper &#61; new QueryWrapper<>();wrapper.eq("id", id);Message message &#61; new Message();message.setStatus("C");messageMysqlDao.update(message,wrapper);return "success";}}

9.消息接受者创建消息重复表进行消息去重

在这里插入图片描述

drop table if exists recived_message;
create table recived_message
(id bigint auto_increment,recived_at datetime
);

10.接受方微服务创建监听器监听rabbitmq信息

消息接受者处理消息发送者发送的消息,在消息处理无误后进行发送openfeign请求,给消息提供者发送确认信息

&#64;Configuration
&#64;RequiredArgsConstructor
&#64;Transactional
public class HarvestResultLister {private final HarvestPlanMysqlDao harvestPlanMysqlDao;private final ReceivedMessageMysqlDao receivedMessageMysqlDao;private final HarvestCheckClient harvestCheckClient;&#64;RabbitListener(queues &#61; RabbitConfig.HARVEST_CHECK)public void harvestUpdateByCheck(String msg, Channel channel,&#64;Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {System.out.println(msg);String[] split &#61; msg.split(":");if (split.length !&#61; 3) {throw new RabbitDataError("发送的数据异常");}String mesId &#61; split[0];//获取发送内容idString contentId &#61; split[1];//获取被修改的采收idString harvestId &#61; split[2];ReceivedMessage receivedMessage &#61; receivedMessageMysqlDao.selectById(Long.parseLong(contentId ));if (receivedMessage !&#61; null){throw new RabbitDataError("发送重复数据");}//存入数据receivedMessageMysqlDao.insert( new ReceivedMessage(Long.parseLong(contentId ), new Date()));QueryWrapper<HarvestPlan> wrapper &#61; new QueryWrapper<>();wrapper.eq("id", Long.parseLong(harvestId));HarvestPlan harvestPlan &#61; new HarvestPlan();harvestPlan.setPurchaseStatusId(3L);String result &#61; harvestCheckClient.messageUpdate(Long.parseLong(mesId));if (!"success".equals(result)){throw new RabbitDataError("确认消息未正常传回");}} catch (Exception e) {e.printStackTrace();throw e;}finally {try {channel.basicAck(tag, false);} catch (IOException e) {e.printStackTrace();}}}
}

推荐阅读
  • Explore how Matterverse is redefining the metaverse experience, creating immersive and meaningful virtual environments that foster genuine connections and economic opportunities. ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • 基于KVM的SRIOV直通配置及性能测试
    SRIOV介绍、VF直通配置,以及包转发率性能测试小慢哥的原创文章,欢迎转载目录?1.SRIOV介绍?2.环境说明?3.开启SRIOV?4.生成VF?5.VF ... [详细]
  • Docker的安全基准
    nsitionalENhttp:www.w3.orgTRxhtml1DTDxhtml1-transitional.dtd ... [详细]
  • 优化ListView性能
    本文深入探讨了如何通过多种技术手段优化ListView的性能,包括视图复用、ViewHolder模式、分批加载数据、图片优化及内存管理等。这些方法能够显著提升应用的响应速度和用户体验。 ... [详细]
  • 本文深入探讨了 Java 中的 Serializable 接口,解释了其实现机制、用途及注意事项,帮助开发者更好地理解和使用序列化功能。 ... [详细]
  • MQTT技术周报:硬件连接与协议解析
    本周开发笔记重点介绍了在新项目中使用MQTT协议进行硬件连接的技术细节,涵盖其特性、原理及实现步骤。 ... [详细]
  • UNP 第9章:主机名与地址转换
    本章探讨了用于在主机名和数值地址之间进行转换的函数,如gethostbyname和gethostbyaddr。此外,还介绍了getservbyname和getservbyport函数,用于在服务器名和端口号之间进行转换。 ... [详细]
  • 360SRC安全应急响应:从漏洞提交到修复的全过程
    本文详细介绍了360SRC平台处理一起关键安全事件的过程,涵盖从漏洞提交、验证、排查到最终修复的各个环节。通过这一案例,展示了360在安全应急响应方面的专业能力和严谨态度。 ... [详细]
  • 本文深入探讨了Linux系统中网卡绑定(bonding)的七种工作模式。网卡绑定技术通过将多个物理网卡组合成一个逻辑网卡,实现网络冗余、带宽聚合和负载均衡,在生产环境中广泛应用。文章详细介绍了每种模式的特点、适用场景及配置方法。 ... [详细]
  • 掌握远程执行Linux脚本和命令的技巧
    本文将详细介绍如何利用Python的Paramiko库实现远程执行Linux脚本和命令,帮助读者快速掌握这一实用技能。通过具体的示例和详尽的解释,让初学者也能轻松上手。 ... [详细]
  • 本文详细介绍了 Apache Jena 库中的 Txn.executeWrite 方法,通过多个实际代码示例展示了其在不同场景下的应用,帮助开发者更好地理解和使用该方法。 ... [详细]
  • 本文详细解释了华为ENSP模拟器中常用的命令,涵盖用户模式、系统模式、接口模式和地址池视图模式下的操作。这些命令对于进行计算机网络实验至关重要,帮助用户更好地理解和配置路由器及PC机的通信。 ... [详细]
  • 华为USG基于源地址的多出口策略路由配置
    网络拓扑如下:组网情况:企业用户主要有技术部(VLAN10)和行政部(VLAN20),通过汇聚交换机连接到USG。企业分别通过两个不同运营商(ISP1和ISP2)连接到 ... [详细]
  • 深入解析TCP/IP五层协议
    本文详细介绍了TCP/IP五层协议模型,包括物理层、数据链路层、网络层、传输层和应用层。每层的功能及其相互关系将被逐一解释,帮助读者理解互联网通信的原理。此外,还特别讨论了UDP和TCP协议的特点以及三次握手、四次挥手的过程。 ... [详细]
author-avatar
耗子很傻爱钻洞
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有