热门标签 | HotTags
当前位置:  开发笔记 > 后端 > 正文

【微服务专题之】.Net6下集成消息队列RabbitMQ交换机模式代码演示(全)

微信公众号:趣编程ACE关注可了解更多的.NET日常实战开发技巧,如需源码请公众号后台留言 源码;[如果觉得本公众号对您有帮助,欢迎关注].Net中RabbitMQ中交换机模式的使

微信公众号:趣编程ACE
关注可了解更多的.NET日常实战开发技巧,如需源码 请公众号后台留言 源码;
[如果觉得本公众号对您有帮助,欢迎关注]



.Net中RabbitMQ中交换机模式的使用


前文回顾

关注上文公众号即可

 

 

 


TopicExchange 交换机模式

如果我们需要将一条信息发送到多个队列上,假若利用直连模式,那么就会有很多的路由,而TopicExchange只需要配置定义好的路由规则,即可省略多余的路由指定。
PS:路由规则有一定的约束,比如需要采取* .#. * 的格式,用.号 分隔
其中



  • 1.*表示一个单词



  • 2. #表示任意数量(零个或多个)单词。




消费者程序演示

public static class TopicExchangeReceive
    {
        public static void Receive(IModel channel)
        {
            channel.ExchangeDeclare("hello-topic-exchange", ExchangeType.Topic);
            channel.QueueDeclare(queue: "hello-topic-queue",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);
            channel.QueueBind("hello-topic-queue", "hello-topic-exchange", "route.*");
            //channel.QueueBind("hello", "hello-direct-exchange", "route2");
            // 创建一个消费者基本事件
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);
            };
            channel.BasicConsume(queue: "hello-topic-queue",
                                 // 自动确认
                                 autoAck: true,
                                 consumer: consumer);
            //channel.BasicConsume(queue: "hello",
            //                     // 自动确认
            //                     autoAck: true,
            //                     consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

生产者程序演示

public static class TopicExchangeSend
    {
        public static void Send(IModel channel)
        {
            channel.ExchangeDeclare("hello-topic-exchange", ExchangeType.Topic);
            var count = 0;
            while (true)
            {
                Thread.Sleep(1000);
                // 发送的消息
                string message = $"Hello World {count}";
                var body = Encoding.UTF8.GetBytes(message);
                //var body2 = Encoding.UTF8.GetBytes(message + "body2");
                // 基本发布 不指定交换
                channel.BasicPublish(exchange: "hello-topic-exchange",
                                     // 路由键   就是队列名称
                                     routingKey: "route.2",
                                     // 基础属性
                                     basicProperties: null,
                                     // 传递的消息体
                                     body: body);
                //channel.BasicPublish(exchange: "hello-direct-exchange",
                //                     // 路由键   就是队列名称
                //                     routingKey: "route2",
                //                     // 基础属性
                //                     basicProperties: null,
                //                     // 传递的消息体
                //                     body: body2);
                count++;
                Console.WriteLine(" [x] sent {0}", message);
            }
        }
    }

效果展示

图片




Headers Exchange 模式

其实这种模式类似与Http请求里面的headers,我们定义一个字典然后通过交换机携带,作为交换机与路由器交换媒介,相比于Topic Exchange交换模式,header定义的格式类型更加丰富。


消费者程序演示

public static class HeadersExchangeReceive
    {
        public static void Receive(IModel channel)
        {
            channel.ExchangeDeclare("hello-headers-exchange", ExchangeType.Headers);
            channel.QueueDeclare(queue: "hello-headers-queue",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);
            var headers = new Dictionary()
            {
                {"test","A" }
            };
            channel.QueueBind("hello-headers-queue", "hello-headers-exchange", String.Empty, headers);
            //channel.QueueBind("hello", "hello-direct-exchange", "route2");
            // 创建一个消费者基本事件
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);
            };
            channel.BasicConsume(queue: "hello-headers-queue",
                                 // 自动确认
                                 autoAck: true,
                                 consumer: consumer);
            //channel.BasicConsume(queue: "hello",
            //                     // 自动确认
            //                     autoAck: true,
            //                     consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

生产者程序演示

public static void Send(IModel channel)
        {
            channel.ExchangeDeclare("hello-headers-exchange", ExchangeType.Headers);
            var count = 0;
            while (true)
            {
                Thread.Sleep(1000);
                // 发送的消息
                string message = $"Hello World {count}";
                var body = Encoding.UTF8.GetBytes(message);
                //var body2 = Encoding.UTF8.GetBytes(message + "body2");
                var basicProperties = channel.CreateBasicProperties();
                basicProperties.Headers = new Dictionary()
                {
                    {"test" ,"A"}
                };
                // 基本发布 不指定交换
                channel.BasicPublish(exchange: "hello-headers-exchange",
                                     // 路由键   就是队列名称
                                     routingKey: String.Empty,
                                     // 基础属性
                                     basicProperties,
                                     // 传递的消息体
                                     body: body);
                //channel.BasicPublish(exchange: "hello-direct-exchange",
                //                     // 路由键   就是队列名称
                //                     routingKey: "route2",
                //                     // 基础属性
                //                     basicProperties: null,
                //                     // 传递的消息体
                //                     body: body2);
                count++;
                Console.WriteLine(" [x] sent {0}", message);
            }
        }

效果演示

 

图片




FanoutExchange 模式

扇形交换机模式是最傻瓜的一种交换模式,总的而言只要将队列绑定到交换机上,就能做到消息互通了。当然了这种机制处理事件的速度也是所有交换机类型里面最快的。


消费者程序演示

public static class FanoutExchangeReceive
    {
        public static void Receive(IModel channel)
        {
            channel.ExchangeDeclare("hello-fanout-exchange", ExchangeType.Fanout);
            channel.QueueDeclare(queue: "hello-fanout-queue",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);
            //var headers = new Dictionary()
            //{
            //    {"test","A" }
            //};
            channel.QueueBind("hello-fanout-queue", "hello-fanout-exchange", String.Empty);
            //channel.QueueBind("hello", "hello-direct-exchange", "route2");
            // 创建一个消费者基本事件
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);
            };
            channel.BasicConsume(queue: "hello-fanout-queue",
                                 // 自动确认
                                 autoAck: true,
                                 consumer: consumer);
            //channel.BasicConsume(queue: "hello",
            //                     // 自动确认
            //                     autoAck: true,
            //                     consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }

生产者程序演示

public static class FanoutExchangeSend
    {
        public static void Send(IModel channel)
        {
            channel.ExchangeDeclare("hello-fanout-exchange", ExchangeType.Fanout);
            var count = 0;
            while (true)
            {
                Thread.Sleep(1000);
                // 发送的消息
                string message = $"Hello World {count}";
                var body = Encoding.UTF8.GetBytes(message);
                //var body2 = Encoding.UTF8.GetBytes(message + "body2");
                //var basicProperties = channel.CreateBasicProperties();
                //basicProperties.Headers = new Dictionary()
                //{
                //    {"test" ,"A"}
                //};
                // 基本发布 不指定交换
                channel.BasicPublish(exchange: "hello-fanout-exchange",
                                     // 路由键   就是队列名称
                                     routingKey: String.Empty,
                                     // 基础属性
                                     null,
                                     // 传递的消息体
                                     body: body);
                //channel.BasicPublish(exchange: "hello-direct-exchange",
                //                     // 路由键   就是队列名称
                //                     routingKey: "route2",
                //                     // 基础属性
                //                     basicProperties: null,
                //                     // 传递的消息体
                //                     body: body2);
                count++;
                Console.WriteLine(" [x] sent {0}", message);
            }
        }
    }

效果演示

图片



PS:具体的代码效果演示看视频哦~



推荐阅读
  • 2019年后蚂蚁集团与拼多多面试经验详述与深度剖析
    2019年后蚂蚁集团与拼多多面试经验详述与深度剖析 ... [详细]
  • 修复一个 Bug 竟耗时两天?真的有那么复杂吗?
    修复一个 Bug 竟然耗费了两天时间?这背后究竟隐藏着怎样的复杂性?本文将深入探讨这个看似简单的 Bug 为何会如此棘手,从代码层面剖析问题根源,并分享解决过程中遇到的技术挑战和心得。 ... [详细]
  • 在RabbitMQ中,消息发布者默认情况下不会接收到关于消息在Broker中状态的反馈,这可能导致消息丢失的问题。为了确保消息的可靠传输与投递,可以采用确认机制(如发布确认和事务模式)来验证消息是否成功抵达Broker,并采取相应的重试策略以提高系统的可靠性。此外,还可以配置消息持久化和镜像队列等高级功能,进一步增强消息的可靠性和高可用性。 ... [详细]
  • Spring cloud微服务架构前后端分离博客系统,Vue+boot源码分享 ... [详细]
  • 西北工业大学作为陕西省三所985和211高校之一,虽然在农业和林业领域不如某些顶尖院校,但在航空航天领域的实力尤为突出。该校的计算机科学专业在科研和教学方面也具有显著优势,是考研的理想选择。 ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • ZeroMQ在云计算环境下的高效消息传递库第四章学习心得
    本章节深入探讨了ZeroMQ在云计算环境中的高效消息传递机制,涵盖客户端请求-响应模式、最近最少使用(LRU)队列、心跳检测、面向服务的队列、基于磁盘的离线队列以及主从备份服务等关键技术。此外,还介绍了无中间件的请求-响应架构,强调了这些技术在提升系统性能和可靠性方面的应用价值。个人理解方面,ZeroMQ通过这些机制有效解决了分布式系统中常见的通信延迟和数据一致性问题。 ... [详细]
  • 在 CentOS 7 上部署和配置 RabbitMQ 消息队列系统时,首先需要安装 Erlang,因为 RabbitMQ 是基于 Erlang 语言开发的。具体步骤包括:安装必要的依赖项,下载 Erlang 源码包(可能需要一些时间,请耐心等待),解压源码包,解决可能出现的错误,验证安装是否成功,并将 Erlang 添加到环境变量中。接下来,下载 RabbitMQ 的 tar.xz 压缩包,并进行解压和安装。确保每一步都按顺序执行,以保证系统的稳定性和可靠性。 ... [详细]
  • 顶尖编程语言,无可匹敌的选择
    我常常在想,一个人具备怎样的素质和能力,才称得上高级工程师?估计有不少人会说,“基础过硬、熟练掌握一门编程语言、至少看过一个 ... [详细]
  • 一文了解消息中间件RabbitMQ
    消息中间件---RabbitMQ1消息中间件的作用2.常用的消息中间件3消息中间件RabbitMQ3.1RabbitMQ介绍3.3RabbitMQ的队列模式3.3RabbitMQ的 ... [详细]
  • .Net下RabbitMQ发布订阅模式实践
    一、概念AMQP,即AdvancedMessageQueuingProtocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的 ... [详细]
  • 在Python网络编程中,多线程技术的应用与优化是提升系统性能的关键。线程作为操作系统调度的基本单位,其主要功能是在进程内共享内存空间和资源,实现并行处理任务。当一个进程启动时,操作系统会为其分配内存空间,加载必要的资源和数据,并调度CPU进行执行。每个进程都拥有独立的地址空间,而线程则在此基础上进一步细化了任务的并行处理能力。通过合理设计和优化多线程程序,可以显著提高网络应用的响应速度和处理效率。 ... [详细]
  • CAS 机制下的无锁队列设计与实现 ... [详细]
  • 互斥锁(Mutual Exclusion,简称Mutex)是多线程编程中的一种同步机制,用于确保同一时间只有一个线程能够访问共享资源,从而避免数据竞争和不一致的问题。信号量(Semaphore)则是一种更为通用的同步工具,不仅能够控制对资源的访问,还能管理资源的数量。本文将深入探讨这两种机制的工作原理、应用场景以及它们在实际开发中的优缺点,帮助读者更好地理解和使用这些重要的并发控制手段。 ... [详细]
  • 深入解析Spring Boot启动过程中Netty异步架构的工作原理与应用
    深入解析Spring Boot启动过程中Netty异步架构的工作原理与应用 ... [详细]
author-avatar
暮迟_MCz_P
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有