热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

ASP.NETCore微服务之基于EasyNetQ使用RabbitMQ消息队列

asp,net,core,微服,务,之,基于,easynetq,使用,rabb

_Tip:_此篇已加入.NET Core微服务基础系列文章索引

一、消息队列与RabbitMQ

1.1 消息队列

消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中,“消息队列”是在消息的传输过程中保存消息的容器

消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:

当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候。

消息队列主要解决了应用耦合、异步处理、流量削锋等问题。当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分数据库如Redis、Mysql以及phxsql也可实现消息队列的功能。更多详细内容请参考:《消息队列及其应用场景介绍》

我也在前几年写过一篇基于Redis做消息队列的文章,对消息队列的一个应用场景做了介绍,没有了解过的童鞋可以看看。

1.2 RabbitMQ

RabbitMQ是一款基于AMQP(高级消息队列协议),由Erlang开发的开源消息队列组件。是一款优秀的消息队列组件,他由两部分组成:服务端和客户端,客户端支持多种语言的驱动,如:.Net、JAVA、Erlang等。

网上有很多性能比较的文章,例如在1百万条1k的消息下,每秒种的收发情况如下图所示:

这里不过多介绍RabbitMQ,有关RabbitMQ的一些需要了解的概念你可以通过下面的文章了解:

颜圣杰,《RabbitMQ知多少》

如果你想了解RabbitMQ与Kafka的对比,可以阅读这篇文章:《开源软件成熟度评测报告-分布式消息中间件》

而EasyNetQ呢,它是一款基于RabbitMQ.Client封装的API库,正如其名,使用起来比较Easy,它把原RabbitMQ.Client中的很多操作都进行了再次封装,让开发人员减少了很多工作量。

二、RabbitMQ的安装

2.1 Linux下的安装

这里不演示如何在Linux下安装,但推荐生产环境使用Linux,下面是一些参考资料:

mcgrady,《Linux下RabbitMQ的安装》

晓晨Master,《.NET Core使用RabbitMQ》

牛头人,《Linux安装RabbitMQ》

一只猪儿虫,《RabbitMQ Linux安装》

2.2 Windows下的安装

开发环境下,我一般使用Windows Server虚拟机,所以这里说明下如何在Windows下安装:

(1)下载Erlang和RabbitMQ (这里我选则的并非最新版本,而是etp20.3和rabbitmq3.7.5)

(2)首先安装Erlang,然后添加环境变量(如果添加了,则skip这一步)并加到PATH中

(3)其次安装RabbitMQ,一路Next,安装完成后也为其添加环境变量并添加到PATH中

(4)检查是否安装成功:rabbitmqctl status

这里我碰到了如下的错误:

解决方法:

更正erlang.COOKIE文件,详情请参考:https://blog.csdn.net/u012637358/article/details/80078610

最终状态:

检查Windows服务,发现已经自动注册了一个服务:

(5)激活Web管理插件,然后检查是否可见(http://127.0.0.1:15672)

2.3 一些必要的配置

(1)使用默认账号:guest/guest登录进去,添加一个新用户(Administrator权限),并设置其Permission

(2)添加新的虚拟机(默认为/,这里我添加一个名为EDCVHOST的虚拟机)

(3)绑定新添加的用户到新的虚拟机上,接下来在我们的程序中就主要使用admin这个用户和EDCVHOST这个虚拟机

*.当然,为了安全考虑,你也可以把guest用户remove掉

三、Quick Start:第一个消息队列

3.1 项目准备

这里为了快速的演示如何使用EasyNetQ,我们来一个QuickStart,准备三个项目:两个Console程序和一个Class Library。

其中,对Publisher和Subscriber项目安装EasyNetQ:

NuGet>Install-Package EasyNetQ

针对Messages类库,新增一个class如下:

 public class TextMessage { public string Text { get; set; } }

3.2 我是Publisher

添加以下代码:

 public class Program { public static void Main(string[] args) { var cOnnStr= "host=192.168.80.71;virtualHost=EDCVHOST;username=admin;password=edison"; using (var bus = RabbitHutch.CreateBus(connStr)) { var input = ""; Console.WriteLine("Please enter a message. 'Quit' to quit."); while ((input = Console.ReadLine()) != "Quit") { bus.Publish(new TextMessage { Text = input }); } } } }

可以看到,我们在其中使用EasyNetQ高度封装的接口创建了一个IBus接口的实例,通过这个IBus实例我们可以通过一个超级Easy的Publish接口进行发布消息。这里主要是读取用户在控制台中输入的消息字符串进行发送。实际中,发送的一般都是一个或多个复杂的实体对象。

3.3 我是Subscriber

添加如下所示代码:

 public class Program { public static void Main(string[] args) { var cOnnStr= "host=192.168.80.71;virtualHost=EDCVHOST;username=admin;password=edison"; using (var bus = RabbitHutch.CreateBus(connStr)) { bus.Subscribe("my_test_subscriptionid", HandleTextMessage); Console.WriteLine("Listening for messages. Hit  to quit."); Console.ReadLine(); } } public static void HandleTextMessage(TextMessage textMessage) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine("Got message: {0}", textMessage.Text); Console.ResetColor(); } }

这里主要是通过IBus实例去订阅消息(这里是除非用户关闭程序否则一直处于监听状态),当发布者发布了指定类型的消息之后,这里就把它打印出来(红色字体显示)。

3.4 简单测试 

通过控制台信息查看结果:

通过RabbitMQ管理界面查看:

(1)通过Connections Tab可以发现我们的两个客户端都在Running中

(2)通过Queues Tab查看目前已有的队列=>可以看到目前我们只注册了一个队列

四、在ASP.NET Core中的使用

4.1 案例结构与说明

这里假设有这样一个场景,客户通过浏览器提交了一个保单,这个保单中包含一些客户信息,ClientService将这些信息处理后发送一个消息到RabbitMQ中,NoticeService和ZAPEngineService订阅了这个消息。NoticeService会将客户信息取出来并获取一些更多信息为客户发送Email,而ZAPEngineService则会根据客户的一些关键信息(比如:年龄,是否吸烟,学历,年收入等等)去数据库读取一些规则来生成一份Question List并存入数据库。

4.2 项目准备工作

创建上面提到的这几个项目,这里我选择ASP.NET Core WebAPI类型。

分别为这几个项目通过NuGet安装EasyNetQ组件,并且通过以下代码注入统一的IBus实例对象:

 public IServiceProvider ConfigureServices(IServiceCollection services) { // IoC - EventBus services.AddSingleton(RabbitHutch.CreateBus(Configuration["MQ:Dev"])); ...... }

这里我将连接字符串写到了配置文件中,请参考上面的QuickStart中的内容。

下面是这个demo用到的一个消息对象实体:通过标签声明队列名称。

 [Queue("Qka.Client", ExchangeName = "Qka.Client")] public class ClientMessage { public int ClientId { get; set; } public string ClientName { get; set; } public string Sex { get; set; } public int Age { get; set; } // N: Non-Smoker, S: Smoker public string SmokerCode { get; set; } // Bachelor, Master, Doctor public string Education { get; set; } public decimal YearIncome { get; set; } }

此外,为了充分简化代码量,EasyNetQ提供了一个AutoSubscriber的方式,可以通过接口和标签快速地让一个类成为Consumer。详细内容参考:https://github.com/EasyNetQ/EasyNetQ/wiki/Auto-Subscriber

这里为了快速的在项目中使用Subscriber,添加一个扩展方法,它会从注入的服务中取出IBus实例对象,并自动帮我们进行Subscriber(那些实现了IConsume接口的类)的注册。具体用法见后面的介绍。

 public static class AppBuilderExtension { public static IApplicationBuilder UseSubscribe(this IApplicationBuilder appBuilder, string subscriptionIdPrefix, Assembly assembly) { var services = appBuilder.ApplicationServices.CreateScope().ServiceProvider; var lifeTime = services.GetService(); var bus = services.GetService(); lifeTime.ApplicationStarted.Register(() => { var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix); subscriber.Subscribe(assembly); subscriber.SubscribeAsync(assembly); }); lifeTime.ApplicationStopped.Register(() => bus.Dispose()); return appBuilder; } }

4.3 Publisher:ClientService

ClientService作为消费者,这里假设我们在API中处理完业务代码后,将message发布给RabbitMQ:

 [Produces("application/json")] [Route("api/Client")] public class ClientController : Controller { private readonly IClientService clientService; private readonly IBus bus; public ClientController(IClientService _clientService, IBus _bus) { clientService = _clientService; bus = _bus; } ...... [HttpPost] public async Task Post([FromBody]ClientDTO clientDto) { // Business Logic here... // eg.Add new client to your service databases via EF // Sample Publish ClientMessage message = new ClientMessage { ClientId = clientDto.Id.Value, ClientName = clientDto.Name, Sex = clientDto.Sex, Age = 29, SmokerCode = "N", Education = "Master", YearIncome = 100000 }; await bus.PublishAsync(message); return "Add Client Success! You will receive some letter later."; } }

当然,你可以使用同步方法:bus.Publish(message);

4.4 Subscriber: NoticeService & ZAPEngineService

(1)NoticeService:新增一个实现IConsume接口的Consumer类

 public class ClientMessageConsumer: IConsumeAsync { [AutoSubscriberConsumer(SubscriptiOnId= "ClientMessageService.Notice")] public Task ConsumeAsync(ClientMessage message) { // Your business logic code here // eg.Build one email to client via SMTP service // Sample console code System.Console.ForegroundColor = System.ConsoleColor.Red; System.Console.WriteLine("Consume one message from RabbitMQ : {0}, I will send one email to client.", message.ClientName); System.Console.ResetColor(); return Task.CompletedTask; } }

这里为了演示效果,增加了一些输出信息的代码,下面的ZAPEngineService也是一样,不再赘述。

(2)ZAPEngineService:新增一个实现IConsume接口的Consumer类

 public class ClientMessageConsumer : IConsumeAsync { [AutoSubscriberConsumer(SubscriptiOnId= "ClientMessageService.ZapQuestion")] public Task ConsumeAsync(ClientMessage message) { // Your business logic code here // eg.Generate one ZAP question records into database and send to client // Sample console code System.Console.ForegroundColor = System.ConsoleColor.Red; System.Console.WriteLine("Consume one message from RabbitMQ : {0}, I will generate one ZAP question list to client", message.ClientName); System.Console.ResetColor(); return Task.CompletedTask; } }

注意两个Consumer的SubscriptionId不能一样,否则无法接受到消息。

(3)为两个Consumer使用扩展方法:UseSubscribe

 public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime) { ...... // easyNetQ app.UseSubscribe("ClientMessageService", Assembly.GetExecutingAssembly()); }

4.5 简单测试

(1)借助Postman向ClientService发起Post请求

(2)查看NoticeService的日志信息

(3)查看ZAPEngineService的日志信息

(4)查看RabbitMQ的管理控制台:

五、小结

本篇超级简单地介绍了一下消息队列与RabbitMQ,通过使用EasyNetQ这个基于RabbitMQ.Client的客户端做了一个QuickStart演示了在.NET Core环境下如何进行消息的发布与订阅,并通过一个微服务的小案例演示了如何在ASP.NET Core环境下如何基于EasyNetQ完成消息的发布与订阅,看起来就像一个类似于简单的事件总线。当然,本篇的内容都十分基础,如果要应用好RabbitMQ,还得把那些基础概念(如:Channel,Exchange等)弄清楚,然后去理解一下事件总线的概念,实际中还得考虑数据一致性等等,路途漫漫,继续加油吧!

示例代码

Click Here => 点我下载

参考资料

EasyNetQ官方文档:https://github.com/EasyNetQ/EasyNetQ/wiki/Introduction

focus-lei,《.net core使用EasyNetQ做EventBus》

常山造纸农,《RabbitMQ安装配置和基于EasyNetQ驱动的基础使用》


推荐阅读
  • RocketMQ在秒杀时的应用
    目录一、RocketMQ是什么二、broker和nameserver2.1Broker2.2NameServer三、MQ在秒杀场景下的应用3.1利用MQ进行异步操作3. ... [详细]
  • 在当今的软件开发领域,分布式技术已成为程序员不可或缺的核心技能之一,尤其在面试中更是考察的重点。无论是小微企业还是大型企业,掌握分布式技术对于提升工作效率和解决实际问题都至关重要。本周的Java架构师实战训练营中,我们深入探讨了Kafka这一高效的分布式消息系统,它不仅支持发布订阅模式,还能在高并发场景下保持高性能和高可靠性。通过实际案例和代码演练,学员们对Kafka的应用有了更加深刻的理解。 ... [详细]
  • 本文详细介绍了Java代码分层的基本概念和常见分层模式,特别是MVC模式。同时探讨了不同项目需求下的分层策略,帮助读者更好地理解和应用Java分层思想。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 修复一个 Bug 竟耗时两天?真的有那么复杂吗?
    修复一个 Bug 竟然耗费了两天时间?这背后究竟隐藏着怎样的复杂性?本文将深入探讨这个看似简单的 Bug 为何会如此棘手,从代码层面剖析问题根源,并分享解决过程中遇到的技术挑战和心得。 ... [详细]
  • 一文了解消息中间件RabbitMQ
    消息中间件---RabbitMQ1消息中间件的作用2.常用的消息中间件3消息中间件RabbitMQ3.1RabbitMQ介绍3.3RabbitMQ的队列模式3.3RabbitMQ的 ... [详细]
  • 启动activemq_「Java」SpringBoot amp; ActiveMQ
    一、消息队列消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构, ... [详细]
  • Web开发框架概览:Java与JavaScript技术及框架综述
    Web开发涉及服务器端和客户端的协同工作。在服务器端,Java是一种优秀的编程语言,适用于构建各种功能模块,如通过Servlet实现特定服务。客户端则主要依赖HTML进行内容展示,同时借助JavaScript增强交互性和动态效果。此外,现代Web开发还广泛使用各种框架和库,如Spring Boot、React和Vue.js,以提高开发效率和应用性能。 ... [详细]
  • 2021年Java开发实战:当前时间戳转换方法详解与实用网址推荐
    在当前的就业市场中,金九银十过后,金三银四也即将到来。本文将分享一些实用的面试技巧和题目,特别是针对正在寻找新工作机会的Java开发者。作者在准备字节跳动的面试过程中积累了丰富的经验,并成功获得了Offer。文中详细介绍了如何将当前时间戳进行转换的方法,并推荐了一些实用的在线资源,帮助读者更好地应对技术面试。 ... [详细]
  • 提升Android开发效率:Clean Code的最佳实践与应用
    在Android开发中,提高代码质量和开发效率是至关重要的。本文介绍了如何通过Clean Code的最佳实践来优化Android应用的开发流程。以SQLite数据库操作为例,详细探讨了如何编写高效、可维护的SQL查询语句,并将其结果封装为Java对象。通过遵循这些最佳实践,开发者可以显著提升代码的可读性和可维护性,从而加快开发速度并减少错误。 ... [详细]
  • 深入解析十大经典排序算法:动画演示、原理分析与代码实现
    本文深入探讨了十种经典的排序算法,不仅通过动画直观展示了每种算法的运行过程,还详细解析了其背后的原理与机制,并提供了相应的代码实现,帮助读者全面理解和掌握这些算法的核心要点。 ... [详细]
  • SpringBoot非官方教程|终章:文章汇总springboot非官方教程,可能最接近于官方的一个教程,大多数案例都来自于官方文档,为了更好的理解,加入了个人的改造。码云下载:htt ... [详细]
  • 字节Java高级岗:java开发cpu吃多线程吗
    前言抱着侥幸心理投了字节跳动后台JAVA开发岗,居然收到通知去面试,一面下整个人来都是懵逼的,不知道我对着面试官都说了些啥(捂脸~~)。侥幸一面居然过了,三天后接到二面通知,结果这 ... [详细]
  • SpringCloud之Bus(消息总线)
    说明:关于SpringCloud系列的文章中的代码都在码云上面地址:https:gitee.comzh_0209_javaspringcloud-ali ... [详细]
  • 深入解析队列机制及其广泛的应用场景
    本文深入探讨了队列机制的核心原理及其在多种应用场景中的广泛应用。首先,文章详细解析了队列的基本概念、操作方法及其时间复杂度。接着,通过具体实例,阐述了队列在操作系统任务调度、网络通信、事件处理等领域的实际应用。此外,文章还对比了队列与其他常见数据结构(如栈、数组和链表)的优缺点,帮助读者更好地理解和选择合适的数据结构。最后,通过具体的编程示例,进一步巩固了对队列机制的理解和应用。 ... [详细]
author-avatar
曾巧红--------
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有