热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

微服务之间通过RabbitMQ通信

微服务之间是相互独立的,不像单个工程一样各个模块之间可以直接通过方法调用实现通信,相互独立的服务直接

微服务之间通过RabbitMQ通信

微服务之间是相互独立的,不像单个工程一样各个模块之间可以直接通过方法调用实现通信,相互独立的服务直接一般的通信方式是使用 HTTP协议rpc协议 或者使用消息中间件如 RabbitMQ``Kafka

微服务之间通过RabbitMQ通信

image

在这篇文章 使用Golang和 MongoDB 构建微服务 已经实现了一个微服务的应用,在文章中已经实现了各个服务直接的通信,是使用的 HTTP 的形式 ,那各个服务之间如何通过 RabbitMQ 进行消息通信呢,我们现在要实现一个功能,就是一个用户预订电影票的接口,需要服务 User Service(port 8000) 和 服务 Booking Service(port 8003) 之间通信,用户预订之后,把预订信息写入到 booking 的数据库中

安装 RabbitMQ

安装 RabbitMQ 之前需要先安装 Erlang 的环境 ,然后下载安装 RabbitMQ ,请选择对应的版本,安装完成之后,RabbitMQ在Windows上是作为一个服务在后台运行,关于 RabbitMQ 的接口如何使用,请参考官网的 教程 ,有各个主流语言的实现我们使用的是 Go 版本,请下载对应的实现接口 go get github.com/streadway/amqp

RabbitMQ 的接口做一下简单的封装

  • 定义一个接口

messaging/message.go

type IMessageClient interface {
    ConnectToBroker(connectionStr string) error
    PublishToQueue(data []byte, queueName string) error
    SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error
    Close()
}

type MessageClient struct {
    conn *amqp.Connection
}
  • 连接接口
func (m *MessageClient) ConnectToBroker(connectionStr string) error {
    if cOnnectionStr== "" {
        panic("the connection str mustnt be null")
    }
    var err error
    m.conn, err = amqp.Dial(connectionStr)
    return err
}
  • 发布消息接口
func (m *MessageClient) PublishToQueue(body []byte, queueName string) error {
    if m.cOnn== nil {
        panic("before publish you must connect the RabbitMQ first")
    }

    ch, err := m.conn.Channel()
    defer ch.Close()
    failOnError(err, "Failed to open a channel")

    q, err := ch.QueueDeclare(
        queueName,
        false,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Publish(
        "",
        q.Name,
        false,
        false,
        amqp.Publishing{
            ContentType: "application/json",
            Body:        body,
        },
    )
    failOnError(err, "Failed to publish a message")

    return nil
}
  • 订阅消息接口
func (m *MessageClient) SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error {
    ch, err := m.conn.Channel()
    //defer ch.Close()
    failOnError(err, "Failed to open a channel")

    q, err := ch.QueueDeclare(
        queueName,
        false,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )

    failOnError(err, "Failed to register a consumer")
    go consumeLoop(msgs, handlerFunc)
    return nil
}

实现通信

User Service 中定义一个新的 POST 接口 /user/{name}/booking ,实现用户的预订功能,预订之后,通过 RabbitMQ 发布一个消息给

Booking Service, Booking Service 接收到消息之后,做相应的处理(写入数据库)

User Service

  • 初始化 MessageClient

users/controllers/user.go

var client messaging.IMessageClient

func init() {
    client = &messaging.MessageClient{}
    err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
    if err != nil {
        fmt.Println("connect to rabbitmq error", err)
    }
}
  • 添加新的路由和实现

routes.go

register("POST", "/user/{name}/booking", controllers.NewBooking, nil)

users/controllers/user.go

func NewBooking(w http.ResponseWriter, r *http.Request) {
    params := mux.Vars(r)
    user_name := params["name"]
    defer r.Body.Close()

    var bookings models.Booking
    body, _ := ioutil.ReadAll(r.Body)
    err := json.Unmarshal(body, &bookings)
    if err != nil {
        fmt.Println("the format body error ", err)
    }
    fmt.Println("user name:", user_name, bookings)
    go notifyMsg(body)
}
  • 用一个协程实现消息的发布
func notifyMsg(body []byte) {
    err := client.PublishToQueue(body, "new_booking")
    if err != nil {
        fmt.Println("Failed to publis message", err)
    }
}

Booking Service

  • 初始化MessageClient
var client messaging.IMessageClient

func initMessage() {
    client = &messaging.MessageClient{}
    err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
    if err != nil {
        fmt.Println("Failed to connect to RabbitMQ", err)
    }

    err = client.SubscribeToQueue("new_booking", getBooking)
    if err != nil {
        fmt.Println("Failed to comsuer the msg", err)
    }
}

在 web服务之前启动

func main() {

    initMessage()

    r := routes.NewRouter()
    http.ListenAndServe(":8003", r)

}
  • 接收后的消息处理
func getBooking(delivery amqp.Delivery) {

  var booking models.Booking
    json.Unmarshal(delivery.Body, &booking)
  booking.Id = bson.NewObjectId().Hex()
    dao.Insert("Booking", "BookModel", booking)
    fmt.Println("the booking msg", booking)
}

验证,需要启动 User ServiceBooking Service

使用 Postman 发送对应的数据

post 127.0.0.1:8000/user/kevin_woo/booking

{
    "name":"kevin_woo",
    "books":[
        {
            "date":"20180727",
            "movies":["5b4c45d49d5e3e33c4a5b97a"]
        },
        {
            "date":"20180810",
            "movies":["5b4c45ea9d5e3e33c4a5b97b"]
        }
    ]
}

可以看到数据库已经有了一条新的预订信息

说明,我这里POST的数据就是booking数据库中的结构,实际情况需要对数据进行封装处理,在POST数据时,没有对数据进行验证,

在实际开发过程中需要对各个数据做相应的验证,这里主要是看一下 RabbitMQ的消息传递处理的过程

源码 Github


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 我们


推荐阅读
  • Abp+MongoDb改造默认的审计日志存储位置
    一、背景在实际项目的开发当中,使用AbpZero自带的审计日志功能写入效率比较低。其次审计日志数据量中后期十分庞大,不适合与业务数据存放在一起。所以我们可以重新实现A ... [详细]
  • Allegro总结:1.防焊层(SolderMask):又称绿油层,PCB非布线层,用于制成丝网印板,将不需要焊接的地方涂上防焊剂.在防焊层上预留的焊盘大小要比实际的焊盘大一些,其差值一般 ... [详细]
  • Sleuth+zipkin链路追踪SpringCloud微服务的解决方案
    在庞大的微服务群中,随着业务扩展,微服务个数增多,系统调用链路复杂化。Sleuth+zipkin是解决SpringCloud微服务定位和追踪的方案。通过TraceId将不同服务调用的日志串联起来,实现请求链路跟踪。通过Feign调用和Request传递TraceId,将整个调用链路的服务日志归组合并,提供定位和追踪的功能。 ... [详细]
  • 有意向可以发简历到邮箱内推.简历直达组内Leader.能做同事的话,内推奖励全给你. ... [详细]
  • 一、前言在数据库中,慢查询日志通常是用来进行优化数据库,MySQL中存在慢查询,Mongodb中也是如此。在Mongo中的慢查询属于Mon ... [详细]
  • 在Docker中,将主机目录挂载到容器中作为volume使用时,常常会遇到文件权限问题。这是因为容器内外的UID不同所导致的。本文介绍了解决这个问题的方法,包括使用gosu和suexec工具以及在Dockerfile中配置volume的权限。通过这些方法,可以避免在使用Docker时出现无写权限的情况。 ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • ZSI.generate.Wsdl2PythonError: unsupported local simpleType restriction ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • Linux如何安装Mongodb的详细步骤和注意事项
    本文介绍了Linux如何安装Mongodb的详细步骤和注意事项,同时介绍了Mongodb的特点和优势。Mongodb是一个开源的数据库,适用于各种规模的企业和各类应用程序。它具有灵活的数据模式和高性能的数据读写操作,能够提高企业的敏捷性和可扩展性。文章还提供了Mongodb的下载安装包地址。 ... [详细]
  • Imtryingtofigureoutawaytogeneratetorrentfilesfromabucket,usingtheAWSSDKforGo.我正 ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • Hadoop 源码学习笔记(4)Hdfs 数据读写流程分析
    Hdfs的数据模型在对读写流程进行分析之前,我们需要先对Hdfs的数据模型有一个简单的认知。数据模型如上图所示,在NameNode中有一个唯一的FSDirectory类负责维护文件 ... [详细]
  • 今天我们学习,数据库mongodb的使用,最下面有mongodb的下载链接。pipinstallpymongo首先安装pymongo,然后在需要用到的地方importpymongo ... [详细]
author-avatar
博仔Mmi
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有