作者:Jen覀nyfraaaaa-v- | 来源:互联网 | 2023-08-17 12:47
一、消费端ack二、重回队列三、代码测试3.1producer端代码importjava.util.HashMap;importjava.util.Map;importcom.
一、消费端ack
二、重回队列
三、代码测试
3.1 producer 端代码
import java.util.HashMap;
import java.util.Map;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory &#61; new ConnectionFactory();connectionFactory.setHost("192.168.11.76");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection &#61; connectionFactory.newConnection();Channel channel &#61; connection.createChannel();String exchange &#61; "test_ack_exchange";String routingKey &#61; "ack.save";for(int i &#61;0; i<5; i &#43;&#43;){Map headers &#61; new HashMap();headers.put("num", i);AMQP.BasicProperties properties &#61; new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").headers(headers).build();String msg &#61; "Hello RabbitMQ ACK Message " &#43; i;channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());}}
}
3.2 consumer端代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;public class Consumer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory &#61; new ConnectionFactory();connectionFactory.setHost("192.168.11.76");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection &#61; connectionFactory.newConnection();Channel channel &#61; connection.createChannel();String exchangeName &#61; "test_ack_exchange";String queueName &#61; "test_ack_queue";String routingKey &#61; "ack.#";channel.exchangeDeclare(exchangeName, "topic", true, false, null);channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);// 手工签收 必须要关闭 autoAck &#61; falsechannel.basicConsume(queueName, false, new MyConsumer(channel));}
}
3.3 myconsumer 代码
channel.basicNack(envelope.getDeliveryTag(), false, true);
第一个参数是deliverTag
第二参数是否批量
第三个测试是否重回队列
import java.io.IOException;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;public class MyConsumer extends DefaultConsumer {private Channel channel ;public MyConsumer(Channel channel) {super(channel);this.channel &#61; channel;}&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.err.println("-----------consume message----------");System.err.println("body: " &#43; new String(body));try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}if((Integer)properties.getHeaders().get("num") &#61;&#61; 0) {channel.basicNack(envelope.getDeliveryTag(), false, true);} else {channel.basicAck(envelope.getDeliveryTag(), false);}}}
消息的确认类型&#xff1a;
1&#xff09;channel.basicAck(deliveryTag, multiple);
consumer处理成功后&#xff0c;通知broker删除队列中的消息&#xff0c;如果设置multiple&#61;true&#xff0c;表示支持批量确认机制以减少网络流量。
例如&#xff1a;有值为5,6,7,8 deliveryTag的投递
如果此时channel.basicAck(8, true);则表示前面未确认的5,6,7投递也一起确认处理完毕。
如果此时channel.basicAck(8, false);则仅表示deliveryTag&#61;8的消息已经成功处理。
2&#xff09;channel.basicNack(deliveryTag, multiple, requeue);
consumer处理失败后&#xff0c;例如&#xff1a;有值为5,6,7,8 deliveryTag的投递。
如果channel.basicNack(8, true, true);表示deliveryTag&#61;8之前未确认的消息都处理失败且将这些消息重新放回队列中。
如果channel.basicNack(8, true, false);表示deliveryTag&#61;8之前未确认的消息都处理失败且将这些消息直接丢弃。
如果channel.basicNack(8, false, true);表示deliveryTag&#61;8的消息处理失败且将该消息重新放回队列。
如果channel.basicNack(8, false, false);表示deliveryTag&#61;8的消息处理失败且将该消息直接丢弃。
3&#xff09;channel.basicReject(deliveryTag, requeue);
相比channel.basicNack&#xff0c;除了没有multiple批量确认机制之外&#xff0c;其他语义完全一样。
如果channel.basicReject(8, true);表示deliveryTag&#61;8的消息处理失败且将该消息重新放回队列。
如果channel.basicReject(8, false);表示deliveryTag&#61;8的消息处理失败且将该消息直接丢弃。