热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink重分区算子解析-StreamPartitionerflink拆分流

本文主要分享【flink拆分流】,技术文章【Flink重分区算子解析-StreamPartitioner】为【Rango_lhl】投稿,如果你遇到大数据相关问题,本文相关知识或能到你。flink

本文主要分享【flink拆分流】,技术文章【Flink重分区算子解析 - StreamPartitioner】为【Rango_lhl】投稿,如果你遇到大数据相关问题,本文相关知识或能到你。

flink拆分流

一、背景说明

目前Flink(version:1.13)包含8个重分区算子,对应8个分区器(7个官方定义及1个自定义),均继承与父类StreamPartitioner。

RebalancePartitioner
RescalePartitioner
KeyGroupStreamPartitioner
GlobalPartitioner
ShufflePartitioner
ForwardPartitioner
CustomPartitionerWrapper
BroadcastPartitioner

二、各分区器说明 1. 概览图

Flink重分区算子解析 - StreamPartitioner flink拆分流

2. RebalancePartitioner

Partitioner that distributes the data equally by cycling through the output channels.

rebalance()算子是真正意义上的轮询操作,上游数据轮询下发到下游算子,注意与broadcast()算子的区别,上图颜色点代表两者数据分发的区别。

private int nextChannelToSendTo;

// 下游channel选择器,第一个数据是随机选择下游其中一个channel
@Override
public void setup(int numberOfChannels) {
   
    super.setup(numberOfChannels);
    nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
}
// 后续+1取模的方式开始轮询下发
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
   
    nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
    return nextChannelToSendTo;
}
// 分发模式为 ALL_TO_ALL
@Override
public boolean isPointwise() {
    return false; }

FLink 将任务的执行计划分为 StreamGraph–>JobGraph–>ExecutionGraph,其中的StreamingJobGraphGenerator类用以实现将StreamGraph转化为JobGraph,在该类中会调用分区器的isPointwise()方法实现分发模式的选择 :POINTWISE / ALL_TO_ALL。

JobEdge jobEdge;
if (partitioner.isPointwise()) {
   
    jobEdge =
            downStreamVertex.connectNewDataSetAsInput(
                    headVertex, DistributionPattern.POINTWISE, resultPartitionType);
} else {
   
    jobEdge =
            downStreamVertex.connectNewDataSetAsInput(
                    headVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
}
3. RescalePartitioner

The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one upstream operation would distribute elements to two downstream operations while the other upstream operation would distribute to the other two downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 4 then two upstream operations will distribute to one downstream operation while the other two upstream operations will distribute to the other downstream operations.

In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.

根据源码里面的注释可知道,rescale的上下游交互取决于他们的并行度,上游为2下游为4,则一个上游对应两个下游,上游为4下游为2,则两个上游对应一个下游。如若是不同倍数的并行度,则下游会有不同数量的输入。

区别于rebalance有两点,轮询从下游第一个分区开始以及是点对点分发模式。rescale可以增加数据本地处理,减少了网络io性能更高,但数据均衡性不如rebalance。
private int nextChannelToSendTo = -1;
// 下游channel选择器,从0开始轮询
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
   
    if (++nextChannelToSendTo >= numberOfChannels) {
   
        nextChannelToSendTo = 0;
    }
    return nextChannelToSendTo;
}
// 分发模式 POINTWISE 点到点,一个下游只会有一个输入
@Override
public boolean isPointwise() {
    return true; }
4. GlobalPartitioner

Partitioner that sends all elements to the downstream operator with subtask ID=0.

如源码注释所写,所有上游数据下发到下游第一个分区。

// 下游channel选择器,均返回0
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    return 0;}

// 分发模式为 ALL_TO_ALL
@Override
public boolean isPointwise() {
    return false;}
5. ForwardPartitioner

Partitioner that forwards elements only to the locally running downstream operation.

仅将元素转发到本地运行的下游操作的分区器。

// 下游channel选择器,均返回0
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    return 0;}

// 分发模式 POINTWISE 点到点,一个下游只会有一个输入
@Override
public boolean isPointwise() {
    return true;}

与global一样的channel选择方法,区别在于isPointwise()方法为点到点。因此实现了下游仅有一个输入,通过概览图可以清晰看到两者区别。

6. BroadcastPartitioner

Partitioner that selects all the output channels.

上游数据会分发给下游所有分区,故源码里面也提示了不支持select channel。

/** * Note: Broadcast mode could be handled directly for all the output channels in record writer, * so it is no need to select channels via this method. */
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
   
    throw new UnsupportedOperationException(
            "Broadcast partitioner does not support select channels.");
}
7. KeyGroupStreamPartitioner

Partitioner selects the target channel based on the key group index.

总结下来就是,按照分区键根据hashCode()一次哈希,再murmurHash(keyHash)二次哈希,按照最大并行度(默认128)取模生成keyGroupId,最后根据keyGroupId * parallelism / maxParallelism 得出下游分区index,作为数据分发的依据。

// 核心逻辑,其中最大并行度由系统定义,DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 <<7 为128
public KeyedStream(
        DataStream<T> dataStream,
        KeySelector<T, KEY> keySelector,
        TypeInformation<KEY> keyType) {
   
    this(
            dataStream,
            new PartitionTransformation<>(
                    dataStream.getTransformation(),
                    new KeyGroupStreamPartitioner<>(
                            keySelector,
                            StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
            keySelector,
            keyType);
}
KeyGroupStreamPartitioner
// key为分组键,maxParallelism由系统定义默认128,numberOfChannels为用户定义并行度
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
   
    K key;
    try {
   
        key = keySelector.getKey(record.getInstance().getValue());
    } catch (Exception e) {
   
        throw new RuntimeException(
                "Could not extract key from " + record.getInstance().getValue(), e);
    }
    return KeyGroupRangeAssignment.assignKeyToParallelOperator(
            key, maxParallelism, numberOfChannels);
}
KeyGroupRangeAssignment
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
   
    Preconditions.checkNotNull(key, "Assigned key must not be null!");
    return computeOperatorIndexForKeyGroup(
            maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}

// 第一次hash
public static int assignToKeyGroup(Object key, int maxParallelism) {
   
    Preconditions.checkNotNull(key, "Assigned key must not be null!");
    return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}

// 第二次hash(murmurhash)
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
   
    return MathUtils.murmurHash(keyHash) % maxParallelism;
}

// 根据公式获取目标下游分区index
public static int computeOperatorIndexForKeyGroup(
        int maxParallelism, int parallelism, int keyGroupId) {
   
    return keyGroupId * parallelism / maxParallelism;
}
8. ShufflePartitioner

Partitioner that distributes the data equally by selecting one output channel randomly.

shuffle()算子按Random()方法随机选择下游分区。

// 随机分发
private Random random = new Random();

@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    return random.nextInt(numberOfChannels);}

@Override
public boolean isPointwise() {
    return false;}
9. CustomPartitionerWrapper

Partitions a DataStream on the key returned by the selector, using a custom partitioner. This
method takes the key selector to get the key to partition on, and a partitioner that accepts
the key type.

partitionCustom()方法顾名思义就是就是自定义分区器,其中主要是重写里面两个方法Partitioner(定义分区行为)及KeySelector(定义key)

public <K> DataStream<T> partitionCustom(
        Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
   
    return setConnectionType(
            new CustomPartitionerWrapper<>(clean(partitioner), clean(keySelector)));
}

学习交流,有任何问题还请随时评论指出交流。

本文《Flink重分区算子解析 - StreamPartitioner》版权归Rango_lhl所有,引用Flink重分区算子解析 - StreamPartitioner需遵循CC 4.0 BY-SA版权协议。


推荐阅读
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 本文介绍了九度OnlineJudge中的1002题目“Grading”的解决方法。该题目要求设计一个公平的评分过程,将每个考题分配给3个独立的专家,如果他们的评分不一致,则需要请一位裁判做出最终决定。文章详细描述了评分规则,并给出了解决该问题的程序。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • 本文介绍了一个Java猜拳小游戏的代码,通过使用Scanner类获取用户输入的拳的数字,并随机生成计算机的拳,然后判断胜负。该游戏可以选择剪刀、石头、布三种拳,通过比较两者的拳来决定胜负。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • ALTERTABLE通过更改、添加、除去列和约束,或者通过启用或禁用约束和触发器来更改表的定义。语法ALTERTABLEtable{[ALTERCOLUMNcolu ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • 本文详细介绍了如何使用MySQL来显示SQL语句的执行时间,并通过MySQL Query Profiler获取CPU和内存使用量以及系统锁和表锁的时间。同时介绍了效能分析的三种方法:瓶颈分析、工作负载分析和基于比率的分析。 ... [详细]
  • Ihavethefollowingonhtml我在html上有以下内容<html><head><scriptsrc..3003_Tes ... [详细]
  • Imtryingtofigureoutawaytogeneratetorrentfilesfromabucket,usingtheAWSSDKforGo.我正 ... [详细]
  • 本文讨论了如何在codeigniter中识别来自angularjs的请求,并提供了两种方法的代码示例。作者尝试了$this->input->is_ajax_request()和自定义函数is_ajax(),但都没有成功。最后,作者展示了一个ajax请求的示例代码。 ... [详细]
  • 本文介绍了在iOS开发中使用UITextField实现字符限制的方法,包括利用代理方法和使用BNTextField-Limit库的实现策略。通过这些方法,开发者可以方便地限制UITextField的字符个数和输入规则。 ... [详细]
author-avatar
Damon
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有