热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

golang使用rabbitmq正确姿势

gomodinit  github.comichunt2019go-rabbitmq D:\gocode\go-rabbitmq\utils\rabbitmq\receiver.g

go mod init  github.com/ichunt2019/go-rabbitmq

 

D:\gocode\go-rabbitmq\utils\rabbitmq\receiver.go

主要是实现了rabbimq 生产者 消费者

消费者:实现失败尝试机制


package rabbitmq
import (
//"errors"
"fmt"
"github.com/streadway/amqp"
"log"
)
// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel
// 定义生产者接口
type Producer interface {
MsgContent() string
}
// 定义生产者接口
type RetryProducer interface {
MsgContent() string
}
// 定义接收者接口
type Receiver interface {
Consumer([]byte) error
FailAction([]byte) error
}
// 定义RabbitMQ对象
type RabbitMQ struct {
connection *amqp.Connection
Channel *amqp.Channel
dns string
QueueName string // 队列名称
RoutingKey string // key名称
ExchangeName string // 交换机名称
ExchangeType string // 交换机类型
producerList []Producer
retryProducerList []RetryProducer
receiverList []Receiver
}
// 定义队列交换机对象
type QueueExchange struct {
QuName string // 队列名称
RtKey string // key值
ExName string // 交换机名称
ExType string // 交换机类型
Dns string //链接地址
}
// 链接rabbitMQ
func (r *RabbitMQ)MqConnect() (err error){
mqConn, err = amqp.Dial(r.dns)
r.cOnnection= mqConn // 赋值给RabbitMQ对象
if err != nil {
fmt.Printf("关闭mq链接失败 :%s \n", err)
}
return
}
// 关闭mq链接
func (r *RabbitMQ)CloseMqConnect() (err error){
err = r.connection.Close()
if err != nil{
fmt.Printf("关闭mq链接失败 :%s \n", err)
}
return
}
// 链接rabbitMQ
func (r *RabbitMQ)MqOpenChannel() (err error){
mqConn := r.connection
r.Channel, err = mqConn.Channel()
//defer mqChan.Close()
if err != nil {
fmt.Printf("MQ打开管道失败:%s \n", err)
}
return err
}
// 链接rabbitMQ
func (r *RabbitMQ)CloseMqChannel() (err error){
r.Channel.Close()
if err != nil {
fmt.Printf("关闭mq链接失败 :%s \n", err)
}
return err
}

// 创建一个新的操作对象
func NewMq(q QueueExchange) RabbitMQ {
return RabbitMQ{
QueueName:q.QuName,
RoutingKey:q.RtKey,
ExchangeName: q.ExName,
ExchangeType: q.ExType,
dns:q.Dns,
}
}
func (mq *RabbitMQ) sendMsg (body string) {
err :=mq.MqOpenChannel()
ch := mq.Channel
if err != nil{
log.Printf("Channel err :%s \n", err)
}
defer mq.Channel.Close()
if mq.ExchangeName != "" {
if mq.ExchangeType == ""{
mq.ExchangeType = "direct"
}
err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
if err != nil {
log.Printf("ExchangeDeclare err :%s \n", err)
}
}
// 用于检查队列是否存在,已经存在不需要重复声明
_, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
if err != nil {
log.Printf("QueueDeclare err :%s \n", err)
}
// 绑定任务
if mq.RoutingKey != "" && mq.ExchangeName != "" {
err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
if err != nil {
log.Printf("QueueBind err :%s \n", err)
}
}
if mq.ExchangeName != "" && mq.RoutingKey != ""{
err = mq.Channel.Publish(
mq.ExchangeName, // exchange
mq.RoutingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
})
}else{
err = mq.Channel.Publish(
"", // exchange
mq.QueueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
})
}
}
func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string) {
err :=mq.MqOpenChannel()
ch := mq.Channel
if err != nil{
log.Printf("Channel err :%s \n", err)
}
defer mq.Channel.Close()
if mq.ExchangeName != "" {
if mq.ExchangeType == ""{
mq.ExchangeType = "direct"
}
err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
if err != nil {
log.Printf("ExchangeDeclare err :%s \n", err)
}
}
//原始路由key
oldRoutingKey := args[0]
//原始交换机名
oldExchangeName := args[1]
table := make(map[string]interface{},3)
table["x-dead-letter-routing-key"] = oldRoutingKey
if oldExchangeName != "" {
table["x-dead-letter-exchange"] = oldExchangeName
}else{
mq.ExchangeName = ""
table["x-dead-letter-exchange"] = ""
}
table["x-message-ttl"] = int64(20000)
//fmt.Printf("%+v",table)
//fmt.Printf("%+v",mq)
// 用于检查队列是否存在,已经存在不需要重复声明
_, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table)
if err != nil {
log.Printf("QueueDeclare err :%s \n", err)
}
// 绑定任务
if mq.RoutingKey != "" && mq.ExchangeName != "" {
err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
if err != nil {
log.Printf("QueueBind err :%s \n", err)
}
}
header := make(map[string]interface{},1)
header["retry_nums"] = retry_nums + int32(1)
var ttl_exchange string
var ttl_routkey string
if(mq.ExchangeName != "" ){
ttl_exchange = mq.ExchangeName
}else{
ttl_exchange = ""
}
if mq.RoutingKey != "" && mq.ExchangeName != ""{
ttl_routkey = mq.RoutingKey
}else{
ttl_routkey = mq.QueueName
}
//fmt.Printf("ttl_exchange:%s,ttl_routkey:%s \n",ttl_exchange,ttl_routkey)
err = mq.Channel.Publish(
ttl_exchange, // exchange
ttl_routkey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
Headers:header,
})
if err != nil {
fmt.Printf("MQ任务发送失败:%s \n", err)
}
}
// 监听接收者接收任务 消费者
func (mq *RabbitMQ) ListenReceiver(receiver Receiver,routineNum int) {
err :=mq.MqOpenChannel()
ch := mq.Channel
if err != nil{
log.Printf("Channel err :%s \n", err)
}
defer mq.Channel.Close()
if mq.ExchangeName != "" {
if mq.ExchangeType == ""{
mq.ExchangeType = "direct"
}
err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
if err != nil {
log.Printf("ExchangeDeclare err :%s \n", err)
}
}
// 用于检查队列是否存在,已经存在不需要重复声明
_, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
if err != nil {
log.Printf("QueueDeclare err :%s \n", err)
}
// 绑定任务
if mq.RoutingKey != "" && mq.ExchangeName != "" {
err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
if err != nil {
log.Printf("QueueBind err :%s \n", err)
}
}
// 获取消费通道,确保rabbitMQ一个一个发送消息
err = ch.Qos(1, 0, false)
msgList, err := ch.Consume(mq.QueueName, "", false, false, false, false, nil)
if err != nil {
log.Printf("Consume err :%s \n", err)
}
for msg := range msgList {
retry_nums,ok := msg.Headers["retry_nums"].(int32)
if(!ok){
retry_nums = int32(0)
}
// 处理数据
err := receiver.Consumer(msg.Body)
if err!=nil {
//消息处理失败 进入延时尝试机制
if retry_nums <3{
fmt.Println(string(msg.Body))
fmt.Printf("消息处理失败 消息开始进入尝试 ttl延时队列 \n")
retry_msg(msg.Body,retry_nums,QueueExchange{
mq.QueueName,
mq.RoutingKey,
mq.ExchangeName,
mq.ExchangeType,
mq.dns,
})
}else{
//消息失败 入库db
fmt.Printf("消息处理3次后还是失败了 入库db 钉钉告警 \n")
receiver.FailAction(msg.Body)
}
err = msg.Ack(true)
if err != nil {
fmt.Printf("确认消息未完成异常:%s \n", err)
}
}else {
// 确认消息,必须为false
err = msg.Ack(true)
if err != nil {
fmt.Printf("消息消费ack失败 err :%s \n", err)
}
}
}
}
//消息处理失败之后 延时尝试
func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){
//原始队列名称 交换机名称
oldQName := queueExchange.QuName
oldExchangeName := queueExchange.ExName
oldRoutingKey := queueExchange.RtKey
if oldRoutingKey == "" || oldExchangeName == ""{
oldRoutingKey = oldQName
}
if queueExchange.QuName != "" {
queueExchange.QuName = queueExchange.QuName + "_retry_3";
}
if queueExchange.RtKey != "" {
queueExchange.RtKey = queueExchange.RtKey + "_retry_3";
}else{
queueExchange.RtKey = queueExchange.QuName + "_retry_3";
}
//fmt.Printf("%+v",queueExchange)
mq := NewMq(queueExchange)
mq.MqConnect()
defer func(){
mq.CloseMqConnect()
}()
//fmt.Printf("%+v",queueExchange)
mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName)
}
func Send(queueExchange QueueExchange,msg string){
mq := NewMq(queueExchange)
mq.MqConnect()
defer func(){
mq.CloseMqConnect()
}()
mq.sendMsg(msg)
}
/*
runNums 开启并发执行任务数量
*/
func Recv(queueExchange QueueExchange,receiver Receiver,runNums int){
mq := NewMq(queueExchange)
mq.MqConnect()
defer func(){
mq.CloseMqConnect()
}()
forever := make(chan bool)
for i:=1;i<=runNums;i++{
go func(routineNum int) {
defer mq.Channel.Close()
// 验证链接是否正常
mq.MqOpenChannel()
mq.ListenReceiver(receiver,routineNum)
}(i)
}
<-forever
}
type retryPro struct {
msgContent string
}
View Code

 

D:\gocode\go-rabbitmq\demo\recv.go

package main
import (
"fmt"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
"time"
)
type RecvPro struct {
}
//// 实现消费者 消费消息失败 自动进入延时尝试 尝试3次之后入库db
/*
返回值 error 为nil 则表示该消息消费成功
否则消息会进入ttl延时队列 重复尝试消费3次
3次后消息如果还是失败 消息就执行失败 进入告警 FailAction
*/
func (t *RecvPro) Consumer(dataByte []byte) error {
//time.Sleep(500*time.Microsecond)
//return errors.New("顶顶顶顶")
fmt.Println(string(dataByte))
time.Sleep(1*time.Second)
//return errors.New("顶顶顶顶")
return nil
}
//消息已经消费3次 失败了 请进行处理
/*
如果消息 消费3次后 仍然失败 此处可以根据情况 对消息进行告警提醒 或者 补偿 入库db 钉钉告警等等
*/
func (t *RecvPro) FailAction(dataByte []byte) error {
fmt.Println(string(dataByte))
fmt.Println("任务处理失败了,我要进入db日志库了")
fmt.Println("任务处理失败了,发送钉钉消息通知主人")
return nil
}
func main() {
t := &RecvPro{}
//rabbitmq.Recv(rabbitmq.QueueExchange{
// "a_test_0001",
// "a_test_0001",
// "",
// "",
// "amqp://guest:guest@192.168.2.232:5672/",
//},t,5)
/*
runNums: 表示任务并发处理数量 一般建议 普通任务1-3 就可以了
*/
rabbitmq.Recv(rabbitmq.QueueExchange{
"a_test_0001",
"a_test_0001",
"hello_go",
"direct",
"amqp://guest:guest@192.168.2.232:5672/",
},t,3)
}

 

D:\gocode\go-rabbitmq\demo\send.go

package main
import (
"fmt"
_ "fmt"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
)
func main() {
for i := 1;i<10;i++{
body := fmt.Sprintf("{\"order_id\":%d}",i)
fmt.Println(body)
/**
使用默认的交换机
如果是默认交换机
type QueueExchange struct {
QuName string // 队列名称
RtKey string // key值
ExName string // 交换机名称
ExType string // 交换机类型
Dns string //链接地址
}
如果你喜欢使用默认交换机
RtKey 此处建议填写成 RtKey 和 QuName 一样的值
*/
//queueExchange := rabbitmq.QueueExchange{
// "a_test_0001",
// "a_test_0001",
// "",
// "",
// "amqp://guest:guest@192.168.2.232:5672/",
//}
/*
使用自定义的交换机
*/
queueExchange := rabbitmq.QueueExchange{
"a_test_0001",
"a_test_0001",
"hello_go",
"direct",
"amqp://guest:guest@192.168.2.232:5672/",
}
rabbitmq.Send(queueExchange,body)
}
}

 

使用说明:


go get github.com/ichunt2019/go-rabbitmq

发送消息

package main
import (
"fmt"
_ "fmt"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
)
func main() {
for i := 1;i<10;i++{
body := fmt.Sprintf("{\"order_id\":%d}",i)
fmt.Println(body)
/**
使用默认的交换机
如果是默认交换机
type QueueExchange struct {
QuName string // 队列名称
RtKey string // key值
ExName string // 交换机名称
ExType string // 交换机类型
Dns string //链接地址
}
如果你喜欢使用默认交换机
RtKey 此处建议填写成 RtKey 和 QuName 一样的值
*/
//queueExchange := rabbitmq.QueueExchange{
// "a_test_0001",
// "a_test_0001",
// "",
// "",
// "amqp://guest:guest@192.168.2.232:5672/",
//}
/*
使用自定义的交换机
*/
queueExchange := rabbitmq.QueueExchange{
"a_test_0001",
"a_test_0001",
"hello_go",
"direct",
"amqp://guest:guest@192.168.2.232:5672/",
}
rabbitmq.Send(queueExchange,body)
}
}

 

消费消息

package main
import (
"fmt"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
"time"
)
type RecvPro struct {
}
//// 实现消费者 消费消息失败 自动进入延时尝试 尝试3次之后入库db
/*
返回值 error 为nil 则表示该消息消费成功
否则消息会进入ttl延时队列 重复尝试消费3次
3次后消息如果还是失败 消息就执行失败 进入告警 FailAction
*/
func (t *RecvPro) Consumer(dataByte []byte) error {
//time.Sleep(500*time.Microsecond)
//return errors.New("顶顶顶顶")
fmt.Println(string(dataByte))
time.Sleep(1*time.Second)
//return errors.New("顶顶顶顶")
return nil
}

 

//消息已经消费3次 失败了 请进行处理
/*
如果消息 消费3次后 仍然失败 此处可以根据情况 对消息进行告警提醒 或者 补偿 入库db 钉钉告警等等
*/
func (t *RecvPro) FailAction(dataByte []byte) error {
fmt.Println(string(dataByte))
fmt.Println("任务处理失败了,我要进入db日志库了")
fmt.Println("任务处理失败了,发送钉钉消息通知主人")
return nil
}
func main() {
t := &RecvPro{}
//rabbitmq.Recv(rabbitmq.QueueExchange{
// "a_test_0001",
// "a_test_0001",
// "",
// "",
// "amqp://guest:guest@192.168.2.232:5672/",
//},t,5)
/*
runNums: 表示任务并发处理数量 一般建议 普通任务1-3 就可以了
*/
rabbitmq.Recv(rabbitmq.QueueExchange{
"a_test_0001",
"a_test_0001",
"hello_go",
"direct",
"amqp://guest:guest@192.168.2.232:5672/",
},t,3)
}
说明:
rabbitmq.Recv(rabbitmq.QueueExchange{
"a_test_0001",
"a_test_0001",
"hello_go",
"direct",
"amqp://guest:guest@192.168.2.232:5672/",
},t,3)

 

第一个参数 QueueExchange说明

// 定义队列交换机对象
type QueueExchange struct {
QuName string // 队列名称
RtKey string // key值
ExName string // 交换机名称
ExType string // 交换机类型
Dns string //链接地址
}

 

第二个参数 type Receiver interface说明












ConsumerFailAction
拿到消息后,用户可以处理任务,如果消费成功 返回nil即可,如果处理失败,返回一个自定义error即可由于消息内部自带消息失败尝试3次机制,3次如果失败后就没必要一直存储在mq,所以此处扩展,可以用作消息补偿和告警

// 定义接收者接口
type Receiver interface {
Consumer([]byte) error
FailAction([]byte) error
}

 

第三个参数:runNusm











runNusm
消息并发数,同时可以处理多少任务 普通任务 设置为1即可 需要并发的设置成3-5即可


推荐阅读
  • 在探讨 AS3 中的数据深度复制技术时,本文详细介绍了实现数据深度克隆的有效方法。通过对比多种方案,最终确定了一种高效且可靠的实现方式,所有代码均来源于公开资源,确保了方法的实用性和可操作性。 ... [详细]
  • 本文深入探讨了RecyclerView的缓存与视图复用机制,详细解析了不同类型的缓存及其功能。首先,介绍了屏幕内ViewHolder的Scrap缓存,这是一种最轻量级的缓存方式,旨在提高滚动性能并减少不必要的视图创建。通过分析其设计原理,揭示了Scrap缓存为何能有效提升用户体验。此外,还讨论了其他类型的缓存机制,如RecycledViewPool和ViewCacheExtension,进一步优化了视图复用效率。 ... [详细]
  • 如何在Spark数据排序过程中有效避免内存溢出(OOM)问题
    本文深入探讨了在使用Spark进行数据排序时如何有效预防内存溢出(OOM)问题。通过具体的代码示例,详细阐述了优化策略和技术手段,为读者在实际工作中遇到类似问题提供了宝贵的参考和指导。 ... [详细]
  • BZOJ4240 Gym 102082G:贪心算法与树状数组的综合应用
    BZOJ4240 Gym 102082G 题目 "有趣的家庭菜园" 结合了贪心算法和树状数组的应用,旨在解决在有限时间和内存限制下高效处理复杂数据结构的问题。通过巧妙地运用贪心策略和树状数组,该题目能够在 10 秒的时间限制和 256MB 的内存限制内,有效处理大量输入数据,实现高性能的解决方案。提交次数为 756 次,成功解决次数为 349 次,体现了该题目的挑战性和实际应用价值。 ... [详细]
  • Spring Boot 实战(一):基础的CRUD操作详解
    在《Spring Boot 实战(一)》中,详细介绍了基础的CRUD操作,涵盖创建、读取、更新和删除等核心功能,适合初学者快速掌握Spring Boot框架的应用开发技巧。 ... [详细]
  • 本文将详细介绍在Android应用中添加自定义返回按钮的方法,帮助开发者更好地理解和实现这一功能。通过具体的代码示例和步骤说明,本文旨在为初学者提供清晰的指导,确保他们在开发过程中能够顺利集成返回按钮,提升用户体验。 ... [详细]
  • Android 图像色彩处理技术详解
    本文详细探讨了 Android 平台上的图像色彩处理技术,重点介绍了如何通过模仿美图秀秀的交互方式,利用 SeekBar 实现对图片颜色的精细调整。文章展示了具体的布局设计和代码实现,帮助开发者更好地理解和应用图像处理技术。 ... [详细]
  • 本题库精选了Java核心知识点的练习题,旨在帮助学习者巩固和检验对Java理论基础的掌握。其中,选择题部分涵盖了访问控制权限等关键概念,例如,Java语言中仅允许子类或同一包内的类访问的访问权限为protected。此外,题库还包括其他重要知识点,如异常处理、多线程、集合框架等,全面覆盖Java编程的核心内容。 ... [详细]
  • 如何在Java中高效构建WebService
    本文介绍了如何利用XFire框架在Java中高效构建WebService。XFire是一个轻量级、高性能的Java SOAP框架,能够简化WebService的开发流程。通过结合MyEclipse集成开发环境,开发者可以更便捷地进行项目配置和代码编写,从而提高开发效率。此外,文章还详细探讨了XFire的关键特性和最佳实践,为读者提供了实用的参考。 ... [详细]
  • 题目描述:小K不幸被LL邪教洗脑,洗脑程度之深使他决定彻底脱离这个邪教。在最终离开前,他计划再进行一次亚瑟王游戏。作为最后一战,他希望这次游戏能够尽善尽美。众所周知,亚瑟王游戏的结果很大程度上取决于运气,但通过合理的策略和算法优化,可以提高获胜的概率。本文将详细解析洛谷P3239 [HNOI2015] 亚瑟王问题,并提供具体的算法实现方法,帮助读者更好地理解和应用相关技术。 ... [详细]
  • Java 8 引入了 Stream API,这一新特性极大地增强了集合数据的处理能力。通过 Stream API,开发者可以更加高效、简洁地进行集合数据的遍历、过滤和转换操作。本文将详细解析 Stream API 的核心概念和常见用法,帮助读者更好地理解和应用这一强大的工具。 ... [详细]
  • Java队列机制深度解析与应用指南
    Java队列机制在并发编程中扮演着重要角色。本文深入解析了Java队列的各种实现类及其应用场景,包括`LinkedList`、`ArrayBlockingQueue`和`PriorityQueue`等,并探讨了它们在高并发环境下的性能表现和适用场景。通过详细分析这些队列的内部机制和使用技巧,帮助开发者更好地理解和应用Java队列,提升系统的设计和架构能力。 ... [详细]
  • 在幼儿园中,有 \( n \) 个小朋友需要通过投票来决定是否午睡。尽管这个问题对每个孩子来说并不是特别重要,但他们仍然希望通过谦让的方式达成一致。每个人都有自己的偏好,但为了集体和谐,他们决定采用一种最小割的方法来解决这一问题。这种方法不仅能够确保每个人的意愿得到尽可能多的尊重,还能找到一个最优的解决方案,使整体满意度最大化。 ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 在开发Xamarin.Forms应用程序时,遇到了使用Entity Framework Core 3.0访问SQLite数据库时 `Database.MigrateAsync` 方法调用的问题。本文详细探讨了该问题的根源,并提供了一种有效的解决方案,确保数据库迁移能够顺利执行。此外,还介绍了如何配置和优化EF Core以提高应用性能和稳定性。 ... [详细]
author-avatar
少唇_200
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有