下面代码是两个消费者和一个生产者实现发布订阅模式
生产者代码:
public class PSEmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
//获取连接
Connection cOnnection= ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
Channel channel = connection.createChannel();
//声明交换机,发布订阅模式
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//发送消息
for (int i = 0; i <10; i++) {
String message = " message" + i;
System.out.println("[send]:" + message);
//发送消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
}
channel.close();
connection.close();
}
}
消费者代码:
public class PSEmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
//获取连接
Connection cOnnection= ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
Channel channel = connection.createChannel();
//声明交换机,发布订阅模式
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//发送消息
for (int i = 0; i <10; i++) {
String message = " message" + i;
System.out.println("[send]:" + message);
//发送消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
}
channel.close();
connection.close();
}
}
public class PSReceiveLogs2 {
private static final String Exchange_name = "logs";
public static void main(String[] argv) throws Exception {
//获取连接
Connection cOnnection= ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
Channel channel = connection.createChannel();
channel.exchangeDeclare(Exchange_name, "fanout");
//随机定义一个队列名称,也可以自己定义一个队列名称
String queueName = channel.queueDeclare().getQueue();
//绑定队列
channel.queueBind(queueName, Exchange_name, "");
DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received &#39;" + message + "&#39;");
});
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
运行的时候需要首先运行消费者代码,不然没有队列,交换机不知道把消息投递到那些队列中。