作者:lubin | 来源:互联网 | 2023-08-24 10:16
Topic模式是生产者通过交换机将消息存储到队列后,交换机根据绑定队列的routingkey的值进行通配符匹配,如果匹配通过,消息将被存储到该队列,如果routingkey的值匹配
Topic 模式是生产者通过交换机将消息存储到队列后,交换机根据绑定队列的 routing key 的值进行通配符匹配,如果匹配通过,消息将被存储到该队列,如果 routing key 的值匹配到了多个队列,消息将会被发送到多个队列;如果一个队列也没匹配上,该消息将丢失。
routing_key 必须是单词列表,用点分隔,其中 * 和 # 的含义为:
*:1个单词
#:0个或多个单词
package com.tszr.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Productor {
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) {
// 1、创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、获取连接、通道
cOnnection= factory.newConnection();
channel = connection.createChannel();
// 消息内容
String message = "hello topic mode";
// 指定路由key
String routeKey = "com.order.test.xxx";
String type = "topic";
// 3、声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, type);
// 4、声明队列
channel.queueDeclare("queue5",false,false,false,null);
channel.queueDeclare("queue6",false,false,false,null);
// 5、绑定 channel 与 queue
channel.queueBind("queue5", EXCHANGE_NAME, "*.order.#");
channel.queueBind("queue6", EXCHANGE_NAME, "#.test.*");
// 6、发布消息
channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));
System.out.println("消息发送成功!");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
System.out.println("消息发送异常");
} finally {
// 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
package com.tszr.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Customer {
private static Runnable runnable = new Runnable() {
@Override
public void run() {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
// 获取连接、通道
cOnnection= factory.newConnection();
channel = connection.createChannel();
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
System.out.println(delivery.getEnvelope().getDeliveryTag());
System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(queueName + ":开始接收消息");
} catch (IOException |
TimeoutException e) {
e.printStackTrace();
} finally {
try {
Thread.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) throws IOException, TimeoutException {
// 创建线程分别从3个队列中获取消息
new Thread(runnable, "queue5").start();
new Thread(runnable, "queue6").start();
}
}