热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

mqtt模式–Topic模式

Topic模式是生产者通过交换机将消息存储到队列后,交换机根据绑定队列的routingkey的值进行通配符匹配,如果匹配通过,消息将被存储到该队列,如果routingkey的值匹配

Topic 模式是生产者通过交换机将消息存储到队列后,交换机根据绑定队列的 routing key 的值进行通配符匹配,如果匹配通过,消息将被存储到该队列,如果 routing key 的值匹配到了多个队列,消息将会被发送到多个队列;如果一个队列也没匹配上,该消息将丢失。
routing_key 必须是单词列表,用点分隔,其中
* 和 # 的含义为:
*:1个单词
#:0个或多个单词

package com.tszr.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Productor {
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) {
// 1、创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(
"127.0.0.1");
factory.setUsername(
"guest");
factory.setPassword(
"guest");
factory.setVirtualHost(
"/");
Connection connection
= null;
Channel channel
= null;
try {
// 2、获取连接、通道
cOnnection= factory.newConnection();
channel
= connection.createChannel();
// 消息内容
String message = "hello topic mode";
// 指定路由key
String routeKey = "com.order.test.xxx";
String type
= "topic";
// 3、声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, type);
// 4、声明队列
channel.queueDeclare("queue5",false,false,false,null);
channel.queueDeclare(
"queue6",false,false,false,null);
// 5、绑定 channel 与 queue
channel.queueBind("queue5", EXCHANGE_NAME, "*.order.#");
channel.queueBind(
"queue6", EXCHANGE_NAME, "#.test.*");
// 6、发布消息
channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));
System.out.println(
"消息发送成功!");
}
catch (IOException | TimeoutException e) {
e.printStackTrace();
System.out.println(
"消息发送异常");
}
finally {
// 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
// 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}
}

package com.tszr.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Customer {
private static Runnable runnable = new Runnable() {
@Override
public void run() {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(
"127.0.0.1");
factory.setUsername(
"guest");
factory.setPassword(
"guest");
factory.setVirtualHost(
"/");
final String queueName = Thread.currentThread().getName();
Connection connection
= null;
Channel channel
= null;
try {
// 获取连接、通道
cOnnection= factory.newConnection();
channel
= connection.createChannel();
Channel finalChannel
= channel;
finalChannel.basicConsume(queueName,
true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
System.out.println(delivery.getEnvelope().getDeliveryTag());
System.out.println(queueName
+ ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
}
},
new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(queueName
+ ":开始接收消息");
}
catch (IOException |
TimeoutException e) {
e.printStackTrace();
}
finally {
try {
Thread.sleep(
3);
}
catch (InterruptedException e) {
e.printStackTrace();
}
// 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
// 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) throws IOException, TimeoutException {
// 创建线程分别从3个队列中获取消息
new Thread(runnable, "queue5").start();
new Thread(runnable, "queue6").start();
}
}

 

 

 



推荐阅读
  • 消息中间件RabbitMQ 高级特性之消费端ACK与重回队列
    什么是消费端的ACK和重回队列?消费端的手工ACK和NACK消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿如果由于服务器宕机等严重问题 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • C# 7.0 新特性:基于Tuple的“多”返回值方法
    本文介绍了C# 7.0中基于Tuple的“多”返回值方法的使用。通过对C# 6.0及更早版本的做法进行回顾,提出了问题:如何使一个方法可返回多个返回值。然后详细介绍了C# 7.0中使用Tuple的写法,并给出了示例代码。最后,总结了该新特性的优点。 ... [详细]
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 李逍遥寻找仙药的迷阵之旅
    本文讲述了少年李逍遥为了救治婶婶的病情,前往仙灵岛寻找仙药的故事。他需要穿越一个由M×N个方格组成的迷阵,有些方格内有怪物,有些方格是安全的。李逍遥需要避开有怪物的方格,并经过最少的方格,找到仙药。在寻找的过程中,他还会遇到神秘人物。本文提供了一个迷阵样例及李逍遥找到仙药的路线。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • 本文讨论了微软的STL容器类是否线程安全。根据MSDN的回答,STL容器类包括vector、deque、list、queue、stack、priority_queue、valarray、map、hash_map、multimap、hash_multimap、set、hash_set、multiset、hash_multiset、basic_string和bitset。对于单个对象来说,多个线程同时读取是安全的。但如果一个线程正在写入一个对象,那么所有的读写操作都需要进行同步。 ... [详细]
  • 代理模式的详细介绍及应用场景
    代理模式是一种在软件开发中常用的设计模式,通过在客户端和目标对象之间增加一层中间层,让代理对象代替目标对象进行访问,从而简化系统的复杂性。代理模式可以根据不同的使用目的分为远程代理、虚拟代理、Copy-on-Write代理、保护代理、防火墙代理、智能引用代理和Cache代理等几种。本文将详细介绍代理模式的原理和应用场景。 ... [详细]
  • 设计模式——模板方法模式的应用和优缺点
    本文介绍了设计模式中的模板方法模式,包括其定义、应用、优点、缺点和使用场景。模板方法模式是一种基于继承的代码复用技术,通过将复杂流程的实现步骤封装在基本方法中,并在抽象父类中定义模板方法的执行次序,子类可以覆盖某些步骤,实现相同的算法框架的不同功能。该模式在软件开发中具有广泛的应用价值。 ... [详细]
  • linux进阶50——无锁CAS
    1.概念比较并交换(compareandswap,CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作࿰ ... [详细]
  • 第七课主要内容:多进程多线程FIFO,LIFO,优先队列线程局部变量进程与线程的选择线程池异步IO概念及twisted案例股票数据抓取 ... [详细]
  • C++ STL复习(13)容器适配器
    STL提供了3种容器适配器,分别为stack栈适配器、queue队列适配器以及priority_queue优先权队列适配器。不同场景下,由于不同的序列式 ... [详细]
author-avatar
lubin
追逐那一点星光
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有