热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:发布订阅模式

篇首语:本文由编程笔记#小编为大家整理,主要介绍了发布订阅模式相关的知识,希望对你有一定的参考价值。

篇首语:本文由编程笔记#小编为大家整理,主要介绍了发布订阅模式相关的知识,希望对你有一定的参考价值。




  • Publish/Subscribe(发布订阅模式)

    发布订阅模式其实就是生产者将数据发送到交换机,交换机将所有的消息发送到每个绑定的队列中,因此 在发布消息时可以只先指定交换机的名称,交换机的声明的代码可以放到消费者端进行声明,队列的声明也放在消费者端来声明



    • Exchange类型-fanout fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中

      技术图片

      在使用发布订阅模式的时候,我们只需要声明该队列为fanout即可。如下:

      channel.exchangeDeclare("logs", "fanout");


    • Temporary Queues(临时队列)


    • 下面代码是两个消费者和一个生产者实现发布订阅模式

      生产者代码:

      public class PSEmitLog {
      private static final String EXCHANGE_NAME = "logs";
      public static void main(String[] args) throws Exception {
      //获取连接
      Connection cOnnection= ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
      Channel channel = connection.createChannel();
      //声明交换机,发布订阅模式
      channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
      //发送消息
      for (int i = 0; i <10; i++) {
      String message = " message" + i;
      System.out.println("[send]:" + message);
      //发送消息
      channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
      }
      channel.close();
      connection.close();
      }
      }

      消费者代码:

      public class PSEmitLog {
      private static final String EXCHANGE_NAME = "logs";
      public static void main(String[] args) throws Exception {
      //获取连接
      Connection cOnnection= ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
      Channel channel = connection.createChannel();
      //声明交换机,发布订阅模式
      channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
      //发送消息
      for (int i = 0; i <10; i++) {
      String message = " message" + i;
      System.out.println("[send]:" + message);
      //发送消息
      channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
      }
      channel.close();
      connection.close();
      }
      }

      public class PSReceiveLogs2 {
      private static final String Exchange_name = "logs";
      public static void main(String[] argv) throws Exception {
      //获取连接
      Connection cOnnection= ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
      Channel channel = connection.createChannel();
      channel.exchangeDeclare(Exchange_name, "fanout");
      //随机定义一个队列名称,也可以自己定义一个队列名称
      String queueName = channel.queueDeclare().getQueue();
      //绑定队列
      channel.queueBind(queueName, Exchange_name, "");
      DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
      String message = new String(delivery.getBody(), "UTF-8");
      System.out.println(" [x] Received &#39;" + message + "&#39;");
      });
      channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
      });
      }
      }

      运行的时候需要首先运行消费者代码,不然没有队列,交换机不知道把消息投递到那些队列中。


    ? 也可以声明一个非临时队列,只需在绑定队列前面加上如下代码:

    //随机定义一个队列名称,也可以自己定义一个队列名称
    // String queueName = channel.queueDeclare().getQueue();
    String queueName="A";
    //声明队列
    channel.queueDeclare(queueName,false,false,false,null);

    其实发布订阅模式就是将消息发送到不同的队列中,由消费者选择不同的队列进行消费即可。


  • Springboot实现发布订阅

    @SpringBootApplication
    @EnableScheduling
    public class RabbitAmqpTutorialsApplication {
    public static void main(String[] args) throws Exception {
    SpringApplication.run(RabbitAmqpTutorialsApplication.class, args);
    }
    }

    @Configuration
    public class Tut3Config {
    /**
    * 交换器
    * @return
    */
    @Bean
    public FanoutExchange fanout() {
    return new FanoutExchange("tut.fanout");
    }
    //@Profile("receiver")
    private static class ReceiverConfig {
    /**
    * 队列1
    * @return
    */
    @Bean
    public Queue autoDeleteQueue1() {
    return new AnonymousQueue();
    }
    /**
    * 队列2
    * @return
    */
    @Bean
    public Queue autoDeleteQueue2() {
    return new AnonymousQueue();
    }
    /**
    * 绑定队列
    * @param fanout
    * @param autoDeleteQueue1
    * @return
    */
    @Bean
    public Binding binding1(FanoutExchange fanout,
    Queue autoDeleteQueue1) {
    return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
    }
    @Bean
    public Binding binding2(FanoutExchange fanout,
    Queue autoDeleteQueue2) {
    return BindingBuilder.bind(autoDeleteQueue2).to(fanout);
    }
    @Bean
    public Tut3Receiver receiver() {
    return new Tut3Receiver();
    }
    }
    //@Profile("sender")
    @Bean
    public Tut3Sender sender() {
    return new Tut3Sender();
    }
    }

    public class Tut3Receiver {
    /**
    * 监听队列
    * @param in
    * @throws InterruptedException
    */
    @RabbitListener(queues = "#{autoDeleteQueue1.name}")
    public void receive1(String in) throws InterruptedException {
    receive(in, 1);
    }
    @RabbitListener(queues = "#{autoDeleteQueue2.name}")
    public void receive2(String in) throws InterruptedException {
    receive(in, 2);
    }
    public void receive(String in, int receiver) throws InterruptedException {
    StopWatch watch = new StopWatch();
    watch.start();
    System.out.println("instance " + receiver + " [x] Received &#39;" + in + "&#39;");
    doWork(in);
    watch.stop();
    System.out.println("instance " + receiver + " [x] Done in "
    + watch.getTotalTimeSeconds() + "s");
    }
    private void doWork(String in) throws InterruptedException {
    for (char ch : in.toCharArray()) {
    if (ch == &#39;.&#39;) {
    Thread.sleep(1000);
    }
    }
    }
    }

    public class Tut3Sender {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private FanoutExchange fanout;
    AtomicInteger dots = new AtomicInteger(0);
    AtomicInteger count = new AtomicInteger(0);
    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
    StringBuilder builder = new StringBuilder("Hello");
    if (dots.getAndIncrement() == 3) {
    dots.set(1);
    }
    for (int i = 0; i builder.append(&#39;.&#39;);
    }
    builder.append(count.incrementAndGet());
    String message = builder.toString();
    template.convertAndSend(fanout.getName(), "", message);
    System.out.println(" [x] Sent &#39;" + message + "&#39;");
    }
    }

    相关代码链接: https://github.com/albert-liu435/springmq



推荐阅读
  • 零拷贝技术是提高I/O性能的重要手段,常用于Java NIO、Netty、Kafka等框架中。本文将详细解析零拷贝技术的原理及其应用。 ... [详细]
  • Java高并发与多线程(二):线程的实现方式详解
    本文将深入探讨Java中线程的三种主要实现方式,包括继承Thread类、实现Runnable接口和实现Callable接口,并分析它们之间的异同及其应用场景。 ... [详细]
  • 本文是Java并发编程系列的开篇之作,将详细解析Java 1.5及以上版本中提供的并发工具。文章假设读者已经具备同步和易失性关键字的基本知识,重点介绍信号量机制的内部工作原理及其在实际开发中的应用。 ... [详细]
  • Python多线程编程技巧与实战应用详解 ... [详细]
  • 使用Maven JAR插件将单个或多个文件及其依赖项合并为一个可引用的JAR包
    本文介绍了如何利用Maven中的maven-assembly-plugin插件将单个或多个Java文件及其依赖项打包成一个可引用的JAR文件。首先,需要创建一个新的Maven项目,并将待打包的Java文件复制到该项目中。通过配置maven-assembly-plugin,可以实现将所有文件及其依赖项合并为一个独立的JAR包,方便在其他项目中引用和使用。此外,该方法还支持自定义装配描述符,以满足不同场景下的需求。 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ... [详细]
  • 本文介绍了 .NET 中用于线程间通信的工具 WaitHandle 及其子类 ManualResetEvent 和 AutoResetEvent,并详细解释了线程池的概念及其在优化资源利用方面的优势。 ... [详细]
  • 本文总结了一些开发中常见的问题及其解决方案,包括特性过滤器的使用、NuGet程序集版本冲突、线程存储、溢出检查、ThreadPool的最大线程数设置、Redis使用中的问题以及Task.Result和Task.GetAwaiter().GetResult()的区别。 ... [详细]
  • 大类|电阻器_使用Requests、Etree、BeautifulSoup、Pandas和Path库进行数据抓取与处理 | 将指定区域内容保存为HTML和Excel格式
    大类|电阻器_使用Requests、Etree、BeautifulSoup、Pandas和Path库进行数据抓取与处理 | 将指定区域内容保存为HTML和Excel格式 ... [详细]
  • 属性类 `Properties` 是 `Hashtable` 类的子类,用于存储键值对形式的数据。该类在 Java 中广泛应用于配置文件的读取与写入,支持字符串类型的键和值。通过 `Properties` 类,开发者可以方便地进行配置信息的管理,确保应用程序的灵活性和可维护性。此外,`Properties` 类还提供了加载和保存属性文件的方法,使其在实际开发中具有较高的实用价值。 ... [详细]
  • 深入解析 Synchronized 锁的升级机制及其在并发编程中的应用
    深入解析 Synchronized 锁的升级机制及其在并发编程中的应用 ... [详细]
  • Python 序列图分割与可视化编程入门教程
    本文介绍了如何使用 Python 进行序列图的快速分割与可视化。通过一个实际案例,详细展示了从需求分析到代码实现的全过程。具体包括如何读取序列图数据、应用分割算法以及利用可视化库生成直观的图表,帮助非编程背景的用户也能轻松上手。 ... [详细]
  • Python 程序转换为 EXE 文件:详细解析 .py 脚本打包成独立可执行文件的方法与技巧
    在开发了几个简单的爬虫 Python 程序后,我决定将其封装成独立的可执行文件以便于分发和使用。为了实现这一目标,首先需要解决的是如何将 Python 脚本转换为 EXE 文件。在这个过程中,我选择了 Qt 作为 GUI 框架,因为之前对此并不熟悉,希望通过这个项目进一步学习和掌握 Qt 的基本用法。本文将详细介绍从 .py 脚本到 EXE 文件的整个过程,包括所需工具、具体步骤以及常见问题的解决方案。 ... [详细]
  • 在Java项目中,当两个文件进行互相调用时出现了函数错误。具体问题出现在 `MainFrame.java` 文件中,该文件位于 `cn.javass.bookmgr` 包下,并且导入了 `java.awt.BorderLayout` 和 `java.awt.Event` 等相关类。为了确保项目的正常运行,请求提供专业的解决方案,以解决函数调用中的错误。建议从类路径、依赖关系和方法签名等方面入手,进行全面排查和调试。 ... [详细]
author-avatar
国王的驴耳朵要吐槽
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有