热门标签 | HotTags
当前位置:  开发笔记 > 后端 > 正文

【微服务专题之】.Net6下集成消息队列RabbitMQ交换机模式代码演示(全)

微信公众号:趣编程ACE关注可了解更多的.NET日常实战开发技巧,如需源码请公众号后台留言 源码;[如果觉得本公众号对您有帮助,欢迎关注].Net中RabbitMQ中交换机模式的使

微信公众号:趣编程ACE
关注可了解更多的.NET日常实战开发技巧,如需源码 请公众号后台留言 源码;
[如果觉得本公众号对您有帮助,欢迎关注]



.Net中RabbitMQ中交换机模式的使用


前文回顾

关注上文公众号即可

 

 

 


TopicExchange 交换机模式

如果我们需要将一条信息发送到多个队列上,假若利用直连模式,那么就会有很多的路由,而TopicExchange只需要配置定义好的路由规则,即可省略多余的路由指定。
PS:路由规则有一定的约束,比如需要采取* .#. * 的格式,用.号 分隔
其中



  • 1.*表示一个单词



  • 2. #表示任意数量(零个或多个)单词。




消费者程序演示

public static class TopicExchangeReceive
    {
        public static void Receive(IModel channel)
        {
            channel.ExchangeDeclare("hello-topic-exchange", ExchangeType.Topic);
            channel.QueueDeclare(queue: "hello-topic-queue",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);
            channel.QueueBind("hello-topic-queue", "hello-topic-exchange", "route.*");
            //channel.QueueBind("hello", "hello-direct-exchange", "route2");
            // 创建一个消费者基本事件
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);
            };
            channel.BasicConsume(queue: "hello-topic-queue",
                                 // 自动确认
                                 autoAck: true,
                                 consumer: consumer);
            //channel.BasicConsume(queue: "hello",
            //                     // 自动确认
            //                     autoAck: true,
            //                     consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

生产者程序演示

public static class TopicExchangeSend
    {
        public static void Send(IModel channel)
        {
            channel.ExchangeDeclare("hello-topic-exchange", ExchangeType.Topic);
            var count = 0;
            while (true)
            {
                Thread.Sleep(1000);
                // 发送的消息
                string message = $"Hello World {count}";
                var body = Encoding.UTF8.GetBytes(message);
                //var body2 = Encoding.UTF8.GetBytes(message + "body2");
                // 基本发布 不指定交换
                channel.BasicPublish(exchange: "hello-topic-exchange",
                                     // 路由键   就是队列名称
                                     routingKey: "route.2",
                                     // 基础属性
                                     basicProperties: null,
                                     // 传递的消息体
                                     body: body);
                //channel.BasicPublish(exchange: "hello-direct-exchange",
                //                     // 路由键   就是队列名称
                //                     routingKey: "route2",
                //                     // 基础属性
                //                     basicProperties: null,
                //                     // 传递的消息体
                //                     body: body2);
                count++;
                Console.WriteLine(" [x] sent {0}", message);
            }
        }
    }

效果展示

图片




Headers Exchange 模式

其实这种模式类似与Http请求里面的headers,我们定义一个字典然后通过交换机携带,作为交换机与路由器交换媒介,相比于Topic Exchange交换模式,header定义的格式类型更加丰富。


消费者程序演示

public static class HeadersExchangeReceive
    {
        public static void Receive(IModel channel)
        {
            channel.ExchangeDeclare("hello-headers-exchange", ExchangeType.Headers);
            channel.QueueDeclare(queue: "hello-headers-queue",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);
            var headers = new Dictionary()
            {
                {"test","A" }
            };
            channel.QueueBind("hello-headers-queue", "hello-headers-exchange", String.Empty, headers);
            //channel.QueueBind("hello", "hello-direct-exchange", "route2");
            // 创建一个消费者基本事件
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);
            };
            channel.BasicConsume(queue: "hello-headers-queue",
                                 // 自动确认
                                 autoAck: true,
                                 consumer: consumer);
            //channel.BasicConsume(queue: "hello",
            //                     // 自动确认
            //                     autoAck: true,
            //                     consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

生产者程序演示

public static void Send(IModel channel)
        {
            channel.ExchangeDeclare("hello-headers-exchange", ExchangeType.Headers);
            var count = 0;
            while (true)
            {
                Thread.Sleep(1000);
                // 发送的消息
                string message = $"Hello World {count}";
                var body = Encoding.UTF8.GetBytes(message);
                //var body2 = Encoding.UTF8.GetBytes(message + "body2");
                var basicProperties = channel.CreateBasicProperties();
                basicProperties.Headers = new Dictionary()
                {
                    {"test" ,"A"}
                };
                // 基本发布 不指定交换
                channel.BasicPublish(exchange: "hello-headers-exchange",
                                     // 路由键   就是队列名称
                                     routingKey: String.Empty,
                                     // 基础属性
                                     basicProperties,
                                     // 传递的消息体
                                     body: body);
                //channel.BasicPublish(exchange: "hello-direct-exchange",
                //                     // 路由键   就是队列名称
                //                     routingKey: "route2",
                //                     // 基础属性
                //                     basicProperties: null,
                //                     // 传递的消息体
                //                     body: body2);
                count++;
                Console.WriteLine(" [x] sent {0}", message);
            }
        }

效果演示

 

图片




FanoutExchange 模式

扇形交换机模式是最傻瓜的一种交换模式,总的而言只要将队列绑定到交换机上,就能做到消息互通了。当然了这种机制处理事件的速度也是所有交换机类型里面最快的。


消费者程序演示

public static class FanoutExchangeReceive
    {
        public static void Receive(IModel channel)
        {
            channel.ExchangeDeclare("hello-fanout-exchange", ExchangeType.Fanout);
            channel.QueueDeclare(queue: "hello-fanout-queue",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);
            //var headers = new Dictionary()
            //{
            //    {"test","A" }
            //};
            channel.QueueBind("hello-fanout-queue", "hello-fanout-exchange", String.Empty);
            //channel.QueueBind("hello", "hello-direct-exchange", "route2");
            // 创建一个消费者基本事件
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);
            };
            channel.BasicConsume(queue: "hello-fanout-queue",
                                 // 自动确认
                                 autoAck: true,
                                 consumer: consumer);
            //channel.BasicConsume(queue: "hello",
            //                     // 自动确认
            //                     autoAck: true,
            //                     consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }

生产者程序演示

public static class FanoutExchangeSend
    {
        public static void Send(IModel channel)
        {
            channel.ExchangeDeclare("hello-fanout-exchange", ExchangeType.Fanout);
            var count = 0;
            while (true)
            {
                Thread.Sleep(1000);
                // 发送的消息
                string message = $"Hello World {count}";
                var body = Encoding.UTF8.GetBytes(message);
                //var body2 = Encoding.UTF8.GetBytes(message + "body2");
                //var basicProperties = channel.CreateBasicProperties();
                //basicProperties.Headers = new Dictionary()
                //{
                //    {"test" ,"A"}
                //};
                // 基本发布 不指定交换
                channel.BasicPublish(exchange: "hello-fanout-exchange",
                                     // 路由键   就是队列名称
                                     routingKey: String.Empty,
                                     // 基础属性
                                     null,
                                     // 传递的消息体
                                     body: body);
                //channel.BasicPublish(exchange: "hello-direct-exchange",
                //                     // 路由键   就是队列名称
                //                     routingKey: "route2",
                //                     // 基础属性
                //                     basicProperties: null,
                //                     // 传递的消息体
                //                     body: body2);
                count++;
                Console.WriteLine(" [x] sent {0}", message);
            }
        }
    }

效果演示

图片



PS:具体的代码效果演示看视频哦~



推荐阅读
  • 云原生应用最佳开发实践之十二原则(12factor)
    目录简介一、基准代码二、依赖三、配置四、后端配置五、构建、发布、运行六、进程七、端口绑定八、并发九、易处理十、开发与线上环境等价十一、日志十二、进程管理当 ... [详细]
  • 有意向可以发简历到邮箱内推.简历直达组内Leader.能做同事的话,内推奖励全给你. ... [详细]
  • RabbitMq之发布确认高级部分1.为什么会需要发布确认高级部分?在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢 ... [详细]
  • rabbitmq杂谈
    rabbitmq中的consumerTag和deliveryTag分别是干啥的,有什么用?同一个会话,consumerTag是固定的可以做此会话的名字,deliveryTag每次接 ... [详细]
  • 说出来你可能不信,我用三天做了一个完整的项目
    Java在人工智能中能起到什么作用?作为编程语言中的扛把子,Java20多年稳定不倒,就在于它的稳定性,维护成本极低。这使得 ... [详细]
  • 都说Python处理速度慢,为何月活7亿的 Instagram依然在使用Python?
    点击“Python编程与实战”,选择“置顶公众号”第一时间获取Python技术干货!来自|简书作者|我爱学python链接|https:www.jian ... [详细]
  • 6(自)、交换机之关键字模式
    上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(error),而不存储那些警告(warnning)或者 ... [详细]
  • 讨伐Java多线程与高并发——MQ篇
    本文是学习Java多线程与高并发知识时做的笔记。这部分内容比较多,按照内容分为5个部分:多线程基础篇JUC篇同步容器和并发容器篇线程池篇MQ篇本篇 ... [详细]
  • 博客_2018年博客总结
    本文由编程笔记#小编为大家整理,主要介绍了2018年博客总结相关的知识,希望对你有一定的参考价值。前言     ... [详细]
  • 这也太简单了!轻松操作Feign 服务调用使用 Zipkin 链路追踪!
    0、介绍分布式微服务时代,方便了业务的快速增长和服务的稳定,但是系统出现问题后,面对同业务多服务排查起来令人头大。这时候领导就想着集成分布式追踪系统。Zipkin是T ... [详细]
  • 数据结构与算法的重要性及基本概念、存储结构和算法分析
    数据结构与算法在编程领域中的重要性不可忽视,无论从事何种岗位,都需要掌握数据结构和算法。本文介绍了数据结构与算法的基本概念、存储结构和算法分析。其中包括线性结构、树结构、图结构、栈、队列、串、查找、排序等内容。此外,还介绍了图论算法、贪婪算法、分治算法、动态规划、随机化算法和回溯算法等高级数据结构和算法。掌握这些知识对于提高编程能力、解决问题具有重要意义。 ... [详细]
  • linux进阶50——无锁CAS
    1.概念比较并交换(compareandswap,CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作࿰ ... [详细]
  • Docker安装Rabbitmq(配合宝塔)
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Docker安装Rabbitmq(配合宝塔)相关的知识,希望对你有一定的参考价值。一、事前准备 ... [详细]
  • 本文主要分享【im即时通讯技术】,技术文章【浅析IM即时通讯开发出现上网卡顿?网络掉线?】为【wecloud1314】投稿,如果你遇到相关问题,本文相关知识或能到你。im即时通讯技术特别推荐即时通 ... [详细]
  • 源码包安装RabbitMQ3.6
    先安装erlang依赖,也是门编程语言,下载源码包地址:https:www.erlang.orgdownloads20.1首先先安装个依赖 ... [详细]
author-avatar
暮迟_MCz_P
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有