作者:阿思翠 | 来源:互联网 | 2023-09-16 13:32
1、准备工具
- VS2013
- Apache.NMS.ActiveMQ-1.7.2-bin.zip
- apache-activemq-5.14.0-bin.zip
2、开始项目
VS2013新建一个C#控制台应用程序,项目中添加两个dll引用,一个是D:\Apache.NMS.ActiveMQ-1.7.2-bin\lib\Apache.NMS\net-4.0目录下的Apache.NMS.dll,另一个是D:\Apache.NMS.ActiveMQ-1.7.2-bin\build\net-4.0\debug目录下的Apache.NMS.ActiveMQ.dll。
新建一个类,MyActiveMq.cs,用于对activemq消息队列接口的封装,实现如下:
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
-
- using Apache.NMS;
- using Apache.NMS.ActiveMQ;
-
- namespace NmsProducerClasses
- {
- public class MyActiveMq
- {
- private IConnectionFactory factory;
- private IConnection connection;
- private ISession session;
- private IMessageProducer prod;
- private IMessageConsumer consumer;
- private ITextMessage msg;
-
- private bool isTopic = false;
- private bool hasSelector = false;
- private const string ClientID = "clientid";
- private const string Selector = "filter='demo'";
- private bool sendSuccess = true;
- private bool receiveSuccess = true;
-
- public MyActiveMq(bool isLocalMachine, string remoteAddress)
- {
- try
- {
-
- if (isLocalMachine)
- {
- factory = new ConnectionFactory("tcp://localhost:61616/");
- }
- else
- {
- factory = new ConnectionFactory("tcp://" + remoteAddress + ":61616/"); //写tcp://192.168.1.111:61616的形式连接其他服务器上的ActiveMQ服务器
- }
-
- connection = factory.CreateConnection();
- connection.ClientId = ClientID;
- connection.Start();
-
- session = connection.CreateSession();
- }
- catch (System.Exception e)
- {
- sendSuccess = false;
- receiveSuccess = false;
- Console.WriteLine("Exception:{0}", e.Message);
- Console.ReadLine();
- throw e;
- }
- Console.WriteLine("Begin connection...");
- }
-
-
- ~MyActiveMq()
- {
-
- }
-
-
-
-
-
-
-
- public bool InitQueueOrTopic(bool topic, string name, bool selector = false)
- {
- try
- {
-
- if (topic)
- {
- prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name));
- if (selector)
- {
- consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, Selector, false);
- hasSelector = true;
- }
- else
- {
- consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, null, false);
- hasSelector = false;
- }
- isTopic = true;
- }
- else
- {
- prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));
- if (selector)
- {
- consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name), Selector);
- hasSelector = true;
- }
- else
- {
- consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));
- hasSelector = false;
- }
- isTopic = false;
- }
-
- msg = prod.CreateTextMessage();
- }
- catch (System.Exception e)
- {
- sendSuccess = false;
- receiveSuccess = false;
- Console.WriteLine("Exception:{0}", e.Message);
- Console.ReadLine();
- throw e;
- }
-
- return sendSuccess;
- }
-
-
- public bool SendMessage(string message, string msgId = "defult", MsgPriority priority = MsgPriority.Normal)
- {
- if (prod == null)
- {
- sendSuccess = false;
- Console.WriteLine("call InitQueueOrTopic() first!!");
- return false;
- }
-
- Console.WriteLine("Begin send messages...");
-
-
- msg.NMSCorrelationID = msgId;
- msg.Properties["MyID"] = msgId;
- msg.NMSMessageId = msgId;
- msg.Text = message;
- Console.WriteLine(message);
-
- if (isTopic)
- {
- sendSuccess = ProducerSubcriber(message, priority);
- }
- else
- {
- sendSuccess = P2P(message, priority);
- }
-
- return sendSuccess;
- }
-
-
- public string GetMessage()
- {
- if (prod == null)
- {
- Console.WriteLine("call InitQueueOrTopic() first!!");
- return null;
- }
-
- Console.WriteLine("Begin receive messages...");
- ITextMessage revMessage = null;
- try
- {
-
- revMessage = consumer.Receive(new TimeSpan(TimeSpan.TicksPerMillisecond *10)) as ITextMessage;
- }
- catch (System.Exception e)
- {
- receiveSuccess = false;
- Console.WriteLine("Exception:{0}", e.Message);
- Console.ReadLine();
- throw e;
- }
-
- if (revMessage == null)
- {
- Console.WriteLine("No message received!");
- return null;
- }
- else
- {
- Console.WriteLine("Received message with Correlation ID: " + revMessage.NMSCorrelationID);
-
- Console.WriteLine("Received message with text: " + revMessage.Text);
- }
-
- return revMessage.Text;
- }
-
-
- private bool P2P(string message, MsgPriority priority)
- {
- try
- {
- if (hasSelector)
- {
-
- msg.Properties.SetString("filter", "demo");
- }
- prod.Priority = priority;
-
- prod.DeliveryMode = MsgDeliveryMode.Persistent;
-
- prod.Send(msg, MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue);
- }
- catch (System.Exception e)
- {
- sendSuccess = false;
- Console.WriteLine("Exception:{0}", e.Message);
- Console.ReadLine();
- throw e;
- }
-
- return sendSuccess;
- }
-
-
-
- private bool ProducerSubcriber(string message, MsgPriority priority)
- {
- try
- {
- prod.Priority = priority;
-
- prod.DeliveryMode = MsgDeliveryMode.Persistent;
- prod.Send(msg, Apache.NMS.MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue);
-
- }
- catch (System.Exception e)
- {
- sendSuccess = false;
- Console.WriteLine("Exception:{0}", e.Message);
- Console.ReadLine();
- throw e;
- }
-
- return sendSuccess;
- }
-
-
- public void ShutDown()
- {
- Console.WriteLine("Close connection and session...");
- session.Close();
- connection.Close();
- }
- }
- }
Program.cs代码如下:
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using System.IO;
- using System.Threading;
-
- namespace NmsProducerClasses
- {
- class Program
- {
- static void Main(string[] args)
- {
- MyActiveMq mymq = new MyActiveMq(isLocalMachine: true, remoteAddress: "");
-
- mymq.InitQueueOrTopic(topic: false, name: "myqueue", selector: false);
-
-
-
-
-
- User myuser0 = new User("0000", "Lowest", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser0), "newid", priority: Apache.NMS.MsgPriority.Lowest);
- User myuser1 = new User("1111", "AboveLow", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser1), "newid", priority: Apache.NMS.MsgPriority.AboveLow);
- User myuser2 = new User("2222", "AboveNormal", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser2), "newid", priority: Apache.NMS.MsgPriority.AboveNormal);
- User myuser3 = new User("0000", "BelowNormal", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser3), "newid", priority: Apache.NMS.MsgPriority.BelowNormal);
- User myuser4 = new User("1111", "High", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser4), "newid", priority: Apache.NMS.MsgPriority.High);
- User myuser5 = new User("2222", "Highest", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser5), "newid", priority: Apache.NMS.MsgPriority.Highest);
- User myuser6 = new User("0000", "Low", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser6), "newid", priority: Apache.NMS.MsgPriority.Low);
- User myuser7 = new User("1111", "Normal", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser7), "newid", priority: Apache.NMS.MsgPriority.Normal);
- User myuser8 = new User("2222", "VeryHigh", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser8), "newid", priority: Apache.NMS.MsgPriority.VeryHigh);
- User myuser9 = new User("2222", "VeryLow", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser8), "newid", priority: Apache.NMS.MsgPriority.VeryLow);
-
- int num = 20;
- while (num-- > 0)
- {
- mymq.GetMessage();
-
- }
- mymq.ShutDown();
-
-
-
-
-
-
-
-
-
-
-
- }
-
- }
3、测试
首先,需要启动消息队列,具体启动及测试消息队列步骤可见这边:
点击打开链接
然后,运行项目,运行结果如下:
4、优先级
priority并不能决定消息传送的严格消息,具体原因可见
http://activemq.apache.org/how-can-i-support-priority-queues.html
http://shift-alt-ctrl.iteye.com/blog/2034440
优先级设置:
在D:\apache-activemq-5.14.0\conf目录的activemq.xml配置文件中,找到
标签,在其中的标签下添加
- <policyEntry queue=">" producerFlowCOntrol="false" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1" />
- <policyEntry queue=">" strictOrderDispatch="false" />
- <policyEntry queue=">" >
- <pendingMessageLimitStrategy>
- <constantPendingMessageLimitStrategy limit="0"/>
- pendingMessageLimitStrategy>
- <messageEvictionStrategy>
- <oldestMessageWithLowestPriorityEvictionStrategy/>
- messageEvictionStrategy>
- policyEntry>
配置完成后,需要重启activemq
5、远程登录监控
要实现远程监控服务器消息队列,需要先进行配置。
配置方法:在D:\apache-activemq-5.14.0\conf目录的jetty.xml配置文件中,把133开始的那段注释去掉即可。