热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

mqtt模式–Topic模式

Topic模式是生产者通过交换机将消息存储到队列后,交换机根据绑定队列的routingkey的值进行通配符匹配,如果匹配通过,消息将被存储到该队列,如果routingkey的值匹配

Topic 模式是生产者通过交换机将消息存储到队列后,交换机根据绑定队列的 routing key 的值进行通配符匹配,如果匹配通过,消息将被存储到该队列,如果 routing key 的值匹配到了多个队列,消息将会被发送到多个队列;如果一个队列也没匹配上,该消息将丢失。
routing_key 必须是单词列表,用点分隔,其中
* 和 # 的含义为:
*:1个单词
#:0个或多个单词

package com.tszr.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Productor {
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) {
// 1、创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(
"127.0.0.1");
factory.setUsername(
"guest");
factory.setPassword(
"guest");
factory.setVirtualHost(
"/");
Connection connection
= null;
Channel channel
= null;
try {
// 2、获取连接、通道
cOnnection= factory.newConnection();
channel
= connection.createChannel();
// 消息内容
String message = "hello topic mode";
// 指定路由key
String routeKey = "com.order.test.xxx";
String type
= "topic";
// 3、声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, type);
// 4、声明队列
channel.queueDeclare("queue5",false,false,false,null);
channel.queueDeclare(
"queue6",false,false,false,null);
// 5、绑定 channel 与 queue
channel.queueBind("queue5", EXCHANGE_NAME, "*.order.#");
channel.queueBind(
"queue6", EXCHANGE_NAME, "#.test.*");
// 6、发布消息
channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));
System.out.println(
"消息发送成功!");
}
catch (IOException | TimeoutException e) {
e.printStackTrace();
System.out.println(
"消息发送异常");
}
finally {
// 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
// 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}
}

package com.tszr.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Customer {
private static Runnable runnable = new Runnable() {
@Override
public void run() {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(
"127.0.0.1");
factory.setUsername(
"guest");
factory.setPassword(
"guest");
factory.setVirtualHost(
"/");
final String queueName = Thread.currentThread().getName();
Connection connection
= null;
Channel channel
= null;
try {
// 获取连接、通道
cOnnection= factory.newConnection();
channel
= connection.createChannel();
Channel finalChannel
= channel;
finalChannel.basicConsume(queueName,
true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
System.out.println(delivery.getEnvelope().getDeliveryTag());
System.out.println(queueName
+ ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
}
},
new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(queueName
+ ":开始接收消息");
}
catch (IOException |
TimeoutException e) {
e.printStackTrace();
}
finally {
try {
Thread.sleep(
3);
}
catch (InterruptedException e) {
e.printStackTrace();
}
// 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
// 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) throws IOException, TimeoutException {
// 创建线程分别从3个队列中获取消息
new Thread(runnable, "queue5").start();
new Thread(runnable, "queue6").start();
}
}

 

 

 



推荐阅读
  • 本文详细解析 Skynet 的启动流程,包括配置文件的读取、环境变量的设置、主要线程的启动(如 timer、socket、monitor 和 worker 线程),以及消息队列的实现机制。 ... [详细]
  • Activity跳转动画 无缝衔接
    Activity跳转动画 无缝衔接 ... [详细]
  • 深入理解线程池及其基本实现
    本文探讨了线程池的概念、优势及其在Java中的应用。通过实例分析不同类型的线程池,并指导如何构建一个简易的线程池。 ... [详细]
  • 成为一名高效的Java架构师不仅需要掌握高级Java编程技巧,还需深入理解JVM的工作原理及其优化方法。此外,对池技术(包括对象池、连接池和线程池)的应用、多线程处理、集合对象的内部机制、以及常用的数据结构和算法的精通也是必不可少的。同时,熟悉Linux操作系统、TCP/IP协议栈、HTTP协议等基础知识,对于构建高效稳定的系统同样重要。 ... [详细]
  • 本文探讨了在使用Apache Flink向Kafka发送数据过程中遇到的事务频繁失败问题,并提供了详细的解决方案,包括必要的配置调整和最佳实践。 ... [详细]
  • 本文探讨了在Windows 8系统中使用C#语言开发的小工具遇到的进程无法强制终止的问题,包括可能的原因及解决方案。 ... [详细]
  • 深入理解Java中的NIO、BIO与AIO
    本文详细解析了Java中三种重要的I/O模型:阻塞I/O(BIO)、非阻塞I/O(NIO)以及异步I/O(AIO),并探讨了它们在实际应用中的优缺点及适用场景。 ... [详细]
  • UVA 401 - 镜像回文字符串
    本题探讨了如何判断一个字符串是否为普通回文、镜像回文或两者都不是。通过特定的字符映射表来实现字符串的镜像转换,并根据转换后的结果进行分类。 ... [详细]
  • 本文提供了解决在尝试重置MySQL root用户密码时遇到连接失败问题的方法,包括停止MySQL服务、以安全模式启动MySQL、手动更新用户表中的密码等步骤。 ... [详细]
  • 了解如何有效清除远程桌面连接中的缓存记录,对于提升服务器安全性至关重要。本文将指导您完成这一过程。 ... [详细]
  • 探讨如何在 Propel 1.5 版本中,通过 Query 功能实现包含多个条件的连接查询,特别是针对 MySQL 数据库的操作。 ... [详细]
  • 本文探讨了如何利用数组来构建二叉树,并介绍了通过队列实现的二叉树层次遍历方法。通过具体的C++代码示例,详细说明了构建及打印二叉树的过程。 ... [详细]
  • 本文详细介绍了如何使用 Python 编程语言中的 Scapy 库执行 DNS 欺骗攻击,包括必要的软件安装、攻击流程及代码示例。 ... [详细]
  • RabbitMQ 核心组件解析
    本文详细介绍了RabbitMQ的核心概念,包括其基本原理、应用场景及关键组件,如消息、生产者、消费者、信道、交换机、路由键和虚拟主机等。 ... [详细]
  • 本文详细介绍了如何在PHP中使用Memcached进行数据缓存,包括服务器连接、数据操作、高级功能等。 ... [详细]
author-avatar
lubin
追逐那一点星光
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有