目录
rabbitMQ是Erlang语言开发的所以先下载Erlang;
RabbitMQ官网地址: https://www.rabbitmq.com/
Erlang下载: https://www.erlang.org/downloads
间接运行: otp_win64_23.0.exe 程序始终next即可,如需扭转装置地位自行抉择,装置实现后对系统环境变量新增ERLANG_HOME地址为:
C:\Program Files\erl-23.0
双击零碎变量path,点击“新建”,将%ERLANG_HOME%\bin退出到path中。
win+R键,输出cmd,再输出erl,看到erlang版本号就阐明erlang装置胜利了。
间接运行: rabbitmq-server-3.8.8 程序始终next即可,如需扭转装置地位自行抉择.
进入装置后的RabbitMQ的sbin目录中(C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin)
Cmd命令执行: rabbitmq-plugins enable rabbitmq_managementr
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin>rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@LX-P1DMPLUV:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@LX-P1DMPLUV...
Plugin configuration unchanged
常用命令:
# 启动RabbitMQ
rabbitmq-service start
# 进行RabbitMQ
rabbitmq-service stop
# 启用RabbitMQ Web可视化界面插件
rabbitmq-plugins enable rabbitmq_management
# 停用RabbitMQ Web可视化界面插件
rabbitmq-plugins disable rabbitmq_management
# 查看RabbitMQ状态
rabbitmqctl status
拜访治理端页面,默认账号密码为: guest
可视化界面: http://127.0.0.1:15672/#/
进入装置后的RabbitMQ的sbin目录中(C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin)
<# 创立用户root用户 明码为123456
rabbitmqctl add_user root 123456
# 为该用户调配所有权限
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
# 设置该用户为管理员角色
rabbitmqctl set_user_tags root administrator
RabbitMQ是一款应用Erlang语言开发的,实现AMQP(高级音讯队列协定)的开源消息中间件。首先要晓得一些RabbitMQ的特点:
如当一个站点新增用户时须要走以下流程:验证账号信息->用户入库->发送注册胜利欢送邮箱给用户;
从该流程中剖析用户注册胜利后首先冀望的是可能胜利登录上站点,而对于是否收到注册胜利的邮件对于用户而言并不重要,
而邮件发送对于如遇到网络问题可能导致发送邮件迟缓素来导致整个用户注册流程响应很慢;
对于告诉邮件发送对于性能而言并不重要的时候,这个时候就能够将该业务放在MQ中异步执行从而能够从肯定水平上晋升整个流程的性能。
如当一个站点新增用户时须要走以下流程:验证账号信息->用户入库->发送注册胜利欢送邮箱给用户;
通常通过零碎划分会划分为:用户模块,音讯模块;
以Spring Cloud的为例依照原始做法会在用户入库胜利后会通过Feign调用音讯模块的发送邮件接口,然而如果音讯模块全集群宕机就会导致Feign申请失败从而导致业务不可用;
应用MQ就不会造成上述的问题,因为咱们在用户注册实现后想音讯模块对应的邮件发送业务队列去发送音讯即可,队列会监督音讯模块实现,如果完不成队列会始终监督,直到实现为止
秒杀和抢购等场景常常应用 MQ 进行流量削峰。流动开始时流量暴增,用户的申请写入 MQ,超过 MQ 最大长度抛弃申请,业务零碎接管 MQ 中的音讯进行解决,达到流量削峰、保证系统可用性的目标。
影响:MQ是排队执行的所以对性能有肯定的影响,并且申请过多后会导致申请被抛弃问题
点对点或者订阅公布模式,通过音讯进行通信。如微信的音讯发送与接管、聊天室等。
POM导入依赖:
org.springframework.boot
spring-boot-starter-amqp
org.springframework.boot
spring-boot-starter-web
org.projectlombok
lombok
创立SpringBoot启动类:
@SpringBootApplication
public class SimpleRabbitMQCaseApplication {
public static void main(String[] args) {
SpringApplication.run(SimpleRabbitMQCaseApplication.class,args);
}
}
创立applicatin.yaml:
server:
port: 8021
spring:
application:
name: rabbitmq-simple-case
#配置rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: 123456
virtual-host: / # 虚构host 能够不设置,应用server默认host
listener:
simple:
concurrency: 10 # 生产端的监听个数(即@RabbitListener开启几个线程去解决数据。)
max-concurrency: 10 # 生产端的监听最大个数
prefetch: 5
acknowledge-mode: auto # MANUAL:手动解决 AUTO:主动解决
default-requeue-rejected: true # 生产不胜利的音讯回绝入队
retry:
enabled: true # 开启音讯重试
max-attempts: 5 # 重试次数
max-interval: 10000 # 重试最大间隔时间
initial-interval: 2000 # 重试初始间隔时间
生产者:
/**
* 简略队列音讯生产
* @author wuwentao
*/
@RestController
@RequestMapping("/simple/queue")
@AllArgsConstructor
public class SimpleQueueProducer {
private RabbitTemplate rabbitTemplate;
// 发送到的队列名称
public static final String AMQP_SIMPLE_QUEUE = "amqp.simple.queue";
/**
* 发送简略音讯
* @param message 音讯内容
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message){
rabbitTemplate.convertAndSend(AMQP_SIMPLE_QUEUE, message);
return "OK";
}
}
消费者:
/**
* 简略队列音讯消费者
* @author wuwentao
*/
@Component
@Slf4j
public class SimpleQueueConsumer {
/**
* 监听一个简略的队列,队列不存在时候会创立
* @param content 音讯
*/
@RabbitListener(queuesToDeclare = @Queue(name = SimpleQueueProducer.AMQP_SIMPLE_QUEUE))
public void consumerSimpleMessage(String content, Message message, Channel channel) throws IOException {
// 通过Message对象解析音讯
String messageStr = new String(message.getBody());
log.info("通过参数模式接管的音讯:{}" ,content);
//log.info("通过Message:{}" ,messageStr); // 可通过Meessage对象解析音讯
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手动确认音讯生产胜利
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // 手动确认音讯生产失败
}
}
测试生成音讯拜访接口地址:
http://localhost:8021/simple/queue/sendMessage?message=这是一条简略的音讯序号1
http://localhost:8021/simple/queue/sendMessage?message=这是一条简略的音讯序号2
http://localhost:8021/simple/queue/sendMessage?message=这是一条简略的音讯序号3
控制台打印生产信息:
2022-08-22 09:45:26.846 INFO 14400 --- [ntContainer#0-1] c.g.b.s.consumer.SimpleQueueConsumer : 通过参数模式接管的音讯:这是一条简略的音讯序号1
2022-08-22 09:45:29.064 INFO 14400 --- [tContainer#0-10] c.g.b.s.consumer.SimpleQueueConsumer : 通过参数模式接管的音讯:这是一条简略的音讯序号2
2022-08-22 09:45:31.441 INFO 14400 --- [ntContainer#0-4] c.g.b.s.consumer.SimpleQueueConsumer : 通过参数模式接管的音讯:这是一条简略的音讯序号3
fanout模式也叫播送模式,每一条音讯能够被绑定在同一个交换机上的所有队列的消费者生产
生产者:
@RestController
@RequestMapping("/exchange/fanout")
@AllArgsConstructor
public class ExchangeFanoutProducer {
private RabbitTemplate rabbitTemplate;
// 扇形交换机定义
public static final String EXCHANGE_FANOUT = "exchange.fanout";
// 绑定扇形交换机的队列1
public static final String EXCHANGE_FANOUT_QUEUE_1 = "exchange.fanout.queue1";
// 绑定扇形交换机的队列2
public static final String EXCHANGE_FANOUT_QUEUE_2 = "exchange.fanout.queue2";
/**
* 发送扇形音讯音讯可能被所有绑定该交换机的队列给生产
* @param message 音讯内容
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message){
// routingkey 在fanout模式不应用,会在direct和topic模式应用,所以这里给空
rabbitTemplate.convertAndSend(EXCHANGE_FANOUT,"", message);
return "OK";
}
}
消费者:
这里定义两个消费者同时绑定同一个扇形交换机,这里次要申明交换机Type为ExchangeTypes.FANOUT
/**
* 扇形交换机队列消费者
* @author wuwentao
*/
@Component
@Slf4j
public class ExchangeFanoutConsumer {
/**
* 创立交换机并且绑定队列(队列1)
*
* @param content 内容
* @param channel 通道
* @param message 音讯
* @throws IOException ioexception
* @throws TimeoutException 超时异样
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = ExchangeFanoutProducer.EXCHANGE_FANOUT, durable = "true", type = ExchangeTypes.FANOUT),
value = @Queue(value = ExchangeFanoutProducer.EXCHANGE_FANOUT_QUEUE_1, durable = "true")
))
@RabbitHandler
public void exchangeFanoutQueue1(String content, Channel channel, Message message) {
log.info("EXCHANGE_FANOUT_QUEUE_1队列接管到音讯:{}",content);
}
/**
* 创立交换机并且绑定队列(队列2)
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = ExchangeFanoutProducer.EXCHANGE_FANOUT, durable = "true", type = ExchangeTypes.FANOUT),
value = @Queue(value = ExchangeFanoutProducer.EXCHANGE_FANOUT_QUEUE_2, durable = "true")
))
@RabbitHandler
public void exchangeFanoutQueue2(String content, Channel channel, Message message) {
log.info("EXCHANGE_FANOUT_QUEUE_2队列接管到音讯:{}",content);
}
}
测试生成音讯拜访接口地址:
http://localhost:8021/exchange/fanout/sendMessage?message=这是一条扇形交换机中的音讯序号1
http://localhost:8021/exchange/fanout/sendMessage?message=这是一条扇形交换机中的音讯序号2
http://localhost:8021/exchange/fanout/sendMessage?message=这是一条扇形交换机中的音讯序号3
控制台打印生产信息:
2022-08-22 10:10:43.285 INFO 12016 --- [ntContainer#1-2] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_2队列接管到音讯:这是一条扇形交换机中的音讯序号1
2022-08-22 10:10:43.285 INFO 12016 --- [ntContainer#0-7] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_1队列接管到音讯:这是一条扇形交换机中的音讯序号1
2022-08-22 10:10:49.151 INFO 12016 --- [tContainer#0-10] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_1队列接管到音讯:这是一条扇形交换机中的音讯序号2
2022-08-22 10:10:49.151 INFO 12016 --- [ntContainer#1-4] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_2队列接管到音讯:这是一条扇形交换机中的音讯序号2
2022-08-22 10:10:54.254 INFO 12016 --- [ntContainer#0-6] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_1队列接管到音讯:这是一条扇形交换机中的音讯序号3
2022-08-22 10:10:54.255 INFO 12016 --- [ntContainer#1-3] c.g.b.s.consumer.ExchangeFanoutConsumer : EXCHANGE_FANOUT_QUEUE_2队列接管到音讯:这是一条扇形交换机中的音讯序号3
直连交换机与扇形交换机的区别在于,队列都是绑定同一个交换机,然而在队列上会增加routingkey标识,消费者会依据对应的不同routingkey生产对应的音讯。
生产者:
@RestController
@RequestMapping("/exchange/direct")
@AllArgsConstructor
public class ExchangeDirectProducer {
private RabbitTemplate rabbitTemplate;
// 直连交换机定义
public static final String EXCHANGE_DIRECT = "exchange.direct";
// 直连交换机队列定义1
public static final String EXCHANGE_DIRECT_QUEUE_1 = "exchange.direct.queue1";
// 直连交换机队列定义2
public static final String EXCHANGE_DIRECT_QUEUE_2 = "exchange.direct.queue2";
// 直连交换机路由KEY定义1
public static final String EXCHANGE_DIRECT_ROUTING_KEY_1 = "exchange.direct.routing.key1";
// 直连交换机路由KEY定义2
public static final String EXCHANGE_DIRECT_ROUTING_KEY_2 = "exchange.direct.routing.key2";
/**
* 发送音讯到直连交换机并且指定对应routingkey
* @param message 音讯内容
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message,
@RequestParam(value = "routingkey") int routingkey){
if(routingkey == 1){
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,EXCHANGE_DIRECT_ROUTING_KEY_1, message);
} else if (routingkey == 2){
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,EXCHANGE_DIRECT_ROUTING_KEY_2, message);
}else{
return "非法参数!";
}
return "OK";
}
}
消费者:
这里定义多个消费者同时绑定同一个直连交换机,这里次要申明交换机Type为ExchangeTypes.DIRECT,不申明则默认为DIRECT。
/**
* 直连交换机队列消费者
* @author wuwentao
*/
@Component
@Slf4j
public class ExchangeDirectConsumer {
/**
* 创立交换机并且绑定队列1(绑定routingkey1)
*
* @param content 内容
* @param channel 通道
* @param message 音讯
* @throws IOException ioexception
* @throws TimeoutException 超时异样
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = ExchangeDirectProducer.EXCHANGE_DIRECT, durable = "true", type = ExchangeTypes.DIRECT),
value = @Queue(value = ExchangeDirectProducer.EXCHANGE_DIRECT_QUEUE_1, durable = "true"),
key = ExchangeDirectProducer.EXCHANGE_DIRECT_ROUTING_KEY_1
))
@RabbitHandler
public void exchangeDirectRoutingKey1(String content, Channel channel, Message message) {
log.info("队列1 KEY1接管到音讯:{}",content);
}
/**
* 创立交换机并且绑定队列2(绑定routingkey2)
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = ExchangeDirectProducer.EXCHANGE_DIRECT, durable = "true", type = ExchangeTypes.DIRECT),
value = @Queue(value = ExchangeDirectProducer.EXCHANGE_DIRECT_QUEUE_2, durable = "true"),
key = ExchangeDirectProducer.EXCHANGE_DIRECT_ROUTING_KEY_2
))
@RabbitHandler
public void exchangeDirectRoutingKey2(String content, Channel channel, Message message) {
log.info("队列2 KEY2接管到音讯:{}",content);
}
}
测试生成音讯拜访接口地址:
http://localhost:8021/exchange/direct/sendMessage?routingkey=1&message=这是一条发给路由key为1的音讯
http://localhost:8021/exchange/direct/sendMessage?routingkey=2&message=这是一条发给路由key为2的音讯
控制台打印生产信息:
2022-08-22 10:37:22.173 INFO 4380 --- [ntContainer#0-1] c.g.b.s.consumer.ExchangeDirectConsumer : 队列1 KEY1接管到音讯:这是一条发给路由key为1的音讯
2022-08-22 10:37:26.882 INFO 4380 --- [ntContainer#1-3] c.g.b.s.consumer.ExchangeDirectConsumer : 队列2 KEY2接管到音讯:这是一条发给路由key为2的音讯
这个交换机其实跟直连交换机流程差不多,然而它的特点就是在它的路由键和绑定键之间是有规定的;规定如下:
Topic交换机接管的音讯RoutingKey必须是多个单词,以 . 宰割
Topic交换机与队列绑定时的routingKey能够指定通配符
#:代表0个或多个词
*:代表1个词
生产者:
@RestController
@RequestMapping("/exchange/topic")
@AllArgsConstructor
public class ExchangeTopicProducer {
private RabbitTemplate rabbitTemplate;
// 主題交换机定义
public static final String EXCHANGE_TOPIC = "exchange.topic";
// 主題交换机队列定义1
public static final String EXCHANGE_TOPIC_QUEUE_1 = "exchange.topic.queue1";
// 主題交换机队列定义1
public static final String EXCHANGE_TOPIC_QUEUE_2 = "exchange.topic.queue2";
// 主題交换机队列路由Key定义1
public static final String EXCHANGE_TOPIC_ROUTING1_KEY_1 = "#.routingkey.#";
// 主題交换机队列路由Key定义2
public static final String EXCHANGE_TOPIC_ROUTING2_KEY_2 = "routingkey.*";
// 案例KEY1 能够被EXCHANGE_TOPIC_ROUTING1_KEY_1匹配不能被EXCHANGE_TOPIC_ROUTING2_KEY_2匹配
public static final String EXCHANGE_TOPIC_CASE_KEY_1 = "topic.routingkey.case1";
// 案例KEY2 同时能够被EXCHANGE_TOPIC_ROUTING1_KEY_1和EXCHANGE_TOPIC_ROUTING2_KEY_2匹配
public static final String EXCHANGE_TOPIC_CASE_KEY_2 = "routingkey.case2";
/**
* 发送音讯到主题交换机并且指定对应可通配routingkey
* @param message 音讯内容
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message,
@RequestParam(value = "routingkey") int routingkey){
if(routingkey == 1){
// 同时匹配 topic.routingkey.case1 和 routingkey.case2
rabbitTemplate.convertAndSend(EXCHANGE_TOPIC,EXCHANGE_TOPIC_CASE_KEY_1, message);
} else if (routingkey == 2){
// 只能匹配 routingkey.case2
rabbitTemplate.convertAndSend(EXCHANGE_TOPIC,EXCHANGE_TOPIC_CASE_KEY_2, message);
}else{
return "非法参数!";
}
return "OK";
}
}
消费者:
这里定义多个消费者同时绑定同一个直主题交换机,这里次要申明交换机Type为ExchangeTypes.TOPIC,当routingkey发送的音讯可能被消费者给匹配仅可能接管到音讯。
@Component
@Slf4j
public class ExchangeTopicConsumer {
/**
* 创立交换机并且绑定队列1(绑定routingkey1)
*
* @param content 内容
* @param channel 通道
* @param message 音讯
* @throws IOException ioexception
* @throws TimeoutException 超时异样
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = ExchangeTopicProducer.EXCHANGE_TOPIC, durable = "true", type = ExchangeTypes.TOPIC),
value = @Queue(value = ExchangeTopicProducer.EXCHANGE_TOPIC_QUEUE_1, durable = "true"),
key = ExchangeTopicProducer.EXCHANGE_TOPIC_ROUTING1_KEY_1
))
@RabbitHandler
public void exchangeTopicRoutingKey1(String content, Channel channel, Message message) {
log.info("#号统配符号队列1接管到音讯:{}",content);
}
/**
* 创立交换机并且绑定队列2(绑定routingkey2)
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = ExchangeTopicProducer.EXCHANGE_TOPIC, durable = "true", type = ExchangeTypes.TOPIC),
value = @Queue(value = ExchangeTopicProducer.EXCHANGE_TOPIC_QUEUE_2, durable = "true"),
key = ExchangeTopicProducer.EXCHANGE_TOPIC_ROUTING2_KEY_2
))
@RabbitHandler
public void exchangeTopicRoutingKey2(String content, Channel channel, Message message) {
log.info("*号统配符号队列2接管到音讯:{}",content);
}
}
测试生成音讯拜访接口地址:
http://localhost:8021/exchange/topic/sendMessage?routingkey=1&message=前后多重匹配
http://localhost:8021/exchange/topic/sendMessage?routingkey=2&message=后一个词匹配
控制台打印生产信息:
2022-08-22 15:10:50.444 INFO 1376 --- [ntContainer#4-8] c.g.b.s.consumer.ExchangeTopicConsumer : #号统配符号队列1接管到音讯:前后多重匹配
2022-08-22 15:10:55.118 INFO 1376 --- [ntContainer#5-8] c.g.b.s.consumer.ExchangeTopicConsumer : *号统配符号队列2接管到音讯:后一个词匹配
2022-08-22 15:10:55.119 INFO 1376 --- [ntContainer#4-9] c.g.b.s.consumer.ExchangeTopicConsumer : #号统配符号队列1接管到音讯:后一个词匹配
新增SpringBoot配置文件YAML,这里次要将主动ACK批改为手工ACK并且开启音讯确认模式与音讯回退:
spring:
rabbitmq:
listener:
acknowledge-mode: manual # MANUAL:手动解决 AUTO:主动解决
# NONE值是禁用公布确认模式,是默认值
# CORRELATED值是公布音讯胜利到交换器后会触发回调办法,如1示例
# SIMPLE值经测试有两种成果,其一成果和CORRELATED值一样会触发回调办法,其二在公布音讯胜利后应用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie办法期待broker节点返回发送后果,依据返回后果来断定下一步的逻辑,要留神的点是waitForConfirmsOrDie办法如果返回false则会敞开channel,则接下来无奈发送音讯到broker;
publisher-confirm-type: simple #音讯确认机制
publisher-returns: true # 音讯回退确认机制
定义音讯回调确认实现类:
/**
* 消费者确认收到音讯后,手动ack回执回调解决
* @author wuwentao
*/
@Slf4j
@Component
public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("===================================================");
log.info("音讯确认机制回调函数参数信息如下:");
log.info("ACK状态:{}",ack);
log.info("投递失败起因:{}",cause);
log.info("===================================================");
}
}
消费者:
/**
* RabbitMQ Message 回调地址消费者测试
* @author wuwentao
*/
@Component
@Slf4j
public class MessagesCallbackConsumer {
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = MessagesCallbackProducer.MESSAGE_CALLBACK_EXCHANGE, durable = "true", type = ExchangeTypes.DIRECT),
value = @Queue(value = MessagesCallbackProducer.MESSAGE_CALLBACK_QUEUE, durable = "true"),
key = MessagesCallbackProducer.MESSAGE_CALLBACK_ROUTINGKEY
))
@RabbitHandler
public void consumer(String content, Channel channel, Message message) throws IOException {
if("胜利".equals(content)){
log.info("音讯解决胜利:{}",content);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手动确认音讯生产胜利
}else{
if(message.getMessageProperties().getRedelivered()){
log.info("音讯已被解决过了请勿反复解决音讯:{}",content);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 回绝该音讯,音讯会被抛弃,不会重回队列
}else{
log.info("音讯解决失败期待重新处理:{}",content);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
生产者:
/**
* 音讯回调机制测试
* @author wuwentao
*/
@RestController
@RequestMapping("/message/callback")
@AllArgsConstructor
public class MessagesCallbackProducer {
private RabbitTemplate rabbitTemplate;
private MessageConfirmCallback messageConfirmCallback;
// 发送到的队列名称
public static final String MESSAGE_CALLBACK_QUEUE = "amqp.message.callback.queue";
public static final String MESSAGE_CALLBACK_EXCHANGE = "amqp.message.callback.exchange";
public static final String MESSAGE_CALLBACK_ROUTINGKEY = "amqp.message.callback.routingkey";
/**
* 测试音讯确认机制
* @param message 音讯内容
*/
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message){
// 设置失败和确认回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(messageConfirmCallback);
//构建回调id为uuid
String callBackId = UUID.randomUUID().toString();
CorrelationData correlatiOnData= new CorrelationData(callBackId);
if("失败的音讯".equals(message)){
// 写一个不存的替换机器 和不存在的路由KEY
rabbitTemplate.convertAndSend("sdfdsafadsf","123dsfdasf",message,
msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
},correlationData);
}else{
rabbitTemplate.convertAndSend(MESSAGE_CALLBACK_EXCHANGE,MESSAGE_CALLBACK_ROUTINGKEY,message,
msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
},correlationData);
}
return "OK";
}
}
测试生成音讯拜访接口地址:
# 发送找不到交换机的音讯
http://localhost:8021/message/callback/sendMessage?message=失败的音讯
# 发送手动ACK胜利的音讯
http://localhost:8021/message/callback/sendMessage?message=胜利
# 发送手动ACK失败的音讯
http://localhost:8021/message/callback/sendMessage?message=失败
控制台打印生产信息:
2022-08-24 09:11:50.122 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:11:50.122 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 音讯确认机制回调函数参数信息如下:
2022-08-24 09:11:50.123 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ACK状态:false
2022-08-24 09:11:50.127 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 投递失败起因:channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'sdfdsafadsf' in vhost '/', class-id=60, method-id=40)
2022-08-24 09:11:50.127 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:12:02.704 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:12:02.705 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 音讯确认机制回调函数参数信息如下:
2022-08-24 09:12:02.705 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ACK状态:true
2022-08-24 09:12:02.705 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 投递失败起因:null
2022-08-24 09:12:02.705 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:12:02.735 INFO 11440 --- [ntContainer#6-1] c.g.b.s.c.MessagesCallbackConsumer : 音讯解决胜利:胜利
2022-08-24 09:12:16.680 INFO 11440 --- [ntContainer#6-4] c.g.b.s.c.MessagesCallbackConsumer : 音讯解决失败期待重新处理:失败
2022-08-24 09:12:16.688 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:12:16.689 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 音讯确认机制回调函数参数信息如下:
2022-08-24 09:12:16.689 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ACK状态:true
2022-08-24 09:12:16.689 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : 投递失败起因:null
2022-08-24 09:12:16.689 INFO 11440 --- [nectionFactory2] c.g.b.s.callback.MessageConfirmCallback : ===================================================
2022-08-24 09:12:16.693 INFO 11440 --- [ntContainer#6-7] c.g.b.s.c.MessagesCallbackConsumer : 音讯已被解决过了请勿反复解决音讯:失败