作者:手机用户2502861123 | 来源:互联网 | 2023-08-15 10:01
篇首语:本文由编程笔记#小编为大家整理,主要介绍了消息队列rabbitmq的五种工作模式(go语言版本)相关的知识,希望对你有一定的参考价值。前言:如果你对rabbitm
篇首语:本文由编程笔记#小编为大家整理,主要介绍了消息队列rabbitmq的五种工作模式(go语言版本)相关的知识,希望对你有一定的参考价值。
前言:如果你对rabbitmq基本概念都不懂,可以移步此篇博文查阅消息队列RabbitMQ
一、单发单收
二、工作队列Work Queue
三、发布/订阅 Publish/Subscribe
四、路由Routing
五、Topic类型的exchange
六、rabbitmq部分封装代码及装备工作
一、单发单收
在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ代表使用者保留的消息缓冲区。
单发单收模式下:一发一收
发送端只需要创建队列,然后向队列发送消息。
接收端也需要创建队列,因为如果接收端先启动,没有此队列就会报错,虽然发送端和接收端都创建此队列,但rabbitmq还是很智能的,它只会创建一次。
需要注意的地方:
1.发送端和接收端都需要创建同名队列
2.接收端指定从这个同名队列中接收消息
在这个例子中,我们将会发送一些描述动物的消息。Routing key的第一个单词是描述速度的,第二个单词是描述颜色的,第三个是描述物种的:“..”。
这里我们创建三个Binding:Binding key为”*.orange.*”的Q1,和binding key为”*.*.rabbit”和”lazy.#”的Q2。
这些binding可以总结为:
- Q1对所有橘色的(orange)的动物感兴趣;
- Q2希望能拿到所有兔子的(rabbit)信息,还有比较懒惰的(lazy.#)动物信息。
一条以” quick.orange.rabbit”为routing key的消息将会推送到Q1和Q2两个queue上,routing key为“lazy.orange.elephant”的消息同样会被推送到Q1和Q2上。但如果routing key为”quick.orange.fox”的话,消息只会被推送到Q1上;routing key为”lazy.brown.fox”的消息会被推送到Q2上,routing key为"lazy.pink.rabbit”的消息也会被推送到Q2上,但同一条消息只会被推送到Q2上一次。
如果在发送消息时所指定的exchange和routing key在消费者端没有对应的exchange和binding key与之绑定的话,那么这条消息将会被丢弃掉。例如:"orange"和"quick.orange.male.rabbit"。但是routing为”lazy.orange.male.rabbit”的消息,将会被推到Q2上。
Topic类型的exchange:
Topic类型的exchange是很强大的,也可以实现其它类型的exchange。
- 当一个队列被绑定为binding key为”#”时,它将会接收所有的消息,此时和fanout类型的exchange很像。
- 当binding key不包含”*”和”#”时,这时候就很像direct类型的exchange。
发送端
package main
import (
"RabbitMQ"
"time"
)
func main(){
ch := rabbitMQ.Connect("amqp://user:password@ip/")
rabbitMQ.NewExchange("amqp://user:password@ip/","exchange","topic")
for{
time.Sleep(1)
ch.Publish("exchange","hello world","lazy.brown.fox")
}
}
接收端
package main
import (
rabbitMQ "RabbitMQ"
"log"
)
func main(){
// 1.接收者,首先自己队列
// 2.创建交换机
// 3.将自己绑定到交换机上
// 4.接收交换机上发过来的消息
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
//1
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello1")
//2
//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","topic")
//3
receive_mq.Bind("exchange","*.orange.*")
//4
for{
//接收消息时,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie1 Received a message: %s", d.Body)
}
}()
}
}
接收端2
package main
import (
rabbitMQ "RabbitMQ"
"log"
)
func main(){
// 1.接收者,首先自己队列
// 2.创建交换机
// 3.将自己绑定到交换机上
// 4.接收交换机上发过来的消息
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
//1
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2")
//2
//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","topic")
//3
receive_mq.Bind("exchange","*.*.rabbit")
receive_mq.Bind("exchange","lazy.#")
//4
for{
//接收消息时,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie2 Received a message: %s", d.Body)
}
}()
}
}
六、rabbitmq部分封装代码及准备工作
目录参考:
准备工作:
1.我们再创建go项目时,首先指定gopath目录,然后在目录下创建bin、src、pkg目录。
2.下载github.com/streadway/amqp包,会自动添加到项目的pkg目录下。
go get github.com/streadway/amqp
3.在rabbitmq服务器上创建用户,指定管理员,并赋予访问权限。
4.rabbitmq封装
package rabbitMQ
import (
"encoding/json"
"github.com/streadway/amqp"
"log"
)
//声明队列类型
type RabbitMQ struct {
channel *amqp.Channel
Name string
exchange string
}
//连接服务器
func Connect(s string) * RabbitMQ{
//连接rabbitmq
conn,e := amqp.Dial(s)
failOnError(e,"连接Rabbitmq服务器失败!")
ch ,e :=conn.Channel()
failOnError(e,"无法打开频道!")
mq := new(RabbitMQ)
mq.channel =ch
return mq
}
//初始化单个消息队列
//第一个参数:rabbitmq服务器的链接,第二个参数:队列名字
func New(s string,name string) * RabbitMQ{
//连接rabbitmq
conn,e := amqp.Dial(s)
failOnError(e,"连接Rabbitmq服务器失败!")
ch ,e :=conn.Channel()
failOnError(e,"无法打开频道!")
q,e := ch.QueueDeclare(
name,//队列名
false,//是否开启持久化
true,//不使用时删除
false, //排他
false, //不等待
nil, //参数
)
failOnError(e,"初始化队列失败!")
mq := new(RabbitMQ)
mq.channel =ch
mq.Name =q.Name
return mq
}
//批量初始化消息队列
//第一个参数:rabbitmq服务器的链接,第二个参数:队列名字列表
//配置队列参数
func (q *RabbitMQ)Qos(){
e := q.channel.Qos(1,0,false)
failOnError(e,"无法设置QoS")
}
//配置交换机参数
//初始化交换机
//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
func NewExchange(s string,name string,typename string){
//连接rabbitmq
conn,e := amqp.Dial(s)
failOnError(e,"连接Rabbitmq服务器失败!")
ch ,e :=conn.Channel()
failOnError(e,"无法打开频道!")
e = ch.ExchangeDeclare(
name, // name
typename, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(e,"初始化交换机失败!")
}
//删除交换机
func (q *RabbitMQ)ExchangeDelete(exchange string){
e := q.channel.ExchangeDelete(exchange,false,true)
failOnError(e,"绑定队列失败!")
}
//绑定消息队列到哪个exchange
func (q *RabbitMQ)Bind(exchange string,key string){
e := q.channel.QueueBind(
q.Name,
key,
exchange,
false,
nil,
)
failOnError(e,"绑定队列失败!")
q.exchange = exchange
}
//向消息队列发送消息
//Send方法可以往某个消息队列发送消息
func (q *RabbitMQ) Send(body interface{}){
str,e := json.Marshal(body)
failOnError(e,"消息序列化失败!")
e = q.channel.Publish(
"",//交换
q.Name,//路由键:当前队列的名字
false, //必填
false, //立即
amqp.Publishing{
ReplyTo:q.Name,
Body:[]byte(str),
})
msg := "向队列:"+q.Name+"发送消息失败!"
failOnError(e,msg)
}
//向exchange发送消息
//Publish方法可以往某个exchange发送消息
func (q *RabbitMQ) Publish(exchange string,body interface{},key string) {
str,e := json.Marshal(body)
failOnError(e,"消息序列化失败!")
e = q.channel.Publish(
exchange,
key,
false,
false,
amqp.Publishing{ReplyTo:q.Name,
Body:[]byte(str)},
)
failOnError(e,"向路由发送消息失败!")
}
//接收某个消息队列的消息
func (q * RabbitMQ) Consume() <-chan amqp.Delivery{
c,e :=q.channel.Consume(
q.Name,//指定从哪个队列中接收消息
"",
true,
false,
false,
false,
nil,
)
failOnError(e,"接收消息失败!")
return c
}
//关闭队列连接
func (q *RabbitMQ) Close() {
q.channel.Close()
}
//错误处理函数
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}