热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Go微服务第九部分使用RabbitMQ和AMQP进行消息传递

第九部分:Go语言微服务系列–使用RabbitMQ和AMQP进行消息传递本文我们将通过RabbitMQ和AMQP协议在Go微服务之间进行消息传递。简介微服务是将应用程序的业务领域分
第九部分: Go语言微服务系列 – 使用RabbitMQ和AMQP进行消息传递

本文我们将通过RabbitMQ和AMQP协议在Go微服务之间进行消息传递。

简介

微服务是将应用程序的业务领域分离成具有清晰分离域的边界上下文,运行进程分离,其中任何跨域边界的持久关系必须依赖最终一致性,而不是类似于ACID事务或外键约束。其中很多概念都来自域驱动设计,或受其启发。领域驱动设计是另外一个很大的话题,足以用一个文章系列来介绍。

在我们Go语言微服务博客系列的上下文和微服务大体架构中,实现服务间的松耦合的一种模式是使用消息传递来进行服务间通信,不需要严格的请求/响应消息交换或类似的消息交换。也就是说,使用消息传递只是便于服务间松耦合的众多策略中的一种。

在Spring Cloud中,RabbitMQ似乎是选择的消息中间人(代理), 特别是因为在第八部分中我们看到的,Spring Cloud Config服务器具有RabbitMQ运行时依赖。

本文中,将会让accountservice服务每当读取特殊账号对象时,就在RabbitMQ exchange上放一条消息。这个消息会被一个我们本文所实现的全新微服务消费。我们也将处理Go代码在多微服务间的复用问题,将多服务复用代码放在common类库中,这样每个微服务都可以import它。

还记得我们在第一部分中的系统景观的图片吗? 下面是在本部分完成之后看起来的样子:

《Go微服务 - 第九部分 - 使用RabbitMQ和AMQP进行消息传递》

  • 实现集中配置服务
  • 实现服务间通信的消息传递
  • 实现两个微服务accountservice和vipservice

依然还有很多元素尚未实现。 不要担心,我们慢慢都会做到的。

源代码

这一部分有很多源代码,本文不会包含所有代码。 要查看完整代码,可克隆并切换到P9分支,或者直接查看https://github.com/callistaen…。

发送消息

我们将实现一个简单的虚构(make-believe)用例: 当特定VIP账号在读取accountservice服务时,我们希望通知一个vip offer服务,在某些情况下,它将为账户持有人产生”offer”。在适当设计的领域模型中,账户对象和VIP offer对象时两个独立领域,它们应该尽可能少的互相了解。

《Go微服务 - 第九部分 - 使用RabbitMQ和AMQP进行消息传递》

换言之,accountservice不能直接访问VIP服务的存储。这个例子中,我们通过RabbitMQ传递一个消息给vipservice, 完全将业务逻辑和持久化都委托给vipservice。

我们将使用AMQP协议做所有通信,这个协议是面向互操作性消息传递的ISO标准应用程序层协议。我们的选择使用的Go类库是streadway/amqp, 类似在第八部分中我们消费配置更新时候使用的。

让我们重复在AMQP中exchange和publisher, consumer和queue之间的关系:
《Go微服务 - 第九部分 - 使用RabbitMQ和AMQP进行消息传递》

也就是说消息被发布到exchange, 然后将消息副本基于路由规则和可能已经注册消费者的绑定分布到queue。在quora.com网站上的这个帖子对这个话题进行了很好的解释。

Thread vs Post: 在论坛中,常用Thread和Post代指某些东西。但是这两者有什么区别呢?

通俗的讲Thread就是论坛中最初发起的某个主题的话题, 包含很多Post(A thread is a group of posts on a single topic.)。中文社区通常所谓的楼主发的第一个东西。 而Post则是对楼主最初发的内容做的回复或跟帖。

参考链接:
https://www.drupal.org/projec…。

为什么RabbitMQ有Queue,还要有Exchange?

现实中的(Quora中的答案)例子:

假设你在Apple商店里边,先要买耳机。 店里就会有人过来问你:”需要什么?” 你告诉他你需要买耳机,然后他就把你带到他的同事的柜台前的排队队列之后等待。因为很多其他人也在买东西,销售员正在处理队列前面的那个消费者。 如果这个时候,另外一个人进店了,刚才招呼你的人会同样询问对方需要什么帮助。刚进来的人需要修下手机,被找呼的人带到了另外一个修理手机的柜台等待了。

这个例子中问你需要什么的人就是exchange, 他会根据需要把你路由到恰当的队列中排队等待。在队列的后面有很多员工,也就是对应队列的worker, 或者消费者。一次处理一个请求,基于先进先出的原则。也可能会根据最先到的人做一个简单轮询。

如果店里没有导流的服务员,那么你就需要来回在每个柜台前来回问是否能帮到你,直到找到你需要办理业务的柜台后开始排队。

当然,导航苹果商店的工作不复杂,但在应用程序中,你可能有很多队列,服务不同类型的请求,基于路由和绑定具有交换路由消息的键来说非常有帮助。 发布者只需要关心添加正确的路由密匙,而消费者只需要关心用正确的绑定密匙创建正确的队列,就可以做到”我对这些消息感兴趣。”

消息传递代码

既然我们需要在accountservice和vipservice中使用消息传递代码和从Spring Cloud Config服务器上加载配置的代码,我们可以创建可共享的库。

我们在goblog目录下面创建一个common目录来保存我们可复用的东西:

mkdir -p common/messaging
mkdir -p common/config

我们将所有AMQP相关的代码放在messaging目录,配置相关的放在config目录。这样你可以把之前的goblog/accountservice/config中的代码移到common/config目录中,并相应的修改import语句中的代码位置。可以看看已完成代码看它是如何支持的。

消息传递代码在单独文件中封装起来,里边定义了我们应用将用于连接、发布和订阅的接口以及具体实现。老实说,对于使用streadway/amqp的AMQP消息传递来说有很多样板代码,因此无需在意代码的实现细节。

在common/messaging/下面创建一个messagingclient.go文件:

package messaging
import (
"github.com/streadway/amqp"
"fmt"
"log"
)
// Defines our interface for connecting and consuming messages.
type IMessagingClient interface {
ConnectToBroker(connectionString string)
Publish(msg []byte, exchangeName string, exchangeType string) error
PublishOnQueue(msg []byte, queueName string) error
Subscribe(exchangeName string, exchangeType string, consumerName string, handlerFunc func(amqp.Delivery)) error
SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error
Close()
}
// Real implementation, encapsulates a pointer to an amqp.Connection
type MessagingClient struct {
conn *amqp.Connection
}

上面代码片段,定义了messaging的接口。 这就是accountservice和vipservice需要消息传递的时候需要使用它们进行处理的,希望能从很多复杂的东西里边抽象出来。注意我已经选择两种变体”Product”和”Consume”来使用topics和direct/queue消息模式。

接下来,我们定义了一个保存amqp.Connection指针的结构体,我们会将必要的方法绑定到它上面(隐式的,因为Go语言中都是这样干的), 这样就实现了我们声明的接口。

func (m *MessagingClient) ConnectToBroker(connectionString string) {
if cOnnectionString== "" {
panic("Cannot initialize connection to broker, connectionString not set. Have you initialized?")
}
var err error
m.conn, err = amqp.Dial(fmt.Sprintf("%s/", connectionString))
if err != nil {
panic("Failed to connect to AMQP compatible broker at: " + connectionString)
}
}
func (m *MessagingClient) PublishOnQueue(body []byte, queueName string) error {
if m.cOnn== nil {
panic("Tried to send message before connection was initialized. Don't do that.")
}
ch, err := m.conn.Channel() // Get a channel from the connection
defer ch.Close()
queue, err := ch.QueueDeclare(// Declare a queue that will be created if not exists with some args
queueName, // our queue name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
// Publishes a message onto the queue.
err = ch.Publish(
"", // exchange
queue.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body, // Our JSON body as []byte
})
fmt.Printf("A message was sent to queue %v: %v", queueName, body)
return err
}

ConnectToBroker中展示了我们如何获取连接指针的,例如amqp.Dial方法。如果我们没有配置或者无法连接我们的broker, 会panic我们的微服务,容器编排会尝试使用新实例重新尝试。 传入的连接字符串就像这样:

amqp://guest:guest@rabbitmq:5672/

注意我们现在使用的是Docker Swarm模式下的RabbitMQ broker的服务名。

PublishOnQueue()函数相当长,它或多或少是从官方例子派生过来的,这里我对其进行了简化,带比较少的参数。要发布消息到命名队列,我们需要传入的参数有:

  • body: 以字节数组形式传入。 可以是JSON,XML或一些二进制。
  • queueName: 要发送消息到的目标队列名字。

要了解更多exchange的详情,可以参考RabbitMQ的官方文档。

PublishOnQueue()方法样本代码使用的很重,但是很容易理解。声明队列(如果不存在就创建它), 然后发布我们的[]byte消息到它里边。发布消息到命名exchange更加复杂,它需要样板代码首先声明一个exchange,一个队列,然后实现将它们绑定一起的代码。 详细请查看完整代码。

继续,实际使用我们MessagingClient的是在goblog/accountservice/service/handlers.go中,因此我们添加一个字段,并硬编码检查是否为VIP, 然后如果请求账号id是10000的话,我们就发送一个消息传递。

var DBClient dbclient.IBoltClient
var MessagingClient messaging.IMessagingClient // 添加新行
var isHealthy = true
func GetAccount(w http.ResponseWriter, r *http.Request) {
// Read the 'accountId' path parameter from the mux map
var accountId = mux.Vars(r)["accountId"]
// Read the account struct BoltDB
account, err := DBClient.QueryAccount(accountId)
account.ServedBy = util.GetIP()
// If err, return a 404
if err != nil {
fmt.Println("Some error occured serving " + accountId + ": " + err.Error())
w.WriteHeader(http.StatusNotFound)
return
}
notifyVIP(account) // 添加新行 同时发送VIP通知。
// NEW call the quotes-service
quote, err := getQuote()
if err == nil {
account.Quote = quote
}
// If found, marshal into JSON, write headers and content
data, _ := json.Marshal(account)
writeJsonResponse(w, http.StatusOK, data)
}
// If our hard-coded "VIP" account, spawn a goroutine to send a message.
func notifyVIP(account model.Account) {
if account.Id == "10000" {
go func(account model.Account) {
vipNotification := model.VipNotification{AccountId: account.Id, ReadAt: time.Now().UTC().String()}
data, _ := json.Marshal(vipNotification)
fmt.Printf("Notifying VIP account %v\n", account.Id)
err := MessagingClient.PublishOnQueue(data, "vip_queue")
if err != nil {
fmt.Println(err.Error())
}
}(account)
}
}

借此机会,我们展示调用新goroutine的内联匿名函数, 也就是说使用了go关键词的。既然我们没有什么理由在发送消息传递的时候需要阻塞执行HTTP处理的主goroutine, 那么这种情况就是使用goroutine实现并行的最佳时机。

main.go文件也需要更新一点代码以便可以在启动的时候使用加载的并注入到Viper中的配置来初始化AMQ连接。

...
func main() {
fmt.Printf("Starting %v\n", appName)
config.LoadConfigurationFromBranch(
viper.GetString("configServerUrl"),
appName,
viper.GetString("profile"),
viper.GetString("configBranch"))
initializeBoltClient()
initializeMessaging() // 新增行,初始化消息传递
handleSigterm(func() {
service.MessagingClient.Close()
})
service.StartWebServer(viper.GetString("server_port"))
}
func initializeMessaging() {
if !viper.IsSet("amqp_server_url") {
panic("No 'amqp_server_url' set in configuration, cannot start")
}
service.MessagingClient = &messaging.MessagingClient{}
service.MessagingClient.ConnectToBroker(viper.GetString("amqp_server_url"))
service.MessagingClient.Subscribe(viper.GetString("config_event_bus"), "topic", appName, config.HandleRefreshEvent)
}
...

没有什么大不了的东西 – 我们创建一个空的MessagingClient实例并将其地址赋值给service.MessagingClient, 然后使用配置amqp_server_url来调用ConnectToBroker方法。如果配置中没有broker_url,我们就panic()退出,因为我们不希望在甚至都没有可能连接到broker的情况下运行服务。

如果成功的连接到broker, 那么我们就调用Subscribe方法来订阅由配置指定的topic。

更新配置

我们在我们的.yml配置文件中添加amqp_broker_url属性到第八部分中的配置文件中,这些东西已经没有人管了。

broker_url: amqp://guest:guest@192.168.99.100:5672 _(dev)_
broker_url: amqp://guest:guest@rabbitmq:5672 _(test)_

注意test profile, 我们使用的是Swarm服务名”rabbitmq”, 而不是我笔记本上看到的Swarm的网络IP地址。(你实际的IP地址可能会变化,192.168.99.100似乎是运行Docker Toolbox的标准IP)。

配置文件中使用明文的用户名和密码是不推荐的,在现实生活中,我们一般会使用第八部分中看到的Spring Cloud Config服务器内置的加密特性。

单元测试

当然,我们应该至少编写一个单元测试,确保我们handlers.go中的GetAccount函数当某人请求神奇的并非常特殊的账号标识为10000的账号时尝试发送一个消息。为此,我们需要模拟IMessagingClient和handlers_test.go中添加新的测试用例实现。让我们开始模拟吧。 这次我们将使用第三方工具mockery来产生IMessagingClient接口的实现:(记住在命令行运行这些命令的时候使用恰当的GOPATH设置)。

> go get github.com/vektra/mockery/.../
> cd $GOPATH/src/github.com/callistaenterprise/goblog/common/messaging
> ./$GOPATH/bin/mockery -all -output .
Generating mock for: IMessagingClient

我们现在在当前目录有一个IMessagingClient.go模拟文件。 我不太喜欢这样的文件名字,不喜欢驼峰,所以我将它重命名为一个明显的东西,它模拟并遵循本博客系列中文件名的约定。

mv IMessagingClient.go mockmessagingclient.go

可能需要调整一般文件中的import语句,删除import别名。 除了那些,我们使用一个黑盒方式来达到这个特殊模拟 – 仅假设它在我们开始写测试的时候会工作。

请随意检查生成的模拟实现的源代码,它非常类似我们之前第四部分中手工写的东西。

切到handlers_test.go,我们添加一个新的测试用例:

// declare mock types to make test code a bit more readable
var anyString = mock.AnythingOfType("string")
var anyByteArray = mock.AnythingOfType("[]uint8") // == []byte
func TestNotificationIsSentForVIPAccount(t *testing.T) {
// Set up the DB client mock
mockRepo.On("QueryAccount", "10000").Return(model.Account{Id:"10000", Name:"Person_10000"}, nil)
DBClient = mockRepo
mockMessagingClient.On("PublishOnQueue", anyByteArray, anyString).Return(nil)
MessagingClient = mockMessagingClient
Convey("Given a HTTP req for a VIP account", t, func() {
req := httptest.NewRequest("GET", "/accounts/10000", nil)
resp := httptest.NewRecorder()
Convey("When the request is handled by the Router", func() {
NewRouter().ServeHTTP(resp, req)
Convey("Then the response should be a 200 and the MessageClient should have been invoked", func() {
So(resp.Code, ShouldEqual, 200)
time.Sleep(time.Millisecond * 10) // Sleep since the Assert below occurs in goroutine
So(mockMessagingClient.AssertNumberOfCalls(t, "PublishOnQueue", 1), ShouldBeTrue)
})
})})
}

可以查看注释了解详情。我不喜欢在断言调用数之前人为添加10毫秒睡眠,但由于模拟是在goroutine中调用,和主线程是独立的,我们需要允许它有一些时间来完成。 希望在涉及到有goroutine或者channel的时候,有更好的单元测试方式。

我承认,模拟这种方式比使用类似Mockito的东西更冗余, 当写Java应用的单元测试的时候。不过,我认为可读性和易读性还是不错的。

确保测试通过:

go test ./...

运行

如果你还没有做的话,先运行springcloud.sh脚本更新配置服务器。 然后,运行copyall.sh并等几秒钟更新accountservice。我们将使用curl来获取我们特殊的账号:

> curl http://$ManagerIP:6767/accounts/10000
{"id":"10000","name":"Person_0","servedBy":"10.255.0.11"}

如果所有进行顺利的话,我们可以打开RabbitMQ管理控制台,并看我们是否在名为vipQueue的队列上获得了一个消息。

《Go微服务 - 第九部分 - 使用RabbitMQ和AMQP进行消息传递》

在上面截图最底下,我们看到vipQueue有一个消息。如果我们使用RabbitMQ管理控制台的Get Message功能, 我们会看到下面的消息:

《Go微服务 - 第九部分 - 使用RabbitMQ和AMQP进行消息传递》

在Go语言中实现消费者 – vipservice

最后,是时候从头开始写一个全新的微服务了, 我们需要用它来展示如何从RabbitMQ消费消息。我们将确保应用在前面内容中学到的模式包括:

  • HTTP服务器
  • 健康检查
  • 集中化配置管理
  • 消息传递码复用

如果你已经切出P9分支的代码了,那么在你goblog目录下面就已经有了vipservice了。
我不会一行行过每个代码文件的内容,因为有些和accountservice里边的重复了。相反我将聚焦在刚才发送消息的消费方面。需要注意一些事情:

  • 在config-repo仓库添加了两个新的.yml文件,vipservice-dev.yml和vipservice-test.yml。
  • copyall.sh脚本更新了,让它同时构建和部署accountservice和vipservice。

消费消息

我们会使用common/messaging的SubscribeToQueue函数,例如:

SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error

这里我们应该提供的最重要的是:

  • 队列的名字(例如: vip_queue)。
  • 消费者名字(我们是谁)。
  • 处理器函数,它将使用一个amqp.Delivery参数来调用 – 和第八部分中我们消费配置更新非常类似。

实际上将我们的回调函数绑定到队列的SubscribeToQueue实现的实现并不奇怪,如果我们需要了解细节,可以查阅源代码。

继续快速看看vipservice的入口文件main.go, 看看我们如何设置的:

package main
import (
"flag"
"fmt"
"github.com/callistaenterprise/goblog/common/config"
"github.com/callistaenterprise/goblog/common/messaging"
"github.com/callistaenterprise/goblog/vipservice/service"
"github.com/spf13/viper"
"github.com/streadway/amqp"
"os"
"os/signal"
"syscall"
)
var appName = "vipservice"
var messagingClient messaging.IMessagingClient
func init() {
configServerUrl := flag.String("configServerUrl", "http://configserver:8888", "Address to config server")
profile := flag.String("profile", "test", "Environment profile, something similar to spring profiles")
configBranch := flag.String("configBranch", "master", "git branch to fetch configuration from")
flag.Parse()
viper.Set("profile", *profile)
viper.Set("configServerUrl", *configServerUrl)
viper.Set("configBranch", *configBranch)
}
func main() {
fmt.Println("Starting " + appName + "...")
config.LoadConfigurationFromBranch(viper.GetString("configServerUrl"), appName, viper.GetString("profile"), viper.GetString("configBranch"))
initializeMessaging()
// Makes sure connection is closed when service exits.
handleSigterm(func() {
if messagingClient != nil {
messagingClient.Close()
}
})
service.StartWebServer(viper.GetString("server_port"))
}
func onMessage(delivery amqp.Delivery) {
fmt.Printf("Got a message: %v\n", string(delivery.Body))
}
func initializeMessaging() {
if !viper.IsSet("amqp_server_url") {
panic("No 'broker_url' set in configuration, cannot start")
}
messagingClient = &messaging.MessagingClient{}
messagingClient.ConnectToBroker(viper.GetString("amqp_server_url"))
// Call the subscribe method with queue name and callback function
err := messagingClient.SubscribeToQueue("vip_queue", appName, onMessage)
failOnError(err, "Could not start subscribe to vip_queue")
err = messagingClient.Subscribe(viper.GetString("config_event_bus"), "topic", appName, config.HandleRefreshEvent)
failOnError(err, "Could not start subscribe to "+viper.GetString("config_event_bus")+" topic")
}
// Handles Ctrl+C or most other means of "controlled" shutdown gracefully. Invokes the supplied func before exiting.
func handleSigterm(handleExit func()) {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)
go func() {
<-c
handleExit()
os.Exit(1)
}()
}
func failOnError(err error, msg string) {
if err != nil {
fmt.Printf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}

看起来和accountservice非常相似,对不对? 我们可能会重复如何安装和启动我们添加的每个微服务的基本知识。

onMessage函数在这里仅仅打印我们接到的vip消息的body。如果我们需要实现更多虚构的用例,它会调用一些花哨的逻辑来确定账号持有人是否有资格获得&#8221;超级可怕的购买我们所有东西(TM)&#8221;的offer, 并且可能写一个offer给&#8221;VIP offer数据库&#8221;。你可以随意实现并提交一个PR。

没有什么可补充的。除了这个片段,当我们按下Ctrl + C或者当Swarm认为是时候杀死服务实例:

func handleSigterm(handleExit func()) {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)
go func() {
<-c
handleExit()
os.Exit(1)
}()
}

不是最容易读的代码片段,它所做的就是注册通道c作为os.Interrupt和syscall的监听器。SIGTERM和goroutine会阻塞在c上的消息监听,知道接收到这两种信号。 这样就使得我们非常肯定我们提供的handleExit()函数在微服务被杀掉的时候都会被调用。怎么确定? Ctrl + C或docker swarm扩展也工作良好。kill也一样。 kill -9不会。 因此请求不要使用kill -9停止,除非你必须要这样做。

它将调用我们在IMessageConsumer接口中声明的Close()函数, 它实现的时候确保AMQP连接被正确关闭。

部署运行

我们对copyall.sh内容进行了修改:

#!/bin/bash
export GOOS=linux
export CGO_ENABLED=0
cd accountservice;go get;go build -o accountservice-linux-amd64;echo built `pwd`;cd ..
cd healthchecker;go get;go build -o healthchecker-linux-amd64;echo built `pwd`;cd ..
cd vipservice;go get;go build -o vipservice-linux-amd64;echo built `pwd`;cd ..
export GOOS=darwin
cp healthchecker/healthchecker-linux-amd64 accountservice/
cp healthchecker/healthchecker-linux-amd64 vipservice/
docker build -t someprefix/accountservice accountservice/
docker service rm accountservice
docker service create --name=accountservice --replicas=1 --network=my_network -p=6767:6767 someprefix/accountservice
docker build -t someprefix/vipservice vipservice/
docker service rm vipservice
docker service create --name=vipservice --replicas=1 --network=my_network someprefix/vipservice

运行这个脚本,等待几秒钟,让服务重新构建部署完成。然后查看:

> docker service ls
ID NAME REPLICAS IMAGE
kpb1j3mus3tn accountservice 1/1 someprefix/accountservice
n9xr7wm86do1 configserver 1/1 someprefix/configserver
r6bhneq2u89c rabbitmq 1/1 someprefix/rabbitmq
sy4t9cbf4upl vipservice 1/1 someprefix/vipservice
u1qcvxm2iqlr viz 1/1 manomarks/visualizer:latest

或者可以使用dvizz Docker Swarm服务呈现来查看:

《Go微服务 - 第九部分 - 使用RabbitMQ和AMQP进行消息传递》

检查日志

既然docker service logs特性已经在1.13.0中被标记为试验阶段,我们依然可以使用前面的方式来查看vipservice的日志。首先,运行docker ps找出容器id:

> docker ps
CONTAINER ID IMAGE
a39e6eca83b3 someprefix/vipservice:latest
b66584ae73ba someprefix/accountservice:latest
d0074e1553c7 someprefix/configserver:latest

然后使用vipservice的容器id来查看日志:

> docker logs -f a39e6eca83b3
Starting vipservice...
2017/06/06 19:27:22 Declaring Queue ()
2017/06/06 19:27:22 declared Exchange, declaring Queue ()
2017/06/06 19:27:22 declared Queue (0 messages, 0 consumers), binding to Exchange (key 'springCloudBus')
Starting HTTP service at 6868

然后另外打开一个窗口,执行下面的请求:

> curl http://$ManagerIP:6767/accounts/10000

然后你就会在刚才日志里边看到多了下面一行信息:

Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:27.033757223 +0000 UTC"}

也就是说我们的vipservice成功的消费了从accountservice发布的消息。

Work队列

跨越服务的多个实例的分布式work模式是利用了work队列的概念。每个vip消息应该只能被单个vipservice实例处理。

《Go微服务 - 第九部分 - 使用RabbitMQ和AMQP进行消息传递》

因此让我们看看当我们将vipservice规模扩大到2个的时候会发生什么:

> docker service scale vipservice=2

数秒之后新的实例就可以使用了。既然我们使用的是AMQP中的direct/queue方式,我们希望有轮询的行为。使用curl触发四个VIP账户查询。

> curl http://$ManagerIP:6767/accounts/10000
> curl http://$ManagerIP:6767/accounts/10000
> curl http://$ManagerIP:6767/accounts/10000
> curl http://$ManagerIP:6767/accounts/10000

然后在看看日志:

> docker logs -f a39e6eca83b3
Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:27.033757223 +0000 UTC"}
Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:29.073682324 +0000 UTC"}

正如我们预料的,我们看到第一个实例处理了四条消息中的两条。如果我们对其他的vipservice进行docker logs查询,我们会看到其他的消息在它们里边消费了。非常满意。

占用空间和性能

这次不会做性能测试,在发送和接受一些消息后,快速查看内存使用就足够了:

CONTAINER CPU % MEM USAGE / LIMIT
vipservice.1.tt47bgnmhef82ajyd9s5hvzs1 0.00% 1.859MiB / 1.955GiB
accountservice.1.w3l6okdqbqnqz62tg618szsoj 0.00% 3.434MiB / 1.955GiB
rabbitmq.1.i2ixydimyleow0yivaw39xbom 0.51% 129.9MiB / 1.955GiB

上买呢在服务了一些请求后得到的信息。新的vipservice和accountservice一样不是很复杂,因此和预料的一样启动的时候占用的内存非常小。

总结

本文可能是这个系列目前最长的一篇文章了!我们完成了:

  • 更深入的测试了RabbitMQ和AMQP的机制。
  • 添加了全新的微服务vipservice。
  • 将消息传递和配置代码放到可复用的子项目中。
  • 使用AMQP协议发布/订阅消息。
  • 使用mockery产生模拟代码。

在第十部分,我们将做一些轻量的但在现实世界非常重要的模型 &#8211; 使用Logrus, Docker GELF日志驱动记录结构化日志以及将日志发不到Laas提供者商。

中英文对照表

  • 领域驱动设计: Domain-driven Design(DDD).

参考链接

  • DDD
  • streadway/amqp
  • RabbitMQ为什么既有exchange又有queue?
  • AMQP协议
  • RabbitMQ官方文档
  • RabbitMQ Work Queues
  • Laas
  • 英文第9部分
  • 系列首页
  • 下一节

推荐阅读
  • 本文讨论了编写可保护的代码的重要性,包括提高代码的可读性、可调试性和直观性。同时介绍了优化代码的方法,如代码格式化、解释函数和提炼函数等。还提到了一些常见的坏代码味道,如不规范的命名、重复代码、过长的函数和参数列表等。最后,介绍了如何处理数据泥团和进行函数重构,以提高代码质量和可维护性。 ... [详细]
  • 本文由编程笔记#小编为大家整理,主要介绍了logistic回归(线性和非线性)相关的知识,包括线性logistic回归的代码和数据集的分布情况。希望对你有一定的参考价值。 ... [详细]
  • 基于PgpoolII的PostgreSQL集群安装与配置教程
    本文介绍了基于PgpoolII的PostgreSQL集群的安装与配置教程。Pgpool-II是一个位于PostgreSQL服务器和PostgreSQL数据库客户端之间的中间件,提供了连接池、复制、负载均衡、缓存、看门狗、限制链接等功能,可以用于搭建高可用的PostgreSQL集群。文章详细介绍了通过yum安装Pgpool-II的步骤,并提供了相关的官方参考地址。 ... [详细]
  • 生成式对抗网络模型综述摘要生成式对抗网络模型(GAN)是基于深度学习的一种强大的生成模型,可以应用于计算机视觉、自然语言处理、半监督学习等重要领域。生成式对抗网络 ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • 本文介绍了C#中生成随机数的三种方法,并分析了其中存在的问题。首先介绍了使用Random类生成随机数的默认方法,但在高并发情况下可能会出现重复的情况。接着通过循环生成了一系列随机数,进一步突显了这个问题。文章指出,随机数生成在任何编程语言中都是必备的功能,但Random类生成的随机数并不可靠。最后,提出了需要寻找其他可靠的随机数生成方法的建议。 ... [详细]
  • Voicewo在线语音识别转换jQuery插件的特点和示例
    本文介绍了一款名为Voicewo的在线语音识别转换jQuery插件,该插件具有快速、架构、风格、扩展和兼容等特点,适合在互联网应用中使用。同时还提供了一个快速示例供开发人员参考。 ... [详细]
  • 不同优化算法的比较分析及实验验证
    本文介绍了神经网络优化中常用的优化方法,包括学习率调整和梯度估计修正,并通过实验验证了不同优化算法的效果。实验结果表明,Adam算法在综合考虑学习率调整和梯度估计修正方面表现较好。该研究对于优化神经网络的训练过程具有指导意义。 ... [详细]
  • 开发笔记:计网局域网:NAT 是如何工作的?
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了计网-局域网:NAT是如何工作的?相关的知识,希望对你有一定的参考价值。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • Linux如何安装Mongodb的详细步骤和注意事项
    本文介绍了Linux如何安装Mongodb的详细步骤和注意事项,同时介绍了Mongodb的特点和优势。Mongodb是一个开源的数据库,适用于各种规模的企业和各类应用程序。它具有灵活的数据模式和高性能的数据读写操作,能够提高企业的敏捷性和可扩展性。文章还提供了Mongodb的下载安装包地址。 ... [详细]
  • 第四章高阶函数(参数传递、高阶函数、lambda表达式)(python进阶)的讲解和应用
    本文主要讲解了第四章高阶函数(参数传递、高阶函数、lambda表达式)的相关知识,包括函数参数传递机制和赋值机制、引用传递的概念和应用、默认参数的定义和使用等内容。同时介绍了高阶函数和lambda表达式的概念,并给出了一些实例代码进行演示。对于想要进一步提升python编程能力的读者来说,本文将是一个不错的学习资料。 ... [详细]
  • 本文记录了在vue cli 3.x中移除console的一些采坑经验,通过使用uglifyjs-webpack-plugin插件,在vue.config.js中进行相关配置,包括设置minimizer、UglifyJsPlugin和compress等参数,最终成功移除了console。同时,还包括了一些可能出现的报错情况和解决方法。 ... [详细]
  • 2018深入java目标计划及学习内容
    本文介绍了作者在2018年的深入java目标计划,包括学习计划和工作中要用到的内容。作者计划学习的内容包括kafka、zookeeper、hbase、hdoop、spark、elasticsearch、solr、spring cloud、mysql、mybatis等。其中,作者对jvm的学习有一定了解,并计划通读《jvm》一书。此外,作者还提到了《HotSpot实战》和《高性能MySQL》等书籍。 ... [详细]
author-avatar
贤闲咸大_552
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有