微信公众号:趣编程ACE
关注可了解更多的.NET日常实战开发技巧,如需源码 请公众号后台留言 源码;
[如果觉得本公众号对您有帮助,欢迎关注]
关注上文公众号即可
如果我们需要将一条信息发送到多个队列上,假若利用直连模式,那么就会有很多的路由,而TopicExchange只需要配置定义好的路由规则,即可省略多余的路由指定。
PS:路由规则有一定的约束,比如需要采取* .#. * 的格式,用.号 分隔
其中
1.*表示一个单词
2. #表示任意数量(零个或多个)单词。
public static class TopicExchangeReceive
{
public static void Receive(IModel channel)
{
channel.ExchangeDeclare("hello-topic-exchange", ExchangeType.Topic);
channel.QueueDeclare(queue: "hello-topic-queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind("hello-topic-queue", "hello-topic-exchange", "route.*");
//channel.QueueBind("hello", "hello-direct-exchange", "route2");
// 创建一个消费者基本事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "hello-topic-queue",
// 自动确认
autoAck: true,
consumer: consumer);
//channel.BasicConsume(queue: "hello",
// // 自动确认
// autoAck: true,
// consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
public static class TopicExchangeSend
{
public static void Send(IModel channel)
{
channel.ExchangeDeclare("hello-topic-exchange", ExchangeType.Topic);
var count = 0;
while (true)
{
Thread.Sleep(1000);
// 发送的消息
string message = $"Hello World {count}";
var body = Encoding.UTF8.GetBytes(message);
//var body2 = Encoding.UTF8.GetBytes(message + "body2");
// 基本发布 不指定交换
channel.BasicPublish(exchange: "hello-topic-exchange",
// 路由键 就是队列名称
routingKey: "route.2",
// 基础属性
basicProperties: null,
// 传递的消息体
body: body);
//channel.BasicPublish(exchange: "hello-direct-exchange",
// // 路由键 就是队列名称
// routingKey: "route2",
// // 基础属性
// basicProperties: null,
// // 传递的消息体
// body: body2);
count++;
Console.WriteLine(" [x] sent {0}", message);
}
}
}
其实这种模式类似与Http请求里面的headers,我们定义一个字典然后通过交换机携带,作为交换机与路由器交换媒介,相比于Topic Exchange交换模式,header定义的格式类型更加丰富。
public static class HeadersExchangeReceive
{
public static void Receive(IModel channel)
{
channel.ExchangeDeclare("hello-headers-exchange", ExchangeType.Headers);
channel.QueueDeclare(queue: "hello-headers-queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var headers = new Dictionary
{
{"test","A" }
};
channel.QueueBind("hello-headers-queue", "hello-headers-exchange", String.Empty, headers);
//channel.QueueBind("hello", "hello-direct-exchange", "route2");
// 创建一个消费者基本事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "hello-headers-queue",
// 自动确认
autoAck: true,
consumer: consumer);
//channel.BasicConsume(queue: "hello",
// // 自动确认
// autoAck: true,
// consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
public static void Send(IModel channel)
{
channel.ExchangeDeclare("hello-headers-exchange", ExchangeType.Headers);
var count = 0;
while (true)
{
Thread.Sleep(1000);
// 发送的消息
string message = $"Hello World {count}";
var body = Encoding.UTF8.GetBytes(message);
//var body2 = Encoding.UTF8.GetBytes(message + "body2");
var basicProperties = channel.CreateBasicProperties();
basicProperties.Headers = new Dictionary
{
{"test" ,"A"}
};
// 基本发布 不指定交换
channel.BasicPublish(exchange: "hello-headers-exchange",
// 路由键 就是队列名称
routingKey: String.Empty,
// 基础属性
basicProperties,
// 传递的消息体
body: body);
//channel.BasicPublish(exchange: "hello-direct-exchange",
// // 路由键 就是队列名称
// routingKey: "route2",
// // 基础属性
// basicProperties: null,
// // 传递的消息体
// body: body2);
count++;
Console.WriteLine(" [x] sent {0}", message);
}
}
扇形交换机模式是最傻瓜的一种交换模式,总的而言只要将队列绑定到交换机上,就能做到消息互通了。当然了这种机制处理事件的速度也是所有交换机类型里面最快的。
public static class FanoutExchangeReceive
{
public static void Receive(IModel channel)
{
channel.ExchangeDeclare("hello-fanout-exchange", ExchangeType.Fanout);
channel.QueueDeclare(queue: "hello-fanout-queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
//var headers = new Dictionary
//{
// {"test","A" }
//};
channel.QueueBind("hello-fanout-queue", "hello-fanout-exchange", String.Empty);
//channel.QueueBind("hello", "hello-direct-exchange", "route2");
// 创建一个消费者基本事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "hello-fanout-queue",
// 自动确认
autoAck: true,
consumer: consumer);
//channel.BasicConsume(queue: "hello",
// // 自动确认
// autoAck: true,
// consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
public static class FanoutExchangeSend
{
public static void Send(IModel channel)
{
channel.ExchangeDeclare("hello-fanout-exchange", ExchangeType.Fanout);
var count = 0;
while (true)
{
Thread.Sleep(1000);
// 发送的消息
string message = $"Hello World {count}";
var body = Encoding.UTF8.GetBytes(message);
//var body2 = Encoding.UTF8.GetBytes(message + "body2");
//var basicProperties = channel.CreateBasicProperties();
//basicProperties.Headers = new Dictionary
//{
// {"test" ,"A"}
//};
// 基本发布 不指定交换
channel.BasicPublish(exchange: "hello-fanout-exchange",
// 路由键 就是队列名称
routingKey: String.Empty,
// 基础属性
null,
// 传递的消息体
body: body);
//channel.BasicPublish(exchange: "hello-direct-exchange",
// // 路由键 就是队列名称
// routingKey: "route2",
// // 基础属性
// basicProperties: null,
// // 传递的消息体
// body: body2);
count++;
Console.WriteLine(" [x] sent {0}", message);
}
}
}
PS:具体的代码效果演示看视频哦~