作者:手机用户2402851335 | 来源:互联网 | 2023-08-13 13:10
WorkQueues工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行
Work Queues
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
轮训发送消息
启动两个线程,一个消息发送线程,来看看这两个工作线程是如何工作的。
抽取工具类
package com.uin;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author wanglufei
* @description: 工具类
* @date 2022/1/24/12:29 AM
*/
public class RabbitMQUtils {
public static Channel getChannel() throws IOException, TimeoutException {
//引入连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
Connection cOnnection= factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
启动两个工作线程
package com.uin.work_queues;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.uin.utils.RabbitMQUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author wanglufei
* @description: TODO
* @date 2022/1/24/12:40 AM
*/
public class Consumer_work01 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
//接受消息的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接受到的消息:" + new String(message.getBody()));
};
//取消消息的回调
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消息被取消消费者接口的回调逻辑!");
};
/**
* 消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答
* 3.未成功消费的一个回调
* 4.消费者取消消费的回调
*/
System.out.println("第一个工作线程!等待接受消息。。。。");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
package com.uin.work_queues;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.uin.utils.RabbitMQUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author wanglufei
* @description: TODO
* @date 2022/1/24/1:03 AM
*/
public class Consumer_work02 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
//接受消息的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接受到的消息:" + new String(message.getBody()));
};
//取消消息的回调
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消息被取消消费者接口的回调逻辑!");
};
/**
* 消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答
* 3.未成功消费的一个回调
* 4.消费者取消消费的回调
*/
System.out.println("第二个工作线程!等待接受消息。。。。");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
生产者
package com.uin.work_queues;
import com.rabbitmq.client.Channel;
import com.uin.utils.RabbitMQUtils;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* @author wanglufei
* @description: TODO
* @date 2022/1/24/1:09 AM
*/
public class Producer_task01 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
/**
* 发送一个消息
* 1.发送到那个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());//以二进制传输
System.out.println("发送消息完成:" + message);
}
}
}
本文来自博客园,作者:{BearBrick0},转载请注明原文链接:{https://www.cnblogs.com/bearbrick}