热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

C#如何使用RabbitMQ实现消息收发

本文是基于http:www.cnblogs.comcheng-leiarticles7274513.html的项目结构进行搭建的,了解之前请先阅读http:www.c

本文是基于http://www.cnblogs.com/cheng-lei/articles/7274513.html的项目结构进行搭建的,了解之前请先阅读http://www.cnblogs.com/cheng-lei/category/1047427.html中的前四篇文章。

 

工具 — Nuget包管理器 —程序包管理器控制台

PM> Install-Package RabbitMQ.Client -Version 5.1.0

PM> Install-Package EasyNetQ -Version 3.2.0

 

一、项目搭建

1. Weiz.MQ 项目,消息队列的通用处理类库,用于正在的订阅和发布消息。

  1、在BusBuilder.cs中添加了对CreateAdvancedBus函数的实现。

1 public static IAdvancedBus CreateAdvancedBus()
2 {
3 // 消息服务器连接字符串
4 string connString = "host=dev.corp.wingoht.com:5672;virtualHost=cd;username=ishowfun;password=123456";
5 if (connString == null || connString == string.Empty)
6 {
7 throw new Exception("messageserver connection string is missing or empty");
8 }
9
10 return RabbitHutch.CreateBus(connString).Advanced;
11 }

View Code

 

  2、在MQHelper.cs中添加了对Send、Receive函数的实现。

1 public static void Send(MyMessage msg)
2 {
3 // 创建消息bus
4 IBus bus = BusBuilder.CreateMessageBus();
5
6 try
7 {
8 bus.Send(msg.MessageRouter, msg);
9 }
10 catch (EasyNetQException ex)
11 {
12 //处理连接消息服务器异常
13 Console.WriteLine("Send Error!!!");
14 }
15
16 bus.Dispose();//与数据库connection类似,使用后记得销毁bus对象
17 }
18
19 public static void Receive(MyMessage msg, IProcessMessage ipro)
20 {
21 // 创建消息bus
22 IBus bus = BusBuilder.CreateMessageBus();
23
24 try
25 {
26 bus.Receive(msg.MessageRouter, message => ipro.ProcessMsg(message));
27 }
28 catch (EasyNetQException ex)
29 {
30 //处理连接消息服务器异常
31 Console.WriteLine("Receive Error!!!");
32 }
33 }

View Code

 

  3、在MQHelper.cs中添加了对采用Fanout、Direct、Topic交换机类型进行消息收发功能的实现。

1 public static void ProducerFanoutMessage(MyMessage msg, string exchangeName = "chending.fanout")
2 {
3 var advancedBus = BusBuilder.CreateAdvancedBus();
4
5 if (advancedBus.IsConnected)
6 {
7 var exchange = advancedBus.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
8
9 advancedBus.Publish(exchange, "", false, new Message(msg));
10 }
11 else
12 {
13 Console.WriteLine("Can't connect");
14 }
15
16 }
17
18 public static void ConsumeFanoutMessage(string exchageName = "chending.fanout", string queueName = "chending.fanout.queue")
19 {
20 var advancedBus = BusBuilder.CreateAdvancedBus();
21 var exchange = advancedBus.ExchangeDeclare(exchageName, ExchangeType.Fanout);
22
23 var queue = advancedBus.QueueDeclare(queueName);
24 advancedBus.Bind(exchange, queue, queueName);
25 advancedBus.Consume(queue, registration =>
26 {
27 registration.Add((message, info) => { Console.WriteLine("Fanout Content: {0}", message.Body.MessageBody); });
28 });
29 }
30
31 public static void ProducerDirectMessage(MyMessage msg, string queueName = "chending.direct.queue")
32 {
33 var advancedBus = BusBuilder.CreateAdvancedBus();
34
35 if (advancedBus.IsConnected)
36 {
37 var queue = advancedBus.QueueDeclare(queueName);
38
39 advancedBus.Publish(Exchange.GetDefault(), queue.Name, false, new Message(msg));
40 }
41 else
42 {
43 Console.WriteLine("Can't connect");
44 }
45
46 }
47
48 public static void ConsumeDirectMessage(string exchageName = "chending.direct", string queueName = "chending.direct.queue")
49 {
50 var advancedBus = BusBuilder.CreateAdvancedBus();
51 var exchange = advancedBus.ExchangeDeclare(exchageName, ExchangeType.Direct);
52
53 var queue = advancedBus.QueueDeclare(queueName);
54 advancedBus.Bind(exchange, queue, queueName);
55 advancedBus.Consume(queue, registration =>
56 {
57 registration.Add((message, info) => { Console.WriteLine("Direct Content: {0}", message.Body.MessageBody); });
58 });
59 }
60
61 public static void ProducerTopicMessage(MyMessage msg)
62 {
63 //// 创建消息bus
64 IBus bus = BusBuilder.CreateMessageBus();
65
66 try
67 {
68 bus.Publish(msg, x => x.WithTopic(msg.MessageRouter));
69 }
70 catch (EasyNetQException ex)
71 {
72 //处理连接消息服务器异常
73 }
74
75 bus.Dispose();//与数据库connection类似,使用后记得销毁bus对象
76 }
77
78 public static void ConsumeTopicMessage(MyMessage msg)
79 {
80 //// 创建消息bus
81 IBus bus = BusBuilder.CreateMessageBus();
82
83 try
84 {
85 bus.Subscribe(msg.MessageRouter, message => Console.WriteLine("Topic Content: {0}", message.MessageBody), x => x.WithTopic(msg.MessageRouter));
86 }
87 catch (EasyNetQException ex)
88 {
89 //处理连接消息服务器异常
90 }
91 }

View Code

  4、在ProduceThread.cs中添加了消息发布线程对前面实现的功能进行测试(也可以不作为线程直接调用)。

1 public class ProduceThread
2 {
3 public static void ProduceMessage() {
4 MyMessage msg1 = new MyMessage();
5 msg1.MessageID = "0-1";
6 msg1.MessageBody = DateTime.Now.ToString();
7 msg1.MessageRouter = "chending.fanout";
8 msg1.MessageTitle = "0-1";
9 MyMessage msg2 = new MyMessage();
10 msg2.MessageID = "0-2";
11 msg2.MessageBody = DateTime.Now.ToString();
12 msg2.MessageRouter = "chending.direct";
13 msg2.MessageTitle = "0-2";
14 MyMessage msg3 = new MyMessage();
15 msg3.MessageID = "0-3";
16 msg3.MessageBody = DateTime.Now.ToString();
17 msg3.MessageRouter = "chending.topic.a.b";
18 msg3.MessageTitle = "0-3";
19
20 //MQHelper.Send(msg1);
21 MQHelper.ProducerFanoutMessage(msg1);
22 MQHelper.ProducerDirectMessage(msg2);
23 MQHelper.ProducerTopicMessage(msg3);
24
25 for (int i &#61; 0; i <10; i&#43;&#43;) {
26 MyMessage msg &#61; new MyMessage();
27 msg.MessageID &#61; (i&#43;1).ToString();
28 msg.MessageBody &#61; DateTime.Now.ToString();
29 if (i % 2 &#61;&#61; 0)
30 msg.MessageRouter &#61; "cd.test.demo.a.b";
31 else
32 msg.MessageRouter &#61; "cd.test.demo.a";
33 msg.MessageTitle &#61; (i&#43;1).ToString();
34
35 MQHelper.Publish(msg);
36 //Console.WriteLine("Message{0} is published!!!", i &#43; 1);
37 Thread.Sleep(200);
38 }
39 }

View Code

 

2. Weiz.Producer&#xff08;生成者&#xff09;已弃用&#xff08;改用ProduceThread.cs&#xff09;

3. Weiz.Consumer 就是Consumer&#xff08;消费者&#xff09;

  1、修改OrderProcessMessage.cs&#xff0c;实现不同的消息处理方式。

1 public class OrderProcessMessage : MQ.IProcessMessage
2 {
3 public void ProcessMsg(MQ.MyMessage msg)
4 {
5 Console.WriteLine("ID: {0}, Title: {1}, Router: {2}, Content: {3}", msg.MessageID, msg.MessageTitle, msg.MessageRouter, msg.MessageBody);
6 }
7 }
8 public class OrderProcessMessage1:MQ.IProcessMessage
9 {
10 public void ProcessMsg(MQ.MyMessage msg)
11 {
12 Console.WriteLine("Process1 ID: {0}, Title: {1}, Router: {2}, Content: {3}", msg.MessageID, msg.MessageTitle, msg.MessageRouter, msg.MessageBody);
13 }
14 }
15
16 public class OrderProcessMessage2 : MQ.IProcessMessage
17 {
18 public void ProcessMsg(MQ.MyMessage msg)
19 {
20 Console.WriteLine("Process2 ID: {0}, Title: {1}, Router: {2}, Content: {3}", msg.MessageID, msg.MessageTitle, msg.MessageRouter, msg.MessageBody);
21 }
22 }

View Code

 

  2、对Program.cs中的Main调用进行了修改。

1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //OrderProcessMessage order &#61; new OrderProcessMessage();
6 OrderProcessMessage1 order1 &#61; new OrderProcessMessage1();
7 OrderProcessMessage2 order2 &#61; new OrderProcessMessage2();
8
9 //MyMessage msg &#61; new MyMessage();
10 MyMessage msg1 &#61; new MyMessage();
11 MyMessage msg2 &#61; new MyMessage();
12 MyMessage msg3 &#61; new MyMessage();
13
14 //msg.MessageRouter &#61; "cd.test.demo";
15 msg1.MessageRouter &#61; "cd.test.demo.*";
16 msg2.MessageRouter &#61; "cd.test.demo.#";
17 msg3.MessageRouter &#61; "chending.topic.#";
18
19 //MQHelper.Receive(msg, order);
20 MQHelper.ConsumeFanoutMessage();
21 MQHelper.ConsumeDirectMessage();
22 MQHelper.ConsumeTopicMessage(msg3);
23 MQHelper.Subscribe(msg1, order1);
24 //MQHelper.Subscribe(msg1, order2);
25 MQHelper.Subscribe(msg2, order2);
26
27 Console.WriteLine("Listening for messages.");
28
29 ProduceThread.ProduceMessage();
30
31 //ThreadStart threadStart &#61; ProduceThread.ProduceMessage;
32 //Thread thread &#61; new Thread(threadStart);
33 //thread.Start();
34 }
35 }

View Code

 

二、项目运行

启动 Weiz.Consumer &#xff08;消费者&#xff09;&#xff0c;启动消费者&#xff0c;会自动在RabbitMQ 服务器上创建相关的exchange 和 queue &#xff0c;同时调用的ProduceThread.ProduceMessage函数会发送消息&#xff0c;接收到的信息会在Console命令行中进行显示。

 

项目源码&#xff1a;百度云链接&#xff1a;https://pan.baidu.com/s/1sCJqY2fKphXV0ntMIytcVw 密码&#xff1a;hfz5

转:https://www.cnblogs.com/lucifer1997/p/lucifer1997.html



推荐阅读
  • 从单机存储进化为接口和存储的分离概述接口服务层对外提供REST服务,数据服务层提供数据存储功能。两者之间通过消息队列进行通信,数据服务层的所有数据服 ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • ZeroMQ在云计算环境下的高效消息传递库第四章学习心得
    本章节深入探讨了ZeroMQ在云计算环境中的高效消息传递机制,涵盖客户端请求-响应模式、最近最少使用(LRU)队列、心跳检测、面向服务的队列、基于磁盘的离线队列以及主从备份服务等关键技术。此外,还介绍了无中间件的请求-响应架构,强调了这些技术在提升系统性能和可靠性方面的应用价值。个人理解方面,ZeroMQ通过这些机制有效解决了分布式系统中常见的通信延迟和数据一致性问题。 ... [详细]
  • 在RabbitMQ中,消息发布者默认情况下不会接收到关于消息在Broker中状态的反馈,这可能导致消息丢失的问题。为了确保消息的可靠传输与投递,可以采用确认机制(如发布确认和事务模式)来验证消息是否成功抵达Broker,并采取相应的重试策略以提高系统的可靠性。此外,还可以配置消息持久化和镜像队列等高级功能,进一步增强消息的可靠性和高可用性。 ... [详细]
  • .Net下RabbitMQ发布订阅模式实践
    一、概念AMQP,即AdvancedMessageQueuingProtocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的 ... [详细]
  • 掌握PHP框架开发与应用的核心知识点:构建高效PHP框架所需的技术与能力综述
    掌握PHP框架开发与应用的核心知识点对于构建高效PHP框架至关重要。本文综述了开发PHP框架所需的关键技术和能力,包括但不限于对PHP语言的深入理解、设计模式的应用、数据库操作、安全性措施以及性能优化等方面。对于初学者而言,熟悉主流框架如Laravel、Symfony等的实际应用场景,有助于更好地理解和掌握自定义框架开发的精髓。 ... [详细]
  • 如何在Java中高效构建WebService
    本文介绍了如何利用XFire框架在Java中高效构建WebService。XFire是一个轻量级、高性能的Java SOAP框架,能够简化WebService的开发流程。通过结合MyEclipse集成开发环境,开发者可以更便捷地进行项目配置和代码编写,从而提高开发效率。此外,文章还详细探讨了XFire的关键特性和最佳实践,为读者提供了实用的参考。 ... [详细]
  • HTTP协议作为互联网通信的基础,其重要性不言而喻。相比JDK自带的URLConnection,HttpClient不仅提升了易用性和灵活性,还在性能、稳定性和安全性方面进行了显著优化。本文将深入解析HttpClient的使用方法与技巧,帮助开发者更好地掌握这一强大的工具。 ... [详细]
  • MongoDB Aggregates.group() 方法详解与编程实例 ... [详细]
  • 优化后的标题:PHP分布式高并发秒杀系统设计与实现
    PHPSeckill是一个基于PHP、Lua和Redis构建的高效分布式秒杀系统。该项目利用php_apcu扩展优化性能,实现了高并发环境下的秒杀功能。系统设计充分考虑了分布式架构的可扩展性和稳定性,适用于大规模用户同时访问的场景。项目代码已开源,可在Gitee平台上获取。 ... [详细]
  • 如何迅速识别并解决Gradle项目中的Jar包名称冲突问题?
    在处理Gradle项目时,经常会遇到Jar包名称冲突的问题。本文介绍了如何快速识别并解决此类冲突,特别是在使用fastjson的Feature.OrderedField功能时。通过添加特定参数,可以有效避免JSON字段乱序的情况,确保数据的一致性和可靠性。此外,文章还提供了详细的步骤和示例代码,帮助开发者高效地解决Jar包冲突问题。 ... [详细]
  • Go语言中Goroutine与通道机制及其异常处理深入解析
    在Go语言中,Goroutine可视为一种轻量级的并发执行单元,其资源消耗远低于传统线程,初始栈大小仅为2KB,而普通线程则通常需要几MB。此外,Goroutine的调度由Go运行时自动管理,能够高效地支持成千上万个并发任务。本文深入探讨了Goroutine的工作原理及其与通道(channel)的配合使用,特别是在异常处理方面的最佳实践,为开发者提供了一套完整的解决方案,以确保程序的稳定性和可靠性。 ... [详细]
  • 前言: 网上搭建k8s的文章很多,但很多都无法按其说明在阿里云ecs服务器成功搭建,所以我就花了些时间基于自己成功搭建k8s的步骤写了个操作手册,希望对想搭建k8s环境的盆友有所帮 ... [详细]
  • 软件开发史上最具影响力的十位编程大师(附图解)
    在软件开发领域,有十位编程大师对行业发展产生了深远影响。本文基于国外知名社区的一项评选,通过图文并茂的形式,详细介绍了这十位杰出人物,包括游戏开发先驱John Carmack等,为读者呈现了他们卓越的技术贡献与创新精神。 ... [详细]
  • RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP,SMTP,STOMP,也 ... [详细]
author-avatar
捕鱼达人2502933245
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有