热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ学习笔记之五种模式及消息确认机制

本文详细介绍简单模式Simple、工作模式Work、发布订阅模式PublishSubscribe、Topic、Routing。Maven依赖引用

本文详细介绍简单模式Simple、工作模式Work、发布订阅模式Publish/Subscribe、Topic、Routing。

Maven依赖引用

junitjunit4.11com.rabbitmqamqp-client4.0.2org.slf4jslf4j-api1.7.10org.slf4jslf4j-log4j121.7.5log4jlog4j1.2.17

连接RabbitMQ服务公用方法

package com.test.testboot.mq;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConnectionUtil {/*** 获取MQ的连接* @return* @throws IOException* @throws TimeoutException*/public static Connection getConection() throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory=new ConnectionFactory();//设置服务地址factory.setHost("127.0.0.1");//设置服务端口号factory.setPort(5672);//设置Hostfactory.setVirtualHost("/");//设置用户名factory.setUsername("");//设置密码factory.setPassword("123456");return factory.newConnection() ;}
}

模式1:简单队列模式(Simple)

消息生产者p将消息放入队列
消费者监听队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列删除
(隐患,消息可能没有被消费者正确处理,已经消失了,无法恢复)

应用场景:聊天室


  • 生产者

package com.test.testboot.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Sender {private static String QUEUE_NAME="test_simple_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取一个连接Connection connection=ConnectionUtil.getConection();//获取一个通道Channel channel=connection.createChannel();//创建队列声明channel.queueDeclare(QUEUE_NAME,false,false,false,null);String msg="Hello World!";System.out.println("sendMsg:"+msg);channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());channel.close();connection.close();}
}

  • 消费者

package com.test.testboot.mq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv {private static String QUEUE_NAME="test_simple_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取连接Connection connection=ConnectionUtil.getConection();//创建通道Channel channel = connection.createChannel();//定义队列消费者channel.queueDeclare(QUEUE_NAME,false,false,false,null);DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {//获取到达的消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String recvMsg = new String(body, "UTF-8");System.out.println("recvMsg:" + recvMsg);}};channel.basicConsume(QUEUE_NAME,true,defaultConsumer);;//老版本api/* QueueingConsumer comsumer= new QueueingConsumer(channel);channel.queueDeclare(QUEUE_NAME,false,false,false,null);//监听队列channel.basicConsume(QUEUE_NAME,true,comsumer);while(true){QueueingConsumer.Delivery delivery=comsumer.nextDelivery();String recvMsg=new String(delivery.getBody());System.out.println("recvMsg:"+recvMsg);}*/}
}

模式2:工作模式(Work)

生产者将消息放入队列
多个消费者同时监听同一个队列,消息如何被消费?(与具体的分发方式有关系)
C1,C2共同争抢当前消息队列的内容,谁先拿到消息,谁来负责消费
应用场景:红包;大型项目中的资源调度过程(直接由最空闲的系统争抢到资源处理任务)


  • 轮询分发(Round-Robin)

    • 生产者

package com.test.testboot.mq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {/** |--C1* p-----Queue---|* |--C2*/private static final String QUEUE_NAME&#61;"test_work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取ChannelChannel channel &#61; conection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);for (int i &#61; 0; i <10 ; i&#43;&#43;) {String msg&#61;"Hello"&#43;i;System.out.println("[WorkQueue] Send &#xff1a;"&#43;msg);channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());Thread.sleep(i*10);}channel.close();conection.close();}}

  • 消费者1

package com.test.testboot.mq.work;import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {private static final String QUEUE_NAME&#61;"test_work_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取ChannelChannel channel &#61; conection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//定义一个消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {//消息到达触发这个方法&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg&#61;new String(body,"UTF-8");System.out.println("Recv[1] recv msg:"&#43;msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("Recv[1] down");}}};/*true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息成功消费.false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈.*/boolean autoAck&#61;true;channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

  • 消费者2

package com.test.testboot.mq.work;import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {private static final String QUEUE_NAME&#61;"test_work_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取ChannelChannel channel &#61; conection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//定义一个消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {//消息到达触发这个方法&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg&#61;new String(body,"UTF-8");System.out.println("Recv[2] recv msg:"&#43;msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("Recv[2] down");}}};boolean autoAck&#61;true;channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

  • 公平分发&#xff08;Work Fair&#xff09;

  • 生产者

package com.test.testboot.mq.workfair;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {/** |--C1* p-----Queue---|* |--C2*/private static final String QUEUE_NAME&#61;"test_work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取ChannelChannel channel &#61; conection.createChannel();//每个消费者 发送确认消息之前&#xff0c;消息队列不发送下一个消息到消费者&#xff0c;一次只处理一个消息//限制发送给同一个消费者不得超过一条消息channel.basicQos(1);//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);for (int i &#61; 0; i <10 ; i&#43;&#43;) {String msg&#61;"Hello"&#43;i;System.out.println("[WorkQueue] Send &#xff1a;"&#43;msg);channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());Thread.sleep(i*5);}channel.close();conection.close();}}

  • 消费者1

package com.test.testboot.mq.workfair;import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {private static final String QUEUE_NAME&#61;"test_work_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取ChannelChannel channel &#61; conection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//每次只分发一个channel.basicQos(1);//定义一个消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {//消息到达触发这个方法&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg&#61;new String(body,"UTF-8");System.out.println("Recv[1] recv msg:"&#43;msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("Recv[1] down");//手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};boolean autoAck&#61;false; //自动应答channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

  • 消费者2

package com.test.testboot.mq.workfair;import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {private static final String QUEUE_NAME&#61;"test_work_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取Channelfinal Channel channel &#61; conection.createChannel();channel.basicQos(1);//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//定义一个消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {//消息到达触发这个方法&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg&#61;new String(body,"UTF-8");System.out.println("Recv[2] recv msg:"&#43;msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("Recv[2] down");channel.basicAck(envelope.getDeliveryTag(),false);}}};boolean autoAck&#61;false;//自动应答channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

模式三&#xff1a;订阅模式(publish/subscribe)

生产者将消息交给交换机
有交换机根据发布订阅的模式设定将消息同步到所有的绑定队列中;
后端的消费者都能拿到消息

应用场景:短信、邮件群发,群聊天,广告


  • 生产者

package com.test.testboot.mq.ps;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {/** |--C1* p---exchange----Queue-----|* |--C2*/private static final String EXCHANGE_NAME&#61;"test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取ChannelChannel channel &#61; conection.createChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME,"fanout");String msg&#61;"Hello";System.out.println("Send &#xff1a;"&#43;msg);channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());channel.close();conection.close();}}

  • 消费者1

package com.test.testboot.mq.ps;import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {private static final String QUEUE_NAME&#61;"test_queue_fanout_email";private static final String EXCHANGE_NAME&#61;"test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取ChannelChannel channel &#61; conection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//绑定交换机channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");//保证一次只分发一个channel.basicQos(1);//定义一个消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {//消息到达触发这个方法&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg&#61;new String(body,"UTF-8");System.out.println("[1] recv msg:"&#43;msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("[1] down");channel.basicAck(envelope.getDeliveryTag(),false);}}};boolean autoAck&#61;false;channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

  • 消费者2

package com.test.testboot.mq.ps;import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {private static final String QUEUE_NAME&#61;"test_queue_fanout_sms";private static final String EXCHANGE_NAME&#61;"test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取ChannelChannel channel &#61; conection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//绑定交换机channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");//保证每次只分发一个channel.basicQos(1);//定义一个消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {//消息到达触发这个方法&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg&#61;new String(body,"UTF-8");System.out.println("[2]recv msg:"&#43;msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("[2] down");channel.basicAck(envelope.getDeliveryTag(),false);}}};boolean autoAck&#61;true;channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

模式四&#xff1a;路由模式&#xff08;Routing&#xff09;

生产者发送消息到交换机,同时绑定一个路由Key,交换机根据路由key对下游绑定的队列进行路
由key的判断,满足路由key的队列才会接收到消息,消费者消费消息

应用场景: 项目中的error报错


  • 生产者

package com.test.testboot.mq.routing;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** &#64;description: 路由模式生产者* &#64;author: Mr.ADiao* &#64;create: 2019-10-21 11:04**/
public class Send {private static final String EXCHANGE_NAME&#61;"test_exchange_direct";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//创建频道Channel channel &#61; conection.createChannel();channel.basicQos(1);//声明交换机channel.exchangeDeclare(EXCHANGE_NAME, "direct");String msg&#61;"Hello Routing";String routingKey&#61;"warning";channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());System.out.println("send:"&#43;msg);channel.close();conection.close();}
}

  • 消费者1

package com.test.testboot.mq.routing;import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {private static final String EXCHANGE_NAME&#61;"test_exchange_direct";private static final String QUEUE_NAME&#61;"test_queue_direct1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtil.getConection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");channel.basicQos(1);//定义一个消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {//消息到达触发这个方法&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg&#61;new String(body,"UTF-8");System.out.println("[1] recv msg:"&#43;msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("[1] down");channel.basicAck(envelope.getDeliveryTag(),false);}}};boolean autoAck&#61;false;channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

  • 消费者2

package com.test.testboot.mq.routing;import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** &#64;description: 路由模式消费者2* &#64;author: Mr.ADiao* &#64;create: 2019-10-21 11:05**/
public class Recv2 {private static final String EXCHANGE_NAME&#61;"test_exchange_direct";private static final String QUEUE_NAME&#61;"test_queue_direct2";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtil.getConection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");channel.basicQos(1);//定义一个消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {//消息到达触发这个方法&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg&#61;new String(body,"UTF-8");System.out.println("[2] recv msg:"&#43;msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("[2] down");channel.basicAck(envelope.getDeliveryTag(),false);}}};boolean autoAck&#61;false;channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

模式五&#xff1a;主题模式&#xff08;Topics&#xff09;

topic模式也称为通配符模式&#xff0c;其实他相对于routing模式最大的好处就是他多了一种匹配模式的路由&#xff0c;怎么理解匹配呢&#xff0c;其实就相当于我们之前正则的.*这种&#xff0c;不过他的匹配机制可能不是这种&#xff08;其实除了匹配规则外&#xff0c;他的作用就和routing模式一样 &#xff09;

匹配规则&#xff1a;

绑定键binding key也必须是这种形式。以特定路由键发送的消息将会发送到所有绑定键与之匹配的队列中。但绑定键有两种特殊的情况&#xff1a; 
①*&#xff08;星号&#xff09;仅代表一个单词 
②#&#xff08;井号&#xff09;代表任意个单词

示例&#xff1a;

*.apple.* :  匹配以 任意一个单词字符开头中间包含 .orange. 以任意一个单词字符结尾 的字符串。比如 a.apple.b, asd.apple.qewf 等&#xff08;注意是一个单词&#xff09;。

log.# &#xff1a;只要一lay.开头的都匹配&#xff0c;他可以匹配product.a,product.a.b, product.b.c等。


  • 生产者

package com.adiao.rabbitmq.topics;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Send {private static final String EXCHANGE_NAME&#61;"test_exchange_topic";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");channel.basicQos(1);String msg&#61;"This is Topic Msg";channel.basicPublish(EXCHANGE_NAME, "myTopic.key2", null, msg.getBytes());System.out.println("Send:"&#43;msg);channel.close();connection.close();}}

  • 消费者1

package com.adiao.rabbitmq.topics;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;public class Recv1 {private static final String EXCHANGE_NAME&#61;"test_exchange_topic";private static final String QUEUE_NAME&#61;"test_queue_topic1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "myTopic.#");channel.basicQos(1);DefaultConsumer consumer &#61; new DefaultConsumer(channel){&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("[1] Recv:"&#43;new String(body,"UTF-8"));//手动回执channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME, false, consumer);}}

  • 消费者2

package com.adiao.rabbitmq.topics;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;public class Recv2 {private static final String EXCHANGE_NAME&#61;"test_exchange_topic";private static final String QUEUE_NAME&#61;"test_queue_topic2";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "myTopic.key1");channel.basicQos(1);DefaultConsumer consumer &#61; new DefaultConsumer(channel){&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("[2]Recv:"&#43;new String(body,"UTF-8"));channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME, false, consumer);}}

  • 消息确认机制之事务机制

AMQP协议自带机制

弊端&#xff1a;降低RabibtMQ的吞吐量


  • 生产者

package com.adiao.rabbitmq.tx;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Send {private static final String QUEUE_NAME&#61;"test_queue_tx";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String msg&#61;"This is tx";try {//开启事务channel.txSelect();channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());int i&#61;1/0;System.out.println("Send"&#43;msg);//提交事务channel.txCommit();} catch (Exception e) {e.printStackTrace();//回滚事务channel.txRollback();System.out.println("rollback");}channel.close();connection.close();}}

  • 消费者

package com.adiao.rabbitmq.tx;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;public class Recv {private static final String QUEUE_NAME &#61; "test_queue_tx";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("Recv:"&#43;new String(body,"UTF-8"));}});}}

  • 消息确认机制之Confirm同步确认

生产者

package com.adiao.rabbitmq.confirm;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Send {private static final String QUEUE_NAME&#61;"test_queue_confirm";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);//单条开启channel.confirmSelect();String msg&#61;"This is Confirm";channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());if(!channel.waitForConfirms()){System.out.println("Send Faile"); }else{System.out.println("Send Success"); }channel.close();connection.close();}}

消费者

package com.adiao.rabbitmq.confirm;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;public class Recv {private static final String QUEUE_NAME &#61; "test_queue_confirm";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();final Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("Recv:" &#43; new String(body, "UTF-8"));channel.basicAck(envelope.getDeliveryTag(), false);}});}}

  • 批量确认消费者

package com.adiao.rabbitmq.confirm;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class SendManyMsg {private static final String QUEUE_NAME&#61;"test_queue_confirm";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);//单条开启channel.confirmSelect();String msg&#61;"This is Confirm";for (int i &#61; 0; i <10; i&#43;&#43;) {if(i&#61;&#61;5){int j&#61;i/0;}channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());}channel.waitForConfirmsOrDie();System.out.println("全部发送完成");channel.close();connection.close();}}

  • 异步确认消费者

    package com.adiao.rabbitmq.confirm;import java.io.IOException;
    import java.util.Date;
    import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmListener;
    import com.rabbitmq.client.Connection;public class SendAsyn {private static final String QUEUE_NAME&#61;"test_queue_confirm";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() {public void handleNack(long deliveryTag, boolean multiple)throws IOException {System.out.println("未确认消息标识&#xff1a;"&#43;deliveryTag);}public void handleAck(long deliveryTag, boolean multiple)throws IOException {System.out.println(String.format("已确认消息标识:%d (%b)", deliveryTag,multiple));}});for (int i &#61; 0; i <10; i&#43;&#43;) {String msg&#61;new Date().getTime()&#43; ":This is Asyn Confirm";System.out.println("Send:"&#43;msg);channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());Thread.sleep(1000);}channel.close();connection.close();}}

     


推荐阅读
author-avatar
幻竞_847
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有