热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用EventSourcing和CQRS模式构建、TypeScript编写的无服务器应用DomKriskovic

在这篇文章中,我介绍了一个名为“Beenion”的开源项目背后的架构。它使用EventSourcing和CQRS模式构建,并使用TypeScript编写。简而言之,Beenion是



在这篇文章中,我介绍了一个名为“Beenion”的开源项目背后的架构
它使用Event Sourcing和CQRS模式构建,并使用TypeScript编写。

简而言之,Beenion是一种“类似Twitter”的服务,您可以在其中发布数据并关注其他用户。
但是,不是发送推文,而是使用Chrome扩展程序对网站进行评级,然后,在应用主页中,您可以看到由您的关注者评分的链接列表。

为何使用CQRS / Event Sourcing?
当我开始考虑这个项目时,各种各样的想法都经历了我的脑海:


  • 如果我想使用DynamoDb存储用户评级,那么我的“分区密钥”是什么?
  • 如果是linkUrl,查询特定用户的链接则需要扫描整个表。
  • 另一方面,如果我使用userId,在查询特定链接上的所有评级时,我会遇到同样的问题。
  • 那么用户提要呢?如何为拥有100个关注者的用户检索数据?
  • 通过每次用户访问网站时执行100个查询(每个关注者一个),或者在用户的关注者为网站评分时准备和更新订阅源?

我很快就意识到我需要的不是单一的数据库模型,而是能够为不同的工作目标使用多种工具。

例如; 用于检索每个链接的所有评级的附加DynamoDb表,用于用户订阅的getStream.io服务,用于搜索的弹性搜索数据库,用于推荐和新闻稿的单独服务等。

它可能看起来很复杂,但从业务领域的角度来看,应用程序很简单:用户可以对链接进行评级并相互关注。其他一切都是由于这些行为而发生的。
在事件采购溯源中,您只需要这些信息。
为了说明这一点,以下是在Beenion中使用的事件类型列表:

LINK_CREATED
LINK_RATED
LINK_TAGGED
LINK_TITLE_UPDATED
LINK_IMAGE_UPDATED
USER_FOLLOWED
USER_UNFOLLOWED



每个这些事件包含了诸如额外的数据userId,linkUrl或linkTitle但尽管如此,他们很容易推理。

对于用户订阅源,我使用的事件处理程序侦听“LINK_RATED”,“USER_FOLLOWED”和“USER_UNFOLLOWED”事件,然后使用“getStream”API保存适当的数据。

以类似的方式,第二个事件处理程序更新DynamoDB表,该表用于获取有关特定链接上所有评级的数据。
如果以后,我改变主意并意识到不同类型的数据库或SAAS服务是如何更合适的,我可以通过在任何时间点以不同的方式利用事件来轻松地进行切换。
对我而言,这是一种优雅的方式,可以减轻制定“正确的基础架构决策”的负担,并开始专注于业务领域。

架构概述
项目架构分为两部分:命令和查询(CQRS)。
每次在应用程序中需要更改某些内容时都会调用命令端(例如评级网站或更新/删除以前评级的网站)。它负责验证业务规则并以事件的形式保存结果(LINK_RATED,USER_FOLLOWED等)。
查询端对这些事件做出反应并更新数据库,然后用于支持各种查询。

聚合边界
即使事件通常存储在关系数据库中,我发现使用NoSQL模型的概念更容易推理它们。
例如,在MongoDB中,文档是存储_id有用作主键的字段的对象。以类似的方式,事件可以存储在事件存储中,但是,您可以将文档视为事件数组,而不是对象。
正如有无数种方法来决定MongoDB中文档的构成,有无数种方法可以决定如何对这些事件数组进行分组。这一切都取决于你如何决定“ 设计聚合边界 ”。
对于此项目,事件按以下方式分类:



但是,在将事件保存到数据库之前,通常必须进行某种验证,并且在大多数情况下只能通过对先前存储的数据进行条件来完成。
例如,为了保存“USER_FOLLOWED”事件,存在无法跟踪同一用户两次的情况。为了保存持久这一点,我正在检查用户的id是否列在当前被跟踪的用户数组中:

if (userFollowers.includes(followUserId)) {
  throw validationError("User is already followed")
}



但是,由于这种数组不存储在任何地方,因此必须首先创建它。
这是通过检索特定用户的所有事件,然后将它们传递到“reducer”来完成的,其中在“USER_FOLLOWED”事件的情况下,一个userId在数组中添加,并且在“USER_UNFOLLOWED”的情况下“,它被删除:

const currentUserEvents = await eventStore.getById(userId)
const userFollowers = userFollowersReducer(currentUserEvents)
if (userFollowers.includes(followUserId)) {
  throw validationError("User is already followed")
}


function userFollowersReducer(
  events: Event[],
  initialState = []
): string[] {
  return events.reduce((followers: string[], e: Event) => {
    switch (e.type) {
      case 'USER_FOLLOWED':
        // add userId in array
        return [...followers, e.payload.followedUserId]
      case 'USER_UNFOLLOWED':
        // remove userId from array
        return followers.filter(userId => userId !== e.payload.unfollowedUserId)
      default:
        // keep array the same in case of any other event
        return followers
    }
  }, initialState)
}



在满足所有业务规则之后,剩下的就是在事件存储中保存事件:

eventStore.save({
  events: [
    {
      type: "USER_FOLLOWED",
      payload: {
        userId,
        followedUserId: followUserId
      }
    }
  ],
  streamId: userId,
  expectedVersion: currentUserEvents.length
})



乐观并发控制
除了指定事件数据和streamId之外,正如您所看到的,我还包括了该expectedVersion属性。这是一种乐观并发控制形式,在这种情况下,它可以防止同时为同一聚合保存多个事件。

快照
因为每次调用命令时,都会检索特定事件streamId(在这种情况下 - userId)的所有事件,您可能想知道,如果用户存储了数千个事件会发生什么?
在那种情况下,我正在使用快照。
我们的想法是将先前存储的状态传递给reducer,并仅应用创建快照后发生的新事件:

const initState =  await  getSnapshot( userId)
const currentUserEvents =  await  eventStore。getById( userId,{
   offset:1000  //假设快照是从前1000个事件构建的
})
const userFollowers =  userFollowersReducer( currentUserEvents, initState)



但是,这个例子是简化的。快照是一种价格优化的优化技术:版本控制,无法为其他reducer重用事件,更新新事件的快照以及对数据库的其他请求。
以下是我实际使用它的方式(来自另一个项目的片段):

const { state, version } = await eventStore.getByIdUsingSnapshot({
  id: event.data.videoId,
  reducer: videoDetailsReducer,
  reducerId: videoDetailsReducerId,
  reducerVersion: videoDetailsReducerVersion
})
// state is a reduced object used to check business rules
// version is used as "expectedVersion" on eventStore.save()




一致性
在同时调用两个命令的示例中,我已经描述了如何不保持一致性可能导致存储在数据库中的无效数据。
在DynamoDB中,这可以通过WorkingWithItems.ConditionalUpdate" class='body_href' >条件写入来解决。由于expectedVersion是save()函数的必需属性,要将事件存储在数据库中,有条件是指定的版本必须不存在(对于特定的聚合)。

dynamoClient
  .put({
    TableName: esTable,
    Item: {
      commitId: syncTime + ':' + params.streamId,
      committedAt: syncTime,
      streamId: params.streamId,
      version: params.expectedVersion,
      events: JSON.stringify(events)
    },
    ConditionExpression: 'attribute_not_exists(version)',
    ReturnValues: 'NONE'
  })
  .promise()



当数据存储在DynamoDB中时,它会以三个副本进行复制,并且只有在更新两个副本后才会确认写入请求。
因此,有两种方法可以从数据库中检索项目:使用“强一致性”或“最终一致性”。
由于在从数据库检索事件时确定版本号,具有“最终一致读取”(默认且更便宜的选项),因此可能会得到过时的结果。在这种情况下,存储事件可能会失败(409错误代码)。这通常通过重试操作直到成功来解决。
但是,如果使用“强一致性”选项,则需要三个DynamoDB副本中的两个来检索可靠结果,并且只有在同一聚合上的并行请求时才会出现409个错误。

顺序
由于“eventstore”表是使用streamId分区键和version排序键创建的,因此该getById()函数始终返回准确排序的事件。
但由于其“NoSQL性质”,在DynamoDB中检索所有聚合中的有序事件并不像在关系数据库中那么容易。
我解决这个问题的第一种方法是使用全局二级索引(GSI)并选择固定属性(如active:1)作为分区键和timestamp排序键。但是,这是一种反模式! 即使我只投射索引键,它也总是使用单个分区,因此需要大吞吐量(高成本)。而且,我依赖于准确存储timestamp有其自身问题的值。
第二种选择是每次添加新事件时手动存储streamId并version在单独的项目,表格或甚至不同类型的数据库中。这可以通过DynamoDB Streams实现,因为它“捕获DynamoDB表中按时间排序的项目级别修改序列,并持续存储长达24小时的信息”(来源)。
总的来说,即使它是可解决的,我认为这是将DynamoDB用于事件存储的最大问题。在撰写本文时,对于Beenion,我仍然使用单个分区GSI,但这可能在将来发生变化。我打算尝试一个提出的解决方案,但我也在寻找其他替代品,如QLDB数据库。

事务
当我启动这个项目时,DynamoDB事务不可用,所以为了使ACID成为可能,在每个事务上,我都存储了一系列事件。存储这样的数据可确保为每个命令保存“全部或全部”事件,但这也意味着在检索时必须“展平”事件。

投射和流程管理
在将事件存储在数据库中之后,必须将该信息传播到事件处理程序,事件处理程序以两种方式使用:


  • 作为预测 -用于更新其他数据库或服务,如“getStream”
  • 作为流程管理者(或“sagas”) - 用于发送电子邮件或通过第三方服务完成付款等副作用

在这两种情况下,最好以容错和可靠的方式发送事件,其顺序与存储事件的顺序相同。
如果事件处理程序中发生错误,则在解决之前不应继续接收新事件。此外,必须隔离每个事件处理程序,因此一个处理程序中的错误不会影响其他处理程序。
由于这些原因,每个投影或进程管理器都使用FIFO(先进先出)消息队列来消耗事件。
消息队列提供一个缓冲区,该缓冲区临时存储由“生产者”发送的消息,并将它们保存在队列中,直到“消费者”检索它,并最终删除它。在FIFO队列中,只有在删除消息后,才能处理下一个消息。

可以在此处查看项目的完整源代码​​​​​​​。
 


推荐阅读
author-avatar
GYuan83_844
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有