作者:耗子很傻爱钻洞 | 来源:互联网 | 2023-10-14 09:22
RabbitMq的最终一致性分布式事务使用rabbitmq的步骤1.运行安装在服务器上的rabbit服务2.在项目中安装依赖3.编写对应的配置文件4.创建对应配置并加上启动注解5.
RabbitMq的最终一致性分布式事务 使用rabbitmq的步骤 1.运行安装在服务器上的rabbit服务 2.在项目中安装依赖 3.编写对应的配置文件 4.创建对应配置并加上启动注解 5.创建message表记录发送次数及信息 6.发送请求时并创建message信息 7.创建spring Task定时器并定时输出rabbitmq信息 8.创建消息确定方法,确认接受方收到的了消息并进行了处理 9.消息接受者创建消息重复表进行消息去重 10.接受方微服务创建监听器监听rabbitmq信息
使用rabbitmq的步骤
1.运行安装在服务器上的rabbit服务
或者在docker上运行
docker volume rm rabbitmq-5672-datadocker volume create --name rabbitmq-5672-datadocker run -d --rm --name rabbitmq-5672 \ -v /etc/localtime:/etc/localtime:ro \ -v rabbitmq-5672-data:/var/lib/rabbitmq \ -p 5672 :5672 \ -p 15672 :15672 \ rabbitmq:3.10-management
docker run -d --rm --name rabbitmq-5672 \ -v /etc/localtime:/etc/localtime:ro \ -v ~/docker/5672/data:/var/lib/rabbitmq \ -p 5672 :5672 \ -p 15672 :15672 \ rabbitmq:3.10-management
2.在项目中安装依赖 < dependency> < groupId> org.springframework.amqp groupId> < artifactId> spring-rabbit-test artifactId> < scope> test scope> dependency>
3.编写对应的配置文件 ## 连接rabbitmq服务器 spring.rabbitmq.host&#61;192.168.12.12 spring.rabbitmq.port&#61;5672 spring.rabbitmq.username&#61;guest spring.rabbitmq.password&#61;guest spring.rabbitmq.virtual-host&#61;hl## 手动确认消息 spring.rabbitmq.listener.simple.acknowledge-mode&#61;manual spring.rabbitmq.listener.direct.acknowledge-mode&#61;manual## 确认消息已发送到交换机&#xff08; Exchange &#xff09; spring.rabbitmq.publisher-confirm-type&#61;CORRELATED# 确认消息已发送到队列&#xff08;Queue&#xff09; spring.rabbitmq.publisher-returns&#61;true
4.创建对应配置并加上启动注解 &#64;Configuration &#64;EnableRabbit &#64;Slf4j &#64;Transactional public class RabbitConfig { &#64;Resource private MessageDao messageDao; public static final String EMPLOYEE_LIST &#61; "employee-list" ; public static final String DEPARTMENT_DELETE &#61; "department-delete" ; &#64;Bean public Queue DepartmentDelete ( ) { return new Queue ( DEPARTMENT_DELETE) ; } &#64;Bean public Queue employeeList ( ) { return new Queue ( EMPLOYEE_LIST) ; } &#64;Bean ( "rabbitTemplate" ) public RabbitTemplate createRabbitTemplate ( ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate &#61; new RabbitTemplate ( ) ; rabbitTemplate. setConnectionFactory ( connectionFactory) ; rabbitTemplate. setConfirmCallback ( new RabbitTemplate. ConfirmCallback ( ) { &#64;Override public void confirm ( CorrelationData correlationData, boolean ack, String s) { System . out. println ( correlationData. getId ( ) ) ; Message message &#61; messageDao. getOne ( Long . parseLong ( Objects . requireNonNull ( correlationData. getId ( ) ) ) ) ; if ( ack) { message. setStatus ( "B" ) ; } message. setRetryCount ( message. getRetryCount ( ) - 1 ) ; log. info ( "剩余消息数:" &#43; message. getRetryCount ( ) ) ; messageDao. save ( message) ; } } ) ; return rabbitTemplate; } }
5.创建message表记录发送次数及信息
drop table if exists message; create table message( id bigint auto_increment , exchange varchar ( 64 ) , routing_key varchar ( 64 ) not null , content varchar ( 128 ) not null , retry_count int not null , status varchar ( 32 ) not null , primary key ( id) ) ;
创建对应的DAO类和实体类
6.发送请求时并创建message信息 public void deleteById ( Long id) { departmentDao. deleteById ( id) ; Message message &#61; new Message ( null , null , RabbitConfig . DEPARTMENT_DELETE, id&#43; "" , 5 , "A" ) ; messageDao. save ( message) ; }
7.创建spring Task定时器并定时输出rabbitmq信息 &#64;Component &#64;Slf4j &#64;Transactional &#64;EnableScheduling public class RabbitTimer { &#64;Resource private MessageMysqlDao messageDao; &#64;Resource private RabbitTemplate rabbitTemplate; &#64;Scheduled ( fixedDelay &#61; 6000 ) private void process ( ) { QueryWrapper < Message > wrapper &#61; new QueryWrapper < > ( ) ; wrapper. ne ( "status" , "C" ) ; wrapper. gt ( "retry_count" , 0 ) ; List < Message > messageList &#61; messageDao. selectList ( wrapper) ; if ( messageList. size ( ) &#61;&#61; 0 ) { log. info ( "暂无消息发送,请等待..." ) ; } else { for ( Message message : messageList) { String content &#61; message. getId ( ) &#43; ":" &#43; message. getContent ( ) ; CorrelationData correlationData &#61; new CorrelationData ( message. getId ( ) &#43; "" ) ; if ( message. getExchange ( ) &#61;&#61; null ) { rabbitTemplate. convertAndSend ( message. getRoutingKey ( ) , ( Object ) content, correlationData) ; } else { rabbitTemplate. convertAndSend ( message. getExchange ( ) , message. getRoutingKey ( ) , content, correlationData) ; } log. info ( "消息 {} 已发送" , content) ; } } } }
8.创建消息确定方法,确认接受方收到的了消息并进行了处理 &#64;RestController &#64;RequestMapping ( "/message" ) &#64;RequiredArgsConstructor public class MessageController implements IAMessageController { &#64;Resource private MessageMysqlDao messageMysqlDao; &#64;PostMapping ( "/update/{id}" ) public String messageUpdate ( &#64;PathVariable ( "id" ) Long id) { QueryWrapper < Message > wrapper &#61; new QueryWrapper < > ( ) ; wrapper. eq ( "id" , id) ; Message message &#61; new Message ( ) ; message. setStatus ( "C" ) ; messageMysqlDao. update ( message, wrapper) ; return "success" ; } }
9.消息接受者创建消息重复表进行消息去重
drop table if exists recived_message; create table recived_message( id bigint auto_increment , recived_at datetime ) ;
10.接受方微服务创建监听器监听rabbitmq信息 消息接受者处理消息发送者发送的消息,在消息处理无误后进行发送openfeign请求,给消息提供者发送确认信息
&#64;Configuration &#64;RequiredArgsConstructor &#64;Transactional public class HarvestResultLister { private final HarvestPlanMysqlDao harvestPlanMysqlDao; private final ReceivedMessageMysqlDao receivedMessageMysqlDao; private final HarvestCheckClient harvestCheckClient; &#64;RabbitListener ( queues &#61; RabbitConfig . HARVEST_CHECK) public void harvestUpdateByCheck ( String msg, Channel channel, &#64;Header ( AmqpHeaders . DELIVERY_TAG) long tag) { try { System . out. println ( msg) ; String [ ] split &#61; msg. split ( ":" ) ; if ( split. length !&#61; 3 ) { throw new RabbitDataError ( "发送的数据异常" ) ; } String mesId &#61; split[ 0 ] ; String contentId &#61; split[ 1 ] ; String harvestId &#61; split[ 2 ] ; ReceivedMessage receivedMessage &#61; receivedMessageMysqlDao. selectById ( Long . parseLong ( contentId ) ) ; if ( receivedMessage !&#61; null ) { throw new RabbitDataError ( "发送重复数据" ) ; } receivedMessageMysqlDao. insert ( new ReceivedMessage ( Long . parseLong ( contentId ) , new Date ( ) ) ) ; QueryWrapper < HarvestPlan > wrapper &#61; new QueryWrapper < > ( ) ; wrapper. eq ( "id" , Long . parseLong ( harvestId) ) ; HarvestPlan harvestPlan &#61; new HarvestPlan ( ) ; harvestPlan. setPurchaseStatusId ( 3L ) ; String result &#61; harvestCheckClient. messageUpdate ( Long . parseLong ( mesId) ) ; if ( ! "success" . equals ( result) ) { throw new RabbitDataError ( "确认消息未正常传回" ) ; } } catch ( Exception e) { e. printStackTrace ( ) ; throw e; } finally { try { channel. basicAck ( tag, false ) ; } catch ( IOException e) { e. printStackTrace ( ) ; } } } }