热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ从零到集群高可用RabbitMQ简介和六种工作模式详解

目录一、RabbitMQ简介二、安装RabbitMQ三、RabbitMQ六种队列模式在.NetCore中使用(1)简单队列(2)工作队列模式(3)发布订阅模式(4)路由模式&#x

目录


  • 一、RabbitMQ简介
  • 二、安装RabbitMQ
  • 三、RabbitMQ六种队列模式在.NetCore中使用
    • (1)简单队列
    • (2)工作队列模式
    • (3)发布订阅模式
    • (4)路由模式(推荐使用)
    • (5)主题模式
    • (6)RPC模式


回到顶部


一、RabbitMQ简介

是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang(高并发语言)语言来编写的,并且RabbitMQ是基于AMQP协议的。

1.1 AMQP协议

Advanced Message Queuing Protocol(高级消息队列协议)


1.2 AMQP专业术语:(多路复用->在同一个线程中开启多个通道进行操作)


  • Server:又称broker,接受客户端的链接,实现AMQP实体服务
  • Connection:连接,应用程序与broker的网络连接
  • Channel:网络信道,几乎所有的操作都在channel中进行,Channel是进行消息读写的通道。客户端可以建立多个channel,每个channel代表一个会话任务。
  • Message:消息,服务器与应用程序之间传送的数据,由Properties和Body组成.Properties可以对消息进行修饰,必须消息的优先级、延迟等高级特性;Body则是消息体内容。
  • virtualhost: 虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个virtual host里面可以有若干个Exchange和Queue,同一个Virtual Host 里面不能有相同名称的Exchange 或 Queue。
  • Exchange:交换机,接收消息,根据路由键转单消息到绑定队列
  • Binding: Exchange和Queue之间的虚拟链接,binding中可以包换routing key
  • Routing key: 一个路由规则,虚拟机可用它来确定如何路由一个特定消息。(如负载均衡)

1.3 RabbitMQ整体架构

ClientA(生产者)发送消息到Exchange1(交换机),同时带上RouteKey(路由Key),Exchange1找到绑定交换机为它和绑定传入的RouteKey的队列,把消息转发到对应的队列,消费者Client1,Client2,Client3只需要指定对应的队列名既可以消费队列数据。

交换机和队列多对多关系,实际开发中一般是一个交换机对多个队列,防止设计复杂化。

回到顶部


二、安装RabbitMQ

安装方式不影响下面的使用,这里用Docker安装

#15672端口为web管理端的端口,5672为RabbitMQ服务的端口
docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:3-management

输入:ip:5672访问验证。

建一个名为develop的Virtual host(虚拟主机)使用,项目中一般是一个项目建一个Virtual host用,能够隔离队列。

切换Virtual host

回到顶部


三、RabbitMQ六种队列模式在.NetCore中使用


(1)简单队列

最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列。也称为点对点模式

描述:一个生产者 P 发送消息到队列 Q,一个消费者 C 接收

建一个RabbitMQHelper.cs类

///

/// RabbitMQ帮助类/// public class RabbitMQHelper{private static ConnectionFactory factory;private static object lockObj = new object();/// /// 获取单个RabbitMQ连接/// /// public static IConnection GetConnection(){if (factory == null){lock (lockObj){if (factory == null){factory = new ConnectionFactory{HostName = "172.16.2.84",//ipPort = 5672,//端口UserName = "admin",//账号Password = "123456",//密码VirtualHost = "develop" //虚拟主机};}}}return factory.CreateConnection();}}

生产者代码:

新建发送类Send.cs

public static void SimpleSendMsg(){string queueName &#61; "simple_order";//队列名//创建连接using (var connection &#61; RabbitMQHelper.GetConnection()){//创建信道using (var channel &#61; connection.CreateModel()){//创建队列channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);for (var i &#61; 0; i <10; i&#43;&#43;){string message &#61; $"Hello RabbitMQ MessageHello,{i &#43; 1}";var body &#61; Encoding.UTF8.GetBytes(message);//发送消息channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: null, body);Console.WriteLine($"发送消息到队列:{queueName},内容:{message}");}}}}

创建队列参数解析&#xff1a;

durable&#xff1a;是否持久化。

exclusive&#xff1a;排他队列&#xff0c;只有创建它的连接(connection)能连&#xff0c;创建它的连接关闭&#xff0c;会自动删除队列。

autoDelete&#xff1a;被消费后&#xff0c;消费者数量都断开时自动删除队列。

arguments&#xff1a;创建队列的参数。

发送消息参数解析&#xff1a;

exchange&#xff1a;交换机&#xff0c;为什么能传空呢&#xff0c;因为RabbitMQ内置有一个默认的交换机&#xff0c;如果传空时&#xff0c;就会用默认交换机。

routingKey&#xff1a;路由名称&#xff0c;这里用队列名称做路由key。

mandatory&#xff1a;true告诉服务器至少将消息route到一个队列种&#xff0c;否则就将消息return给发送者&#xff1b;false&#xff1a;没有找到路由则消息丢弃。

执行效果&#xff1a;

队列产生10条消息。

消费者代码&#xff1a;

新建Recevie.cs类

public static void SimpleConsumer(){string queueName &#61; "simple_order";var connection &#61; RabbitMQHelper.GetConnection();{//创建信道var channel &#61; connection.CreateModel();{//创建队列channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);var consumer &#61; new EventingBasicConsumer(channel);int i &#61; 0;consumer.Received &#43;&#61; (model, ea) &#61;>{//消费者业务处理var message &#61; Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"{i},线程id:{Thread.GetCurrentProcessorId()},队列{queueName}消费消息长度:{message.Length}");i&#43;&#43;;};channel.BasicConsume(queueName, true, consumer);}}}

消费者只需要知道队列名就可以消费了&#xff0c;不需要Exchange和routingKey。

注&#xff1a;消费者这里有一个创建队列&#xff0c;它本身不需要&#xff0c;是预防消费端程序先执行&#xff0c;没有队列会报错。

执行效果&#xff1a;

消息已经被消费完。


(2)工作队列模式

一个消息生产者&#xff0c;一个交换器&#xff0c;一个消息队列&#xff0c;多个消费者。同样也称为点对点模式

生产者P发送消息到队列&#xff0c;多个消费者C消费队列的数据。

工作队列也称为公平性队列模式&#xff0c;循环分发&#xff0c;RabbitMQ 将按顺序将每条消息发送给下一个消费者&#xff0c;每个消费者将获得相同数量的消息。

生产者&#xff1a;

Send.cs代码&#xff1a;

///

/// 工作队列模式/// public static void WorkerSendMsg(){string queueName &#61; "worker_order";//队列名//创建连接using (var connection &#61; RabbitMQHelper.GetConnection()){//创建信道using (var channel &#61; connection.CreateModel()){//创建队列channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);for ( var i&#61;0;i<10;i&#43;&#43;){string message &#61; $"Hello RabbitMQ MessageHello,{i&#43;1}";var body &#61; Encoding.UTF8.GetBytes(message);//发送消息到rabbitmqchannel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: null, body);Console.WriteLine($"发送消息到队列:{queueName},内容:{message}");}}}}

参数durable&#xff1a;true&#xff0c;需要持久化&#xff0c;实际项目中肯定需要持久化的&#xff0c;不然重启RabbitMQ数据就会丢失了。

执行效果&#xff1a;

写入10条数据&#xff0c;有持久化标识D。

消费端&#xff1a;

Recevie代码&#xff1a;

public static void WorkerConsumer(){string queueName &#61; "worker_order";var connection &#61; RabbitMQHelper.GetConnection();{//创建信道var channel &#61; connection.CreateModel();{//创建队列channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);var consumer &#61; new EventingBasicConsumer(channel);//prefetchCount:1来告知RabbitMQ,不要同时给一个消费者推送多于 N 个消息&#xff0c;也确保了消费速度和性能channel.BasicQos(prefetchSize: 0, prefetchCount:1, global: false);int i &#61; 1;int index &#61; new Random().Next(10);consumer.Received &#43;&#61; (model, ea) &#61;>{//处理业务var message &#61; Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"{i},消费者:{index},队列{queueName}消费消息长度:{message.Length}");channel.BasicAck(ea.DeliveryTag, false); //消息ack确认&#xff0c;告诉mq这条队列处理完&#xff0c;可以从mq删除了Thread.Sleep(1000);i&#43;&#43;;};channel.BasicConsume(queueName,autoAck:false, consumer);}}}

BasicQos参数解析&#xff1a;

prefetchSize&#xff1a;每条消息大小&#xff0c;一般设为0&#xff0c;表示不限制。

prefetchCount&#xff1a;1&#xff0c;作用限流&#xff0c;告诉RabbitMQ不要同时给一个消费者推送多于N个消息&#xff0c;消费者会把N条消息缓存到本地一条条消费&#xff0c;如果不设&#xff0c;RabbitMQ会进可能快的把消息推到客户端&#xff0c;导致客户端内存升高。设置合理可以不用频繁从RabbitMQ 获取能提升消费速度和性能&#xff0c;设的太多的话则会增大本地内存&#xff0c;需要根据机器性能合理设置&#xff0c;官方建议设为30。

global:是否为全局设置。

这些限流设置针对消费者autoAck&#xff1a;false时才有效&#xff0c;如果是自动Ack的&#xff0c;限流不生效。

执行两个消费者&#xff0c;效果&#xff1a;

可以看到消费者号的标识&#xff0c;8,2,8,2是平均的&#xff0c;一个消费者5个&#xff0c;RabbitMQ上也能看到有2个消费者&#xff0c;Unacked数是2&#xff0c;因为每个客户端的限流数是1。

工作队列模式也是很常用的队列模式。


(3)发布订阅模式

Pulish/Subscribe&#xff0c;无选择接收消息&#xff0c;一个消息生产者&#xff0c;一个交换机&#xff08;交换机类型为fanout&#xff09;&#xff0c;多个消息队列&#xff0c;多个消费者。称为发布/订阅模式

在应用中&#xff0c;只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播&#xff0c;每台子网内的主机都获得了一份复制的消息。

生产者P只需把消息发送到交换机X&#xff0c;绑定这个交换机的队列都会获得一份一样的数据。

应用场景&#xff1a;适合于用同一份数据源做不同的业务。

生产者代码&#xff1a;

     ///

/// 发布订阅&#xff0c; 扇形队列/// public static void SendMessageFanout(){//创建连接using (var connection &#61; RabbitMQHelper.GetConnection()){//创建信道using (var channel &#61; connection.CreateModel()){string exchangeName &#61; "fanout_exchange";//创建交换机,fanout类型channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);string queueName1 &#61; "fanout_queue1";string queueName2 &#61; "fanout_queue2";string queueName3 &#61; "fanout_queue3";//创建队列channel.QueueDeclare(queueName1, false, false, false);channel.QueueDeclare(queueName2, false, false, false);channel.QueueDeclare(queueName3, false, false, false);//把创建的队列绑定交换机,routingKey不用给值&#xff0c;给了也没意义的channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "");channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "");channel.QueueBind(queue: queueName3, exchange: exchangeName, routingKey: "");//向交换机写10条消息for (int i &#61; 0; i <10; i&#43;&#43;){string message &#61; $"RabbitMQ Fanout {i &#43; 1} Message";var body &#61; Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchangeName, routingKey: "", null, body);Console.WriteLine($"发送Fanout消息:{message}");}}}}

执行代码&#xff1a;

向交换机发送10条消息&#xff0c;则绑定这个交换机的3个队列都会有10条消息。

消费端的代码和工作队列的一样&#xff0c;只需知道队列名即可消费&#xff0c;声明时要和生产者的声明一样。


(4)路由模式&#xff08;推荐使用&#xff09;

在发布/订阅模式的基础上&#xff0c;有选择的接收消息&#xff0c;也就是通过 routing 路由进行匹配条件是否满足接收消息。

上图是一个结合日志消费级别的配图&#xff0c;在路由模式它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中&#xff0c;此模式也就是 Exchange 模式中的direct模式。

生产者P发送数据是要指定交换机(X&#xff09;和routing发送消息 &#xff0c;指定的routingKey&#61;error&#xff0c;则队列Q1和队列Q2都会有一份数据&#xff0c;如果指定routingKey&#61;into,或&#61;warning&#xff0c;交换机(X)只会把消息发到Q2队列。

生产者代码&#xff1a;

    ///

/// 路由模式&#xff0c;点到点直连队列/// public static void SendMessageDirect(){//创建连接using (var connection &#61; RabbitMQHelper.GetConnection()){//创建信道using (var channel &#61; connection.CreateModel()){//声明交换机对象,fanout类型string exchangeName &#61; "direct_exchange";channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);//创建队列string queueName1 &#61; "direct_errorlog";string queueName2 &#61; "direct_alllog";channel.QueueDeclare(queueName1, true, false, false);channel.QueueDeclare(queueName2, true, false, false);//把创建的队列绑定交换机,direct_errorlog队列只绑定routingKey:errorchannel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "error");//direct_alllog队列绑定routingKey:error,infochannel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "info");channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "error");//向交换机写10条错误日志和10条Info日志for (int i &#61; 0; i <10; i&#43;&#43;){string message &#61; $"RabbitMQ Direct {i &#43; 1} error Message";var body &#61; Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchangeName, routingKey: "error", null, body);Console.WriteLine($"发送Direct消息error:{message}");string message2 &#61; $"RabbitMQ Direct {i &#43; 1} info Message";var body2 &#61; Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchangeName, routingKey: "info", null, body2);Console.WriteLine($"info:{message2}");}}}}

这里创建一个direct类型的交换机&#xff0c;两个路由key,一个error&#xff0c;一个info,两个队列&#xff0c;一个队列只绑定error&#xff0c;一个队列绑定error和info&#xff0c;向error和info各发10条消息。

执行代码&#xff1a;

查看RabbitMQ管理界面&#xff0c;direct_errorlog队列10条&#xff0c;而direct_alllog有20条&#xff0c;因为direct_alllog队列两个routingKey的消息都进去了。

点进去看下两个队列绑定的交换机和routingKey

消费者代码&#xff1a;

消费者和工作队列一样&#xff0c;只需根据队列名消费即可&#xff0c;这里只消费direct_errorlog队列作示例

public static void DirectConsumer(){string queueName &#61; "direct_errorlog";var connection &#61; RabbitMQHelper.GetConnection();{//创建信道var channel &#61; connection.CreateModel();{//创建队列channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);var consumer &#61; new EventingBasicConsumer(channel);///prefetchCount:1来告知RabbitMQ,不要同时给一个消费者推送多于 N 个消息&#xff0c;也确保了消费速度和性能///global&#xff1a;是否设为全局的///prefetchSize:单条消息大小&#xff0c;通常设0&#xff0c;表示不做限制//是autoAck&#61;false才会有效channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);int i &#61; 1;consumer.Received &#43;&#61; (model, ea) &#61;>{//处理业务var message &#61; Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine($"{i},队列{queueName}消费消息长度:{message.Length}");channel.BasicAck(ea.DeliveryTag, false); //消息ack确认&#xff0c;告诉mq这条队列处理完&#xff0c;可以从mq删除了i&#43;&#43;;};channel.BasicConsume(queueName, autoAck: false, consumer);}}}

普通场景中推荐使用路由模式&#xff0c;因为路由模式有交换机&#xff0c;有路由key&#xff0c;能够更好的拓展各种应用场景。


(5)主题模式

topics(主题)模式跟routing路由模式类似&#xff0c;只不过路由模式是指定固定的路由键 routingKey&#xff0c;而主题模式是可以模糊匹配路由键 routingKey&#xff0c;类似于SQL中 &#61; 和 like 的关系。

P 表示为生产者、 X 表示交换机、C1C2 表示为消费者&#xff0c;红色表示队列。

topics 模式与 routing 模式比较相近&#xff0c;topics 模式不能具有任意的 routingKey&#xff0c;必须由一个英文句点号“.”分隔的字符串&#xff08;我们将被句点号“.”分隔开的每一段独立的字符串称为一个单词&#xff09;&#xff0c;比如 "lazy.orange.a"。topics routingKey 中可以存在两种特殊字符"*"与“#”&#xff0c;用于做模糊匹配&#xff0c;其中“*”用于匹配一个单词&#xff0c;“#”用于匹配多个单词&#xff08;可以是零个&#xff09;。

以上图为例&#xff1a;

如果发送消息的routingKey设置为&#xff1a;

aaa.orange.rabbit&#xff0c;那么消息会路由到Q1与Q2&#xff0c;

routingKey&#61;aaa.orange.bb的消息会路由到Q1&#xff0c;

routingKey&#61;lazy.aa.bb.cc的消息会路由到Q2&#xff1b;

routingKey&#61;lazy.aa.rabbit的消息会路由到 Q2&#xff08;只会投递给Q2一次&#xff0c;虽然这个routingKey 与 Q2 的两个 bindingKey 都匹配&#xff09;&#xff1b;

没匹配routingKey的消息将会被丢弃。

生产者代码&#xff1a;

public static void SendMessageTopic(){//创建连接using (var connection &#61; RabbitMQHelper.GetConnection()){//创建信道using (var channel &#61; connection.CreateModel()){//声明交换机对象,fanout类型string exchangeName &#61; "topic_exchange";channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);//队列名string queueName1 &#61; "topic_queue1";string queueName2 &#61; "topic_queue2";//路由名string routingKey1 &#61; "*.orange.*";string routingKey2 &#61; "*.*.rabbit";string routingKey3 &#61; "lazy.#";channel.QueueDeclare(queueName1, true, false, false);channel.QueueDeclare(queueName2, true, false, false);//把创建的队列绑定交换机,routingKey指定routingKeychannel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: routingKey1);channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey2);channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey3);//向交换机写10条消息for (int i &#61; 0; i <10; i&#43;&#43;){string message &#61; $"RabbitMQ Direct {i &#43; 1} Message";var body &#61; Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchangeName, routingKey: "aaa.orange.rabbit", null, body);channel.BasicPublish(exchangeName, routingKey: "lazy.aa.rabbit", null, body);Console.WriteLine($"发送Topic消息:{message}");}}}}

这里演示了 routingKey为aaa.orange.rabbit&#xff0c;和lazy.aa.rabbit的情况&#xff0c;第一个匹配到Q1和Q2&#xff0c;第二个匹配到Q2&#xff0c;所以应该Q1是10条&#xff0c;Q2有20条&#xff0c;

执行后看rabbitMQ界面&#xff1a;


(6)RPC模式

与上面其他5种所不同之处&#xff0c;该模式是拥有请求/回复的。也就是有响应的&#xff0c;上面5种都没有。

RPC是指远程过程调用&#xff0c;也就是说两台服务器A&#xff0c;B&#xff0c;一个应用部署在A服务器上&#xff0c;想要调用B服务器上应用提供的处理业务&#xff0c;处理完后然后在A服务器继续执行下去&#xff0c;把异步的消息以同步的方式执行。

客户端(C)声明一个排他队列自己订阅&#xff0c;然后发送消息到RPC队列同时也把这个排他队列名也在消息里传进去&#xff0c;服务端监听RPC队列&#xff0c;处理完业务后把处理结果发送到这个排他队列&#xff0c;然后客户端收到结果&#xff0c;继续处理自己的逻辑。

RPC的处理流程&#xff1a;


  • 当客户端启动时&#xff0c;创建一个匿名的回调队列。
  • 客户端为RPC请求设置2个属性&#xff1a;replyTo&#xff1a;设置回调队列名字&#xff1b;correlationId&#xff1a;标记request。
  • 请求被发送到rpc_queue队列中。
  • RPC服务器端监听rpc_queue队列中的请求&#xff0c;当请求到来时&#xff0c;服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。
  • 客户端监听回调队列&#xff0c;当有消息时&#xff0c;检查correlationId属性&#xff0c;如果与request中匹配&#xff0c;那就是结果了。

服务端代码&#xff1a;

public class RPCServer{public static void RpcHandle(){var connection &#61; RabbitMQHelper.GetConnection();{var channel &#61; connection.CreateModel();{string queueName &#61; "rpc_queue";channel.QueueDeclare(queue: queueName, durable: false,exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 1, false);var consumer &#61; new EventingBasicConsumer(channel);channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);Console.WriteLine("【服务端】等待RPC请求...");consumer.Received &#43;&#61; (model, ea) &#61;>{string response &#61; null;var body &#61; ea.Body.ToArray();var props &#61; ea.BasicProperties;var replyProps &#61; channel.CreateBasicProperties();replyProps.CorrelationId &#61; props.CorrelationId;try{var message &#61; Encoding.UTF8.GetString(body);Console.WriteLine($"【服务端】接收到数据:{ message},开始处理");response &#61; $"消息:{message},处理完成";}catch (Exception e){Console.WriteLine("错误:" &#43; e.Message);response &#61; "";}finally{var responseBytes &#61; Encoding.UTF8.GetBytes(response);channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,basicProperties: replyProps, body: responseBytes);channel.BasicAck(deliveryTag: ea.DeliveryTag,multiple: false);}};}}}}

客户端&#xff1a;

public class RPCClient{private readonly IConnection connection;private readonly IModel channel;private readonly string replyQueueName;private readonly EventingBasicConsumer consumer;private readonly BlockingCollection respQueue &#61; new BlockingCollection();private readonly IBasicProperties props;public RPCClient(){connection &#61; RabbitMQHelper.GetConnection();channel &#61; connection.CreateModel();replyQueueName &#61; channel.QueueDeclare().QueueName;consumer &#61; new EventingBasicConsumer(channel);props &#61; channel.CreateBasicProperties();var correlationId &#61; Guid.NewGuid().ToString();props.CorrelationId &#61; correlationId; //给消息idprops.ReplyTo &#61; replyQueueName;//回调的队列名&#xff0c;Client关闭后会自动删除consumer.Received &#43;&#61; (model, ea) &#61;>{var body &#61; ea.Body.ToArray();var response &#61; Encoding.UTF8.GetString(body);//监听的消息Id和定义的消息Id相同代表这条消息服务端处理完成if (ea.BasicProperties.CorrelationId &#61;&#61; correlationId){respQueue.Add(response);}};channel.BasicConsume(consumer: consumer,queue: replyQueueName,autoAck: true);}public string Call(string message){var messageBytes &#61; Encoding.UTF8.GetBytes(message);//发送消息channel.BasicPublish(exchange: "",routingKey: "rpc_queue",basicProperties: props,body: messageBytes);//等待回复return respQueue.Take();}public void Close(){connection.Close();}}

执行代码&#xff1a;

static void Main(string[] args){Console.WriteLine("Hello World!");//启动服务端&#xff0c;正常逻辑是在另一个程序RPCServer.RpcHandle();//实例化客户端var rpcClient &#61; new RPCClient();string message &#61; $"消息id:{new Random().Next(1, 1000)}";Console.WriteLine($"【客服端】RPC请求中&#xff0c;{message}");//向服务端发送消息&#xff0c;等待回复var response &#61; rpcClient.Call(message);Console.WriteLine("【客服端】收到回复响应:{0}", response);rpcClient.Close();Console.ReadKey();}

测试效果&#xff1a;

z执行完&#xff0c;客服端close后&#xff0c;可以接着自己的下一步业务处理。

总结&#xff1a;

以上便是RabbitMQ的6中模式在.net core中实际使用&#xff0c;其中(1)简单队列&#xff0c;(2)工作队列,(4)路由模式,(6)RPC模式的交换机类型都是direct&#xff0c;(3)发布订阅的交换机是fanout&#xff0c;(5)topics的交换机是topic。正常场景用的是direct&#xff0c;默认交换机也是direct类型的&#xff0c;推荐用(4)路由模式&#xff0c;因为指定交换机名比起默认的交换机会容易扩展场景&#xff0c;其他的交换机看业务场景所需使用

下面位置可以看到交换机类型&#xff0c;amq.开头那几个是内置的&#xff0c;避免交换机过多可以直接使用。


推荐阅读
  • RabbitMQ的消息持久化处理
    1、RabbitMQ的消息持久化处理,消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。2、auto ... [详细]
  • 讨伐Java多线程与高并发——MQ篇
    本文是学习Java多线程与高并发知识时做的笔记。这部分内容比较多,按照内容分为5个部分:多线程基础篇JUC篇同步容器和并发容器篇线程池篇MQ篇本篇 ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • AFNetwork框架(零)使用NSURLSession进行网络请求
    本文介绍了AFNetwork框架中使用NSURLSession进行网络请求的方法,包括NSURLSession的配置、请求的创建和执行等步骤。同时还介绍了NSURLSessionDelegate和NSURLSessionConfiguration的相关内容。通过本文可以了解到AFNetwork框架中使用NSURLSession进行网络请求的基本流程和注意事项。 ... [详细]
  • 本文探讨了容器技术在安全方面面临的挑战,并提出了相应的解决方案。多租户保护、用户访问控制、中毒的镜像、验证和加密、容器守护以及容器监控都是容器技术中需要关注的安全问题。通过在虚拟机中运行容器、限制特权升级、使用受信任的镜像库、进行验证和加密、限制容器守护进程的访问以及监控容器栈,可以提高容器技术的安全性。未来,随着容器技术的发展,还需解决诸如硬件支持、软件定义基础设施集成等挑战。 ... [详细]
  • 如何使用PLEX播放组播、抓取信号源以及设置路由器
    本文介绍了如何使用PLEX播放组播、抓取信号源以及设置路由器。通过使用xTeve软件和M3U源,用户可以在PLEX上实现直播功能,并且可以自动匹配EPG信息和定时录制节目。同时,本文还提供了从华为itv盒子提取组播地址的方法以及如何在ASUS固件路由器上设置IPTV。在使用PLEX之前,建议先使用VLC测试是否可以正常播放UDPXY转发的iptv流。最后,本文还介绍了docker版xTeve的设置方法。 ... [详细]
  • [翻译]微服务设计模式5. 服务发现服务端服务发现
    服务之间需要互相调用,在单体架构中,服务之间的互相调用直接通过编程语言层面的方法调用就搞定了。在传统的分布式应用的部署中,服务地 ... [详细]
  • 项目需要将音视频文件上传服务器,考虑并发要求高,通过七牛来实现。直接上代码usingQiniu.IO;usingQiniu.IO.Resumable;usingQiniu.RPC; ... [详细]
  • Docker安装Rabbitmq(配合宝塔)
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Docker安装Rabbitmq(配合宝塔)相关的知识,希望对你有一定的参考价值。一、事前准备 ... [详细]
  • Docker下Prometheus和Grafana三部曲之一:极速体验
    开源监控工具Prometheus目前广为使用,配合Grafana即可直观展现监控数据,但对于初学者来说搭建这样一个系统要花费些时间,或者有 ... [详细]
  • 本文讨论了一个数列求和问题,该数列按照一定规律生成。通过观察数列的规律,我们可以得出求解该问题的算法。具体算法为计算前n项i*f[i]的和,其中f[i]表示数列中有i个数字。根据参考的思路,我们可以将算法的时间复杂度控制在O(n),即计算到5e5即可满足1e9的要求。 ... [详细]
  • Centos下安装memcached+memcached教程
    本文介绍了在Centos下安装memcached和使用memcached的教程,详细解释了memcached的工作原理,包括缓存数据和对象、减少数据库读取次数、提高网站速度等。同时,还对memcached的快速和高效率进行了解释,与传统的文件型数据库相比,memcached作为一个内存型数据库,具有更高的读取速度。 ... [详细]
  • 深入解析Linux下的I/O多路转接epoll技术
    本文深入解析了Linux下的I/O多路转接epoll技术,介绍了select和poll函数的问题,以及epoll函数的设计和优点。同时讲解了epoll函数的使用方法,包括epoll_create和epoll_ctl两个系统调用。 ... [详细]
  • 本文介绍了SPOJ2829题目的解法及优化方法。题目要求找出满足一定条件的数列,并对结果取模。文章详细解释了解题思路和算法实现,并提出了使用FMT优化的方法。最后,对于第三个限制条件,作者给出了处理方法。文章最后给出了代码实现。 ... [详细]
  • 分布式大型互联网企业架构!
    2019独角兽企业重金招聘Python工程师标准摘要:开发工具1.EclipseIDE:采用Maven项目管理,模块化。2.代码生成: ... [详细]
author-avatar
ql3zqi
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有