热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

利用Watermill实现GolangCQRS

CQRSCQRS的意思是“命令-查询责任隔离”。我们分离了命令(写请求)和查询(读请求)之间的责任。写请求和读请求由不同的对象处理。就是这样。我们可以进一步分割数据存储,使用单独的

CQRS

CQRS 的意思是“命令-查询责任隔离”。我们分离了命令(写请求)和查询(读请求)之间的责任。写请求和读请求由不同的对象处理。

就是这样。我们可以进一步分割数据存储,使用单独的读写存储。一旦发生这种情况,可能会有许多读取存储,这些存储针对处理不同类型的查询或跨越多个边界上下文进行了优化。虽然经常讨论与 CQRS 相关的单独读写存储,但这并不是 CQRS 本身。CQRS 只是命令和查询的第一部分。

术语

Command

该命令是一个简单的数据结构,表示执行某些操作的请求。

Command Bus

完整源码:

  • github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
// CommandBus 将命令(commands)传输到命令处理程序(command handlers)。
type CommandBus struct {
// ...

Command Processor

完整源码:

  • github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// CommandProcessor 决定哪个 CommandHandler 应该处理这个命令
received from the command bus.
type CommandProcessor struct {
// ...

Command Handler

完整源码:

  • github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// CommandHandler 接收由 NewCommand 定义的命令,并使用 Handle 方法处理它。
// 如果使用 DDD, CommandHandler 可以修改并持久化聚合。
//
// 与 EvenHandler 相反,每个命令必须只有一个 CommandHandler。
//
// 在处理消息期间使用 CommandHandler 的一个实例。
// 当同时发送多个命令时,Handle 方法可以同时执行多次。
// 因此,Handle 方法必须是线程安全的!
type CommandHandler interface {
// ...

Event

该事件表示已经发生的事情。 事件是不可变的。

Event Bus

完整源码:

  • github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
// EventBus 将事件传输到事件处理程序。
type EventBus struct {
// ...

Event Processor

完整源码:

  • github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
// EventProcessor 确定哪个 EventHandler 应该处理从事件总线接收到的事件。
type EventProcessor struct {
// ...

Event Handler

完整源码:

  • github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
// EventHandler 接收由 NewEvent 定义的事件,并使用其 Handle 方法对其进行处理。
// 如果使用 DDD,CommandHandler 可以修改并保留聚合。
// 它还可以调用流程管理器、saga 或仅仅构建一个读取模型。
// 与 CommandHandler 相比,每个 Event 可以具有多个 EventHandler。
//
// 在处理消息时使用 EventHandler 的一个实例。
// 当同时传递多个事件时,Handle 方法可以同时执行多次。
// 因此,Handle 方法必须是线程安全的!
type EventHandler interface {
// ...

CQRS Facade

完整源码:

  • github.com/ThreeDotsLabs/watermill/components/cqrs/cqrs.go
// ...
// Facade 是用于创建 Command 和 Event buses 及 processors 的 facade。
// 创建它是为了在以标准方式使用 CQRS 时避免使用 boilerplate。
// 您还可以手动创建 buses 和 processors,并从 NewFacade 中获得灵感。
type Facade struct {
// ...

Command and Event Marshaler

完整源码:

  • github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go
// ...
// CommandEventMarshaler 将命令和事件 marshal 给 Watermill 的消息,反之亦然。
// 该命令的有效载荷需要 marshal 至 []bytes。
type CommandEventMarshaler interface {
    // Marshal marshal 命令或事件给 Watermill 的消息。
   Marshal(v interface{}) (*message.Message, error)

    // Unmarshal Unmarshal watermill的信息给 v Command 或 Event。
   Unmarshal(msg *message.Message, v interface{}) (err error)

   // Name 返回命令或事件的名称。
   // Name 用于确定接收到的命令或事件是我们想要处理的事件。
   Name(v interface{}) string

   // NameFromMessage 从 Watermill 的消息(由 Marshal 生成)中返回命令或事件的名称。
   // 
   //
   // 当我们将 Command 或 Event marshal 到 Watermill 的消息中时,
   // 我们应该使用 NameFromMessage 而不是 Name 来避免不必要的 unmarshaling。
   NameFromMessage(msg *message.Message) string
}
// ...

用法

Example domain(领域模型示例)

作为示例,我们将使用一个简单的 domain,它负责处理酒店的房间预订。

我们将使用 Event Storming 表示法来展示这个 domain 的模型。

Legend:

  • blue(蓝色)便利贴是命令
  • orange(橙色)便利贴是事件
  • green(绿色)便利贴是读取模型,从事件异步生成
  • violet(紫色)便利贴是策略,由事件触发并产生命令
  • pink(粉色)便利贴是热点(hot-spots);我们标记经常发生问题的地方

domain(领域模型)很简单:

  • 客人可以预订房间(book a room)。
  • 每当预订房间时,我们都会为客人订购啤酒(Whenever a room is booked, we order a beer)(因为我们爱客人)。
    • 我们知道有时候啤酒不够(not enough beers)。
  • 我们根据预订生成财务报告(financial report)。

Sending a command(发送命令)

首先,我们需要模拟访客的动作。

完整源码:

  • github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
       bookRoomCmd := &BookRoom{
            RoomId:    fmt.Sprintf("%d", i),
            GuestName: "John",
            StartDate: startDate,
            EndDate:   endDate,
        }
        if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
            panic(err)
        }
// ...

Command handler

BookRoomHandler 将处理我们的命令。

完整源码:

  • github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookRoomHandler 是一个命令处理程序,它处理 BookRoom 命令并发出 RoomBooked。
//
// 在 CQRS 中,一个命令只能由一个处理程序处理。
// 将具有此命令的另一个处理程序添加到命令处理器时,将返回错误。
type BookRoomHandler struct {
    eventBus *cqrs.EventBus
}

func (b BookRoomHandler) HandlerName() string {
    return "BookRoomHandler"
}

// NewCommand 返回该 handle 应该处理的命令类型。它必须是一个指针。
func (b BookRoomHandler) NewCommand() interface{} {
    return &BookRoom{}
}

func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
    // c 始终是 `NewCommand` 返回的类型,因此强制转换始终是安全的
   cmd := c.(*BookRoom)

   // 一些随机的价格,在生产中你可能会用更明智的方式计算
   price := (rand.Int63n(40) + 1) * 10

    log.Printf(
        "Booked %s for %s from %s to %s",
        cmd.RoomId,
        cmd.GuestName,
        time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
        time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
    )

   // RoomBooked 将由 OrderBeerOnRoomBooked 事件处理程序处理,
   // 将来,RoomBooked 可能由多个事件处理程序处理
   if err := b.eventBus.Publish(ctx, &RoomBooked{
        ReservationId: watermill.NewUUID(),
        RoomId:        cmd.RoomId,
        GuestName:     cmd.GuestName,
        Price:         price,
        StartDate:     cmd.StartDate,
        EndDate:       cmd.EndDate,
    }); err != nil {
        return err
    }

    return nil
}

// OrderBeerOnRoomBooked 是事件处理程序,它处理 RoomBooked 事件并发出 OrderBeer 命令。
// ...

Event handler

如前所述,我们希望每次预订房间时都点一杯啤酒(“每次预订房间时”便笺)。我们通过使 OrderBeer 命令来实现。

完整源码:

  • github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// OrderBeerOnRoomBooked 是事件处理程序,它处理 RoomBooked 事件并发出 OrderBeer 命令。
type OrderBeerOnRoomBooked struct {
    commandBus *cqrs.CommandBus
}

func (o OrderBeerOnRoomBooked) HandlerName() string {
   // 此名称传递给 EventsSubscriberConstructor 并用于生成队列名称
   return "OrderBeerOnRoomBooked"
}

func (OrderBeerOnRoomBooked) NewEvent() interface{} {
    return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
    event := e.(*RoomBooked)

    orderBeerCmd := &OrderBeer{
        RoomId: event.RoomId,
        Count:  rand.Int63n(10) + 1,
    }

    return o.commandBus.Send(ctx, orderBeerCmd)
}

// OrderBeerHandler 是命令处理程序,它处理 OrderBeer 命令并发出 BeerOrdered。
// ...

OrderBeerHandler 与 BookRoomHandler 非常相似。唯一的区别是,当啤酒不够时,它有时会返回一个错误,这将导致重新交付命令。您可以在示例源代码中找到整个实现。

使用事件处理程序构建读取模型

完整源码:

  • github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookingsFinancialReport 是一个读取模型,用于计算我们可以从预订中赚取多少钱。
// 与 OrderBeerOnRoomBooked 一样,它侦听 RoomBooked 事件。
//
// 此实现只是写入内存。在生产中,您可能会使用一些持久性存储。
type BookingsFinancialReport struct {
    handledBookings map[string]struct{}
    totalCharge     int64
    lock            sync.Mutex
}

func NewBookingsFinancialReport() *BookingsFinancialReport {
    return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (b BookingsFinancialReport) HandlerName() string {
   // 此名称传递给 EventsSubscriberConstructor 并用于生成队列名称
   return "BookingsFinancialReport"
}

func (BookingsFinancialReport) NewEvent() interface{} {
    return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
   // Handle 可以被并发调用,因此它必须是线程安全的。
   b.lock.Lock()
    defer b.lock.Unlock()

    event := e.(*RoomBooked)

   // 当我们使用不提供一次精确交付语义的 Pub/Sub 时,我们需要对消息进行重复数据删除。
   // GoChannel Pub/Sub 提供了精确的一次交付,
   // 但是让我们为其他 Pub/Sub 实现准备好这个示例。
   if _, ok := b.handledBookings[event.ReservationId]; ok {
        return nil
    }
    b.handledBookings[event.ReservationId] = struct{}{}

    b.totalCharge += event.Price

    fmt.Printf(">>> Already booked rooms for $%d\n", b.totalCharge)
    return nil
}

var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"

func main() {
// ...

将其连接起来——CQRS facade

我们拥有构建 CQRS 应用程序的所有块。 现在,我们需要使用某种胶水将其连接起来。

我们将使用最简单的内存消息传递基础设施: GoChannel。

在后台,CQRS 正在使用 Watermill 的消息路由器。 如果您不熟悉它,并且想了解它的工作原理,则应查阅《入门指南》。 它还将向您展示如何使用一些标准的消息传递模式,例如 metrics,poison queue,throttling,correlation 以及每个消息驱动的应用程序使用的其他工具。那些内置于 Watermill 中。

让我们回到 CQRS。如您所知,CQRS 是由多个组件构建的,如命令(Command)或事件总线(Event buses)、处理程序(handlers)、处理器(processors)等。为了简化所有这些构建块的创建,我们创建了 cqrs.Facade,它创建所有这些。

完整源码:

  • github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
func main() {
    logger := watermill.NewStdLogger(false, false)
    cqrsMarshaler := cqrs.ProtobufMarshaler{}

    // 您可以从此处使用任何 Pub/Sub 实现:https://watermill.io/docs/pub-sub-implementations/
   // 详细的 RabbitMQ 实现: https://watermill.io/docs/pub-sub-implementations/#rabbitmq-amqp
   // 命令将被发送到队列,因为它们需要被使用一次。
   commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
    commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
    if err != nil {
        panic(err)
    }
    commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
    if err != nil {
        panic(err)
    }

   // 事件将被发布到配置了 PubSu b的 Rabbit,因为它们可能被多个使用者使用。
   // (在这种情况下,BookingsFinancialReport 和 OrderBeerOnRoomBooked).
   eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
    if err != nil {
        panic(err)
    }

    // CQRS 建立在消息路由器上。详细文档:https://watermill.io/docs/messages-router/
   router, err := message.NewRouter(message.RouterConfig{}, logger)
    if err != nil {
        panic(err)
    }

   // 简单的中间件,可以从事件或命令处理程序中 recover panics。
   // 您可以在文档中找到有关路由器中间件的更多信息:
   // https://watermill.io/docs/messages-router/#middleware
   //
   // 您可以在 message/router/middleware 中找到可用的中间件列表。
   router.AddMiddleware(middleware.Recoverer)

    // cqrs.Facade是命令和事件总线与处理器的 facade。
   // 您可以使用 facade,或者手动创建总线和处理器(您可以使用 cqrs.NewFacade 激发灵感)
   cqrsFacade, err := cqrs.NewFacade(cqrs.FacadeConfig{
        GenerateCommandsTopic: func(commandName string) string {
            // 我们正在使用RabbitMQ队列配置,因此我们需要按命令类型指定主题 topic
           return commandName
        },
        CommandHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.CommandHandler {
            return []cqrs.CommandHandler{
                BookRoomHandler{eb},
                OrderBeerHandler{eb},
            }
        },
        CommandsPublisher: commandsPublisher,
        CommandsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
            // 我们可以重用订阅者(subscriber),因为所有命令都有各自的主题(topics)
           return commandsSubscriber, nil
        },
        GenerateEventsTopic: func(eventName string) string {
            // 因为我们使用的是PubSub RabbitMQ配置,所以我们可以对所有事件使用一个主题(topic)
           return "events"

            // 我们还可以按事件类型使用主题(topic)
           // return eventName
       },
        EventHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.EventHandler {
            return []cqrs.EventHandler{
                OrderBeerOnRoomBooked{cb},
                NewBookingsFinancialReport(),
            }
        },
        EventsPublisher: eventsPublisher,
        EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
            config := amqp.NewDurablePubSubConfig(
                amqpAddress,
                amqp.GenerateQueueNameTopicNameWithSuffix(handlerName),
            )

            return amqp.NewSubscriber(config, logger)
        },
        Router:                router,
        CommandEventMarshaler: cqrsMarshaler,
        Logger:                logger,
    })
    if err != nil {
        panic(err)
    }

    // 每秒发布 BookRoom 命令以模拟传入流量
   go publishCommands(cqrsFacade.CommandBus())

    // 处理器(processors)是基于路由器(router)的,所以当路由器启动时,处理器就会工作
   if err := router.Run(context.Background()); err != nil {
        panic(err)
    }
}
// ...

就这样。 我们有一个正在运行的 CQRS 应用程序。

我是为少。微信:uuhells123。公众号:黑客下午茶。

谢谢点赞支持


推荐阅读
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • FeatureRequestIsyourfeaturerequestrelatedtoaproblem?Please ... [详细]
  • Centos7.6安装Gitlab教程及注意事项
    本文介绍了在Centos7.6系统下安装Gitlab的详细教程,并提供了一些注意事项。教程包括查看系统版本、安装必要的软件包、配置防火墙等步骤。同时,还强调了使用阿里云服务器时的特殊配置需求,以及建议至少4GB的可用RAM来运行GitLab。 ... [详细]
  • 本文讨论了clone的fork与pthread_create创建线程的不同之处。进程是一个指令执行流及其执行环境,其执行环境是一个系统资源的集合。在调用系统调用fork创建一个进程时,子进程只是完全复制父进程的资源,这样得到的子进程独立于父进程,具有良好的并发性。但是二者之间的通讯需要通过专门的通讯机制,另外通过fork创建子进程系统开销很大。因此,在某些情况下,使用clone或pthread_create创建线程可能更加高效。 ... [详细]
  • 本文介绍了设计师伊振华受邀参与沈阳市智慧城市运行管理中心项目的整体设计,并以数字赋能和创新驱动高质量发展的理念,建设了集成、智慧、高效的一体化城市综合管理平台,促进了城市的数字化转型。该中心被称为当代城市的智能心脏,为沈阳市的智慧城市建设做出了重要贡献。 ... [详细]
  • Commit1ced2a7433ea8937a1b260ea65d708f32ca7c95eintroduceda+Clonetraitboundtom ... [详细]
  • 目录实现效果:实现环境实现方法一:基本思路主要代码JavaScript代码总结方法二主要代码总结方法三基本思路主要代码JavaScriptHTML总结实 ... [详细]
  • Android中高级面试必知必会,积累总结
    本文介绍了Android中高级面试的必知必会内容,并总结了相关经验。文章指出,如今的Android市场对开发人员的要求更高,需要更专业的人才。同时,文章还给出了针对Android岗位的职责和要求,并提供了简历突出的建议。 ... [详细]
  • 【Windows】实现微信双开或多开的方法及步骤详解
    本文介绍了在Windows系统下实现微信双开或多开的方法,通过安装微信电脑版、复制微信程序启动路径、修改文本文件为bat文件等步骤,实现同时登录两个或多个微信的效果。相比于使用虚拟机的方法,本方法更简单易行,适用于任何电脑,并且不会消耗过多系统资源。详细步骤和原理解释请参考本文内容。 ... [详细]
  • 关于我们EMQ是一家全球领先的开源物联网基础设施软件供应商,服务新产业周期的IoT&5G、边缘计算与云计算市场,交付全球领先的开源物联网消息服务器和流处理数据 ... [详细]
  • Webmin远程命令执行漏洞复现及防护方法
    本文介绍了Webmin远程命令执行漏洞CVE-2019-15107的漏洞详情和复现方法,同时提供了防护方法。漏洞存在于Webmin的找回密码页面中,攻击者无需权限即可注入命令并执行任意系统命令。文章还提供了相关参考链接和搭建靶场的步骤。此外,还指出了参考链接中的数据包不准确的问题,并解释了漏洞触发的条件。最后,给出了防护方法以避免受到该漏洞的攻击。 ... [详细]
  • 本文介绍了Java高并发程序设计中线程安全的概念与synchronized关键字的使用。通过一个计数器的例子,演示了多线程同时对变量进行累加操作时可能出现的问题。最终值会小于预期的原因是因为两个线程同时对变量进行写入时,其中一个线程的结果会覆盖另一个线程的结果。为了解决这个问题,可以使用synchronized关键字来保证线程安全。 ... [详细]
  • Linux环境变量函数getenv、putenv、setenv和unsetenv详解
    本文详细解释了Linux中的环境变量函数getenv、putenv、setenv和unsetenv的用法和功能。通过使用这些函数,可以获取、设置和删除环境变量的值。同时给出了相应的函数原型、参数说明和返回值。通过示例代码演示了如何使用getenv函数获取环境变量的值,并打印出来。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • Android开发实现的计时器功能示例
    本文分享了Android开发实现的计时器功能示例,包括效果图、布局和按钮的使用。通过使用Chronometer控件,可以实现计时器功能。该示例适用于Android平台,供开发者参考。 ... [详细]
author-avatar
COCO歧
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有