热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Golang并发操作RabbitMQ

<svg


send.go

通过amqp连接RabbitMQ,在通过协程发送信息

package main
import (
"github.com/streadway/amqp"
"log"
"rabbitmqTest/utils"
"sync"
)
func main() {
//TODO 连接地址改为自己主机地址
conn, err := amqp.Dial("amqp://guest:guest@192.168.100.101:5672/")
utils.FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
utils.FailOnError(err, "Failed to open a channel")
defer ch.Close()
bodyMap := make(map[string]string)
bodyMap["test1"] = "a"
bodyMap["test2"] = "b"
bodyMap["test3"] = "c"
var wg sync.WaitGroup
errList := make(chan error, 2 * len(bodyMap))
for name, body := range bodyMap {
wg.Add(1)
go func(name, body string) {
defer wg.Done()
q, err := ch.QueueDeclare(
name, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
errList <- err
return
}
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
errList <- err
return
}
log.Printf(" [x] Sent %s", body)
}(name, body)
}
wg.Wait()
close(errList)
if len(errList) > 0{
for err := range errList {
utils.FailOnError(err, "Failed send message")
}
}
}

recv.go


package main
import (
"log"
"github.com/streadway/amqp"
"rabbitmqTest/utils"
"sync"
)
func main() {
//TODO 连接地址改为自己主机地址
conn, err := amqp.Dial("amqp://guest:guest@192.168.100.101:5672/")
utils.FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
utils.FailOnError(err, "Failed to open a channel")
defer ch.Close()
nameList := []string{
"test1",
"test2",
"test3",
}
log.Printf(" [*] Waiting for messages. To exit press CTRL C")
var wg sync.WaitGroup
errList := make(chan error, 2 * len(nameList))
for _, name := range nameList {
go func(name string) {
q, err := ch.QueueDeclare(
name, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
utils.FailOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
utils.FailOnError(err, "Failed to register a consumer")
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}(name)
}
wg.Wait()
close(errList)
if len(errList) > 0{
for err := range errList {
utils.FailOnError(err, "Failed send message")
}
}
forever := make(chan bool)
<-forever
}

log.go


package utils
import (
"log"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}


推荐阅读
author-avatar
天涯老许_137
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有