今天主要讲在大规模数据情况下,Kafka如何解决实时的数据集成问题。主要有以下四个方面:
1. Traditional ETL
2. Data Integration
3. Kafka Connect
4. Group Management
1. Traditional ETL
回顾历史,三四十年前,大部分的应用是比较简单的架构(如下图),主要数据存储在关系型数据库里。关系型数据库里的数据每过一段时间就放在数据仓库里做离线分析,以此找到数据中的一些模式。这时候数据源只有一个。
现在有很多新的系统出现,如下图:
这些存储系统有两类,一种是primary storage(主存储,比如NoSQL),它是presistent的。现在数据源会有很多个,关系型和非关系型数据库的数据都需要导到数据仓库中。另一种是secondary storage(二级存储),数据一般是从primary storage做变化后得到,即使数据丢失,也可以从primary storage中恢复,对presistent要求没有那么高。比如elastic search,它适合对全文检索和实时性要求高的场所。
随着系统种类的增多,系统之间传递数据的实时性和连续性变得非常强。因为所有的系统服务于我们的应用。我们希望数据在主存储里一旦更改,就能在二级存储里改变。
一旦存储系统变多,在不同系统间传递数据变成非常复杂的事情。下图是简化版:
一份数据因为用处不同,需要拷贝到很多地方去。如果图中每条线都用一个专门的工具处理,耗时且难以实现。
2. Data Integration
Data integration就是将数据放到应该去的地方。
首先,先介绍不应该有的设计,如下图:
1)one-off tools
One-off tools如之前图中所示,每条线用一个专门的工具来做。
2)kitchen sink tools
Kitchen sink tools指特别通用的工具,能在所有系统间导数据。它的问题是因为太通用,对source和destination的假设太少,无法确保reliable。
3)stream processing frameworks
它的侧重点在transformation,不关注如何支持在不同系统间拷贝数据。因此需要自己做很多工作。
其中1)和2)是两个极端,如下图左侧为one-off tools的情况,右侧ET&L(Extract-Transform-Load)为kitchen sink tools情况,指从数据库中找到数据,做一些变换,然后load到数据仓库中。
如何在这两者之间做个平衡,是kafka最初设计的动力。
Kafka是个place holder,假设所有的东西都往kafka发数据或取数据,那么所有系统都和kafka连接,不会出现强耦合。这样对kafka会有很多要求。
Kafka是一个分布式系统,它有很多brokers。
Kafka以log的方式管理数据,顺序读写并且写只能在log尾部。
Kafka和pub-sub系统的联系在于source不断地写消息,sub不断的读消息。
为了使kafka能scale,唤鱼迟引入topic的概念。Topic相当于partitioned log。一个topic可以有很多个partition。Kafka只能保证在一个partition里有顺序。
Kafka还支持scalable consumption。它是指有很多个consumer,成为consumer group,可以订阅很多不同的topic。比如consumer group A,订阅了partition 0,1,2。因为group里有两个consumer,不同的partition会分配到不同consumer。假设增加一个新的consumer,属于consumer goup A,那么partition 0或者1就会放到新的consumer里,产生新的平均分配。可以把group想象成系统。
Kafka不决定如何consume,consumer自己决定何时,如何consume。Kafka里的数据是持久化的,每个数据都存在服务器硬盘里。
Kafka里的consumer支持fault tolerant。如果一个consumer挂到,kafka会把任务放到其他consumer上。
3. Kafka Connect
前面所说的consumer和producer都是develop的工具。如果想用它们做data pipeline或者ETL的工作,要写大量的数据。同时还要对data pipeline做管理和监控。Kafka connect解决了这个广泛的需求。
它的目标是:
1)它着重于数据的copying,而不是transiformation。
2)它解决fault tolerance,scale up,scale down,monitor和mangement的工作。
3)所有connect job都会有统一的方法来monitor和management。
4)所有的job都是并行运行。
5)可以用单机模式运行你的任务。
Kafka connect有两个概念,一个source,另一个是sink。source是把数据从一个系统拷贝到kafka里,sink是从kafka拷贝到另一个系统里。如下图:
Kafka的数据模型是它是一个partitioned的stream。如果把一个partition看做一个表格,并且每个partition有一个primary key或者timestamp,可以根据timestamp来order表中数据,就可以把它看做一个stream了。
下图是一个具体的例子,id相当于timestamp:
以这个角度看待数据库,数据库就变成了流数据。
另一个模型是一个connector有很多个task。我们可以把每个partition放在一个thread里运行,但这不是一个很好的模型,因为parittion的数量也许会很大,因此引入task。多个partition可以放在一个task里,由一个thread执行。这样资源的逻辑区分和物理划分可以更加匹配。
Kafka有两种运行模式,第一种是standalone mode,如下图:
这种是单机模式,另一种是分布式模式,如下图:
一台或很多台机器里有多个worker进程,worker进程有很多connector和task。connector负责获得所要连接的系统的信息,并不进行数据拷贝。task进行数据拷贝。这个模式也是fault torerant,如果worker 4挂了,它的connector和task将交由worker 3进行。
4. Group Management
以前的group management是由zookeeper来实现,但是这样rebalance的负担太重。为了解决scalable的问题,不再使用zookeeper,让每个broker负责一些group的管理,client端完全不需要依赖zookeeper,开发管理变得更加简单。这个group management有两个阶段:第一阶段发现group里都有谁,第二阶段让每个memeber状态同步。下图是个例子:
Coordinator可以理解成kafka的broker,member可以是每个consumer group里的group或者connect里的worker,member通过id来确认group。member通过JoinGroupRequest找到coordinator。coordinator等一段时间(session time out,default为30秒)来接收各种request。之后coordinator会发response给member。
Response会包括member里leader信息。coordinator收到的第一个请求的发送方就是leader。同时coordinator会把所有member的信息发给每个member。
第二阶段,leader决定分配的任务。用leader分配任务是因为,coordinator不知道member之间使用什么策略来分配任务。
总的来说,group management的工作分类如下:
Kafka connect保证at least one delivery:
最后,简单介绍以下如何做到exactly once:
其中WAL涉及到zombie writer:
图中C为coordinator,T为task。每个task都有一个consumer instance。对一个connector来说,它所有task的consumer instance都在一个consumer group里。假设有个task特别慢,造成consumer和coordinator无法通信。Coordinator长时间无法检测到心跳会把这个consumer踢出consumer group,触发rebalance。这个task的任务会被放在其他task上。在commit(把临时文件改为永久文件)之前,数据都在临时文件里。如果T1被踢出group前的offset为100-200,T2为100-300,并且都还没有commit,那么可能会出现T1之后再次激活,T1和T2都要commit,导致数据的重复。Read head log能避免这种情况的发生。在任何commit之前,都要先写到read head log里,把这个操作放到HDFS里的一个文件里,然后flush到所有copy里。这样如果T1即使再次激活也不能再写文件,T2执行所有的文件(执行时检查文件是否被写),同时更新HDFS文件。