今天是个开心的日子,又是周末,可以轻轻松松的写写文章了。去年,我写了enode 1.0版本,那时我也写了一个分析系列。经过了大半年的时间,我对第一个版本做了很多架构上的改进,最重要的就是让enode实现了分布式,通过新增一个分布式消息队列equeue来实现。之所以要设计一个分布式的消息队列是因为在enode
1.0版本中,某个特定的消息队列只能被某个特定的消费者消费。这样就会导致一个问题,就是如果这个消费者挂了,那这个消费者对应的消息队列就不能自动被其他消费者消费了。这个问题会直接导致系统不可用。而enode
2.0中,就不会有这个问题了,因为消息队列被设计为独立的,被消费者所共享的;一个消息队列可以被多个消费者集群消费或广播消费,如果一个消费者挂了,那其他的消费者会自动顶上。这里具体的细节,我会在后面详细介绍。
熟悉CQRS架构的人看到这图应该就再熟悉不过了,enode实现的是一个CQRS架构。基本的概念就不多介绍了,如果大家对上图中的一些概念还不太清楚,可以看一下我的博客里的其他相关文章,我应该都有写到。下面主要介绍一下enode
2.0在实现CQRS架构时的一些不一样的地方(由于篇幅的限制,先说三点吧):
就是你不能在command
handler中一次修改多个聚合根,我觉得这应该是enode对开发人员的最大约束,可能也是最让开发人员觉得不爽的地方。但我觉得这个不是约束,而是对数据强一致性和最终一致性的一个正确认识。 >在我学过ddd+cqrs+event
sourcing这三个东西之后,我认识到,聚合内必须确保强一致性,聚合间最终一致性。传统三层开发,我们通过unit of
work模式(简称uow,比如nhibernate的session, entity
framework的dbcontext)可以轻易实现多个对象修改的强一致性事务;确实在传统三层模式开发中,这种利用uow的方式来实现跨聚合的强一致性事务的方式很实用,开发起来很方便,开发人员可以不必担心会出现数据不一致的问题了,因为所有修改总是在一个事务内保存。
但enode的设计目标不是为了支持传统三层开发,而是面向ddd+cqrs+eda+event
sourcing架构的框架。曾经我也想让command
handler支持修改多个聚合根,但这样做必须要面临一个很棘手的问题:command在发送到command
queue时,无法根据聚合根ID来路由了。因为一个command会修改多个聚合根,也就是说一个command不会和一个聚合根一一对应了。这意味着同一个聚合根没办法总是被路由到同一个command
queue里,这样就导致相同ID的聚合根可能会在两台服务器被同时修改,这就会导致整个系统可能会频繁的产生并发更新冲突。很多command就会不断的重试,整个系统的性能就会下降。而enode设计之初就是为了高性能,所以这点让我觉得很难接收。
相反,如果一个command总是只会创建或修改一个聚合根,那我们的command就能根据聚合根ID来路由到特定的消息队列,同一个聚合根ID总是会被路由到同一个queue,而一个queue的消费者服务器(command
handler所在的服务器)同一时刻总是只有一个,那我们就能保证一个聚合根的修改不会有并发问题。当然光这样还不够,在这个command消费者服务器里,enode框架会用内存级别的queue对同一个聚合根的所有command再次进行排队(如果需要排队的话),之所以要这样是因为有时对一个聚合根的并发修改command可能1s内发送了很多过来,所以command
handler肯定来不及在1s内全部处理掉这些command,所以需要在内存里再次排队(天猫双十一的时候,应用服务器内部也会有类似的对同一个聚合根设计一个相应的内存queue来避免对同一个聚合根的修改的并发冲突的问题)。通过这样的设计,我们可以做到绝大部分情况下,不会再有并发冲突的问题,也就是command不会再出现重试的情况。 >这样最后的效果就是:不同ID的聚合根的处理可以并行,同一个ID的聚合根的处理是串行,通过两级排队实现。前面说到,这样只能做到绝大部分情况下不会有并发冲突,那么什么时候还是会有并发冲突呢?就是在新增command消费者服务器的时候,比如我们发现最近系统繁忙,我们希望增加command消费者服务器来加快command的处理,那在新增服务器后,原来修改某个聚合根的command可能会被路由到新的服务器,但是这个聚合根的有些command可能还在原来的服务器上还没执行完,此时就会出现同一个聚合根在两台服务器上被同时修改的可能了;那这个怎么解决呢?我现在的想法是框架层面不必解决了,我们只需要在系统最空的时候(比如凌晨4点)的时候,增加服务器即可,因为那个时候消息队列里的消息是最少的,也就是不太可能会产生因为增加command
handler服务器而导致并发冲突的问题,这样我们就可以最大限度的避免可能带来的并发冲突。
相比一般的CQRS架构,enode每次在处理一个command,在获取聚合根时,不是从eventstore获取,而是从缓存获取。从上面的架构图可以看出,enode架构中有一个domain
memory
cache,目前用redis实现。这样做的好处是,将所有的聚合根都缓存在redis缓存中,这样就能提高聚合根的读取时间;有一个问题需要考虑,redis缓存服务器宕机了怎么办?宕机后缓存数据就没了,那如何恢复这些缓存数据呢?这也是我选择redis的一个主要理由,因为redis支持持久化,我们可以利用redis的aof或快照方式的持久化功能,来持久化缓存数据。从而可以在redis挂了后能最快的速度恢复缓存,重启redis服务器即可。那重启之前以及重启的过程中,因为无法从redis获取聚合根了,那只能从eventstore通过event
sourcing的方式去获取,那样的话性能肯定会比较差,那怎么办呢?答案是通过定时为聚合根创建快照,这也是采用event
sourcing架构的一个好处。我们可以定时对某些聚合跟创建快照(注意,我觉得只需要考虑那些对性能要求很高的模块所涉及到的聚合根创建快照即可),那怎么创建呢?可以开一个独立的进程,监听domain
event,对需要创建快照的domain event做出判断,根据某种快照创建策略进行判断,如果认为需要创建快照,则从event
store拿出该聚合根的相关事件,通过event
sourcing还原得到某个版本的聚合根,这样就得到了某个聚合根的某个版本的快照了。然后持久化起来即可。然后,enode支持在从event
store获取聚合根前,先检查是否有快照,如果有快照,则会先加载快照,再把快照之后的domain event从event
store获取,再把这些快照之后的domain
event一个个apply到当前聚合根,从而得到最新状态的聚合根。这个过程比获取该聚合根的所有领域事件在一个个通过event
sourcing还原得到聚合根要快的多;尤其是在一个聚合根的domain
event比较多的情况下就更有意义。因此,通过缓存的引入,我们可以提高command handler的处理速度。
另外一点很重要的是,因为我们的command是会发送到分布式消息队列,然后队列中的command消息会被取出来执行;大家知道,我们很难保证一个消息不会被重复执行,也就是说,一个command可能会重复执行。因此,我们的应用要支持对command的密等处理。而对于使用enode框架的应用,因为整个command
side的数据持久化就是持久化domain event,程序员不必关心domain
event的持久化过程。所以enode很有必要能内置支持对command的重复处理的判断。那么如何做呢?我觉得最靠谱的做法是,在持久化domain
event的时候就能绝对靠谱的检测出来某个command是否被重复执行了。那很自然就想到将被持久化的domain
event和产生他的对应command关联起来。所以我设计了如下的结构,用来表示一个command在操作聚合根后所产生的领域事件的信息。
///
///
public string CommitId { get; private set; }
///
///
public string AggregateRootId { get; private set; }
///
///
public int AggregateRootTypeCode { get; private set; }
///
///
public int Version { get; private set; }
///
///
public DateTime Timestamp { get; private set; }
///
///
public IEnumerable
对于上面的结构体,我们可以实现两个重要的功能:1)为AggregateRootId和Version这两个字段建立唯一索引,这样我们就能实现判断某个聚合根是否被并发修改,因为如果有并发修改导致并发冲突,那保存到eventstore时,它们的Version肯定是相同的;2)为AggregateRootId和CommitId两个字段建立唯一索引,这样我们就能判断某个command是否被重复执行,因为一个command被实例化出来后,它所要修改的聚合根ID就不可能再修改了,所以如果该command被重复执行,那最后产生的领域事件(上面这个结构体)最后被持久化到eventstore时就会违反这个唯一索引,从而框架就能知道是否有command被重复执行了;
另外,上面这个结构体被保存到eventstore时,是以一条记录的方式被保存,Events集合会被序列化为一段二进制;所以,假如我们用关系型数据库来保存,那就是只有一条insert语句即可,这样就实现了一个聚合根的一次修改的事务持久化。然后因为上两个索引的存在,我们就能在保存时判断是否有并发冲突或command是否被重复执行。
在设计event store时,我考虑了很多。最后认为event
store要解决的最大的两个问题是持久化性能和可水平扩展性。首先,因为每次command
handler在处理完一个聚合根后,都会把产生的领域事件持久化到event
store,没持久化完成则不能认为该command已处理完,所以持久化的性能对处理command的吞吐量至关重要。另外一点就是可水平扩展性,因为event
store里保存的都是domain event,而enode又是为了实现高性能为目标的,所以event
store里的数据肯定会非常多,比如1s中要持久化1K个domain
event,那一天就会有8600W条记录要记录,一天就真么多,那1年就更多了,所以用单点存储所有的domain
event显示不靠谱了。所以我们的event
store必须要支持水平扩展。比如我们可以设计100个分区,那每个分区一天只需要保存86W条记录,一年也只需要保存3亿多条记录即可。之前我很追求单个存储节点的高性能,所以曾经想过要用leveldb,stsdb,甚至redis这种高性能的基于key,value的nosql存储。但后来发现这种nosql存储虽然性能很高,但因为只是key,value的存储结构,所以没办法支持二级索引,这样就没办法实现上面第一点中提到的command的幂等处理和聚合根并发冲突的检测。另一个重要的原因是,event
store中的数据我们有时候是要被查询的。比如现在某个command遇到的并发冲突,那框架需要自动重试,但是重试之前需要先更新redis缓存,就是把eventstore里的最新的聚合根更新到redis缓存里,这样command在重试时才能拿到最新版本的聚合根,这样重试才能成功。那如何从eventstore里拿最新的聚合根呢?只能根据聚合根ID从eventstore里查询。而聚合根ID又不是key,value
nosql的key,自然就没办法实现这个需求了;所以,我觉得合理的办法应该是用关系型数据库来实现eventstore。有人说关系型数据库的性能不行。我觉得只要关系型数据库支持水平扩展,也就是将domain
event sharding(分片)到不同的分库分表中,那平均到每个库里的domain
event的数量就不大了;这样整个eventstore的持久化性能就可以随着分库的数量的增加而线性增加;比如我现在单个db insert
domain event的性能是1K tps(mysql配合ssd硬盘完全无压力,呵呵),那10个库的tps就能达到1W
tps了。因为我们分库会根据聚合根ID的hash code来平均散列,这样能确保每个库中的聚合根的domain
event数量是基本一样的;从而就能实现整个event
store的持久化性能随着分库的增加而线性增加。所以,有了分库的优势,大数据量和性能都不是问题了。且因为关系型数据库支持二级索引和唯一索引,那查询domain
event也不是问题了。
上图是enode在实际项目中我目前认为的一个物理部署结构图。
首先客户端浏览器通过网络最后访问到我们的web服务器集群,当然web服务器前面肯定还有网关和负载均衡器,我这里为了突出重点就不画出来了。然后每个web服务器接受到httprequest后会生成command,然后通过enode框架发送到分布式消息队列服务器(message
queue server),目前由我开发的equeue实现。然后消息队列服务器上的消息会被推送到command process
servers,command process server就是执行command handler、完成domain logic,持久化domain
event,以及publish domain event的服务器。command process server处理完之后,domain
event会由enode框架自动发送到message queue server,然后会被event process server处理,event
process server就是订阅domain event,然后根据domain event更新query db。对于查询,web
server可以直接通过sql查询query db即可。
各种服务器的集群:
好了,就写这些吧,后续的再后续文章中补上,呵呵。
enode框架 2.0 step by step之整体架构介绍,布布扣,bubuko.com