热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

org.apache.flink.api.common.functions.FlatMapFunction类的使用及代码示例

本文整理了Java中org.apache.flink.api.common.functions.FlatMapFunction类的一些代码示例,展示了Fla

本文整理了Java中org.apache.flink.api.common.functions.FlatMapFunction类的一些代码示例,展示了FlatMapFunction类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。FlatMapFunction类的具体详情如下:
包路径:org.apache.flink.api.common.functions.FlatMapFunction
类名称:FlatMapFunction

FlatMapFunction介绍

[英]Base interface for flatMap functions. FlatMap functions take elements and transform them, into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists and arrays. Operations that produce multiple strictly one result element per input element can also use the MapFunction.

The basic syntax for using a FlatMapFunction is as follows:

DataSet input = ...;

[中]flatMap函数的基本接口。FlatMap函数获取元素并将其转换为零、一个或多个元素。典型的应用程序可以是拆分元素,或者取消列表和数组的测试。每个输入元素生成多个结果元素的操作也可以使用MapFunction。
使用FlatMapFunction的基本语法如下:

DataSet input = ...;

代码示例

代码示例来源:origin: apache/flink

@Override
public void flatMap(PyObject value, Collector out) throws Exception {
this.collector.setCollector(out);
try {
this.fun.flatMap(value, this.collector);
} catch (PyException pe) {
throw createAndLogException(pe);
}
}
}

代码示例来源:origin: apache/flink

@Override
public void processElement(StreamRecord element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
}

代码示例来源:origin: apache/flink

@Override
protected List executeOnCollections(List inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction function = this.userFunction.getUserCodeObject();

FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, this.parameters);

ArrayList result = new ArrayList(inputData.size());
ListCollector collector = new ListCollector(result);
for (T element : inputData) {
function.flatMap(element, collector);
}

FunctionUtils.closeFunction(function);

return result;
}
}

代码示例来源:origin: apache/flink

@Override
public void processElement(StreamRecord> element) throws Exception {
userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
}

代码示例来源:origin: apache/flink

@Override
protected List executeOnCollections(List input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction function = userFunction.getUserCodeObject();

FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, parameters);
ArrayList result = new ArrayList(input.size());
TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
CopyingListCollector resultCollector = new CopyingListCollector(result, outSerializer);
for (IN element : input) {
IN inCopy = inSerializer.copy(element);
function.flatMap(inCopy, resultCollector);
}
FunctionUtils.closeFunction(function);
return result;
}
}

代码示例来源:origin: apache/flink

@Override
public void processElement(StreamRecord> element) throws Exception {
userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
}

代码示例来源:origin: com.alibaba.blink/flink-runtime

@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.mapper.flatMap(record, this.outputCollector);
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
}
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

@Override
public void processElement(StreamRecord element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
}

代码示例来源:origin: org.apache.flink/flink-runtime

@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.mapper.flatMap(record, this.outputCollector);
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
}
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

@Override
public void processElement(StreamRecord element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.mapper.flatMap(record, this.outputCollector);
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
}
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

@Override
public void processElement(StreamRecord element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.mapper.flatMap(record, this.outputCollector);
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
}
}

代码示例来源:origin: com.alibaba.blink/flink-core

@Override
protected List executeOnCollections(List inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction function = this.userFunction.getUserCodeObject();

FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, this.parameters);

ArrayList result = new ArrayList(inputData.size());
ListCollector collector = new ListCollector(result);
for (T element : inputData) {
function.flatMap(element, collector);
}

FunctionUtils.closeFunction(function);

return result;
}
}

代码示例来源:origin: org.apache.flink/flink-core

@Override
protected List executeOnCollections(List inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction function = this.userFunction.getUserCodeObject();

FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, this.parameters);

ArrayList result = new ArrayList(inputData.size());
ListCollector collector = new ListCollector(result);
for (T element : inputData) {
function.flatMap(element, collector);
}

FunctionUtils.closeFunction(function);

return result;
}
}

代码示例来源:origin: com.alibaba.blink/flink-core

@Override
protected List executeOnCollections(List input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction function = userFunction.getUserCodeObject();

FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, parameters);
ArrayList result = new ArrayList(input.size());
TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
CopyingListCollector resultCollector = new CopyingListCollector(result, outSerializer);
for (IN element : input) {
IN inCopy = inSerializer.copy(element);
function.flatMap(inCopy, resultCollector);
}
FunctionUtils.closeFunction(function);
return result;
}
}

代码示例来源:origin: org.apache.flink/flink-core

@Override
protected List executeOnCollections(List input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction function = userFunction.getUserCodeObject();

FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, parameters);
ArrayList result = new ArrayList(input.size());
TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
CopyingListCollector resultCollector = new CopyingListCollector(result, outSerializer);
for (IN element : input) {
IN inCopy = inSerializer.copy(element);
function.flatMap(inCopy, resultCollector);
}
FunctionUtils.closeFunction(function);
return result;
}
}

代码示例来源:origin: vasia/gelly-streaming

f.flatMap(input1, out2);
Assert.assertEquals(0, resultList2.size());
f.flatMap(input2, out2);
Assert.assertEquals(0, resultList2.size());
f.flatMap(input3, out2);
Assert.assertEquals(5, resultList2.size());
Assert.assertEquals(true, resultList2.contains(new Tuple2<>(3, 1)));
f.flatMap(input4, out2);
Assert.assertEquals(5, resultList2.size());
Assert.assertEquals(true, resultList2.contains(new Tuple2<>(2, 1)));

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

@Override
public void run() throws Exception {
final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
// cache references on the stack
final MutableObjectIterator input = this.taskContext.getInput(0);
final FlatMapFunction function = this.taskContext.getStub();
final Collector output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
IT record = this.taskContext.getInputSerializer(0).getSerializer().createInstance();
while (this.running && ((record = input.next(record)) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
} else {
IT record;
while (this.running && ((record = input.next()) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
}
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

@Override
public void run() throws Exception {
final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
// cache references on the stack
final MutableObjectIterator input = this.taskContext.getInput(0);
final FlatMapFunction function = this.taskContext.getStub();
final Collector output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
IT record = this.taskContext.getInputSerializer(0).getSerializer().createInstance();
while (this.running && ((record = input.next(record)) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
} else {
IT record;
while (this.running && ((record = input.next()) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
}
}

推荐阅读
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • 本题主要考察二维数组的遍历和重塑。通过将二维数组降为一维,再根据新的行数和列数重新构建矩阵。 ... [详细]
  • 本文详细介绍了Java反射机制的基本概念、获取Class对象的方法、反射的主要功能及其在实际开发中的应用。通过具体示例,帮助读者更好地理解和使用Java反射。 ... [详细]
  • 本文将带你快速了解 SpringMVC 框架的基本使用方法,通过实现一个简单的 Controller 并在浏览器中访问,展示 SpringMVC 的强大与简便。 ... [详细]
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 2.2 组件间父子通信机制详解
    2.2 组件间父子通信机制详解 ... [详细]
  • 深入解析 Synchronized 锁的升级机制及其在并发编程中的应用
    深入解析 Synchronized 锁的升级机制及其在并发编程中的应用 ... [详细]
  • 如何将TS文件转换为M3U8直播流:HLS与M3U8格式详解
    在视频传输领域,MP4虽然常见,但在直播场景中直接使用MP4格式存在诸多问题。例如,MP4文件的头部信息(如ftyp、moov)较大,导致初始加载时间较长,影响用户体验。相比之下,HLS(HTTP Live Streaming)协议及其M3U8格式更具优势。HLS通过将视频切分成多个小片段,并生成一个M3U8播放列表文件,实现低延迟和高稳定性。本文详细介绍了如何将TS文件转换为M3U8直播流,包括技术原理和具体操作步骤,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 每日前端实战:148# 视频教程展示纯 CSS 实现按钮两侧滑入装饰元素的悬停效果
    通过点击页面右侧的“预览”按钮,您可以直接在当前页面查看效果,或点击链接进入全屏预览模式。该视频教程展示了如何使用纯 CSS 实现按钮两侧滑入装饰元素的悬停效果。视频内容具有互动性,观众可以实时调整代码并观察变化。访问以下链接体验完整效果:https://codepen.io/comehope/pen/yRyOZr。 ... [详细]
  • WinMain 函数详解及示例
    本文详细介绍了 WinMain 函数的参数及其用途,并提供了一个具体的示例代码来解析 WinMain 函数的实现。 ... [详细]
  • IOS Run loop详解
    为什么80%的码农都做不了架构师?转自http:blog.csdn.netztp800201articledetails9240913感谢作者分享Objecti ... [详细]
  • 在多线程并发环境中,普通变量的操作往往是线程不安全的。本文通过一个简单的例子,展示了如何使用 AtomicInteger 类及其核心的 CAS 无锁算法来保证线程安全。 ... [详细]
  • com.hazelcast.config.MapConfig.isStatisticsEnabled()方法的使用及代码示例 ... [详细]
  • 本文总结了一些开发中常见的问题及其解决方案,包括特性过滤器的使用、NuGet程序集版本冲突、线程存储、溢出检查、ThreadPool的最大线程数设置、Redis使用中的问题以及Task.Result和Task.GetAwaiter().GetResult()的区别。 ... [详细]
author-avatar
mobiledu2502889153
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有