热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Golang与RabbitMQ

文章目录RabbitMQ概述RabbitMQ特点RabbitMQ基础概念RabbitMQ工作流程Golang操作RabbitMQ基础队列使用工作队列参考资料RabbitMQ概述

文章目录


      • RabbitMQ 概述
        • RabbitMQ 特点
        • RabbitMQ基础概念
        • RabbitMQ 工作流程
      • Golang 操作RabbitMQ
        • 基础队列使用
        • 工作队列
      • 参考资料

RabbitMQ 概述

RabbitMQ是采用Erlang编程语言实现了高级消息队列协议AMQP (Advanced Message Queuing Protocol)的开源消息代理软件(消息队列中间件)


市面上流行的消息队列中间件有很多种,而RabbitMQ只是其中比较流行的一种

我们简单说说消息队列中间件的作用

  • 解耦
  • 削峰
  • 异步处理
  • 缓存存储
  • 消息通信
  • 提高系统拓展性


RabbitMQ 特点


  1. 可靠性

    通过一些机制例如,持久化,传输确认等来确保消息传递的可靠性

  2. 拓展性

    多个RabbitMQ节点可以组成集群

  3. 高可用性

    队列可以在RabbitMQ集群中设置镜像,如此一来即使部分节点挂掉了,但是队列仍然可以使用

  4. 多种协议

    原生的支持AMQP,也能支持STOMP,MQTT等协议

  5. 丰富的客户端

    我们常用的编程语言都支持RabbitMQ

  6. 管理界面

    自带提供一个WEB管理界面

  7. 插件机制

    RabbitMQ 自己提供了很多插件,可以按需要进行拓展 Plugins


RabbitMQ基础概念


总体上看RabbitMQ是一个生产者和消费者的模型, 接收,存储 ,转发


在这里插入图片描述

我们看看在RabbitMQ中的几个主要概念


  1. Producer (生产者) : 消息的生产者,投递方

  2. Consumer (消费者) : 消息的消费者

  3. RabbitMQ Broker (RabbitMQ 代理) : RabbitMQ 服务节点(单机情况中,就是代表RabbitMQ服务器)

  4. Queue (队列) : 在RabbitMQ中Queue是存储消息数据的唯一形式

  5. Binding (绑定) : RabbitMQ中绑定(Binding)是交换机(exchange)将消息(message)路由给队列(queue)所需遵循的规则。如果要指示交换机“E”将消息路由给队列“Q”,那么“Q”就需要与“E”进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。

  6. RoutingKey (路由键) : 消息投递给交换器,通常会指定一个 RoutingKey ,通过这个路由键来明确消息的路由规则

    RoutingKey 通常是生产者和消费者有协商一致的key策略,消费者就可以合法从生产者手中获取数据。这个RoutingKey主要当Exchange交换机模式为设定为direct和topic模式的时候使用,fanout模式不使用RoutingKey

  7. Exchange (交换机) : 生产者将消息发送给交换器(交换机),再由交换器将消息路由导对应的队列中

    交换机四种类型 : fanout,direct,topic,headers

    1. fanout (扇形交换机) :

      将发送到该类型交换机的消息(message)路由到所有的与该交换机绑定的队列中,如同一个"扇"状扩散给各个队列

    在这里插入图片描述

    fanout类型的交换机会忽略RoutingKey的存在,将message直接"广播"到绑定的所有队列中

    1. direct (直连交换机) :

      根据消息携带的路由键(RoutingKey) 将消息投递到对应的队列中

在这里插入图片描述


direct类型的交换机(exchange)是RabbitMQ Broker的默认类型,它有一个特别的属性对一些简单的应用来说是非常有用的,在使用这个类型的Exchange时,可以不必指定routing key的名字,在此类型下创建的Queue有一个默认的routing key,这个routing key一般同Queue同名。



  1. Topic (主题交换机) :

    topic类型交换机在RoutingKeyBindKey 匹配规则上更加的灵活. 同样是将消息路由到RoutingKeyBindingKey 相匹配的队列中,但是匹配规则有如下的特点 :

    规则1: RoutingKey 是一个使用. 的字符串 例如: “go.log.info” , “java.log.error”

    规则2: BingingKey 也会一个使用 . 分割的字符串, 但是在 BindingKey 中可以使用两种特殊字符 *# ,其中 “*” 用于匹配一个单词,"#"用于匹配多规格单词(零个或者多个单词)

在这里插入图片描述


RoutingKey和BindingKey 是一种"模糊匹配" ,那么一个消息Message可能 会被发送到一个或者多个队列中

无法匹配的消息将会被丢弃或者返回者生产者



  1. Headers (头交换机):

    Headers类型的交换机使用不是很多

    关于Headers Exchange 摘取一段比较容易理解的解释 :

    有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

    我们可以绑定一个队列到头交换机上,并给他们之间的绑定使用多个用于匹配的头(header)。这个案例中,消息代理得从应用开发者那儿取到更多一段信息,换句话说,它需要考虑某条消息(message)是需要部分匹配还是全部匹配。上边说的“更多一段消息”就是"x-match"参数。当"x-match"设置为“any”时,消息头的任意一个值被匹配就可以满足条件,而当"x-match"设置为“all”的时候,就需要消息头的所有值都匹配成功。

    头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。


RabbitMQ 工作流程

消息生产流程


  1. 消息生产者连与RabbitMQ Broker 建立一个连接,建立好了连接之后,开启一个信道Channel
  2. 声明一个交换机,并设置其相关的属性(交换机类型,持久化等)
  3. 声明一个队列并设置其相关属性(排他性,持久化自动删除等)
  4. 通过路由键将交换机和队列绑定起来
  5. 消息生产者发送消息给 RabbitMQ Broker , 消息中包含了路由键,交换机等信息,交换机根据接收的路由键查找匹配对应的队列
  6. 查找匹配成功,则将消息存储到队列中
  7. 查找匹配失败,根据生产者配置的属性选择丢弃或者回退给生产者
  8. 关闭信道Channel , 关闭连接

消息消费流程


  1. 消息消费者连与RabbitMQ Broker 建立一个连接,建立好了连接之后,开启一个信道Channel
  2. 消费者向RabbitMQ Broker 请求消费者相应队列中的消息
  3. 等待RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息
  4. 消费者确认(ack) 接收消息, RabbitMQ Broker 消除已经确认的消息
  5. 关闭信道Channel ,关闭连接

Golang 操作RabbitMQ


RabbitMQ 支持我们常见的编程语言,此处我们使用 Golang 来操作

Golang操作RabbitMQ的前提我们需要有个RabbitMQ的服务端,至于RabbitMQ的服务怎么搭建我们此处就不详细描述了.

Golang操作RabbitMQ的客户端包,网上已经有一个很流行的了,而且也是RabbitMQ官网比较推荐的,不需要我们再从头开始构建一个RabbitMQ的Go语言客户端包. 详情

go get github.com/streadway/amqp


项目目录

___lib
______commonFunc.go
___producer.go
___comsumer.go

commonFunc.go

package libimport ("github.com/streadway/amqp""log"
)
// RabbitMQ连接函数
func RabbitMQConn() (conn *amqp.Connection,err error){// RabbitMQ分配的用户名称var user string = "admin"// RabbitMQ用户的密码var pwd string = "123456"// RabbitMQ Broker 的ip地址var host string = "192.168.230.132"// RabbitMQ Broker 监听的端口var port string = "5672"url := "amqp://"+user+":"+pwd+"@"+host+":"+port+"/"// 新建一个连接conn,err =amqp.Dial(url)// 返回连接和错误return
}
// 错误处理函数
func ErrorHanding(err error, msg string){if err != nil{log.Fatalf("%s: %s", msg, err)}
}

基础队列使用


简单队列模式是RabbitMQ的常规用法,简单理解就是消息生产者发送消息给一个队列,然后消息的消息的消费者从队列中读取消息

当多个消费者订阅同一个队列的时候,队列中的消息是平均分摊给多个消费者处理
定义一个消息的生产者

producer.go


package mainimport ("encoding/json""log""myDemo/rabbitmq_demo/lib""github.com/streadway/amqp"
)
type simpleDemo struct {Name string `json:"name"`Addr string `json:"addr"`
}
func main() {// 连接RabbitMQ服务器conn, err := lib.RabbitMQConn()lib.ErrorHanding(err, "Failed to connect to RabbitMQ")// 关闭连接defer conn.Close()// 新建一个通道ch, err := conn.Channel()lib.ErrorHanding(err, "Failed to open a channel")// 关闭通道defer ch.Close()// 声明或者创建一个队列用来保存消息q, err := ch.QueueDeclare(// 队列名称"simple:queue", // namefalse, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)lib.ErrorHanding(err, "Failed to declare a queue")data := simpleDemo{Name: "Tom",Addr: "Beijing",}dataBytes,err := json.Marshal(data)if err != nil{lib.ErrorHanding(err,"struct to json failed")}err = ch.Publish("", // exchangeq.Name, // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: dataBytes,})log.Printf(" [x] Sent %s", dataBytes)lib.ErrorHanding(err, "Failed to publish a message")
}

定义一个消息的消费者

comsumer.go


package mainimport ("log""myDemo/rabbitmq_demo/lib"
)func main() {conn, err := lib.RabbitMQConn()lib.ErrorHanding(err,"failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()lib.ErrorHanding(err,"failed to open a channel")defer ch.Close()q, err := ch.QueueDeclare("simple:queue", // namefalse, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)lib.ErrorHanding(err,"Failed to declare a queue")// 定义一个消费者msgs, err := ch.Consume(q.Name, // queue"", // consumertrue, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)lib.ErrorHanding(err,"Failed to register a consume")go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")select {}
}

工作队列


工作队列也称为 任务队列 任务队列是为了避免等待执行一些耗时的任务,而是将需要执行的任务封装为消息发送给工作队列,后台运行的工作进程将任务消息取出来并执行相关任务 , 多个后台工作进程同时间进行,那么任务在他们之间共享


在这里插入图片描述


我们定义一个任务的生产者,用于生产任务消息

task.go


package mainimport ("github.com/streadway/amqp""log""myDemo/rabbitmq_demo/lib""os""strings"
)func bodyFrom(args []string) string {var s stringif (len(args) <2) || os.Args[1] &#61;&#61; "" {s &#61; "no task"} else {s &#61; strings.Join(args[1:], " ")}return s
}
func main() {// 连接RabbitMQ服务器conn, err :&#61; lib.RabbitMQConn()lib.ErrorHanding(err, "Failed to connect to RabbitMQ")// 关闭连接defer conn.Close()// 新建一个通道ch, err :&#61; conn.Channel()lib.ErrorHanding(err, "Failed to open a channel")// 关闭通道defer ch.Close()// 声明或者创建一个队列用来保存消息q, err :&#61; ch.QueueDeclare(// 队列名称"task:queue", // namefalse, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)lib.ErrorHanding(err, "Failed to declare a queue")body :&#61; bodyFrom(os.Args)err &#61; ch.Publish("",q.Name,false,false,amqp.Publishing{ContentType: "text/plain",// 将消息标记为持久消息DeliveryMode: amqp.Persistent,Body: []byte(body),})lib.ErrorHanding(err, "Failed to publish a message")log.Printf("sent %s", body)
}

定义一个工作者,用于消费掉任务消息

worker.go


package mainimport ("log""myDemo/rabbitmq_demo/lib"
)func main() {conn, err :&#61; lib.RabbitMQConn()lib.ErrorHanding(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err :&#61; conn.Channel()lib.ErrorHanding(err, "Failed to open a channel")defer ch.Close()q, err :&#61; ch.QueueDeclare("task:queue", // namefalse, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)lib.ErrorHanding(err, "Failed to declare a queue")// 将预取计数器设置为1// 在并行处理中将消息分配给不同的工作进程err &#61; ch.Qos(1, // prefetch count0, // prefetch sizefalse, // global)lib.ErrorHanding(err, "Failed to set QoS")msgs, err :&#61; ch.Consume(q.Name, // queue"", // consumerfalse, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)lib.ErrorHanding(err, "Failed to register a consumer")forever :&#61; make(chan bool)go func() {for d :&#61; range msgs {log.Printf("Received a message: %s", d.Body)log.Printf("Done")d.Ack(false)}}()log.Printf(" [*] Waiting for messages. To exit press CTRL&#43;C")<-forever
}

测试

#shell1
go run task.go#shell2
go run worker.go#shell3
go run worker.go

RabbitMQ 的用法很多,详情参看官网文档


参考资料

https://www.rabbitmq.com/getstarted.html
http://rabbitmq.mr-ping.com/
https://github.com/streadway/amqp
https://blog.csdn.net/u013256816/category_6532725.html


推荐阅读
  • rabbitmq杂谈
    rabbitmq中的consumerTag和deliveryTag分别是干啥的,有什么用?同一个会话,consumerTag是固定的可以做此会话的名字,deliveryTag每次接 ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • RabbitMQ的消息持久化处理
    1、RabbitMQ的消息持久化处理,消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。2、auto ... [详细]
  • 消息中间件RabbitMQ 高级特性之消费端ACK与重回队列
    什么是消费端的ACK和重回队列?消费端的手工ACK和NACK消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿如果由于服务器宕机等严重问题 ... [详细]
  • 6(自)、交换机之关键字模式
    上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(error),而不存储那些警告(warnning)或者 ... [详细]
  • 本文回顾了3.21开学以来的学习情况,包括javaWeb课程的迷糊感和未预习导致的不知所措,以及对VOJ题目的归类和解答。午饭前完成了阶乘相关的两道题目。下午的数据结构课听懂了队列的讲解,但有几个疑问未能及时复习。设计模式课程因预习效率低而感到困惑,同时也没搞清楚下节课的内容。晚上去图书馆学习。通过反思和总结,对自己的学习收获有了更深刻的认识。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • Python爬虫中使用正则表达式的方法和注意事项
    本文介绍了在Python爬虫中使用正则表达式的方法和注意事项。首先解释了爬虫的四个主要步骤,并强调了正则表达式在数据处理中的重要性。然后详细介绍了正则表达式的概念和用法,包括检索、替换和过滤文本的功能。同时提到了re模块是Python内置的用于处理正则表达式的模块,并给出了使用正则表达式时需要注意的特殊字符转义和原始字符串的用法。通过本文的学习,读者可以掌握在Python爬虫中使用正则表达式的技巧和方法。 ... [详细]
  • 本文整理了315道Python基础题目及答案,帮助读者检验学习成果。文章介绍了学习Python的途径、Python与其他编程语言的对比、解释型和编译型编程语言的简述、Python解释器的种类和特点、位和字节的关系、以及至少5个PEP8规范。对于想要检验自己学习成果的读者,这些题目将是一个不错的选择。请注意,答案在视频中,本文不提供答案。 ... [详细]
  • node.jsurlsearchparamsAPI哎哎哎 ... [详细]
  • 正则表达式及其范例
    为什么80%的码农都做不了架构师?一、前言部分控制台输入的字符串,编译成java字符串之后才送进内存,比如控制台打\, ... [详细]
  • RabbitMq之发布确认高级部分1.为什么会需要发布确认高级部分?在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢 ... [详细]
  • CISCO ASA防火墙Failover+multiple context详细部署By 年糕泰迪[操作系统入门]
    一.文章概述本文主要就CISCOASA防火墙的高可用和扩张性进行阐述和部署。再cisco防火墙系列中主要有3种技术来实现高可用和扩张性。分别是Failover,multiplese ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
author-avatar
讨厌上学的-彭志超-_354
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有