热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMq的最终一致性分布式事务

RabbitMq的最终一致性分布式事务使用rabbitmq的步骤1.运行安装在服务器上的rabbit服务2.在项目中安装依赖3.编写对应的配置文件4.创建对应配置并加上启动注解5.


RabbitMq的最终一致性分布式事务

    • 使用rabbitmq的步骤
      • 1.运行安装在服务器上的rabbit服务
      • 2.在项目中安装依赖
      • 3.编写对应的配置文件
      • 4.创建对应配置并加上启动注解
      • 5.创建message表记录发送次数及信息
      • 6.发送请求时并创建message信息
      • 7.创建spring Task定时器并定时输出rabbitmq信息
      • 8.创建消息确定方法,确认接受方收到的了消息并进行了处理
      • 9.消息接受者创建消息重复表进行消息去重
      • 10.接受方微服务创建监听器监听rabbitmq信息


使用rabbitmq的步骤

在这里插入图片描述


1.运行安装在服务器上的rabbit服务

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Rs8LPaRN-1664352105453)(C:\Users\han\AppData\Roaming\Typora\typora-user-images\image-20220901201727164.png)]

或者在docker上运行

# 使用数据卷
docker volume rm rabbitmq-5672-data
docker volume create --name rabbitmq-5672-data
docker run -d --rm --name rabbitmq-5672 \-v /etc/localtime:/etc/localtime:ro \-v rabbitmq-5672-data:/var/lib/rabbitmq \-p 5672:5672 \-p 15672:15672 \rabbitmq:3.10-management

# 这个例子挂载「数据存储目录」
docker run -d --rm --name rabbitmq-5672 \-v /etc/localtime:/etc/localtime:ro \-v ~/docker/5672/data:/var/lib/rabbitmq \-p 5672:5672 \-p 15672:15672 \rabbitmq:3.10-management

2.在项目中安装依赖

<dependency><groupId>org.springframework.amqpgroupId><artifactId>spring-rabbit-testartifactId><scope>testscope>
dependency>

3.编写对应的配置文件

## 连接rabbitmq服务器
spring.rabbitmq.host&#61;192.168.12.12
spring.rabbitmq.port&#61;5672
spring.rabbitmq.username&#61;guest
spring.rabbitmq.password&#61;guest
spring.rabbitmq.virtual-host&#61;hl## 手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode&#61;manual
spring.rabbitmq.listener.direct.acknowledge-mode&#61;manual## 确认消息已发送到交换机&#xff08; Exchange &#xff09;
spring.rabbitmq.publisher-confirm-type&#61;CORRELATED# 确认消息已发送到队列&#xff08;Queue&#xff09;
spring.rabbitmq.publisher-returns&#61;true

4.创建对应配置并加上启动注解

&#64;Configuration
&#64;EnableRabbit
&#64;Slf4j
&#64;Transactional
public class RabbitConfig {&#64;Resourceprivate MessageDao messageDao;public static final String EMPLOYEE_LIST &#61; "employee-list";public static final String DEPARTMENT_DELETE &#61; "department-delete";&#64;Beanpublic Queue DepartmentDelete(){return new Queue(DEPARTMENT_DELETE);}&#64;Beanpublic Queue employeeList(){return new Queue(EMPLOYEE_LIST);}&#64;Bean("rabbitTemplate")public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate &#61; new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {&#64;Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {//每次发送队列信息将触发此方法,需要添加配置属性System.out.println(correlationData.getId());Message message &#61; messageDao.getOne(Long.parseLong(Objects.requireNonNull(correlationData.getId())));if (ack){message.setStatus("B");}message.setRetryCount(message.getRetryCount()-1);log.info("剩余消息数:"&#43;message.getRetryCount());messageDao.save(message);}});// rabbitTemplate.setMandatory(true);
//
// rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
// &#64;Override
// public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// log.info("ReturnCallback 消息&#xff1a;{}", message);
// log.info("ReturnCallback 回应码&#xff1a;{}", replyCode);
// log.info("ReturnCallback 回应信息&#xff1a;{}", replyText);
// log.info("ReturnCallback 交换机&#xff1a;{}", exchange);
// log.info("ReturnCallback 路由键&#xff1a;{}", routingKey);
// }
// });return rabbitTemplate;}}

5.创建message表记录发送次数及信息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WSNp3Mbr-1664352105454)(C:\Users\han\AppData\Roaming\Typora\typora-user-images\image-20220901202318282.png)]

drop table if exists message;
create table message
(id bigint auto_increment,exchange varchar(64) ,routing_key varchar(64) not null,content varchar(128) not null,retry_count int not null,status varchar(32) not null,primary key (id)
);

创建对应的DAO类和实体类


6.发送请求时并创建message信息

public void deleteById(Long id) {departmentDao.deleteById(id);Message message &#61; new Message(null, null, RabbitConfig.DEPARTMENT_DELETE, id&#43;"", 5, "A");messageDao.save(message);
}

7.创建spring Task定时器并定时输出rabbitmq信息

&#64;Component
&#64;Slf4j
&#64;Transactional
&#64;EnableScheduling
public class RabbitTimer {&#64;Resourceprivate MessageMysqlDao messageDao;&#64;Resourceprivate RabbitTemplate rabbitTemplate;&#64;Scheduled(fixedDelay &#61; 6000)private void process(){//获取状态不等于C和次数大于0的信息QueryWrapper<Message> wrapper &#61; new QueryWrapper<>();wrapper.ne("status", "C");wrapper.gt("retry_count", 0);List<Message> messageList &#61; messageDao.selectList(wrapper);if (messageList.size()&#61;&#61;0){log.info("暂无消息发送,请等待...");}else {//进行信息发送for (Message message : messageList) {String content &#61; message.getId()&#43;":"&#43;message.getContent();CorrelationData correlationData &#61; new CorrelationData(message.getId()&#43;"");if (message.getExchange()&#61;&#61;null) {rabbitTemplate.convertAndSend(message.getRoutingKey(), (Object) content, correlationData);}else{rabbitTemplate.convertAndSend(message.getExchange(), message.getRoutingKey(), content, correlationData);}log.info("消息 {} 已发送",content);}}}
}

8.创建消息确定方法,确认接受方收到的了消息并进行了处理

&#64;RestController
&#64;RequestMapping("/message")
&#64;RequiredArgsConstructor
public class MessageController implements IAMessageController{&#64;Resourceprivate MessageMysqlDao messageMysqlDao;&#64;PostMapping("/update/{id}")public String messageUpdate(&#64;PathVariable("id")Long id){QueryWrapper<Message> wrapper &#61; new QueryWrapper<>();wrapper.eq("id", id);Message message &#61; new Message();message.setStatus("C");messageMysqlDao.update(message,wrapper);return "success";}}

9.消息接受者创建消息重复表进行消息去重

在这里插入图片描述

drop table if exists recived_message;
create table recived_message
(id bigint auto_increment,recived_at datetime
);

10.接受方微服务创建监听器监听rabbitmq信息

消息接受者处理消息发送者发送的消息,在消息处理无误后进行发送openfeign请求,给消息提供者发送确认信息

&#64;Configuration
&#64;RequiredArgsConstructor
&#64;Transactional
public class HarvestResultLister {private final HarvestPlanMysqlDao harvestPlanMysqlDao;private final ReceivedMessageMysqlDao receivedMessageMysqlDao;private final HarvestCheckClient harvestCheckClient;&#64;RabbitListener(queues &#61; RabbitConfig.HARVEST_CHECK)public void harvestUpdateByCheck(String msg, Channel channel,&#64;Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {System.out.println(msg);String[] split &#61; msg.split(":");if (split.length !&#61; 3) {throw new RabbitDataError("发送的数据异常");}String mesId &#61; split[0];//获取发送内容idString contentId &#61; split[1];//获取被修改的采收idString harvestId &#61; split[2];ReceivedMessage receivedMessage &#61; receivedMessageMysqlDao.selectById(Long.parseLong(contentId ));if (receivedMessage !&#61; null){throw new RabbitDataError("发送重复数据");}//存入数据receivedMessageMysqlDao.insert( new ReceivedMessage(Long.parseLong(contentId ), new Date()));QueryWrapper<HarvestPlan> wrapper &#61; new QueryWrapper<>();wrapper.eq("id", Long.parseLong(harvestId));HarvestPlan harvestPlan &#61; new HarvestPlan();harvestPlan.setPurchaseStatusId(3L);String result &#61; harvestCheckClient.messageUpdate(Long.parseLong(mesId));if (!"success".equals(result)){throw new RabbitDataError("确认消息未正常传回");}} catch (Exception e) {e.printStackTrace();throw e;}finally {try {channel.basicAck(tag, false);} catch (IOException e) {e.printStackTrace();}}}
}

推荐阅读
  • 本文探讨了 Spring Boot 应用程序在不同配置下支持的最大并发连接数,重点分析了内置服务器(如 Tomcat、Jetty 和 Undertow)的默认设置及其对性能的影响。 ... [详细]
  • 配置多VLAN环境下的透明SQUID代理
    本文介绍如何在包含多个VLAN的网络环境中配置SQUID作为透明网关。网络拓扑包括Cisco 3750交换机、PANABIT防火墙和SQUID服务器,所有设备均部署在ESXi虚拟化平台上。 ... [详细]
  • 采用IKE方式建立IPsec安全隧道
    一、【组网和实验环境】按如上的接口ip先作配置,再作ipsec的相关配置,配置文本见文章最后本文实验采用的交换机是H3C模拟器,下载地址如 ... [详细]
  • 本文介绍了如何使用 Spring Boot DevTools 实现应用程序在开发过程中自动重启。这一特性显著提高了开发效率,特别是在集成开发环境(IDE)中工作时,能够提供快速的反馈循环。默认情况下,DevTools 会监控类路径上的文件变化,并根据需要触发应用重启。 ... [详细]
  • 深入解析Spring Cloud Ribbon负载均衡机制
    本文详细介绍了Spring Cloud中的Ribbon组件如何实现服务调用的负载均衡。通过分析其工作原理、源码结构及配置方式,帮助读者理解Ribbon在分布式系统中的重要作用。 ... [详细]
  • MQTT技术周报:硬件连接与协议解析
    本周开发笔记重点介绍了在新项目中使用MQTT协议进行硬件连接的技术细节,涵盖其特性、原理及实现步骤。 ... [详细]
  • 本文深入探讨了Linux系统中网卡绑定(bonding)的七种工作模式。网卡绑定技术通过将多个物理网卡组合成一个逻辑网卡,实现网络冗余、带宽聚合和负载均衡,在生产环境中广泛应用。文章详细介绍了每种模式的特点、适用场景及配置方法。 ... [详细]
  • 本文详细探讨了JDBC(Java数据库连接)的内部机制,重点分析其作为服务提供者接口(SPI)框架的应用。通过类图和代码示例,展示了JDBC如何注册驱动程序、建立数据库连接以及执行SQL查询的过程。 ... [详细]
  • 实体映射最强工具类:MapStruct真香 ... [详细]
  • 深入解析 Apache Shiro 安全框架架构
    本文详细介绍了 Apache Shiro,一个强大且灵活的开源安全框架。Shiro 专注于简化身份验证、授权、会话管理和加密等复杂的安全操作,使开发者能够更轻松地保护应用程序。其核心目标是提供易于使用和理解的API,同时确保高度的安全性和灵活性。 ... [详细]
  • 探讨如何真正掌握Java EE,包括所需技能、工具和实践经验。资深软件教学总监李刚分享了对毕业生简历中常见问题的看法,并提供了详尽的标准。 ... [详细]
  • 华为USG基于源地址的多出口策略路由配置
    网络拓扑如下:组网情况:企业用户主要有技术部(VLAN10)和行政部(VLAN20),通过汇聚交换机连接到USG。企业分别通过两个不同运营商(ISP1和ISP2)连接到 ... [详细]
  • 本文详细介绍了网络存储技术的基本概念、分类及应用场景。通过分析直连式存储(DAS)、网络附加存储(NAS)和存储区域网络(SAN)的特点,帮助读者理解不同存储方式的优势与局限性。 ... [详细]
  • 本文介绍如何在华为CE交换机上配置M-LAG(多链路聚合组),以实现CE1和CE2设备作为VLAN 10网关的高可用性。通过详细的配置步骤,确保网络冗余和稳定性。 ... [详细]
  • 深入解析 Spring Security 用户认证机制
    本文将详细介绍 Spring Security 中用户登录认证的核心流程,重点分析 AbstractAuthenticationProcessingFilter 和 AuthenticationManager 的工作原理。通过理解这些组件的实现,读者可以更好地掌握 Spring Security 的认证机制。 ... [详细]
author-avatar
耗子很傻爱钻洞
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有