作者:耗子很傻爱钻洞 | 来源:互联网 | 2023-10-14 09:22
RabbitMq的最终一致性分布式事务使用rabbitmq的步骤1.运行安装在服务器上的rabbit服务2.在项目中安装依赖3.编写对应的配置文件4.创建对应配置并加上启动注解5.
RabbitMq的最终一致性分布式事务
- 使用rabbitmq的步骤
- 1.运行安装在服务器上的rabbit服务
- 2.在项目中安装依赖
- 3.编写对应的配置文件
- 4.创建对应配置并加上启动注解
- 5.创建message表记录发送次数及信息
- 6.发送请求时并创建message信息
- 7.创建spring Task定时器并定时输出rabbitmq信息
- 8.创建消息确定方法,确认接受方收到的了消息并进行了处理
- 9.消息接受者创建消息重复表进行消息去重
- 10.接受方微服务创建监听器监听rabbitmq信息
使用rabbitmq的步骤
1.运行安装在服务器上的rabbit服务
或者在docker上运行
docker volume rm rabbitmq-5672-data
docker volume create --name rabbitmq-5672-data
docker run -d --rm --name rabbitmq-5672 \-v /etc/localtime:/etc/localtime:ro \-v rabbitmq-5672-data:/var/lib/rabbitmq \-p 5672:5672 \-p 15672:15672 \rabbitmq:3.10-management
docker run -d --rm --name rabbitmq-5672 \-v /etc/localtime:/etc/localtime:ro \-v ~/docker/5672/data:/var/lib/rabbitmq \-p 5672:5672 \-p 15672:15672 \rabbitmq:3.10-management
2.在项目中安装依赖
<dependency><groupId>org.springframework.amqpgroupId><artifactId>spring-rabbit-testartifactId><scope>testscope>
dependency>
3.编写对应的配置文件
## 连接rabbitmq服务器
spring.rabbitmq.host&#61;192.168.12.12
spring.rabbitmq.port&#61;5672
spring.rabbitmq.username&#61;guest
spring.rabbitmq.password&#61;guest
spring.rabbitmq.virtual-host&#61;hl## 手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode&#61;manual
spring.rabbitmq.listener.direct.acknowledge-mode&#61;manual## 确认消息已发送到交换机&#xff08; Exchange &#xff09;
spring.rabbitmq.publisher-confirm-type&#61;CORRELATED# 确认消息已发送到队列&#xff08;Queue&#xff09;
spring.rabbitmq.publisher-returns&#61;true
4.创建对应配置并加上启动注解
&#64;Configuration
&#64;EnableRabbit
&#64;Slf4j
&#64;Transactional
public class RabbitConfig {&#64;Resourceprivate MessageDao messageDao;public static final String EMPLOYEE_LIST &#61; "employee-list";public static final String DEPARTMENT_DELETE &#61; "department-delete";&#64;Beanpublic Queue DepartmentDelete(){return new Queue(DEPARTMENT_DELETE);}&#64;Beanpublic Queue employeeList(){return new Queue(EMPLOYEE_LIST);}&#64;Bean("rabbitTemplate")public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate &#61; new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {&#64;Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {System.out.println(correlationData.getId());Message message &#61; messageDao.getOne(Long.parseLong(Objects.requireNonNull(correlationData.getId())));if (ack){message.setStatus("B");}message.setRetryCount(message.getRetryCount()-1);log.info("剩余消息数:"&#43;message.getRetryCount());messageDao.save(message);}});
return rabbitTemplate;}}
5.创建message表记录发送次数及信息
drop table if exists message;
create table message
(id bigint auto_increment,exchange varchar(64) ,routing_key varchar(64) not null,content varchar(128) not null,retry_count int not null,status varchar(32) not null,primary key (id)
);
创建对应的DAO类和实体类
6.发送请求时并创建message信息
public void deleteById(Long id) {departmentDao.deleteById(id);Message message &#61; new Message(null, null, RabbitConfig.DEPARTMENT_DELETE, id&#43;"", 5, "A");messageDao.save(message);
}
7.创建spring Task定时器并定时输出rabbitmq信息
&#64;Component
&#64;Slf4j
&#64;Transactional
&#64;EnableScheduling
public class RabbitTimer {&#64;Resourceprivate MessageMysqlDao messageDao;&#64;Resourceprivate RabbitTemplate rabbitTemplate;&#64;Scheduled(fixedDelay &#61; 6000)private void process(){QueryWrapper<Message> wrapper &#61; new QueryWrapper<>();wrapper.ne("status", "C");wrapper.gt("retry_count", 0);List<Message> messageList &#61; messageDao.selectList(wrapper);if (messageList.size()&#61;&#61;0){log.info("暂无消息发送,请等待...");}else {for (Message message : messageList) {String content &#61; message.getId()&#43;":"&#43;message.getContent();CorrelationData correlationData &#61; new CorrelationData(message.getId()&#43;"");if (message.getExchange()&#61;&#61;null) {rabbitTemplate.convertAndSend(message.getRoutingKey(), (Object) content, correlationData);}else{rabbitTemplate.convertAndSend(message.getExchange(), message.getRoutingKey(), content, correlationData);}log.info("消息 {} 已发送",content);}}}
}
8.创建消息确定方法,确认接受方收到的了消息并进行了处理
&#64;RestController
&#64;RequestMapping("/message")
&#64;RequiredArgsConstructor
public class MessageController implements IAMessageController{&#64;Resourceprivate MessageMysqlDao messageMysqlDao;&#64;PostMapping("/update/{id}")public String messageUpdate(&#64;PathVariable("id")Long id){QueryWrapper<Message> wrapper &#61; new QueryWrapper<>();wrapper.eq("id", id);Message message &#61; new Message();message.setStatus("C");messageMysqlDao.update(message,wrapper);return "success";}}
9.消息接受者创建消息重复表进行消息去重
drop table if exists recived_message;
create table recived_message
(id bigint auto_increment,recived_at datetime
);
10.接受方微服务创建监听器监听rabbitmq信息
消息接受者处理消息发送者发送的消息,在消息处理无误后进行发送openfeign请求,给消息提供者发送确认信息
&#64;Configuration
&#64;RequiredArgsConstructor
&#64;Transactional
public class HarvestResultLister {private final HarvestPlanMysqlDao harvestPlanMysqlDao;private final ReceivedMessageMysqlDao receivedMessageMysqlDao;private final HarvestCheckClient harvestCheckClient;&#64;RabbitListener(queues &#61; RabbitConfig.HARVEST_CHECK)public void harvestUpdateByCheck(String msg, Channel channel,&#64;Header(AmqpHeaders.DELIVERY_TAG) long tag) {try {System.out.println(msg);String[] split &#61; msg.split(":");if (split.length !&#61; 3) {throw new RabbitDataError("发送的数据异常");}String mesId &#61; split[0];String contentId &#61; split[1];String harvestId &#61; split[2];ReceivedMessage receivedMessage &#61; receivedMessageMysqlDao.selectById(Long.parseLong(contentId ));if (receivedMessage !&#61; null){throw new RabbitDataError("发送重复数据");}receivedMessageMysqlDao.insert( new ReceivedMessage(Long.parseLong(contentId ), new Date()));QueryWrapper<HarvestPlan> wrapper &#61; new QueryWrapper<>();wrapper.eq("id", Long.parseLong(harvestId));HarvestPlan harvestPlan &#61; new HarvestPlan();harvestPlan.setPurchaseStatusId(3L);String result &#61; harvestCheckClient.messageUpdate(Long.parseLong(mesId));if (!"success".equals(result)){throw new RabbitDataError("确认消息未正常传回");}} catch (Exception e) {e.printStackTrace();throw e;}finally {try {channel.basicAck(tag, false);} catch (IOException e) {e.printStackTrace();}}}
}