热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【05】Flink之DataStreamAPI(三):Partition操作

1、Partition操作常用APIRandompartitioningRebalancing

1、Partition 操作常用API

  • Random partitioning
  • Rebalancing
  • Rescaling
  • Custom partitioning
  • Broadcasting
  1. Random partitioning:随机分区
    使用dataStream.shuffle()方法
    底层实现:

public class ShufflePartitioner extends StreamPartitioner {
private static final long serialVersiOnUID= 1L;
private Random random = new Random();
private final int[] returnArray = new int[1];
@Override
public int[] selectChannels(SerializationDelegate> record,
int numberOfOutputChannels) { // 获取所有 channel 数
returnArray[0] = random.nextInt(numberOfOutputChannels); // 得到一个 0 - channel_num 之间的数值
return returnArray; // 该返回的数值决定了要分到哪个区
}
@Override
public StreamPartitioner copy() {
return new ShufflePartitioner();
}
@Override
public String toString() {
return "SHUFFLE";
}
}

  1. Rebalancing:对数据集进行再平衡,重分区,消 除 数 据 倾 斜 \color{red}{消除数据倾斜}消除数据倾斜
    使用dataStream.rebalance()方法

底层实现:

public class RebalancePartitioner extends StreamPartitioner {
private static final long serialVersiOnUID= 1L;
private final int[] returnArray = new int[] {-1};
@Override
public int[] selectChannels(SerializationDelegate> record,
int numberOfOutputChannels) {
int newChannel = ++this.returnArray[0]; // 获取 0 号元素的数据,通过加1,指向下一个channel
if (newChannel >= numberOfOutputChannels) { // 如果大于等于 channel数,则加到头了,重新再从 0号channel开始分发
this.returnArray[0] = 0;
}
return this.returnArray;
}
public StreamPartitioner copy() {
return this;
}
@Override
public String toString() {
return "REBALANCE";
}
}

  1. Rescaling:
    使用dataStream.rescale()方法
    举例:
    如果上游操作有2个并发,而下游操作有4个并发,那么上游的一个并发结果分配给下游的两个并发操作,另外的一个并发结果分配给了下游的另外两个并发操作.另一方面,下游有两个并发操作而上游又4个并发操作,那么上游的其中两个操作的结果分配给下游的一个并发操作而另外两个并发操作的结果则分配给另外一个并发操作。

注意:

  • Rescaling与Rebalancing的区别:
  • Rebalancing会产生全量重分区,而Rescaling不会。
  1. Custom partitioning:自定义分区
    自定义分区需要实现Partitioner接口
    使用dataStream.partitionCustom(partitioner, “someKey”)方法
    或者
    使用dataStream.partitionCustom(partitioner, 0); 方法
  2. Broadcasting

2、自定义分区 Custom partitioning

自定义分区
自定义分区需要实现Partitioner接口

2.1 Java代码实现

实现根据奇、偶数分区

public class MyPartition implements Partitioner {
@Override
public int partition(Long key, int numPartitions) {
System.out.println("分区总数:"+numPartitions);
if(key % 2 == 0){
return 0;
}else{
return 1;
}
}
}

dataStream.partitionCustom(partitioner, "someKey")

或者:

dataStream.partitionCustom(partitioner, 0);

完整代码:

package com.Streaming.custormPartition;
import com.Streaming.custormSource.MyNoParalleSource;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @Author: Henry
* @Description: 使用自定义分析
* 根据数字的奇偶性来分区
* @Date: Create in 2019/5/12 19:21
**/
public class SteamingDemoWithMyParitition {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource text = env.addSource(new MyNoParalleSource());
//对数据进行转换,把long类型转成tuple1类型
DataStream> tupleData = text.map(new MapFunction>() {
@Override
public Tuple1 map(Long value) throws Exception {
return new Tuple1<>(value);
}
});
//分区之后的数据
DataStream> partitiOnData= tupleData.partitionCustom(new MyPartition(), 0);
DataStream result = partitionData.map(new MapFunction, Long>() {
@Override
public Long map(Tuple1 value) throws Exception {
System.out.println("当前线程id:" + Thread.currentThread().getId() + ",value: " + value);
return value.getField(0);
}
});
result.print().setParallelism(1);
env.execute("SteamingDemoWithMyParitition");
}
}

2.2、运行结果

分数总数:8 (因为没有设置并行度)
虽然并行度是8,但是实际只有两个线程工作:即线程id=68 和 线程id=69
线程id=68,处理奇数分区
线程id=69,处理偶数分区

由上图代码,可以根据业务设置并行度,即 env.setParallelism(2) ;

2.3 Scala代码实现

自定义分区代码如下:

package cn.Streaming.custormPartition
import org.apache.flink.api.common.functions.Partitioner
/**
* @Author: HongZhen
* @Description:
* @Date: Create in 2019/5/14 22:16
**/
class MyPartitionerScala extends Partitioner[Long]{
override def partition(key: Long, numPartitions: Int) = {
println("分区总数:"+numPartitions)
if(key % 2 ==0){
0
}else{
1
}
}
}

主程序代码:

package cn.Streaming.custormPartition
import cn.Streaming.custormSource.MyNoParallelSourceScala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* @Author: Henry
* @Description:
* @Date: Create in 2019/5/14 22:17
**/
object StreamingDemoMyPartitionerScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
//隐式转换
import org.apache.flink.api.scala._
val text = env.addSource(new MyNoParallelSourceScala)
//把long类型的数据转成tuple类型
val tupleData = text.map(line=>{
Tuple1(line)// 注意tuple1的实现方式
// Tuple2 可以直接写成如 (line,1)
// 但是 Tuple1 必须加上关键词 Tuple1
})
// 上面将 Long 转换为 Tuple1[Long] 的原因是由于
// partitionCustom 的 field 参数类型: Tuple1[K]
val partitiOnData= tupleData.partitionCustom(
new MyPartitionerScala, 0 )
val result = partitionData.map(line=>{
println("当前线程id:"+
Thread.currentThread().getId+",value: "+line)
line._1
})
result.print().setParallelism(1)
env.execute("StreamingDemoWithMyNoParallelSourceScala")
}
}

2.4 运行结果


推荐阅读
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • 高质量SQL书写的30条建议
    本文提供了30条关于优化SQL的建议,包括避免使用select *,使用具体字段,以及使用limit 1等。这些建议是基于实际开发经验总结出来的,旨在帮助读者优化SQL查询。 ... [详细]
  • 本文介绍了三种方法来实现在Win7系统中显示桌面的快捷方式,包括使用任务栏快速启动栏、运行命令和自己创建快捷方式的方法。具体操作步骤详细说明,并提供了保存图标的路径,方便以后使用。 ... [详细]
  • 本文介绍了Android 7的学习笔记总结,包括最新的移动架构视频、大厂安卓面试真题和项目实战源码讲义。同时还分享了开源的完整内容,并提醒读者在使用FileProvider适配时要注意不同模块的AndroidManfiest.xml中配置的xml文件名必须不同,否则会出现问题。 ... [详细]
  • MyBatis多表查询与动态SQL使用
    本文介绍了MyBatis多表查询与动态SQL的使用方法,包括一对一查询和一对多查询。同时还介绍了动态SQL的使用,包括if标签、trim标签、where标签、set标签和foreach标签的用法。文章还提供了相关的配置信息和示例代码。 ... [详细]
  • 本文介绍了如何清除Eclipse中SVN用户的设置。首先需要查看使用的SVN接口,然后根据接口类型找到相应的目录并删除相关文件。最后使用SVN更新或提交来应用更改。 ... [详细]
  • 解决.net项目中未注册“microsoft.ACE.oledb.12.0”提供程序的方法
    在开发.net项目中,通过microsoft.ACE.oledb读取excel文件信息时,报错“未在本地计算机上注册“microsoft.ACE.oledb.12.0”提供程序”。本文提供了解决这个问题的方法,包括错误描述和代码示例。通过注册提供程序和修改连接字符串,可以成功读取excel文件信息。 ... [详细]
  • OpenMap教程4 – 图层概述
    本文介绍了OpenMap教程4中关于地图图层的内容,包括将ShapeLayer添加到MapBean中的方法,OpenMap支持的图层类型以及使用BufferedLayer创建图像的MapBean。此外,还介绍了Layer背景标志的作用和OMGraphicHandlerLayer的基础层类。 ... [详细]
  • #define_CRT_SECURE_NO_WARNINGS#includelist.h#includevoidSListInit(PNode*pHead ... [详细]
author-avatar
MS07224_670
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有