热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

DotNetCore中使用RabbitMQ

上一篇随笔记录到RabbitMQ的安装,安装完成,我们就开始使用吧。RabbitMQ简介AMQP,即AdvancedMessageQueuingProtocol,高级消息队列协议,

  上一篇随笔记录到RabbitMQ的安装,安装完成,我们就开始使用吧。

RabbitMQ简介

  AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

  AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

  RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

DotNet Core使用RabbitMQ

通过nuget安装:https://www.nuget.org/packages/RabbitMQ.Client/

定义生产者:

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",//用户名
    Password = "guest",//密码
    HostName = "127.0.0.1"//rabbitmq ip
};

//创建连接
var cOnnection= factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
//声明一个队列
channel.QueueDeclare("hello", false, false, false, null);

Console.WriteLine("\nRabbitMQ连接成功,请输入消息,输入exit退出!");

string input;
do
{
    input = Console.ReadLine();

    var sendBytes = Encoding.UTF8.GetBytes(input);
    //发布消息
    channel.BasicPublish("", "hello", null, sendBytes);

} while (input.Trim().ToLower() != "exit");
channel.Close();
connection.Close();

定义消费者:

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",//用户名
    Password = "guest",//密码
    HostName = "127.0.0.1"//rabbitmq ip
};

//创建连接
var cOnnection= factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();

//事件基本消费者
EventingBasicConsumer cOnsumer= new EventingBasicConsumer(channel);

//接收到消息事件
consumer.Received += (ch, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    Console.WriteLine($"收到消息: {message}");
    //确认该消息已被消费
    channel.BasicAck(ea.DeliveryTag, false);
};
//启动消费者 设置为手动应答消息
channel.BasicConsume("hello", false, consumer);
Console.WriteLine("消费者已启动");
Console.ReadKey();
channel.Dispose();
connection.Close();

演示如下:

DotNet Core中使用RabbitMQ

启动了一个生产者,两个消费者,可以看见两个消费者都能接收到消息,消息投递到哪个消费者是由RabbitMQ决定的。

RabbitMQ消费失败的处理

  RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么RabbitMQ会将这个消息重新投递。

我们来修改一下消费者的代码:

 //接收到消息事件
 consumer.Received += (ch, ea) =>
 {
     var message = Encoding.UTF8.GetString(ea.Body);

     Console.WriteLine($"收到消息: {message}");

     Console.WriteLine($"收到该消息[{ea.DeliveryTag}] 延迟10s发送回执");
     Thread.Sleep(10000);
     //确认该消息已被消费
     channel.BasicAck(ea.DeliveryTag, false);
     Console.WriteLine($"已发送回执[{ea.DeliveryTag}]");
 };

演示如下:

DotNet Core中使用RabbitMQ

从图中可以看出,设置了消息应答延迟10s,如果在这10s中,该消费者断开了连接,那么消息会被RabbitMQ重新投递。

使用RabbitMQ的Exchange

前面的例子,我们可以看到生产者将消息投递到Queue中,实际上这种方式在RabbitMQ中永远都不会发生的。实际的情况是,生产者将消息发送到Exchange(交换器),下图中的X,由Exchange(交换器)将消息路由到一个或多个Queue中(或者丢弃)。

DotNet Core中使用RabbitMQ

 

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

Exchange Types(交换器类型)

RabbitMQ常用的Exchange Type有Fanout、Direct、Topic、Headers这四种

1、Fanout:

  这种类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,这时Routing key不起作用

 DotNet Core中使用RabbitMQ

 

 

Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。

所以,Fanout Exchange 转发消息是最快的。

为了演示效果,定义了两个队列,分别为hello1,hello2,每个队列都拥有一个消费者。

static void Main(string[] args)
{
    string exchangeName = "TestFanoutChange";
    string queueName1 = "hello1";
    string queueName2 = "hello2";
    string routeKey = "";

    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "guest",//用户名
        Password = "guest",//密码
        HostName = "127.0.0.1"//rabbitmq ip
    };

    //创建连接
    var cOnnection= factory.CreateConnection();
    //创建通道
    var channel = connection.CreateModel();

    //定义一个Direct类型交换机
    channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);

    //定义队列1
    channel.QueueDeclare(queueName1, false, false, false, null);
    //定义队列2
    channel.QueueDeclare(queueName2, false, false, false, null);

    //将队列绑定到交换机
    channel.QueueBind(queueName1, exchangeName, routeKey, null);
    channel.QueueBind(queueName2, exchangeName, routeKey, null);

    //生成两个队列的消费者
    ConsumerGenerator(queueName1);
    ConsumerGenerator(queueName2);


    Console.WriteLine($"\nRabbitMQ连接成功,\n\n请输入消息,输入exit退出!");

    string input;
    do
    {
        input = Console.ReadLine();

        var sendBytes = Encoding.UTF8.GetBytes(input);
        //发布消息
        channel.BasicPublish(exchangeName, routeKey, null, sendBytes);

    } while (input.Trim().ToLower() != "exit");
    channel.Close();
    connection.Close();
}
 /// 
 /// 根据队列名称生成消费者
 /// 
 /// 
 static void ConsumerGenerator(string queueName)
 {
     //创建连接工厂
     ConnectionFactory factory = new ConnectionFactory
     {
         UserName = "guest",//用户名
         Password = "guest",//密码
         HostName = "127.0.0.1"//rabbitmq ip
     };

     //创建连接
     var cOnnection= factory.CreateConnection();
     //创建通道
     var channel = connection.CreateModel();

     //事件基本消费者
     EventingBasicConsumer cOnsumer= new EventingBasicConsumer(channel);

     //接收到消息事件
     consumer.Received += (ch, ea) =>
     {
         var message = Encoding.UTF8.GetString(ea.Body);

         Console.WriteLine($"Queue:{queueName}收到消息: {message}");
         //确认该消息已被消费
         channel.BasicAck(ea.DeliveryTag, false);
     };
     //启动消费者 设置为手动应答消息
     channel.BasicConsume(queueName, false, consumer);
     Console.WriteLine($"Queue:{queueName},消费者已启动");
 }

运行效果如下:

DotNet Core中使用RabbitMQ

2、Direct

  这种类型的Exchange路由规则也很简单,它会把消息路由到哪些binding key与routingkey完全匹配的Queue中。

DotNet Core中使用RabbitMQ

 

   Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

static void Main(string[] args)
{
    string exchangeName = "TestChange";
    string queueName = "hello";
    string routeKey = "helloRouteKey";

    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "guest",//用户名
        Password = "guest",//密码
        HostName = "127.0.0.1"//rabbitmq ip
    };

    //创建连接
    var cOnnection= factory.CreateConnection();
    //创建通道
    var channel = connection.CreateModel();

    //定义一个Direct类型交换机
    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);

    //定义一个队列
    channel.QueueDeclare(queueName, false, false, false, null);

    //将队列绑定到交换机
    channel.QueueBind(queueName, exchangeName, routeKey, null);

    Console.WriteLine($"\nRabbitMQ连接成功,Exchange:{exchangeName},Queue:{queueName},Route:{routeKey},\n\n请输入消息,输入exit退出!");

    string input;
    do
    {
        input = Console.ReadLine();

        var sendBytes = Encoding.UTF8.GetBytes(input);
        //发布消息
        channel.BasicPublish(exchangeName, routeKey, null, sendBytes);

    } while (input.Trim().ToLower() != "exit");
    channel.Close();
    connection.Close();

运行效果如下:

DotNet Core中使用RabbitMQ

3、Topic

  这种类型的Exchange的路由规则支持 binding key 和 routing key 的模糊匹配,会把消息路由到满足条件的Queue。 binding key 中可以存在两种特殊字符 *与 #,用于做模糊匹配,其中 * 用于匹配一个单词,# 用于匹配0个或多个单词,单词以符号“.”为分隔符。

DotNet Core中使用RabbitMQ

  以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1与Q2,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。

  所以,Topic Exchange使用非常灵活。
static void Main(string[] args)
{
    string exchangeName = "TestTopicChange";
    string queueName = "hello";
    string routeKey = "TestRouteKey.*";

    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory
    {
        UserName = "guest",//用户名
        Password = "guest",//密码
        HostName = "127.0.0.1"//rabbitmq ip
    };

    //创建连接
    var cOnnection= factory.CreateConnection();
    //创建通道
    var channel = connection.CreateModel();

    //定义一个Direct类型交换机
    channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);

    //定义队列1
    channel.QueueDeclare(queueName, false, false, false, null);

    //将队列绑定到交换机
    channel.QueueBind(queueName, exchangeName, routeKey, null);



    Console.WriteLine($"\nRabbitMQ连接成功,\n\n请输入消息,输入exit退出!");

    string input;
    do
    {
        input = Console.ReadLine();

        var sendBytes = Encoding.UTF8.GetBytes(input);
        //发布消息
        channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);

    } while (input.Trim().ToLower() != "exit");
    channel.Close();
    connection.Close();
}

运行效果如下:

DotNet Core中使用RabbitMQ

 4、Headers

  这种类型的Exchange不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。

参考:

  官网:https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

    https://www.cnblogs.com/stulzq/p/7551819.html

    https://www.jianshu.com/p/e55e971aebd8


推荐阅读
  • 1、概述首先和大家一起回顾一下Java消息服务,在我之前的博客《Java消息队列-JMS概述》中,我为大家分析了:然后在另一篇博客《Java消息队列-ActiveMq实战》中 ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • 云原生应用最佳开发实践之十二原则(12factor)
    目录简介一、基准代码二、依赖三、配置四、后端配置五、构建、发布、运行六、进程七、端口绑定八、并发九、易处理十、开发与线上环境等价十一、日志十二、进程管理当 ... [详细]
  • RabbitMQ的消息持久化处理
    1、RabbitMQ的消息持久化处理,消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。2、auto ... [详细]
  • 消息中间件RabbitMQ 高级特性之消费端ACK与重回队列
    什么是消费端的ACK和重回队列?消费端的手工ACK和NACK消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿如果由于服务器宕机等严重问题 ... [详细]
  • 讨伐Java多线程与高并发——MQ篇
    本文是学习Java多线程与高并发知识时做的笔记。这部分内容比较多,按照内容分为5个部分:多线程基础篇JUC篇同步容器和并发容器篇线程池篇MQ篇本篇 ... [详细]
  • Hadoop——Hive简介和环境配置
    一、Hive的简介和配置1.简介Hive是构建在Hadoop之上的数据操作平台lHive是一个SQL解析引擎,它将SQL转译成MapReduce作业,并 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • 篇首语:本文由编程笔记#小编为大家整理,主要介绍了软件测试知识点之数据库压力测试方法小结相关的知识,希望对你有一定的参考价值。 ... [详细]
  • Allegro总结:1.防焊层(SolderMask):又称绿油层,PCB非布线层,用于制成丝网印板,将不需要焊接的地方涂上防焊剂.在防焊层上预留的焊盘大小要比实际的焊盘大一些,其差值一般 ... [详细]
  • 导读:在编程的世界里,语言纷繁多样,而大部分真正广泛流行的语言并不是那些学术界的产物,而是在通过自由发挥设计出来的。和那些 ... [详细]
  • 只使用’if-else’语句的’else’部分是否可以接受?有时,我觉得检查所有条件是否都是真的更容易,但是只处理“其他”情况。我想 ... [详细]
  • ruby 输出彩色内容到控制台
    程序输出控制台时,为了区分输出信息的严重程度,可以使用颜色、符号等来做标识。ruby也支持设置输出内容的颜色,比如运行以下代码:以下内容是百度到的,因发现很多博客都是同样的写法,所 ... [详细]
author-avatar
葬宝御用小奶瓶2004
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有