本文是基于http://www.cnblogs.com/cheng-lei/articles/7274513.html的项目结构进行搭建的,了解之前请先阅读http://www.cnblogs.com/cheng-lei/category/1047427.html中的前四篇文章。
工具 — Nuget包管理器 —程序包管理器控制台
PM> Install-Package RabbitMQ.Client -Version 5.1.0
PM> Install-Package EasyNetQ -Version 3.2.0
一、项目搭建
1. Weiz.MQ 项目,消息队列的通用处理类库,用于正在的订阅和发布消息。
1、在BusBuilder.cs中添加了对CreateAdvancedBus函数的实现。
1 public static IAdvancedBus CreateAdvancedBus()
2 {
3 // 消息服务器连接字符串
4 string connString = "host=dev.corp.wingoht.com:5672;virtualHost=cd;username=ishowfun;password=123456";
5 if (connString == null || connString == string.Empty)
6 {
7 throw new Exception("messageserver connection string is missing or empty");
8 }
9
10 return RabbitHutch.CreateBus(connString).Advanced;
11 }
2、在MQHelper.cs中添加了对Send、Receive函数的实现。
1 public static void Send(MyMessage msg)
2 {
3 // 创建消息bus
4 IBus bus = BusBuilder.CreateMessageBus();
5
6 try
7 {
8 bus.Send(msg.MessageRouter, msg);
9 }
10 catch (EasyNetQException ex)
11 {
12 //处理连接消息服务器异常
13 Console.WriteLine("Send Error!!!");
14 }
15
16 bus.Dispose();//与数据库connection类似,使用后记得销毁bus对象
17 }
18
19 public static void Receive(MyMessage msg, IProcessMessage ipro)
20 {
21 // 创建消息bus
22 IBus bus = BusBuilder.CreateMessageBus();
23
24 try
25 {
26 bus.Receive
27 }
28 catch (EasyNetQException ex)
29 {
30 //处理连接消息服务器异常
31 Console.WriteLine("Receive Error!!!");
32 }
33 }
3、在MQHelper.cs中添加了对采用Fanout、Direct、Topic交换机类型进行消息收发功能的实现。
1 public static void ProducerFanoutMessage(MyMessage msg, string exchangeName = "chending.fanout")
2 {
3 var advancedBus = BusBuilder.CreateAdvancedBus();
4
5 if (advancedBus.IsConnected)
6 {
7 var exchange = advancedBus.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
8
9 advancedBus.Publish(exchange, "", false, new Message
10 }
11 else
12 {
13 Console.WriteLine("Can't connect");
14 }
15
16 }
17
18 public static void ConsumeFanoutMessage(string exchageName = "chending.fanout", string queueName = "chending.fanout.queue")
19 {
20 var advancedBus = BusBuilder.CreateAdvancedBus();
21 var exchange = advancedBus.ExchangeDeclare(exchageName, ExchangeType.Fanout);
22
23 var queue = advancedBus.QueueDeclare(queueName);
24 advancedBus.Bind(exchange, queue, queueName);
25 advancedBus.Consume(queue, registration =>
26 {
27 registration.Add
28 });
29 }
30
31 public static void ProducerDirectMessage(MyMessage msg, string queueName = "chending.direct.queue")
32 {
33 var advancedBus = BusBuilder.CreateAdvancedBus();
34
35 if (advancedBus.IsConnected)
36 {
37 var queue = advancedBus.QueueDeclare(queueName);
38
39 advancedBus.Publish(Exchange.GetDefault(), queue.Name, false, new Message
40 }
41 else
42 {
43 Console.WriteLine("Can't connect");
44 }
45
46 }
47
48 public static void ConsumeDirectMessage(string exchageName = "chending.direct", string queueName = "chending.direct.queue")
49 {
50 var advancedBus = BusBuilder.CreateAdvancedBus();
51 var exchange = advancedBus.ExchangeDeclare(exchageName, ExchangeType.Direct);
52
53 var queue = advancedBus.QueueDeclare(queueName);
54 advancedBus.Bind(exchange, queue, queueName);
55 advancedBus.Consume(queue, registration =>
56 {
57 registration.Add
58 });
59 }
60
61 public static void ProducerTopicMessage(MyMessage msg)
62 {
63 //// 创建消息bus
64 IBus bus = BusBuilder.CreateMessageBus();
65
66 try
67 {
68 bus.Publish(msg, x => x.WithTopic(msg.MessageRouter));
69 }
70 catch (EasyNetQException ex)
71 {
72 //处理连接消息服务器异常
73 }
74
75 bus.Dispose();//与数据库connection类似,使用后记得销毁bus对象
76 }
77
78 public static void ConsumeTopicMessage(MyMessage msg)
79 {
80 //// 创建消息bus
81 IBus bus = BusBuilder.CreateMessageBus();
82
83 try
84 {
85 bus.Subscribe
86 }
87 catch (EasyNetQException ex)
88 {
89 //处理连接消息服务器异常
90 }
91 }
4、在ProduceThread.cs中添加了消息发布线程对前面实现的功能进行测试(也可以不作为线程直接调用)。
1 public class ProduceThread
2 {
3 public static void ProduceMessage() {
4 MyMessage msg1 = new MyMessage();
5 msg1.MessageID = "0-1";
6 msg1.MessageBody = DateTime.Now.ToString();
7 msg1.MessageRouter = "chending.fanout";
8 msg1.MessageTitle = "0-1";
9 MyMessage msg2 = new MyMessage();
10 msg2.MessageID = "0-2";
11 msg2.MessageBody = DateTime.Now.ToString();
12 msg2.MessageRouter = "chending.direct";
13 msg2.MessageTitle = "0-2";
14 MyMessage msg3 = new MyMessage();
15 msg3.MessageID = "0-3";
16 msg3.MessageBody = DateTime.Now.ToString();
17 msg3.MessageRouter = "chending.topic.a.b";
18 msg3.MessageTitle = "0-3";
19
20 //MQHelper.Send(msg1);
21 MQHelper.ProducerFanoutMessage(msg1);
22 MQHelper.ProducerDirectMessage(msg2);
23 MQHelper.ProducerTopicMessage(msg3);
24
25 for (int i &#61; 0; i <10; i&#43;&#43;) {
26 MyMessage msg &#61; new MyMessage();
27 msg.MessageID &#61; (i&#43;1).ToString();
28 msg.MessageBody &#61; DateTime.Now.ToString();
29 if (i % 2 &#61;&#61; 0)
30 msg.MessageRouter &#61; "cd.test.demo.a.b";
31 else
32 msg.MessageRouter &#61; "cd.test.demo.a";
33 msg.MessageTitle &#61; (i&#43;1).ToString();
34
35 MQHelper.Publish(msg);
36 //Console.WriteLine("Message{0} is published!!!", i &#43; 1);
37 Thread.Sleep(200);
38 }
39 }
2. Weiz.Producer&#xff08;生成者&#xff09;已弃用&#xff08;改用ProduceThread.cs&#xff09;
3. Weiz.Consumer 就是Consumer&#xff08;消费者&#xff09;
1、修改OrderProcessMessage.cs&#xff0c;实现不同的消息处理方式。
1 public class OrderProcessMessage : MQ.IProcessMessage
2 {
3 public void ProcessMsg(MQ.MyMessage msg)
4 {
5 Console.WriteLine("ID: {0}, Title: {1}, Router: {2}, Content: {3}", msg.MessageID, msg.MessageTitle, msg.MessageRouter, msg.MessageBody);
6 }
7 }
8 public class OrderProcessMessage1:MQ.IProcessMessage
9 {
10 public void ProcessMsg(MQ.MyMessage msg)
11 {
12 Console.WriteLine("Process1 ID: {0}, Title: {1}, Router: {2}, Content: {3}", msg.MessageID, msg.MessageTitle, msg.MessageRouter, msg.MessageBody);
13 }
14 }
15
16 public class OrderProcessMessage2 : MQ.IProcessMessage
17 {
18 public void ProcessMsg(MQ.MyMessage msg)
19 {
20 Console.WriteLine("Process2 ID: {0}, Title: {1}, Router: {2}, Content: {3}", msg.MessageID, msg.MessageTitle, msg.MessageRouter, msg.MessageBody);
21 }
22 }
2、对Program.cs中的Main调用进行了修改。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //OrderProcessMessage order &#61; new OrderProcessMessage();
6 OrderProcessMessage1 order1 &#61; new OrderProcessMessage1();
7 OrderProcessMessage2 order2 &#61; new OrderProcessMessage2();
8
9 //MyMessage msg &#61; new MyMessage();
10 MyMessage msg1 &#61; new MyMessage();
11 MyMessage msg2 &#61; new MyMessage();
12 MyMessage msg3 &#61; new MyMessage();
13
14 //msg.MessageRouter &#61; "cd.test.demo";
15 msg1.MessageRouter &#61; "cd.test.demo.*";
16 msg2.MessageRouter &#61; "cd.test.demo.#";
17 msg3.MessageRouter &#61; "chending.topic.#";
18
19 //MQHelper.Receive(msg, order);
20 MQHelper.ConsumeFanoutMessage();
21 MQHelper.ConsumeDirectMessage();
22 MQHelper.ConsumeTopicMessage(msg3);
23 MQHelper.Subscribe(msg1, order1);
24 //MQHelper.Subscribe(msg1, order2);
25 MQHelper.Subscribe(msg2, order2);
26
27 Console.WriteLine("Listening for messages.");
28
29 ProduceThread.ProduceMessage();
30
31 //ThreadStart threadStart &#61; ProduceThread.ProduceMessage;
32 //Thread thread &#61; new Thread(threadStart);
33 //thread.Start();
34 }
35 }
二、项目运行
启动 Weiz.Consumer &#xff08;消费者&#xff09;&#xff0c;启动消费者&#xff0c;会自动在RabbitMQ 服务器上创建相关的exchange 和 queue &#xff0c;同时调用的ProduceThread.ProduceMessage函数会发送消息&#xff0c;接收到的信息会在Console命令行中进行显示。
项目源码&#xff1a;百度云链接&#xff1a;https://pan.baidu.com/s/1sCJqY2fKphXV0ntMIytcVw 密码&#xff1a;hfz5