IntermediateDataset
IntermediateDataset
是在 JobGraph
中对中间结果的抽象。我们知道,JobGraph
是对 StreamGraph
进一步进行优化后得到的逻辑图,它尽量把可以 chain 到一起 operator 合并为一个 JobVertex
,而 IntermediateDataset
就表示一个 JobVertex
的输出结果。JobVertex
的输入是 JobEdge
,而 JobEdge
可以看作是 IntermediateDataset
的消费者。一个 JobVertex
也可能产生多个 IntermediateDataset
。需要说明的一点是,目前一个 IntermediateDataset
实际上只会有一个 JobEdge
作为消费者,也就是说,一个 JobVertex
的下游有多少 JobVertex
需要依赖当前节点的数据,那么当前节点就有对应数量的 IntermediateDataset
。
19-幻灯片26-1568x882.png
这时候还没有向集群去提交任务,在 Client 端会将 StreamGraph 生成 JobGraph,JobGraph 就是做为向集群提交的最基本的单元。在生成 JobGrap 的时候会做一些优化,将一些没有 Shuffle 机制的节点进行合并。有了 JobGraph 后就会向集群进行提交,进入运行阶段。
Jobvertex->Intermedia Dateset->JobEage
20-幻灯片27-1-1568x882.png
然后我们概念化这样一张物理执行图,可以看到每个 Task 在接收数据时都会通过这样一个InputGate 可以认为是负责接收数据的,再往前有这样一个 ResultPartition 负责发送数据,在 ResultPartition 又会去做分区跟下游的 Task 保持一致,就形成了 ResultSubPartition 和 InputChannel 的对应关系。这就是从逻辑层上来看的网络传输的通道,基于这么一个概念我们可以将反压的问题进行拆解。
ExecutionGraph -> Intermedia ATA
task ->ResultPartion->InputGate
ResultSubParition-->InputChannel 一一对应
image.png
假设最下游的 Task (Sink)出现了问题,处理速度降了下来这时候是如何将这个压力反向传播回去呢?这时候就分为两种情况:
跨 TaskManager ,反压如何从 InputGate 传播到 ResultPartition
TaskManager 内,反压如何从 ResultPartition 传播到 InputGate
TaskManager 只有一个 Network BufferPool 被所有的 Task 共享,初始化时会从 Off-heap Memory 中申请内存,申请后内存管理通过 Network BufferPool 来进行,不需要依赖 JVM GC 的机制去释放。有了 Network BufferPool 之后可以为每一个 ResultSubPartition 创建 Local BufferPool
反压过程:
跨 TaskManager 数据传输
image.png
TaskManager 内反压过程
image.png
下游的 TaskManager 反压导致本 TaskManager 的 ResultSubPartition 无法继续写入数据,于是 Record Writer 的写也被阻塞住了,因为 Operator 需要有输入才能有计算后的输出,输入跟输出都是在同一线程执行, Record Writer 阻塞了,Record Reader 也停止从 InputChannel 读数据,这时上游的 TaskManager 还在不断地发送数据,最终将这个 TaskManager 的 Buffer 耗尽。具体流程可以参考下图,这就是 TaskManager 内的反压过程。
TCP-based 反压的弊端在一个 TaskManager 中可能要执行多个 Task,如果多个 Task 的数据最终都要传输到下游的同一个 TaskManager 就会复用同一个 Socket 进行传输,这个时候如果单个 Task 产生反压,就会导致复用的 Socket 阻塞,其余的 Task 也无法使用传输,checkpoint barrier 也无法发出导致下游执行 checkpoint 的延迟增大。
依赖最底层的 TCP 去做流控,会导致反压传播路径太长,导致生效的延迟比较大。
这个机制简单的理解起来就是在 Flink 层面实现类似 TCP 流控的反压机制来解决上述的弊端,Credit 可以类比为 TCP 的 Window 机制。
Flink 层面实现反压机制,就是每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息(图上两个 ResultSubPartition 和 InputChannel 之间是虚线是因为最终还是要通过 Netty 和 Socket 去通信)
过了一段时间后,由于上游的发送速率要大于下游的接受速率,下游的 TaskManager 的 Buffer 已经到达了申请上限,这时候下游就会向上游返回 Credit = 0,ResultSubPartition 接收到之后就不会向 Netty 去传输数据,上游 TaskManager 的 Buffer 也很快耗尽,达到反压的效果,这样在 ResultSubPartition 层就能感知到反压,不用通过 Socket 和 Netty 一层层地向上反馈,降低了反压生效的延迟。同时也不会将 Socket 去阻塞,解决了由于一个 Task 反压导致 TaskManager 和 TaskManager 之间的 Socket 阻塞的问题
总结
image.png
image.png
网络上传输的数据会写到 Task 的 InputGate(IG) 中,经过 Task 的处理后,再由 Task 写到 ResultPartition(RS) 中。每个 Task 都包括了输入和输入,输入和输出的数据存在 Buffer
中(都是字节数据)。Buffer
是 MemorySegment 的包装
类。
根据配置,Flink 会在 NetworkBufferPool
中生成一定数量(默认2048)的内存块 MemorySegment
,内存块的总数量就代表了网络传输中所有可用的内存。NetworkEnvironment 和 NetworkBufferPool 是 Task 之间共享的,每个 TM 只会实例化一个
Task 线程启动时,会为 Task 的 InputGate(IG)和 ResultPartition(RP) 分别创建一个 LocalBufferPool(缓冲池)并设置可申请的 MemorySegment(内存块)数量。IG 对应的缓冲池初始的内存块数量与 IG 中 InputChannel 数量一致,RP 对应的缓冲池初始的内存块数量与 RP 中的 ResultSubpartition 数量一致。不过,每当创建或销毁缓冲池时,NetworkBufferPool 会计算剩余空闲的内存块数量,并平均分配给已创建的缓冲池。
在 Task 线程执行过程中,当 Netty 接收端收到数据时,为了将 Netty 中的数据拷贝到 Task 中,InputChannel(实际是 RemoteInputChannel)会向其对应的缓冲池申请内存块(上图中的①)。如果缓冲池中也没有可用的内存块且已申请的数量还没到池子上限,则会向 NetworkBufferPool 申请内存块(上图中的②)并交给 InputChannel 填上数据(上图中的③和④)。
Buffer.recycle()
方法,会将内存块还给 LocalBufferPool (上图中的⑤)。如果LocalBufferPool中当前申请的数量超过了池子容量(由于上文提到的动态容量,由于新注册的 Task 导致该池子容量变小),则LocalBufferPool会将该内存块回收给 NetworkBufferPool(上图中的⑥)。如果没超过池子容量,则会继续留在池子中,减少反复申请的开销反压的过程
这里我们需要注意两个场景:
初始化 NettyServer 时配置的水位值参数。
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 2 * config.getMemorySegmentSize());
当输出缓冲中的字节数超过了高水位值, 则 Channel.isWritable() 会返回false。当输出缓存中的字节数又掉到了低水位值以下, 则 Channel.isWritable() 会重新返回true。Flink 中发送数据的核心代码在 PartitionRequestQueue 中
private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
反压监控
if (fatalError) {
return;
}
Buffer buffer = null;
try {
// channel.isWritable() 配合 WRITE_BUFFER_LOW_WATER_MARK
// 和 WRITE_BUFFER_HIGH_WATER_MARK 实现发送端的流量控制
if (channel.isWritable()) {
// 注意: 一个while循环也就最多只发送一个BufferResponse, 连续发送BufferResponse是通过writeListener回调实现的
while (true) {
if (currentPartitiOnQueue== null && (currentPartitiOnQueue= queue.poll()) == null) {
return;
}
buffer = currentPartitionQueue.getNextBuffer();
if (buffer == null) {
// 跳过这部分代码
...
}
else {
// 构造一个response返回给客户端
BufferResponse resp = new BufferResponse(buffer, currentPartitionQueue.getSequenceNumber(), currentPartitionQueue.getReceiverId());
if (!buffer.isBuffer() &&
EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class) {
// 跳过这部分代码。batch 模式中 subpartition 的数据准备就绪,通知下游消费者。
...
}
// 将该response发到netty channel, 当写成功后,
// 通过注册的writeListener又会回调进来, 从而不断地消费 queue 中的请求
channel.writeAndFlush(resp).addListener(writeListener);
return;
}
}
}
}
catch (Throwable t) {
if (buffer != null) {
buffer.recycle();
}
throw new IOException(t.getMessage(), t);
}
}
// 当水位值降下来后(channel 再次可写),会重新触发发送函数
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
writeAndFlushNextMessageIfPossible(ctx.channel());
}
Flink 在使用了一个 trick 来实现对反压的监控。如果一个 Task 因为反压而降速了,那么它会卡在向 LocalBufferPool 申请内存块上。那么这时候,该 Task 的 stack trace 就会长下面这样:
java.lang.Object.wait(Native Method)
o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING request
[...]
那么事情就简单了。通过不断地采样每个 task 的 stack trace(线程堆栈) 就可以实现反压监控。
image.png
Flink 的实现中,只有当 Web 页面切换到某个 Job 的 Backpressure 页面,才会对这个 Job 触发反压检测,因为反压检测还是挺昂贵的。JobManager的StackTraceSampleCoordinator( A coordinator for triggering and collecting stack traces of running tasks)
会通过 Akka 给每个 TaskManager 发送TriggerStackTraceSample消息。默认情况下,TaskManager 会触发100次 stack trace 采样,每次间隔 50ms(也就是说一次反压检测至少要等待5秒钟)。并将这 100 次采样的结果返回给 JobManager,由 JobManager 来计算反压比率
(反压出现的次数/采样的次数),最终展现在 UI 上。UI 刷新的默认周期是一分钟,目的是不对 TaskManager 造成太大的负担。
Flink 如何在吞吐量和延迟之间做权衡?
我们分析了上述的网络传输后,知道每个 SubTask 输出的数据并不是直接输出到下游,而是在 ResultSubPartition 中有一个 Buffer 用来缓存一批数据后,再 Flush 到 Netty 发送到下游 SubTask。那到底哪些情况会触发 Buffer Flush 到 Netty 呢?
Buffer 变满时
Buffer timeout 时
特殊事件来临时,例如:CheckPoint 的 barrier
来临时
Flink 在数据传输时,会把数据序列化成二进制然后写到 Buffer 中,当 Buffer 满了,需要 Flush(默认为32KiB
,通过taskmanager.memory.segment-size设置)。但是当流量低峰或者测试环节,可能1分钟都没有 32 KB
的数据,就会导致1分钟内的数据都积攒在 Buffer 中不会发送到下游 Task 去处理,从而导致数据出现延迟,这并不是我们想看到的。所以 Flink 有一个 Buffer timeout
的策略,意思是当数据量比较少,Buffer 一直没有变满时,后台的 Output flusher
线程会强制地将 Buffer 中的数据 Flush 到下游。Flink 中默认 timeout 时间是 100ms
,即:Buffer 中的数据要么变满时 Flush,要么最多等 100ms 也会 Flush 来保证数据不会出现很大的延迟。当然这个可以通过 env.setBufferTimeout
(timeoutMillis) 来控制超时时间。
一些特殊的消息如果通过 RecordWriter 发送,也会触发立即 Flush 缓存的数据。其中最重要的消息包括 Checkpoint barrier 以及 end-of-partition 事件,这些事件应该尽快被发送,而不应该等待 Buffer 被填满或者 Output flusher 的下一次 Flush。当然如果出现反压,CheckPoint barrier 也会等待,不能发送到下游。
引入 Network buffers
以获得更高的资源利用率
和更高的吞吐量
,代价是让一些记录在 Buffer 中等待一段时间。虽然可以通过缓冲区超时给出此等待时间的上限,但你可能知道有关这两个维度(延迟和吞吐量)之间权衡的更多信息:显然,无法同时获得这两者
参考:
https://ververica.cn/developers/advanced-tutorial-2-analysis-of-network-flow-control-and-back-pressure/
http://wuchong.me/blog/2016/04/26/flink-internals-how-to-handle-backpressure/
[https://www.cnblogs.com/pengblog2020/p/12161716.html]