作者:天涯老许_137 | 来源:互联网 | 2023-10-10 16:16
通过amqp连接RabbitMQ,在通过协程发送信息
package main
import (
"github.com/streadway/amqp"
"log"
"rabbitmqTest/utils"
"sync"
)
func main() {
//TODO 连接地址改为自己主机地址
conn, err := amqp.Dial("amqp://guest:guest@192.168.100.101:5672/")
utils.FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
utils.FailOnError(err, "Failed to open a channel")
defer ch.Close()
bodyMap := make(map[string]string)
bodyMap["test1"] = "a"
bodyMap["test2"] = "b"
bodyMap["test3"] = "c"
var wg sync.WaitGroup
errList := make(chan error, 2 * len(bodyMap))
for name, body := range bodyMap {
wg.Add(1)
go func(name, body string) {
defer wg.Done()
q, err := ch.QueueDeclare(
name, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
errList <- err
return
}
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
errList <- err
return
}
log.Printf(" [x] Sent %s", body)
}(name, body)
}
wg.Wait()
close(errList)
if len(errList) > 0{
for err := range errList {
utils.FailOnError(err, "Failed send message")
}
}
}
package main
import (
"log"
"github.com/streadway/amqp"
"rabbitmqTest/utils"
"sync"
)
func main() {
//TODO 连接地址改为自己主机地址
conn, err := amqp.Dial("amqp://guest:guest@192.168.100.101:5672/")
utils.FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
utils.FailOnError(err, "Failed to open a channel")
defer ch.Close()
nameList := []string{
"test1",
"test2",
"test3",
}
log.Printf(" [*] Waiting for messages. To exit press CTRL C")
var wg sync.WaitGroup
errList := make(chan error, 2 * len(nameList))
for _, name := range nameList {
go func(name string) {
q, err := ch.QueueDeclare(
name, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
utils.FailOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
utils.FailOnError(err, "Failed to register a consumer")
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}(name)
}
wg.Wait()
close(errList)
if len(errList) > 0{
for err := range errList {
utils.FailOnError(err, "Failed send message")
}
}
forever := make(chan bool)
<-forever
}
package utils
import (
"log"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}