热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:消息队列rabbitmq的五种工作模式(go语言版本)

篇首语:本文由编程笔记#小编为大家整理,主要介绍了消息队列rabbitmq的五种工作模式(go语言版本)相关的知识,希望对你有一定的参考价值。前言:如果你对rabbitm

篇首语:本文由编程笔记#小编为大家整理,主要介绍了消息队列rabbitmq的五种工作模式(go语言版本)相关的知识,希望对你有一定的参考价值。


前言:如果你对rabbitmq基本概念都不懂,可以移步此篇博文查阅消息队列RabbitMQ

一、单发单收

二、工作队列Work Queue

三、发布/订阅 Publish/Subscribe

四、路由Routing

五、Topic类型的exchange

六、rabbitmq部分封装代码及装备工作

 


一、单发单收

在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ代表使用者保留的消息缓冲区。

单发单收模式下:一发一收

发送端只需要创建队列,然后向队列发送消息。

接收端也需要创建队列,因为如果接收端先启动,没有此队列就会报错,虽然发送端和接收端都创建此队列,但rabbitmq还是很智能的,它只会创建一次。

需要注意的地方:

1.发送端和接收端都需要创建同名队列

2.接收端指定从这个同名队列中接收消息

技术图片

在这个例子中,我们将会发送一些描述动物的消息。Routing key的第一个单词是描述速度的,第二个单词是描述颜色的,第三个是描述物种的:“..”。

这里我们创建三个Binding:Binding key为”*.orange.*”的Q1,和binding key为”*.*.rabbit”和”lazy.#”的Q2。

这些binding可以总结为:



  • Q1对所有橘色的(orange)的动物感兴趣;

  • Q2希望能拿到所有兔子的(rabbit)信息,还有比较懒惰的(lazy.#)动物信息。

一条以” quick.orange.rabbit”为routing key的消息将会推送到Q1和Q2两个queue上,routing key为“lazy.orange.elephant”的消息同样会被推送到Q1和Q2上。但如果routing key为”quick.orange.fox”的话,消息只会被推送到Q1上;routing key为”lazy.brown.fox”的消息会被推送到Q2上,routing key为"lazy.pink.rabbit”的消息也会被推送到Q2上,但同一条消息只会被推送到Q2上一次。

如果在发送消息时所指定的exchange和routing key在消费者端没有对应的exchange和binding key与之绑定的话,那么这条消息将会被丢弃掉。例如:"orange"和"quick.orange.male.rabbit"。但是routing为”lazy.orange.male.rabbit”的消息,将会被推到Q2上。

Topic类型的exchange

Topic类型的exchange是很强大的,也可以实现其它类型的exchange。



  • 当一个队列被绑定为binding key为”#”时,它将会接收所有的消息,此时和fanout类型的exchange很像。

  • 当binding key不包含”*”和”#”时,这时候就很像direct类型的exchange。

发送端


package main
import (
"RabbitMQ"
"time"
)
func main(){
ch := rabbitMQ.Connect("amqp://user:password@ip/")
rabbitMQ.NewExchange("amqp://user:password@ip/","exchange","topic")
for{
time.Sleep(1)
ch.Publish("exchange","hello world","lazy.brown.fox")
}
}

 

接收端


package main
import (
rabbitMQ "RabbitMQ"
"log"
)
func main(){
// 1.接收者,首先自己队列
// 2.创建交换机
// 3.将自己绑定到交换机上
// 4.接收交换机上发过来的消息
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
//1
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello1")
//2
//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","topic")
//3
receive_mq.Bind("exchange","*.orange.*")
//4
for{
//接收消息时,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie1 Received a message: %s", d.Body)
}
}()
}
}

接收端2


package main
import (
rabbitMQ "RabbitMQ"
"log"
)
func main(){
// 1.接收者,首先自己队列
// 2.创建交换机
// 3.将自己绑定到交换机上
// 4.接收交换机上发过来的消息
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
//1
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2")
//2
//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","topic")
//3
receive_mq.Bind("exchange","*.*.rabbit")
receive_mq.Bind("exchange","lazy.#")
//4
for{
//接收消息时,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie2 Received a message: %s", d.Body)
}
}()
}
}

  


 
六、rabbitmq部分封装代码及准备工作

目录参考:

技术图片

准备工作:

1.我们再创建go项目时,首先指定gopath目录,然后在目录下创建bin、src、pkg目录。

2.下载github.com/streadway/amqp包,会自动添加到项目的pkg目录下。


go get github.com/streadway/amqp

 3.在rabbitmq服务器上创建用户,指定管理员,并赋予访问权限。

4.rabbitmq封装


package rabbitMQ
import (
"encoding/json"
"github.com/streadway/amqp"
"log"
)
//声明队列类型
type RabbitMQ struct {
channel *amqp.Channel
Name string
exchange string
}
//连接服务器
func Connect(s string) * RabbitMQ{
//连接rabbitmq
conn,e := amqp.Dial(s)
failOnError(e,"连接Rabbitmq服务器失败!")
ch ,e :=conn.Channel()
failOnError(e,"无法打开频道!")
mq := new(RabbitMQ)
mq.channel =ch
return mq
}
//初始化单个消息队列
//第一个参数:rabbitmq服务器的链接,第二个参数:队列名字
func New(s string,name string) * RabbitMQ{
//连接rabbitmq
conn,e := amqp.Dial(s)
failOnError(e,"连接Rabbitmq服务器失败!")
ch ,e :=conn.Channel()
failOnError(e,"无法打开频道!")
q,e := ch.QueueDeclare(
name,//队列名
false,//是否开启持久化
true,//不使用时删除
false, //排他
false, //不等待
nil, //参数
)
failOnError(e,"初始化队列失败!")
mq := new(RabbitMQ)
mq.channel =ch
mq.Name =q.Name
return mq
}
//批量初始化消息队列
//第一个参数:rabbitmq服务器的链接,第二个参数:队列名字列表
//配置队列参数
func (q *RabbitMQ)Qos(){
e := q.channel.Qos(1,0,false)
failOnError(e,"无法设置QoS")
}
//配置交换机参数
//初始化交换机
//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
func NewExchange(s string,name string,typename string){
//连接rabbitmq
conn,e := amqp.Dial(s)
failOnError(e,"连接Rabbitmq服务器失败!")
ch ,e :=conn.Channel()
failOnError(e,"无法打开频道!")
e = ch.ExchangeDeclare(
name, // name
typename, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(e,"初始化交换机失败!")
}
//删除交换机
func (q *RabbitMQ)ExchangeDelete(exchange string){
e := q.channel.ExchangeDelete(exchange,false,true)
failOnError(e,"绑定队列失败!")
}
//绑定消息队列到哪个exchange
func (q *RabbitMQ)Bind(exchange string,key string){
e := q.channel.QueueBind(
q.Name,
key,
exchange,
false,
nil,
)
failOnError(e,"绑定队列失败!")
q.exchange = exchange
}
//向消息队列发送消息
//Send方法可以往某个消息队列发送消息
func (q *RabbitMQ) Send(body interface{}){
str,e := json.Marshal(body)
failOnError(e,"消息序列化失败!")
e = q.channel.Publish(
"",//交换
q.Name,//路由键:当前队列的名字
false, //必填
false, //立即
amqp.Publishing{
ReplyTo:q.Name,
Body:[]byte(str),
})
msg := "向队列:"+q.Name+"发送消息失败!"
failOnError(e,msg)
}
//向exchange发送消息
//Publish方法可以往某个exchange发送消息
func (q *RabbitMQ) Publish(exchange string,body interface{},key string) {
str,e := json.Marshal(body)
failOnError(e,"消息序列化失败!")
e = q.channel.Publish(
exchange,
key,
false,
false,
amqp.Publishing{ReplyTo:q.Name,
Body:[]byte(str)},
)
failOnError(e,"向路由发送消息失败!")
}
//接收某个消息队列的消息
func (q * RabbitMQ) Consume() <-chan amqp.Delivery{
c,e :=q.channel.Consume(
q.Name,//指定从哪个队列中接收消息
"",
true,
false,
false,
false,
nil,
)
failOnError(e,"接收消息失败!")
return c
}
//关闭队列连接
func (q *RabbitMQ) Close() {
q.channel.Close()
}
//错误处理函数
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}


推荐阅读
  • DHCP三层交换机设置方式全局模式和接口模式设置方式和命令resetsave回车输入yreboot输入n输入y重启后就恢复默认设置了默认用户名密码adminAdmin@huawei ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • ZeroMQ在云计算环境下的高效消息传递库第四章学习心得
    本章节深入探讨了ZeroMQ在云计算环境中的高效消息传递机制,涵盖客户端请求-响应模式、最近最少使用(LRU)队列、心跳检测、面向服务的队列、基于磁盘的离线队列以及主从备份服务等关键技术。此外,还介绍了无中间件的请求-响应架构,强调了这些技术在提升系统性能和可靠性方面的应用价值。个人理解方面,ZeroMQ通过这些机制有效解决了分布式系统中常见的通信延迟和数据一致性问题。 ... [详细]
  • PHP中元素的计量单位是什么? ... [详细]
  • Ceph API微服务实现RBD块设备的高效创建与安全删除
    本文旨在实现Ceph块存储中RBD块设备的高效创建与安全删除功能。开发环境为CentOS 7,使用 IntelliJ IDEA 进行开发。首先介绍了 librbd 的基本概念及其在 Ceph 中的作用,随后详细描述了项目 Gradle 配置的优化过程,确保了开发环境的稳定性和兼容性。通过这一系列步骤,我们成功实现了 RBD 块设备的快速创建与安全删除,提升了系统的整体性能和可靠性。 ... [详细]
  • 在 Android 开发中,通过合理利用系统通知服务,可以显著提升应用的用户交互体验。针对 Android 8.0 及以上版本,开发者需首先创建并注册通知渠道。本文将详细介绍如何在应用中实现这一功能,包括初始化通知管理器、创建通知渠道以及发送通知的具体步骤,帮助开发者更好地理解和应用这些技术细节。 ... [详细]
  • MySQL性能优化与调参指南【数据库管理】
    本文详细探讨了MySQL数据库的性能优化与参数调整技巧,旨在帮助数据库管理员和开发人员提升系统的运行效率。内容涵盖索引优化、查询优化、配置参数调整等方面,结合实际案例进行深入分析,提供实用的操作建议。此外,还介绍了常见的性能监控工具和方法,助力读者全面掌握MySQL性能优化的核心技能。 ... [详细]
  • Go语言实现Redis客户端与服务器的交互机制深入解析
    在前文对Godis v1.0版本的基础功能进行了详细介绍后,本文将重点探讨如何实现客户端与服务器之间的交互机制。通过具体代码实现,使客户端与服务器能够顺利通信,赋予项目实际运行的能力。本文将详细解析Go语言在实现这一过程中的关键技术和实现细节,帮助读者深入了解Redis客户端与服务器的交互原理。 ... [详细]
  • 2019年后蚂蚁集团与拼多多面试经验详述与深度剖析
    2019年后蚂蚁集团与拼多多面试经验详述与深度剖析 ... [详细]
  • Go GORM 使用过程中常见问题及解决方案
    本文详细探讨了在使用 Go 语言的 GORM 框架时遇到的常见问题及其解决方案,提供了丰富的实战经验和技巧,旨在帮助开发者高效解决相关技术难题,具有很高的参考价值。 ... [详细]
  • Go语言中的高效排序与搜索算法解析
    在探讨Go语言中高效的排序与搜索算法时,本文深入分析了Go语言提供的内置排序功能及其优化策略。通过实例代码,详细讲解了如何利用Go语言的标准库实现快速、高效的排序和搜索操作,为开发者提供了实用的编程指导。 ... [详细]
  • 探讨 jBPM 数据库表结构设计的精要与实践
    探讨 jBPM 数据库表结构设计的精要与实践 ... [详细]
  • 利用ViewComponents在Asp.Net Core中构建高效分页组件
    通过运用 ViewComponents 技术,在 Asp.Net Core 中实现了高效的分页组件开发。本文详细介绍了如何通过创建 `PaginationViewComponent` 类并利用 `HelloWorld.DataContext` 上下文,实现对分页参数的定义与管理,从而提升 Web 应用程序的性能和用户体验。 ... [详细]
  • 掌握DSP必备的56个核心问题,我已经将其收藏以备不时之需! ... [详细]
  • 如何判断一个度序列能否构成简单图——哈维尔-哈基米算法的应用与解析 ... [详细]
author-avatar
手机用户2502861123
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有