热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:发布订阅模式

篇首语:本文由编程笔记#小编为大家整理,主要介绍了发布订阅模式相关的知识,希望对你有一定的参考价值。

篇首语:本文由编程笔记#小编为大家整理,主要介绍了发布订阅模式相关的知识,希望对你有一定的参考价值。




  • Publish/Subscribe(发布订阅模式)

    发布订阅模式其实就是生产者将数据发送到交换机,交换机将所有的消息发送到每个绑定的队列中,因此 在发布消息时可以只先指定交换机的名称,交换机的声明的代码可以放到消费者端进行声明,队列的声明也放在消费者端来声明



    • Exchange类型-fanout fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中

      技术图片

      在使用发布订阅模式的时候,我们只需要声明该队列为fanout即可。如下:

      channel.exchangeDeclare("logs", "fanout");


    • Temporary Queues(临时队列)


    • 下面代码是两个消费者和一个生产者实现发布订阅模式

      生产者代码:

      public class PSEmitLog {
      private static final String EXCHANGE_NAME = "logs";
      public static void main(String[] args) throws Exception {
      //获取连接
      Connection cOnnection= ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
      Channel channel = connection.createChannel();
      //声明交换机,发布订阅模式
      channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
      //发送消息
      for (int i = 0; i <10; i++) {
      String message = " message" + i;
      System.out.println("[send]:" + message);
      //发送消息
      channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
      }
      channel.close();
      connection.close();
      }
      }

      消费者代码:

      public class PSEmitLog {
      private static final String EXCHANGE_NAME = "logs";
      public static void main(String[] args) throws Exception {
      //获取连接
      Connection cOnnection= ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
      Channel channel = connection.createChannel();
      //声明交换机,发布订阅模式
      channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
      //发送消息
      for (int i = 0; i <10; i++) {
      String message = " message" + i;
      System.out.println("[send]:" + message);
      //发送消息
      channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
      }
      channel.close();
      connection.close();
      }
      }

      public class PSReceiveLogs2 {
      private static final String Exchange_name = "logs";
      public static void main(String[] argv) throws Exception {
      //获取连接
      Connection cOnnection= ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
      Channel channel = connection.createChannel();
      channel.exchangeDeclare(Exchange_name, "fanout");
      //随机定义一个队列名称,也可以自己定义一个队列名称
      String queueName = channel.queueDeclare().getQueue();
      //绑定队列
      channel.queueBind(queueName, Exchange_name, "");
      DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
      String message = new String(delivery.getBody(), "UTF-8");
      System.out.println(" [x] Received &#39;" + message + "&#39;");
      });
      channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
      });
      }
      }

      运行的时候需要首先运行消费者代码,不然没有队列,交换机不知道把消息投递到那些队列中。


    ? 也可以声明一个非临时队列,只需在绑定队列前面加上如下代码:

    //随机定义一个队列名称,也可以自己定义一个队列名称
    // String queueName = channel.queueDeclare().getQueue();
    String queueName="A";
    //声明队列
    channel.queueDeclare(queueName,false,false,false,null);

    其实发布订阅模式就是将消息发送到不同的队列中,由消费者选择不同的队列进行消费即可。


  • Springboot实现发布订阅

    @SpringBootApplication
    @EnableScheduling
    public class RabbitAmqpTutorialsApplication {
    public static void main(String[] args) throws Exception {
    SpringApplication.run(RabbitAmqpTutorialsApplication.class, args);
    }
    }

    @Configuration
    public class Tut3Config {
    /**
    * 交换器
    * @return
    */
    @Bean
    public FanoutExchange fanout() {
    return new FanoutExchange("tut.fanout");
    }
    //@Profile("receiver")
    private static class ReceiverConfig {
    /**
    * 队列1
    * @return
    */
    @Bean
    public Queue autoDeleteQueue1() {
    return new AnonymousQueue();
    }
    /**
    * 队列2
    * @return
    */
    @Bean
    public Queue autoDeleteQueue2() {
    return new AnonymousQueue();
    }
    /**
    * 绑定队列
    * @param fanout
    * @param autoDeleteQueue1
    * @return
    */
    @Bean
    public Binding binding1(FanoutExchange fanout,
    Queue autoDeleteQueue1) {
    return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
    }
    @Bean
    public Binding binding2(FanoutExchange fanout,
    Queue autoDeleteQueue2) {
    return BindingBuilder.bind(autoDeleteQueue2).to(fanout);
    }
    @Bean
    public Tut3Receiver receiver() {
    return new Tut3Receiver();
    }
    }
    //@Profile("sender")
    @Bean
    public Tut3Sender sender() {
    return new Tut3Sender();
    }
    }

    public class Tut3Receiver {
    /**
    * 监听队列
    * @param in
    * @throws InterruptedException
    */
    @RabbitListener(queues = "#{autoDeleteQueue1.name}")
    public void receive1(String in) throws InterruptedException {
    receive(in, 1);
    }
    @RabbitListener(queues = "#{autoDeleteQueue2.name}")
    public void receive2(String in) throws InterruptedException {
    receive(in, 2);
    }
    public void receive(String in, int receiver) throws InterruptedException {
    StopWatch watch = new StopWatch();
    watch.start();
    System.out.println("instance " + receiver + " [x] Received &#39;" + in + "&#39;");
    doWork(in);
    watch.stop();
    System.out.println("instance " + receiver + " [x] Done in "
    + watch.getTotalTimeSeconds() + "s");
    }
    private void doWork(String in) throws InterruptedException {
    for (char ch : in.toCharArray()) {
    if (ch == &#39;.&#39;) {
    Thread.sleep(1000);
    }
    }
    }
    }

    public class Tut3Sender {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private FanoutExchange fanout;
    AtomicInteger dots = new AtomicInteger(0);
    AtomicInteger count = new AtomicInteger(0);
    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
    StringBuilder builder = new StringBuilder("Hello");
    if (dots.getAndIncrement() == 3) {
    dots.set(1);
    }
    for (int i = 0; i builder.append(&#39;.&#39;);
    }
    builder.append(count.incrementAndGet());
    String message = builder.toString();
    template.convertAndSend(fanout.getName(), "", message);
    System.out.println(" [x] Sent &#39;" + message + "&#39;");
    }
    }

    相关代码链接: https://github.com/albert-liu435/springmq



推荐阅读
  • JVM 学习总结(三)——对象存活判定算法的两种实现
    本文介绍了垃圾收集器在回收堆内存前确定对象存活的两种算法:引用计数算法和可达性分析算法。引用计数算法通过计数器判定对象是否存活,虽然简单高效,但无法解决循环引用的问题;可达性分析算法通过判断对象是否可达来确定存活对象,是主流的Java虚拟机内存管理算法。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • 在springmvc框架中,前台ajax调用方法,对图片批量下载,如何弹出提示保存位置选框?Controller方法 ... [详细]
  • 李逍遥寻找仙药的迷阵之旅
    本文讲述了少年李逍遥为了救治婶婶的病情,前往仙灵岛寻找仙药的故事。他需要穿越一个由M×N个方格组成的迷阵,有些方格内有怪物,有些方格是安全的。李逍遥需要避开有怪物的方格,并经过最少的方格,找到仙药。在寻找的过程中,他还会遇到神秘人物。本文提供了一个迷阵样例及李逍遥找到仙药的路线。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • 本文介绍了在Android开发中使用软引用和弱引用的应用。如果一个对象只具有软引用,那么只有在内存不够的情况下才会被回收,可以用来实现内存敏感的高速缓存;而如果一个对象只具有弱引用,不管内存是否足够,都会被垃圾回收器回收。软引用和弱引用还可以与引用队列联合使用,当被引用的对象被回收时,会将引用加入到关联的引用队列中。软引用和弱引用的根本区别在于生命周期的长短,弱引用的对象可能随时被回收,而软引用的对象只有在内存不够时才会被回收。 ... [详细]
  • MySQL数据库锁机制及其应用(数据库锁的概念)
    本文介绍了MySQL数据库锁机制及其应用。数据库锁是计算机协调多个进程或线程并发访问某一资源的机制,在数据库中,数据是一种供许多用户共享的资源,如何保证数据并发访问的一致性和有效性是数据库必须解决的问题。MySQL的锁机制相对简单,不同的存储引擎支持不同的锁机制,主要包括表级锁、行级锁和页面锁。本文详细介绍了MySQL表级锁的锁模式和特点,以及行级锁和页面锁的特点和应用场景。同时还讨论了锁冲突对数据库并发访问性能的影响。 ... [详细]
  • 本文讨论了微软的STL容器类是否线程安全。根据MSDN的回答,STL容器类包括vector、deque、list、queue、stack、priority_queue、valarray、map、hash_map、multimap、hash_multimap、set、hash_set、multiset、hash_multiset、basic_string和bitset。对于单个对象来说,多个线程同时读取是安全的。但如果一个线程正在写入一个对象,那么所有的读写操作都需要进行同步。 ... [详细]
  • 本文介绍了SPOJ2829题目的解法及优化方法。题目要求找出满足一定条件的数列,并对结果取模。文章详细解释了解题思路和算法实现,并提出了使用FMT优化的方法。最后,对于第三个限制条件,作者给出了处理方法。文章最后给出了代码实现。 ... [详细]
author-avatar
国王的驴耳朵要吐槽
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有