热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ入门4:生产者、消费者演示;多个消费者平均压力、公平派遣;

说明:(1)内容说明:●这儿我们会创建一个项目,演示RabbitMQ最基础的内容;通过

说明:

(1)内容说明:

          ● 这儿我们会创建一个项目,演示RabbitMQ最基础的内容;

通过,这个最简单的例子,先了解:如何使用RabbitMQ,如何连接RabbitMQ,如何发送消息,如何接收消息等最最基础的内容;

          ● 然后,会演示多个消费者平均压力的内容;

目录

一:第一个生产者和消费者;

0.创建一个maven项目rabbitmq,演示用;

1.引入RabbitMQ的Java客户端的,依赖;

2.第一个生产者;

3.第一个消费者;

4.瞅一眼RabbitMQ管理后台;

二: 根据消息内容的不同,采取不同的处理策略;(这儿演示的是一种思路)

三:当消息相对较多时,多个消费者平均压力; 

1.多个消费者,平均压力:引入; 

2.公平派遣;



一:第一个生产者和消费者;

0.创建一个maven项目rabbitmq,演示用;

 RabbitMQ支持多语言,其中就包括Java;同时,RabbitMQ的API丰富,我们可以利用RabbitMQ针对Java提供的客户端的一系列API,来完成操作;


1.引入RabbitMQ的Java客户端的,依赖;

com.rabbitmqamqp-client5.8.0org.slf4jslf4j-nop1.7.29

说明:

(1)依赖说明;


2.第一个生产者;

Send类:

package helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 描述:发送消息的类:连接到RabbitMQ的服务端,然后发送一条消息,然后退出;*/
public class Send {//我们发送消息时,需要指定要发到哪里去;所以,我们要指定队列的名字;所以,这儿我们定义队列的名字;//这个名字可随便取,待会在接收的消息时候,要使用这个队列;private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();//2.设置RabbitMQ的地址(即RabbitMQ的服务端的地址)//这里面填写是RabbitMQ服务端所在服务器的ip地址connectionFactory.setHost("1**.***.***.**8");//然后,要想连接RabbitMQ的服务端,我么还需要通过一个用户才行;// 所以,这儿我们使用【前面我们设置的,能够在其他服务器上访问RabbitMQ所在服务器的,admin用户】connectionFactory.setUsername("admin");connectionFactory.setPassword("password");//PS:记得要放开RabbitMQ部署的服务器的,5672端口;//3.建立连接Connection connection = connectionFactory.newConnection();//4.获得Channel信道(我们大部分的操作,都是在信道上完成的;有了信道后,我们就可以进行操作了)Channel channel = connection.createChannel();//5.声明队列(有了队列之后,我们就可以发布消息了)//参数说明:第一个参数(queue):队列名;// 第二个参数(durable):这个队列是否需要持久(即,服务重启后,这个队列是否需要还存在;这儿我们根据自己的需求,设为了false;)//第三个参数(exclusive):这个队列是否独有(即,这个队列是不是仅能给这个连接使用;这儿我们设为了false)//第四个参数(autoDelete):这个队列是否需要自动删除(即,在队列没有使用的时候,是否需要自动删除;这儿我们设为了false)//第五个参数(arguments);channel.queueDeclare(QUEUE_NAME, false, false, false, null);//6.发布消息String message = "测试的消息";//参数说明:第一个参数(exchange)是交换机,这儿我们暂时不深入了解;// 第二个参数(routingKey)是路由键,这儿我们就写成队列的名字;//第三个参数(props),消息除了消息体外,还要有props作为它的配置;// 第四个参数(body)消息的内容,要求是byte[]类型的,同时,需要指定编码类型channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));System.out.println("消息发送成功了:" + message);//7.关闭连接:先关闭channel信道,然后关闭connection连接;channel.close();connection.close();}}

说明:

(0)RabbitMQ有三个所谓的“端”:这儿梳理一下;

          ● 服务端:就是我们安装了RabbitMQ的Linux系统;我们把RabbitMQ启动后,这个服务器中的RabbitMQ就是服务端;

          ● 管理后台:RabbitMQ的管理后台,就是我们在【RabbitMQ入门3:RabbitMQ管理后台,简介;】中,演示的在web端查看、管理RabbitMQ的一些配置的地方;(PS:要想在网页上访问管理后台,那么部署RabbitMQ的服务器,就要开发RabbitMQ的15672端口)

          ● 客户端:比如,在这儿,我们在我们的Java项目中,引入RabbitMQ提供的Java客户端后,我们就可以通过客户端去操作RabbitMQ了;(PS:要想在远端服务器,通过客户端访问RabbitMQ服务,那么部署RabbitMQ的服务器,就要开发RabbitMQ的5672端口)

(1)看注释;这儿的连接RabbitMQ的套路,都是相对固定的,后面如有需要,我们似乎也可以创建一个工具类;

(2)我们要想在其他服务器,通过客户端访问RabbitMQ,那么部署RabbitMQ的服务器,就要开发RabbitMQ的5672端口;关于Linux防火墙设置,可以参考【Linux进阶六:【firewall-cmd】防火墙设置;(以【对外开放Tomcat】为例来演示)】;

(3)如果我们的RabbitMQ是安装在本机的话,就可以设置本机地址,然后其默认会使用guest用户去登录,所以设置用户就可以省略;

(4)运行结果:


3.第一个消费者;

Recv类:

package helloworld;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 描述:接收消息的类:连接到RabbitMQ的服务端,然后接收消息;这个接收消息的类,会持续运行;*/
public class Recv {//我们这儿想要接收的消息,就是Send类发送到“hello”这个队列中的消息;// 所以,在接收消息的时候,我们也要使用到“hello”这个队列;private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();//2.设置RabbitMQ的地址(即RabbitMQ的服务端的地址)//这里面填写是RabbitMQ服务端所在服务器的ip地址connectionFactory.setHost("1**.***.***.**8");//然后,要想连接RabbitMQ的服务端,我么还需要通过一个用户才行;// 所以,这儿我们使用【前面我们设置的,能够在其他服务器上访问RabbitMQ所在服务器的,admin用户】connectionFactory.setUsername("admin");connectionFactory.setPassword("password");//PS:记得要放开RabbitMQ部署的服务器的,5672端口;//3.建立连接Connection connection = connectionFactory.newConnection();//4.获得Channel信道(我们大部分的操作,都是在信道上完成的;有了信道后,我们就可以进行操作了)Channel channel = connection.createChannel();//5.声明队列:因为这儿想要接收的消息,就是Send类发送到“hello”这个队列中的消息;所以,这儿声明的队列和Send中声明的队列是一样的;//参数说明:第一个参数(queue):队列名;// 第二个参数(durable):这个队列是否需要持久(即,服务重启后,这个队列是否需要还存在;这儿我们根据自己的需求,设为了false;)//第三个参数(exclusive):这个队列是否独有(即,这个队列是不是仅能给这个连接使用;这儿我们设为了false)//第四个参数(autoDelete):这个队列是否需要自动删除(即,在队列没有使用的时候,是否需要自动删除;这儿我们设为了false)//第五个参数(arguments);channel.queueDeclare(QUEUE_NAME, false, false, false, null);//6.接收消息,并消费//参数说明:第一个参数(queue)队列名;// 第二个参数(autoAck),是否去自动的确认收到;即,这儿接收到消息之后,是否需要通知消息发送者;//第三个参数(callback),消息收到后的处理channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(message);}});}
}

说明:

(1)接收消息的时候,我们不需要关闭channel和connection,这个类会一直运行,即其会一直看队列中有没有数据,有的话就拿过来消费;

(2)看注释;

(3)我们接收到消息后,具体消息怎么处理,写在了handleDelivery()方法中;

(4)运行结果;


4.瞅一眼RabbitMQ管理后台;



二: 根据消息内容的不同,采取不同的处理策略;(这儿演示的是一种思路)

在实际开发中,有时可能需要根据消息内容的不同,采取不同的处理策略;本篇博客,就来演示一下;

NewTask类:

package workqueues;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 描述:发送消息的类:其会发送*/
public class NewTask {//我们发送消息时&#xff0c;需要指定要发到哪里去&#xff1b;所以&#xff0c;我们要指定队列的名字&#xff1b;所以&#xff0c;这儿我们定义队列的名字&#xff1b;//这个名字可随便取&#xff0c;待会在接收的消息时候&#xff0c;要使用这个队列&#xff1b;private final static String TASK_QUEUE_NAME &#61; "task_queue";public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory &#61; new ConnectionFactory();//2.设置RabbitMQ的地址&#xff08;即RabbitMQ的服务端的地址&#xff09;//这里面填写是RabbitMQ服务端所在服务器的ip地址connectionFactory.setHost("1**.***.***.**8");//然后&#xff0c;要想连接RabbitMQ的服务端&#xff0c;我么还需要通过一个用户才行&#xff1b;// 所以&#xff0c;这儿我们使用【前面我们设置的&#xff0c;能够在其他服务器上访问RabbitMQ所在服务器的&#xff0c;admin用户】connectionFactory.setUsername("admin");connectionFactory.setPassword("password");//PS:记得要放开RabbitMQ部署的服务器的&#xff0c;5672端口&#xff1b;//3.建立连接Connection connection &#61; connectionFactory.newConnection();//4.获得Channel信道&#xff08;我们大部分的操作&#xff0c;都是在信道上完成的&#xff1b;有了信道后&#xff0c;我们就可以进行操作了&#xff09;Channel channel &#61; connection.createChannel();//5.声明队列&#xff08;有了队列之后&#xff0c;我们就可以发布消息了&#xff09;//参数说明&#xff1a;第一个参数(queue)&#xff1a;队列名&#xff1b;// 第二个参数(durable)&#xff1a;这个队列是否需要持久&#xff08;即&#xff0c;服务重启后&#xff0c;这个队列是否需要还存在&#xff1b;这儿我们根据自己的需求&#xff0c;设为了true&#xff1b;&#xff09;//第三个参数(exclusive)&#xff1a;这个队列是否独有&#xff08;即&#xff0c;这个队列是不是仅能给这个连接使用&#xff1b;这儿我们设为了false&#xff09;//第四个参数(autoDelete)&#xff1a;这个队列是否需要自动删除&#xff08;即&#xff0c;在队列没有使用的时候&#xff0c;是否需要自动删除&#xff1b;这儿我们设为了false&#xff09;//第五个参数(arguments)&#xff1b;channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);//6.发布消息;这儿我们模拟发送10条消息&#xff08;消息内容是&#xff0c;如"1...","2...","3..."……这样的&#xff09;for (int i &#61; 0; i <10; i&#43;&#43;) {String message &#61; i &#43; "...";//参数说明&#xff1a;第一个参数&#xff08;exchange&#xff09;是交换机&#xff0c;这儿我们暂时不深入了解&#xff1b;// 第二个参数&#xff08;routingKey&#xff09;是路由键&#xff0c;这儿我们就写成队列的名字&#xff1b;//第三个参数&#xff08;props&#xff09;&#xff0c;消息除了消息体外&#xff0c;还要有props作为它的配置&#xff1b;// 第四个参数&#xff08;body&#xff09;消息的内容&#xff0c;要求是byte[]类型的&#xff0c;同时&#xff0c;需要指定编码类型channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println("消息发送成功了:" &#43; message);}//7.关闭连接&#xff1a;先关闭channel信道&#xff0c;然后关闭connection连接&#xff1b;channel.close();connection.close();}
}

说明&#xff1a;

&#xff08;1&#xff09;看注释&#xff1b;

&#xff08;2&#xff09;类内容说明&#xff1b;

Worker类&#xff1a;

package workqueues;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 描述&#xff1a;接收消息的类&#xff1a;连接到RabbitMQ的服务端&#xff0c;然后接收消息&#xff1b;这个接收消息的类&#xff0c;会持续运行&#xff1b;*/
public class Worker {//我们这儿想要接收的消息&#xff0c;就是NewTask类发送到“task_queue”这个队列中的消息&#xff1b;// 所以&#xff0c;在接收消息的时候&#xff0c;我们也要使用到“task_queue”这个队列&#xff1b;private final static String TASK_QUEUE_NAME &#61; "task_queue";public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory &#61; new ConnectionFactory();//2.设置RabbitMQ的地址&#xff08;即RabbitMQ的服务端的地址&#xff09;//这里面填写是RabbitMQ服务端所在服务器的ip地址connectionFactory.setHost("1**.***.***.**8");//然后&#xff0c;要想连接RabbitMQ的服务端&#xff0c;我么还需要通过一个用户才行&#xff1b;// 所以&#xff0c;这儿我们使用【前面我们设置的&#xff0c;能够在其他服务器上访问RabbitMQ所在服务器的&#xff0c;admin用户】connectionFactory.setUsername("admin");connectionFactory.setPassword("password");//PS:记得要放开RabbitMQ部署的服务器的&#xff0c;5672端口&#xff1b;//3.建立连接Connection connection &#61; connectionFactory.newConnection();//4.获得Channel信道&#xff08;我们大部分的操作&#xff0c;都是在信道上完成的&#xff1b;有了信道后&#xff0c;我们就可以进行操作了&#xff09;Channel channel &#61; connection.createChannel();//5.声明队列&#xff1a;因为这儿想要接收的消息&#xff0c;就是Send类发送到“hello”这个队列中的消息&#xff1b;所以&#xff0c;这儿声明的队列和Send中声明的队列是一样的&#xff1b;//参数说明&#xff1a;第一个参数(queue)&#xff1a;队列名&#xff1b;// 第二个参数(durable)&#xff1a;这个队列是否需要持久&#xff08;即&#xff0c;服务重启后&#xff0c;这个队列是否需要还存在&#xff1b;这儿我们根据自己的需求&#xff0c;设为了true&#xff1b;&#xff09;//第三个参数(exclusive)&#xff1a;这个队列是否独有&#xff08;即&#xff0c;这个队列是不是仅能给这个连接使用&#xff1b;这儿我们设为了false&#xff09;//第四个参数(autoDelete)&#xff1a;这个队列是否需要自动删除&#xff08;即&#xff0c;在队列没有使用的时候&#xff0c;是否需要自动删除&#xff1b;这儿我们设为了false&#xff09;//第五个参数(arguments)&#xff1b;channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println("开始接收消息");//6.接收消息&#xff0c;并消费//参数说明&#xff1a;第一个参数&#xff08;queue&#xff09;队列名&#xff1b;// 第二个参数&#xff08;autoAck&#xff09;&#xff0c;是否去自动的确认收到&#xff1b;即&#xff0c;这儿接收到消息之后&#xff0c;是否需要通知消息发送者&#xff1b;//第三个参数&#xff08;callback&#xff09;&#xff0c;消息收到后的处理channel.basicConsume(TASK_QUEUE_NAME, true, new DefaultConsumer(channel){&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message &#61; new String(body, "UTF-8");System.out.println("收到了消息" &#43; message);try{doWork(message);}finally {System.out.println("完成消息处理");}}});}/*** 工具方法&#xff1a;处理消息&#xff1b;* &#64;param task*/private static void doWork(String task) {//根据具体消息内容的不同&#xff0c;去处理消息&#xff1b;// 即&#xff0c;如果消息中有&#39;.&#39;的话&#xff0c;那么我们就让其处理速度慢1秒&#xff1b;&#xff08;PS&#xff1a;这儿仅仅是为了演示用的&#xff0c;玩具式程序&#xff09;// 那么&#xff0c;这样一来&#xff0c;就会出现这个效果&#xff1a;如果消息中没有&#39;.&#39;&#xff0c;处理的就会很快&#xff1b;如果有&#39;.&#39;&#xff0c;处理速度就会慢的多&#xff1b;char[] chars &#61; task.toCharArray();for (char ch : chars) {if (ch &#61;&#61; &#39;.&#39;) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}
}

说明&#xff1a;

&#xff08;1&#xff09;看注释&#xff1b;

&#xff08;2&#xff09;类内容说明&#xff1b;

&#xff08;3&#xff09;运行结果&#xff1b;



三&#xff1a;当消息相对较多时&#xff0c;多个消费者平均压力&#xff1b; 

可以看到&#xff0c;在&#xff08;二&#xff1a; 根据消息内容的不同&#xff0c;采取不同的处理策略&#xff1b;&#xff08;这儿演示的是一种思路&#xff09;&#xff09;中&#xff0c;单靠Worker处理10条信息&#xff0c;需要10秒多&#xff1b;试想&#xff0c;我们再创建一个Worker&#xff0c;10条消息的处理速度就会提升&#xff1b;


1.多个消费者&#xff0c;平均压力&#xff1a;引入&#xff1b; 

一个直接的想法就是&#xff0c;既然我们想让多个Worker一起来接收并处理消息&#xff0c;那么我们可不可以再运行一次Worker类&#xff1f;

进行一下配置&#xff0c;让Worker类可以有多个实例并行运行&#xff1b;

那么&#xff0c;此时的效果&#xff1b;


默认情况下&#xff0c;如果有多个Worker的话&#xff0c;那么这多个Worker会并行工作&#xff1b;RabbitMQ会根据已启动worker和消息的情况&#xff0c;按顺序把每个消息发送给下一个Worker&#xff0c;在消息数量上是平均分配的&#xff1b;即&#xff0c;比如上面有10条消息&#xff0c;两个消费者&#xff0c;那么无论两个消费者处理能力如何&#xff0c;每个消费者都会收到5条消息&#xff1b;

但是&#xff0c;这种平均&#xff0c;是任务量的平均分配&#xff0c;而不一定是真实工作量&#xff08;压力&#xff09;的分配&#xff1b;比如&#xff0c;下面的案例&#xff1b;

此时&#xff0c;再发送消息&#xff0c;观察效果&#xff1a;

而&#xff0c;为了解决这种&#xff0c;纯按数量和顺序分配&#xff0c;却没有按工作量&#xff08;压力&#xff09;平均分配的问题&#xff1b;就是下面公平派遣的内容了&#xff1b;


2.公平派遣&#xff1b;


公平派遣是在有多个消费者&#xff0c;而且是循环调度的情况下&#xff0c;来说的&#xff1b;公平派遣机制下&#xff0c;RabbitMQ会根据消费者的压力&#xff0c;来决定是否派遣&#xff1b;

要想实现公平派遣&#xff0c;还需要加入消息确认机制&#xff1b;主要目的是&#xff0c;消费者处理完消息后&#xff0c;发送一个确认消息&#xff0c;这样一来&#xff0c;RabbitMQ就知道你处理完了&#xff0c;然后就会给你发送下一个消息&#xff1b;

修改Worker这个Consumer类的内容如下&#xff1a;

package workqueues;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 描述&#xff1a;接收消息的类&#xff1a;连接到RabbitMQ的服务端&#xff0c;然后接收消息&#xff1b;这个接收消息的类&#xff0c;会持续运行&#xff1b;*/
public class Worker {//我们这儿想要接收的消息&#xff0c;就是NewTask类发送到“task_queue”这个队列中的消息&#xff1b;// 所以&#xff0c;在接收消息的时候&#xff0c;我们也要使用到“task_queue”这个队列&#xff1b;private final static String TASK_QUEUE_NAME &#61; "task_queue";public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory connectionFactory &#61; new ConnectionFactory();//2.设置RabbitMQ的地址&#xff08;即RabbitMQ的服务端的地址&#xff09;//这里面填写是RabbitMQ服务端所在服务器的ip地址connectionFactory.setHost("1**.***.***.**8");//然后&#xff0c;要想连接RabbitMQ的服务端&#xff0c;我么还需要通过一个用户才行&#xff1b;// 所以&#xff0c;这儿我们使用【前面我们设置的&#xff0c;能够在其他服务器上访问RabbitMQ所在服务器的&#xff0c;admin用户】connectionFactory.setUsername("admin");connectionFactory.setPassword("password");//PS:记得要放开RabbitMQ部署的服务器的&#xff0c;5672端口&#xff1b;//3.建立连接Connection connection &#61; connectionFactory.newConnection();//4.获得Channel信道&#xff08;我们大部分的操作&#xff0c;都是在信道上完成的&#xff1b;有了信道后&#xff0c;我们就可以进行操作了&#xff09;final Channel channel &#61; connection.createChannel();//5.声明队列&#xff1a;因为这儿想要接收的消息&#xff0c;就是Send类发送到“hello”这个队列中的消息&#xff1b;所以&#xff0c;这儿声明的队列和Send中声明的队列是一样的&#xff1b;//参数说明&#xff1a;第一个参数(queue)&#xff1a;队列名&#xff1b;// 第二个参数(durable)&#xff1a;这个队列是否需要持久&#xff08;即&#xff0c;服务重启后&#xff0c;这个队列是否需要还存在&#xff1b;这儿我们根据自己的需求&#xff0c;设为了true&#xff1b;&#xff09;//第三个参数(exclusive)&#xff1a;这个队列是否独有&#xff08;即&#xff0c;这个队列是不是仅能给这个连接使用&#xff1b;这儿我们设为了false&#xff09;//第四个参数(autoDelete)&#xff1a;这个队列是否需要自动删除&#xff08;即&#xff0c;在队列没有使用的时候&#xff0c;是否需要自动删除&#xff1b;这儿我们设为了false&#xff09;//第五个参数(arguments)&#xff1b;channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println("开始接收消息");//这句话的意思&#xff0c;这个消费者最希望处理的消息的数量&#xff1b;那么效果就是&#xff0c;这个消费者在处理完一个消息之前&#xff0c;是不会接收下一个消息的&#xff1b;channel.basicQos(1);//6.接收消息&#xff0c;并消费//参数说明&#xff1a;第一个参数&#xff08;queue&#xff09;队列名&#xff1b;// 第二个参数&#xff08;autoAck&#xff09;&#xff0c;是否去自动的确认收到&#xff1b;即&#xff0c;这儿接收到消息之后&#xff0c;是否需要通知消息发送者&#xff1b;//为了演示公平派遣&#xff0c;我们这儿改成了false&#xff0c;即我么手动却发送处理了完成的消息//第三个参数&#xff08;callback&#xff09;&#xff0c;消息收到后的处理channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel){&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message &#61; new String(body, "UTF-8");System.out.println("收到了消息" &#43; message);try{doWork(message);}finally {System.out.println("完成消息处理");//消息处理完成后&#xff0c;去手动确认&#xff1b;//第一个参数(deliveryTag)这个参数是固定的&#xff1b;//第二参数(multiple)意思是&#xff0c;我们是否同时多个消息一起确认&#xff0c;这儿我们不需要&#xff0c;所以设为了false&#xff1b;channel.basicAck(envelope.getDeliveryTag(), false);}}});}/*** 工具方法&#xff1a;处理消息&#xff1b;* &#64;param task*/private static void doWork(String task) {//根据具体消息内容的不同&#xff0c;去处理消息&#xff1b;// 即&#xff0c;如果消息中有&#39;.&#39;的话&#xff0c;那么我们就让其处理速度慢1秒&#xff1b;&#xff08;PS&#xff1a;这儿仅仅是为了演示用的&#xff0c;玩具式程序&#xff09;// 那么&#xff0c;这样一来&#xff0c;就会出现这个效果&#xff1a;如果消息中没有&#39;.&#39;&#xff0c;处理的就会很快&#xff1b;如果有&#39;.&#39;&#xff0c;处理速度就会慢的多&#xff1b;char[] chars &#61; task.toCharArray();for (char ch : chars) {if (ch &#61;&#61; &#39;.&#39;) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}
}

说明&#xff1a;

&#xff08;1&#xff09;修改内容说明&#xff1b;

&#xff08;2&#xff09;注意&#xff1a;此时&#xff0c;一定要记得主动发送【消息处理完成的通知】&#xff1b;否则&#xff0c;RabbitMQ就不知道&#xff0c;这个消息是否被处理完了&#xff0c;其就会认为没有被处理完&#xff0c;于是后续的消息就会得不到处理&#xff0c;越积越多&#xff0c;消耗内存&#xff1b;

&#xff08;3&#xff09;运行效果&#xff1b;此时&#xff0c;我们重开两个Worker&#xff0c;然后运行NewTask&#xff0c;发送10条消息&#xff1b;

 


推荐阅读
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • Java 中优先级队列的轮询方法详解与应用 ... [详细]
  • 如何在Java中高效构建WebService
    本文介绍了如何利用XFire框架在Java中高效构建WebService。XFire是一个轻量级、高性能的Java SOAP框架,能够简化WebService的开发流程。通过结合MyEclipse集成开发环境,开发者可以更便捷地进行项目配置和代码编写,从而提高开发效率。此外,文章还详细探讨了XFire的关键特性和最佳实践,为读者提供了实用的参考。 ... [详细]
  • 本文详细探讨了Java集合框架的使用方法及其性能特点。首先,通过关系图展示了集合接口之间的层次结构,如`Collection`接口作为对象集合的基础,其下分为`List`、`Set`和`Queue`等子接口。其中,`List`接口支持按插入顺序保存元素且允许重复,而`Set`接口则确保元素唯一性。此外,文章还深入分析了不同集合类在实际应用中的性能表现,为开发者选择合适的集合类型提供了参考依据。 ... [详细]
  • 工厂方法模式详解:莫勇鹏老师的深入解析与应用实例
    2019年,独角兽企业高薪招聘Python工程师时特别关注工厂方法模式(Factory Method Pattern)。该模式通过定义一个创建对象的接口,让子类决定实例化哪一个类。莫勇鹏老师深入解析了这一设计模式,并提供了丰富的应用实例,帮助开发者更好地理解和运用工厂方法模式,提升软件设计的灵活性和可扩展性。 ... [详细]
  • Java新手求助:如何优雅地向心仪女生索要QQ联系方式(附代码示例与技巧)
    在端午节后的闲暇时光中,我无意间在技术社区里发现了一篇关于如何巧妙地向心仪女生索取QQ联系方式的文章,顿时感到精神焕发。这篇文章详细介绍了源自《啊哈!算法》的方法,不仅图文并茂,还提供了实用的代码示例和技巧,非常适合 Java 新手学习和参考。 ... [详细]
  • Java 8 引入了 Stream API,这一新特性极大地增强了集合数据的处理能力。通过 Stream API,开发者可以更加高效、简洁地进行集合数据的遍历、过滤和转换操作。本文将详细解析 Stream API 的核心概念和常见用法,帮助读者更好地理解和应用这一强大的工具。 ... [详细]
  • 本文深入探讨了IO复用技术的原理与实现,重点分析了其在解决C10K问题中的关键作用。IO复用技术允许单个进程同时管理多个IO对象,如文件、套接字和管道等,通过系统调用如`select`、`poll`和`epoll`,高效地处理大量并发连接。文章详细介绍了这些技术的工作机制,并结合实际案例,展示了它们在高并发场景下的应用效果。 ... [详细]
  • 本课程详细解析了Spring AOP的核心概念及其增强机制,涵盖前置增强、后置增强和环绕增强等类型。通过具体示例,深入探讨了如何在实际开发中有效运用这些增强技术,以提升代码的模块化和可维护性。此外,还介绍了Spring AOP在异常处理和性能监控等场景中的应用,帮助开发者更好地理解和掌握这一强大工具。 ... [详细]
  • 在启用分层编译的情况下,即时编译器(JIT)的触发条件涉及多个因素,包括方法调用频率、代码复杂度和运行时性能数据。本文将详细解析这些条件,并探讨分层编译如何优化JVM的执行效率。 ... [详细]
  • 本文深入探讨了 Spring Cloud 微服务架构中 Gateway 组件的应用,详细介绍了其在实现高效请求路由与过滤方面的关键作用。文章首先从基本配置入手,逐步讲解了如何通过静态路由和动态路由实现灵活的服务访问控制。此外,还特别介绍了如何配置 Gateway 以自动从 Nacos 服务注册中心拉取服务列表,进一步提升系统的可维护性和扩展性。 ... [详细]
  • IIS 7及7.5版本中应用程序池的最佳配置策略与实践
    在IIS 7及7.5版本中,优化应用程序池的配置是提升Web站点性能的关键步骤。具体操作包括:首先定位到目标Web站点的应用程序池,然后通过“应用程序池”菜单找到对应的池,右键选择“高级设置”。在一般优化方案中,建议调整以下几个关键参数:1. **基本设置**: - **队列长度**:默认值为1000,可根据实际需求调整队列长度,以提高处理请求的能力。此外,还可以进一步优化其他参数,如处理器使用限制、回收策略等,以确保应用程序池的高效运行。这些优化措施有助于提升系统的稳定性和响应速度。 ... [详细]
  • 本研究基于状态空间方法,通过动态可视化技术实现了汉诺塔问题的求解过程,即将n个盘子从A柱移动到C柱。本文提供了一个使用C语言在控制台进行动画绘制的示例,并详细注释了程序逻辑,以帮助读者更好地理解和学习该算法。 ... [详细]
  • 在Java编程中,初学者常会遇到“无法从静态上下文中引用非静态变量this”的编译错误。此问题根源在于对静态(static)与非静态成员特性的理解不足。静态成员属于类本身,而非特定对象实例,因此在静态方法或静态初始化块中直接访问非静态成员会导致编译失败。解决这一问题的关键是将相关代码移至非静态方法中,或通过创建类的实例来间接访问非静态成员。 ... [详细]
  • 深入解析 C 语言与 C++ 之间的差异及关联
    深入解析 C 语言与 C++ 之间的差异及关联 ... [详细]
author-avatar
木瓜香皂a
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有