热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

ActiveMqC#客户端消息队列的使用(存和取)

1、准备工具VS2013Apache.NMS.ActiveMQ-1.7.2-bin.zipapache-activemq-5.14.0-bin.zip
1、准备工具
 
  1. VS2013
  2. Apache.NMS.ActiveMQ-1.7.2-bin.zip
  3. apache-activemq-5.14.0-bin.zip
 
2、开始项目
 
VS2013新建一个C#控制台应用程序,项目中添加两个dll引用,一个是D:\Apache.NMS.ActiveMQ-1.7.2-bin\lib\Apache.NMS\net-4.0目录下的Apache.NMS.dll,另一个是D:\Apache.NMS.ActiveMQ-1.7.2-bin\build\net-4.0\debug目录下的Apache.NMS.ActiveMQ.dll。
新建一个类,MyActiveMq.cs,用于对activemq消息队列接口的封装,实现如下:
[csharp]  view plain  copy
 
  1. using System;  
  2. using System.Collections.Generic;  
  3. using System.Linq;  
  4. using System.Text;  
  5. using System.Threading.Tasks;  
  6.   
  7. using Apache.NMS;  
  8. using Apache.NMS.ActiveMQ;  
  9.   
  10. namespace NmsProducerClasses  
  11. {  
  12.     public class MyActiveMq  
  13.     {  
  14.         private IConnectionFactory factory;  
  15.         private IConnection connection;  
  16.         private ISession session;  
  17.         private IMessageProducer prod;  
  18.         private IMessageConsumer consumer;  
  19.         private ITextMessage msg;  
  20.   
  21.         private bool isTopic = false;  
  22.         private bool hasSelector = false;  
  23.         private const string ClientID = "clientid";  
  24.         private const string Selector = "filter='demo'";  
  25.         private bool sendSuccess = true;  
  26.         private bool receiveSuccess = true;  
  27.   
  28.         public MyActiveMq(bool isLocalMachine, string remoteAddress)  
  29.         {  
  30.             try  
  31.             {  
  32.                 //初始化工厂     
  33.                 if (isLocalMachine)  
  34.                 {  
  35.                     factory = new ConnectionFactory("tcp://localhost:61616/");  
  36.                 }  
  37.                 else  
  38.                 {  
  39.                     factory = new ConnectionFactory("tcp://" + remoteAddress + ":61616/"); //写tcp://192.168.1.111:61616的形式连接其他服务器上的ActiveMQ服务器             
  40.                 }  
  41.                 //通过工厂建立连接  
  42.                 connection = factory.CreateConnection();  
  43.                 connection.ClientId = ClientID;  
  44.                 connection.Start();  
  45.                 //通过连接创建Session会话  
  46.                 session = connection.CreateSession();  
  47.             }  
  48.             catch (System.Exception e)  
  49.             {  
  50.                 sendSuccess = false;  
  51.                 receiveSuccess = false;  
  52.                 Console.WriteLine("Exception:{0}", e.Message);  
  53.                 Console.ReadLine();  
  54.                 throw e;  
  55.             }  
  56.             Console.WriteLine("Begin connection...");  
  57.         }  
  58.   
  59.   
  60.         ~MyActiveMq()  
  61.         {  
  62.             //this.ShutDown();  
  63.         }  
  64.   
  65.         ///   
  66.         /// 初始化  
  67.         ///   
  68.         /// 选择是否是Topic  
  69.         /// 队列名  
  70.         /// 是否设置过滤  
  71.         public bool InitQueueOrTopic(bool topic, string name, bool selector = false)  
  72.         {  
  73.             try  
  74.             {  
  75.                 //通过会话创建生产者、消费者  
  76.                 if (topic)  
  77.                 {  
  78.                     prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name));  
  79.                     if (selector)  
  80.                     {  
  81.                         consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, Selector, false);  
  82.                         hasSelector = true;  
  83.                     }  
  84.                     else  
  85.                     {  
  86.                         consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, null, false);  
  87.                         hasSelector = false;  
  88.                     }  
  89.                     isTopic = true;  
  90.                 }  
  91.                 else  
  92.                 {  
  93.                     prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));  
  94.                     if (selector)  
  95.                     {  
  96.                         consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name), Selector);  
  97.                         hasSelector = true;  
  98.                     }  
  99.                     else  
  100.                     {  
  101.                         consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));  
  102.                         hasSelector = false;  
  103.                     }  
  104.                     isTopic = false;  
  105.                 }  
  106.                 //创建一个发送的消息对象  
  107.                 msg = prod.CreateTextMessage();  
  108.             }  
  109.             catch (System.Exception e)  
  110.             {  
  111.                 sendSuccess = false;  
  112.                 receiveSuccess = false;  
  113.                 Console.WriteLine("Exception:{0}", e.Message);  
  114.                 Console.ReadLine();  
  115.                 throw e;  
  116.             }  
  117.   
  118.             return sendSuccess;  
  119.         }  
  120.   
  121.   
  122.         public bool SendMessage(string message, string msgId = "defult", MsgPriority priority = MsgPriority.Normal)  
  123.         {  
  124.             if (prod == null)  
  125.             {  
  126.                 sendSuccess = false;  
  127.                 Console.WriteLine("call InitQueueOrTopic() first!!");  
  128.                 return false;  
  129.             }  
  130.   
  131.             Console.WriteLine("Begin send messages...");  
  132.   
  133.             //给这个对象赋实际的消息  
  134.             msg.NMSCorrelationID = msgId;  
  135.             msg.Properties["MyID"] = msgId;  
  136.             msg.NMSMessageId = msgId;  
  137.             msg.Text = message;  
  138.             Console.WriteLine(message);  
  139.   
  140.             if (isTopic)  
  141.             {  
  142.                 sendSuccess = ProducerSubcriber(message, priority);  
  143.             }  
  144.             else  
  145.             {  
  146.                 sendSuccess = P2P(message, priority);  
  147.             }  
  148.   
  149.             return sendSuccess;  
  150.         }  
  151.   
  152.   
  153.         public string GetMessage()  
  154.         {  
  155.             if (prod == null)  
  156.             {  
  157.                 Console.WriteLine("call InitQueueOrTopic() first!!");  
  158.                 return null;  
  159.             }  
  160.   
  161.             Console.WriteLine("Begin receive messages...");  
  162.             ITextMessage revMessage = null;  
  163.             try  
  164.             {  
  165.                 //同步阻塞10ms,没消息就直接返回null,注意此处时间不能设太短,否则还没取到消息就直接返回null了!!!  
  166.                 revMessage = consumer.Receive(new TimeSpan(TimeSpan.TicksPerMillisecond *10)) as ITextMessage;   
  167.             }  
  168.             catch (System.Exception e)  
  169.             {  
  170.                 receiveSuccess = false;  
  171.                 Console.WriteLine("Exception:{0}", e.Message);  
  172.                 Console.ReadLine();  
  173.                 throw e;  
  174.             }  
  175.   
  176.             if (revMessage == null)  
  177.             {  
  178.                 Console.WriteLine("No message received!");  
  179.                 return null;  
  180.             }  
  181.             else  
  182.             {  
  183.                 Console.WriteLine("Received message with Correlation ID: " + revMessage.NMSCorrelationID);  
  184.                 //Console.WriteLine("Received message with Properties'ID: " + revMessage.Properties["MyID"]);  
  185.                 Console.WriteLine("Received message with text: " + revMessage.Text);  
  186.             }  
  187.   
  188.             return revMessage.Text;  
  189.         }  
  190.   
  191.         //P2P模式,一个生产者对应一个消费者  
  192.         private bool P2P(string message, MsgPriority priority)  
  193.         {  
  194.             try  
  195.             {  
  196.                 if (hasSelector)  
  197.                 {  
  198.                     //设置消息对象的属性,这个很重要,是Queue的过滤条件,也是P2P消息的唯一指定属性  
  199.                     msg.Properties.SetString("filter", "demo");  //P2P模式  
  200.                 }  
  201.                 prod.Priority = priority;  
  202.                 //设置持久化  
  203.                 prod.DeliveryMode = MsgDeliveryMode.Persistent;  
  204.                 //生产者把消息发送出去,几个枚举参数MsgDeliveryMode是否持久化,MsgPriority消息优先级别,存活时间,当然还有其他重载  
  205.                 prod.Send(msg, MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue);  
  206.             }  
  207.             catch (System.Exception e)  
  208.             {  
  209.                 sendSuccess = false;  
  210.                 Console.WriteLine("Exception:{0}", e.Message);  
  211.                 Console.ReadLine();  
  212.                 throw e;  
  213.             }  
  214.   
  215.             return sendSuccess;  
  216.         }  
  217.   
  218.   
  219.         //发布订阅模式,一个生产者多个消费者   
  220.         private bool ProducerSubcriber(string message, MsgPriority priority)  
  221.         {  
  222.             try  
  223.             {  
  224.                 prod.Priority = priority;  
  225.                 //设置持久化,如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失  
  226.                 prod.DeliveryMode = MsgDeliveryMode.Persistent;  
  227.                 prod.Send(msg, Apache.NMS.MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue);  
  228.                 //System.Threading.Thread.Sleep(1000);    
  229.             }  
  230.             catch (System.Exception e)  
  231.             {  
  232.                 sendSuccess = false;  
  233.                 Console.WriteLine("Exception:{0}", e.Message);  
  234.                 Console.ReadLine();  
  235.                 throw e;  
  236.             }  
  237.   
  238.             return sendSuccess;  
  239.         }  
  240.   
  241.   
  242.         public void ShutDown()  
  243.         {  
  244.             Console.WriteLine("Close connection and session...");  
  245.             session.Close();  
  246.             connection.Close();  
  247.         }  
  248.     }  
  249. }  


Program.cs代码如下:
 
[csharp]  view plain  copy
 
  1. using System;  
  2. using System.Collections.Generic;  
  3. using System.Linq;  
  4. using System.Text;  
  5. using System.Threading.Tasks;  
  6. using System.IO;  
  7. using System.Threading;  
  8.   
  9. namespace NmsProducerClasses  
  10. {  
  11.     class Program  
  12.     {  
  13.         static void Main(string[] args)  
  14.         {  
  15.             MyActiveMq mymq = new MyActiveMq(isLocalMachine: true, remoteAddress: "");  
  16.   
  17.             mymq.InitQueueOrTopic(topic: false, name: "myqueue", selector: false);  
  18.             //mymq.InitQueueOrTopic(topic: false, name: "seletorqueue", selector: true);   
  19.             //mymq.InitQueueOrTopic(topic: true, name: "noselectortopic", selector: false);  
  20.             //mymq.InitQueueOrTopic(topic: true, name: "selectortopic", selector: true);  
  21.   
  22.             //The full range of priority values (0-9) are supported by the JDBC message store. For KahaDB three priority categories are supported, Low (< 4), Default (= 4) and High (> 4).  
  23.             User myuser0 = new User("0000", "Lowest", "img/p.jpg");  
  24.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser0), "newid", priority: Apache.NMS.MsgPriority.Lowest);  
  25.             User myuser1 = new User("1111", "AboveLow", "img/p.jpg");  
  26.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser1), "newid", priority: Apache.NMS.MsgPriority.AboveLow);  
  27.             User myuser2 = new User("2222", "AboveNormal", "img/p.jpg");  
  28.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser2), "newid", priority: Apache.NMS.MsgPriority.AboveNormal);  
  29.             User myuser3 = new User("0000", "BelowNormal", "img/p.jpg");  
  30.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser3), "newid", priority: Apache.NMS.MsgPriority.BelowNormal);  
  31.             User myuser4 = new User("1111", "High", "img/p.jpg");  
  32.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser4), "newid", priority: Apache.NMS.MsgPriority.High);  
  33.             User myuser5 = new User("2222", "Highest", "img/p.jpg");  
  34.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser5), "newid", priority: Apache.NMS.MsgPriority.Highest);  
  35.             User myuser6 = new User("0000", "Low", "img/p.jpg");  
  36.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser6), "newid", priority: Apache.NMS.MsgPriority.Low);  
  37.             User myuser7 = new User("1111", "Normal", "img/p.jpg");  
  38.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser7), "newid", priority: Apache.NMS.MsgPriority.Normal);  
  39.             User myuser8 = new User("2222", "VeryHigh", "img/p.jpg");  
  40.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser8), "newid", priority: Apache.NMS.MsgPriority.VeryHigh);  
  41.             User myuser9 = new User("2222", "VeryLow", "img/p.jpg");  
  42.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser8), "newid", priority: Apache.NMS.MsgPriority.VeryLow);  
  43.   
  44.             int num = 20;  
  45.             while (num-- > 0)  
  46.             {  
  47.                 mymq.GetMessage();  
  48.                 //Thread.Sleep(1000);  
  49.             }  
  50.             mymq.ShutDown();  
  51.               
  52.   
  53.             //XML测试  
  54.             //string xml = XmlTest.ObjToXml();  
  55.             //Console.WriteLine("ObjToXml: {0}", xml);  
  56.   
  57.             //Json测试  
  58.             //User u = new User() { Id="88", Imgurl="img/88.jpg", Name="haha88"};  
  59.             //string jsonstr = JsonUtil.ObjectToJson(u);  
  60.             //Console.WriteLine(jsonstr);  
  61.               
  62.         }  
  63.   
  64.     }  
3、测试
首先,需要启动消息队列,具体启动及测试消息队列步骤可见这边: 点击打开链接
然后,运行项目,运行结果如下:
 
 
 
 
4、优先级
priority并不能决定消息传送的严格消息,具体原因可见
http://activemq.apache.org/how-can-i-support-priority-queues.html
http://shift-alt-ctrl.iteye.com/blog/2034440 

优先级设置:

在D:\apache-activemq-5.14.0\conf目录的activemq.xml配置文件中,找到标签,在其中的标签下添加
[html]  view plain  copy
 
  1. <policyEntry queue=">"  producerFlowCOntrol="false" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1" />    
  2. <policyEntry queue=">" strictOrderDispatch="false" />    
  3. <policyEntry queue=">" >    
  4.               <pendingMessageLimitStrategy>    
  5.                   <constantPendingMessageLimitStrategy limit="0"/>    
  6.               pendingMessageLimitStrategy>    
  7.               <messageEvictionStrategy>    
  8.                   <oldestMessageWithLowestPriorityEvictionStrategy/>    
  9.               messageEvictionStrategy>    
  10. policyEntry>    
 
配置完成后,需要重启activemq
 
 
5、远程登录监控
要实现远程监控服务器消息队列,需要先进行配置。
配置方法:在D:\apache-activemq-5.14.0\conf目录的jetty.xml配置文件中,把133开始的那段注释去掉即可。
 

推荐阅读
  • 您的数据库配置是否安全?DBSAT工具助您一臂之力!
    本文探讨了Oracle提供的免费工具DBSAT,该工具能够有效协助用户检测和优化数据库配置的安全性。通过全面的分析和报告,DBSAT帮助用户识别潜在的安全漏洞,并提供针对性的改进建议,确保数据库系统的稳定性和安全性。 ... [详细]
  • 深入解析Struts、Spring与Hibernate三大框架的面试要点与技巧 ... [详细]
  • 在处理 XML 数据时,如果需要解析 `` 标签的内容,可以采用 Pull 解析方法。Pull 解析是一种高效的 XML 解析方式,适用于流式数据处理。具体实现中,可以通过 Java 的 `XmlPullParser` 或其他类似的库来逐步读取和解析 XML 文档中的 `` 元素。这样不仅能够提高解析效率,还能减少内存占用。本文将详细介绍如何使用 Pull 解析方法来提取 `` 标签的内容,并提供一个示例代码,帮助开发者快速解决问题。 ... [详细]
  • QT框架中事件循环机制及事件分发类详解
    在QT框架中,QCoreApplication类作为事件循环的核心组件,为应用程序提供了基础的事件处理机制。该类继承自QObject,负责管理和调度各种事件,确保程序能够响应用户操作和其他系统事件。通过事件循环,QCoreApplication实现了高效的事件分发和处理,使得应用程序能够保持流畅的运行状态。此外,QCoreApplication还提供了多种方法和信号槽机制,方便开发者进行事件的定制和扩展。 ... [详细]
  • 在 Vue 应用开发中,页面状态管理和跨页面数据传递是常见需求。本文将详细介绍 Vue Router 提供的两种有效方式,帮助开发者高效地实现页面间的数据交互与状态同步,同时分享一些最佳实践和注意事项。 ... [详细]
  • 在Android开发中,BroadcastReceiver(广播接收器)是一个重要的组件,广泛应用于多种场景。本文将深入解析BroadcastReceiver的工作原理、应用场景及其具体实现方法,帮助开发者更好地理解和使用这一组件。通过实例分析,文章详细探讨了静态广播的注册方式、生命周期管理以及常见问题的解决策略,为开发者提供全面的技术指导。 ... [详细]
  • 在探讨Hibernate框架的高级特性时,缓存机制和懒加载策略是提升数据操作效率的关键要素。缓存策略能够显著减少数据库访问次数,从而提高应用性能,特别是在处理频繁访问的数据时。Hibernate提供了多层次的缓存支持,包括一级缓存和二级缓存,以满足不同场景下的需求。懒加载策略则通过按需加载关联对象,进一步优化了资源利用和响应时间。本文将深入分析这些机制的实现原理及其最佳实践。 ... [详细]
  • 本文探讨了利用Python实现高效语音识别技术的方法。通过使用先进的语音处理库和算法,本文详细介绍了如何构建一个准确且高效的语音识别系统。提供的代码示例和实验结果展示了该方法在实际应用中的优越性能。相关文件可从以下链接下载:链接:https://pan.baidu.com/s/1RWNVHuXMQleOrEi5vig_bQ,提取码:p57s。 ... [详细]
  • 本文介绍了如何在iOS平台上使用GLSL着色器将YV12格式的视频帧数据转换为RGB格式,并展示了转换后的图像效果。通过详细的技术实现步骤和代码示例,读者可以轻松掌握这一过程,适用于需要进行视频处理的应用开发。 ... [详细]
  • Unity3D 中 AsyncOperation 实现异步场景加载及进度显示优化技巧
    在Unity3D中,通过使用`AsyncOperation`可以实现高效的异步场景加载,并结合进度条显示来提升用户体验。本文详细介绍了如何利用`AsyncOperation`进行异步加载,并提供了优化技巧,包括进度条的动态更新和加载过程中的性能优化方法。此外,还探讨了如何处理加载过程中可能出现的异常情况,确保加载过程的稳定性和可靠性。 ... [详细]
  • Kotlin协程中async和await的常见异常陷阱及正确的异常处理方法
    在Kotlin协程中,`async`和`await`是常用的异步编程工具,尤其是在与Jetpack组件结合时,能够显著简化Android开发中的异步任务处理。然而,不当使用这些工具可能会导致常见的异常陷阱,如未捕获的异常或异常传播问题。本文将深入探讨这些陷阱,并提供有效的异常处理方法,帮助开发者避免潜在的问题,确保应用的稳定性和可靠性。 ... [详细]
  • 在Android开发中,当TextView的高度固定且内容超出时,可以通过设置其内置的滚动条属性来实现垂直滚动功能。具体来说,可以通过配置`android:scrollbars="vertical"`来启用垂直滚动,确保用户能够查看完整的内容。此外,为了优化用户体验,建议结合`setMovementMethod(ScrollerMovementMethod.getInstance())`方法,使滚动操作更加流畅和自然。 ... [详细]
  • 使用 ListView 浏览安卓系统中的回收站文件 ... [详细]
  • Web开发框架概览:Java与JavaScript技术及框架综述
    Web开发涉及服务器端和客户端的协同工作。在服务器端,Java是一种优秀的编程语言,适用于构建各种功能模块,如通过Servlet实现特定服务。客户端则主要依赖HTML进行内容展示,同时借助JavaScript增强交互性和动态效果。此外,现代Web开发还广泛使用各种框架和库,如Spring Boot、React和Vue.js,以提高开发效率和应用性能。 ... [详细]
  • 在使用SSH框架进行项目开发时,经常会遇到一些常见的问题。例如,在Spring配置文件中配置AOP事务声明后,进行单元测试时可能会出现“No Hibernate Session bound to thread”的错误。本文将详细探讨这一问题的原因,并提供有效的解决方案,帮助开发者顺利解决此类问题。 ... [详细]
author-avatar
阿思翠
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有