消息落库,对消息状态进行打标
消息落库,对消息进行打标(对消息设置状态,发送中,broker收到,)
定时器轮训,检测未发送的消息,进行二次投递,最大努力尝试(设置最大次数)
补偿
分布式定时任务,抓取数据(超过第一时长),尝试重发,重试次数限制
消息的延迟投递,做二次确认,回调检查 (最大限度的减少消息落库)
方案一在高并发场景下,每次消息落库,影响性能(IO操作)
step1: 业务消息落库 ,一定是数据库入库成功以后在进行发送消息
step2:第一次消息的发送
step3:延迟消息的检测
step4:监听,处理完,生成一条新消息
step5:通过队列发送,确认 不是之前的ack
借鉴数据库的乐观锁机制
执行一条更新SQL
update t_reps set count=count-1,version=version+1 where verison=1
在业务高峰期,如何避免消息的重复消费问题
消费端实现幂等性,就意味着,我们的消息永远不会被消费多次,即时收到多条一样的消息。
幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。
https://github.com/doocs/advanced-java/blob/master/docs/high-concurrency/how-to-ensure-that-messages-are-not-repeatedly-consumed.md
方案一:
唯一ID+指纹码机制
方案二:利用Redis的原子性实现
理解Confirm消息确认机制:
confirm确认消息流程解析
confirm确认消息实现
示例
生产者
/**
* @author niugang
*/
public class Producer {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 获取C onnection
Connection cOnnection= connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
//4 指定我们的消息投递模式: 消息的确认模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
//5 发送一条消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
//6 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
//失败
// deliveryTag 消息唯一标签
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------no ack!-----------");
}
//成功
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});
}
}
消费者
/**
* @author niugang
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 获取C onnection
Connection cOnnection= connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.#";
String queueName = "test_confirm_queue";
//4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5 创建消费者
QueueingConsumer queueingCOnsumer= new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端: " + msg);
}
}
}
配置
在基础API上有一个关键的配置项
Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息。
流程
生产者
/**
* @author niugang
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_return_exchange";
String routingKey = "return.save";
String routingKeyError = "abc.save";
String msg = "Hello RabbitMQ Return Message";
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("---------handle return----------");
//响应码 312
System.err.println("replyCode: " + replyCode);
//NO_ROUTE
System.err.println("replyText: " + replyText);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
});
channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
//channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
}
}
消费者
/**
* @author niugang
*/
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_return_exchange";
String routingKey = "return.#";
String queueName = "test_return_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer queueingCOnsumer= new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费者: " + msg);
}
}
}
实现方式
自定义类,继承 DefaultConsumer
生产者
/**
* @author niugang
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_consumer_exchange";
String routingKey = "consumer.save";
String msg = "Hello RabbitMQ Consumer Message";
for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}
}
}
自定义消费者
/**
* 自定义消费者
* @author niugang
*/
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
//消费标签
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
消费者
/**
* @author niugang
*/
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.#";
String queueName = "test_consumer_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
什么是消费端的限流
生产者
/**
* @author niugang
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhosy");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_qos_exchange";
String routingKey = "qos.save";
String msg = "Hello RabbitMQ QOS Message";
for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}
}
}
自定义消费者
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
//ack 注释掉后 控制台只会接收到一条消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
消费者
/**
* @author niugang
*/
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//1 限流方式 第一件事就是 autoAck设置为 false
//prefetchCount broker 给 消费者 最大推送消息数量
channel.basicQos(0, 1, false);
//手工签收
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
消费端的手工ACK和NACK
消费端的重回队列
消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker (requeue属性设置)
一般我们在实际应用中,都会关闭重回队列,也就是设置为false;
生产者
/**
* ack 测试生产者
* @author niugang
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_ack_exchange";
String routingKey = "ack.save";
for(int i =0; i<5; i ++){
Map
headers.put("num", i);
//设置消息属性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2).expiration("1000")
.contentEncoding("UTF-8")
.headers(headers)
.build();
String msg = "Hello RabbitMQ ACK Message " + i;
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
自定义消费者
/**
* 自定义消费者
* @author niugang
*/
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
//multiple 是否是批量
//requeue 重新添加到队列尾部
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
消费者
/**
* 消费者
* @author niugang
*/
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "ack.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 手工签收 必须要关闭 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
TTL
在控制台创建队列
在控制台创建exchange 并添加binding 然后发送消息,然后在队列页面可以看到消息自动被队列剔除
原生API设置TTL
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("1000").build();
Spring AMQP设置TTL
MessageProperties messageProperties = new MessageProperties();
//消息过期时间
messageProperties.setExpiration("1000");
Message stringMessage = new Message("Hello Springboot RabbitMQ".getBytes(), messageProperties);
死信队列:DLX ,Dead-Letter-Exchange
消息变成死信有以下几种情况:
死信队列详细解释
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能;
死信队列具体设置
step1:首先需要设置死信队列的exchange和queue,然后进行绑定
例如定义如下exchange和queue
step2:
然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可:arguments.put(“x-dead-letter-exchange”,“dlx.exchange”);
这样消息在过期、requeue、队列在达到最大长度时,消息就可以直接路由到死信队列。
生产者
/**
* 私信队列 生产端
*
* @author niugang
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();
//自定义普通的exchange
String exchange = "test_dlx_exchange";
String routingKey = "dlx.save";
String msg = "Hello RabbitMQ DLX Message";
for (int i = 0; i <1; i++) {
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
//过期时间为10s
.expiration("10000")
.build();
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
自定义消息消费
/**
* 自定义消息消费
*
* @author niugang
*/
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
生产者
/**
* 死信队列消费端
*
* @author niugang
*/
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 这就是一个普通的交换机 和 队列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map
//设置死信队列exchange 这些具体的参数可以通过rabbitmq控制台查看
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//这个agruments属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
//要进行死信队列的声明:
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}