热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

org.apache.flink.api.common.functions.FlatMapFunction类的使用及代码示例

本文整理了Java中org.apache.flink.api.common.functions.FlatMapFunction类的一些代码示例,展示了Fla

本文整理了Java中org.apache.flink.api.common.functions.FlatMapFunction类的一些代码示例,展示了FlatMapFunction类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。FlatMapFunction类的具体详情如下:
包路径:org.apache.flink.api.common.functions.FlatMapFunction
类名称:FlatMapFunction

FlatMapFunction介绍

[英]Base interface for flatMap functions. FlatMap functions take elements and transform them, into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists and arrays. Operations that produce multiple strictly one result element per input element can also use the MapFunction.

The basic syntax for using a FlatMapFunction is as follows:

DataSet input = ...;

[中]flatMap函数的基本接口。FlatMap函数获取元素并将其转换为零、一个或多个元素。典型的应用程序可以是拆分元素,或者取消列表和数组的测试。每个输入元素生成多个结果元素的操作也可以使用MapFunction。
使用FlatMapFunction的基本语法如下:

DataSet input = ...;

代码示例

代码示例来源:origin: apache/flink

@Override
public void flatMap(PyObject value, Collector out) throws Exception {
this.collector.setCollector(out);
try {
this.fun.flatMap(value, this.collector);
} catch (PyException pe) {
throw createAndLogException(pe);
}
}
}

代码示例来源:origin: apache/flink

@Override
public void processElement(StreamRecord element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
}

代码示例来源:origin: apache/flink

@Override
protected List executeOnCollections(List inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction function = this.userFunction.getUserCodeObject();

FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, this.parameters);

ArrayList result = new ArrayList(inputData.size());
ListCollector collector = new ListCollector(result);
for (T element : inputData) {
function.flatMap(element, collector);
}

FunctionUtils.closeFunction(function);

return result;
}
}

代码示例来源:origin: apache/flink

@Override
public void processElement(StreamRecord> element) throws Exception {
userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
}

代码示例来源:origin: apache/flink

@Override
protected List executeOnCollections(List input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction function = userFunction.getUserCodeObject();

FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, parameters);
ArrayList result = new ArrayList(input.size());
TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
CopyingListCollector resultCollector = new CopyingListCollector(result, outSerializer);
for (IN element : input) {
IN inCopy = inSerializer.copy(element);
function.flatMap(inCopy, resultCollector);
}
FunctionUtils.closeFunction(function);
return result;
}
}

代码示例来源:origin: apache/flink

@Override
public void processElement(StreamRecord> element) throws Exception {
userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
}

代码示例来源:origin: com.alibaba.blink/flink-runtime

@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.mapper.flatMap(record, this.outputCollector);
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
}
}

代码示例来源:origin: org.apache.flink/flink-streaming-java

@Override
public void processElement(StreamRecord element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
}

代码示例来源:origin: org.apache.flink/flink-runtime

@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.mapper.flatMap(record, this.outputCollector);
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
}
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11

@Override
public void processElement(StreamRecord element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.mapper.flatMap(record, this.outputCollector);
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
}
}

代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10

@Override
public void processElement(StreamRecord element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.mapper.flatMap(record, this.outputCollector);
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
}
}

代码示例来源:origin: com.alibaba.blink/flink-core

@Override
protected List executeOnCollections(List inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction function = this.userFunction.getUserCodeObject();

FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, this.parameters);

ArrayList result = new ArrayList(inputData.size());
ListCollector collector = new ListCollector(result);
for (T element : inputData) {
function.flatMap(element, collector);
}

FunctionUtils.closeFunction(function);

return result;
}
}

代码示例来源:origin: org.apache.flink/flink-core

@Override
protected List executeOnCollections(List inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction function = this.userFunction.getUserCodeObject();

FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, this.parameters);

ArrayList result = new ArrayList(inputData.size());
ListCollector collector = new ListCollector(result);
for (T element : inputData) {
function.flatMap(element, collector);
}

FunctionUtils.closeFunction(function);

return result;
}
}

代码示例来源:origin: com.alibaba.blink/flink-core

@Override
protected List executeOnCollections(List input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction function = userFunction.getUserCodeObject();

FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, parameters);
ArrayList result = new ArrayList(input.size());
TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
CopyingListCollector resultCollector = new CopyingListCollector(result, outSerializer);
for (IN element : input) {
IN inCopy = inSerializer.copy(element);
function.flatMap(inCopy, resultCollector);
}
FunctionUtils.closeFunction(function);
return result;
}
}

代码示例来源:origin: org.apache.flink/flink-core

@Override
protected List executeOnCollections(List input, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
FlatMapFunction function = userFunction.getUserCodeObject();

FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, parameters);
ArrayList result = new ArrayList(input.size());
TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
CopyingListCollector resultCollector = new CopyingListCollector(result, outSerializer);
for (IN element : input) {
IN inCopy = inSerializer.copy(element);
function.flatMap(inCopy, resultCollector);
}
FunctionUtils.closeFunction(function);
return result;
}
}

代码示例来源:origin: vasia/gelly-streaming

f.flatMap(input1, out2);
Assert.assertEquals(0, resultList2.size());
f.flatMap(input2, out2);
Assert.assertEquals(0, resultList2.size());
f.flatMap(input3, out2);
Assert.assertEquals(5, resultList2.size());
Assert.assertEquals(true, resultList2.contains(new Tuple2<>(3, 1)));
f.flatMap(input4, out2);
Assert.assertEquals(5, resultList2.size());
Assert.assertEquals(true, resultList2.contains(new Tuple2<>(2, 1)));

代码示例来源:origin: org.apache.flink/flink-runtime_2.10

@Override
public void run() throws Exception {
final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
// cache references on the stack
final MutableObjectIterator input = this.taskContext.getInput(0);
final FlatMapFunction function = this.taskContext.getStub();
final Collector output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
IT record = this.taskContext.getInputSerializer(0).getSerializer().createInstance();
while (this.running && ((record = input.next(record)) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
} else {
IT record;
while (this.running && ((record = input.next()) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
}
}

代码示例来源:origin: org.apache.flink/flink-runtime_2.11

@Override
public void run() throws Exception {
final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
// cache references on the stack
final MutableObjectIterator input = this.taskContext.getInput(0);
final FlatMapFunction function = this.taskContext.getStub();
final Collector output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
if (objectReuseEnabled) {
IT record = this.taskContext.getInputSerializer(0).getSerializer().createInstance();
while (this.running && ((record = input.next(record)) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
} else {
IT record;
while (this.running && ((record = input.next()) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
}
}

推荐阅读
  • 技术分享:使用 Flask、AngularJS 和 Jinja2 构建高效前后端交互系统
    技术分享:使用 Flask、AngularJS 和 Jinja2 构建高效前后端交互系统 ... [详细]
  • 本文详细解析了客户端与服务器之间的交互过程,重点介绍了Socket通信机制。IP地址由32位的4个8位二进制数组成,分为网络地址和主机地址两部分。通过使用 `ipconfig /all` 命令,用户可以查看详细的IP配置信息。此外,文章还介绍了如何使用 `ping` 命令测试网络连通性,例如 `ping 127.0.0.1` 可以检测本机网络是否正常。这些技术细节对于理解网络通信的基本原理具有重要意义。 ... [详细]
  • 如何使用 `org.apache.poi.openxml4j.opc.PackagePart` 类中的 `loadRelationships()` 方法及其代码示例详解 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • 本文介绍了如何利用Struts1框架构建一个简易的四则运算计算器。通过采用DispatchAction来处理不同类型的计算请求,并使用动态Form来优化开发流程,确保代码的简洁性和可维护性。同时,系统提供了用户友好的错误提示,以增强用户体验。 ... [详细]
  • C++ 异步编程中获取线程执行结果的方法与技巧及其在前端开发中的应用探讨
    本文探讨了C++异步编程中获取线程执行结果的方法与技巧,并深入分析了这些技术在前端开发中的应用。通过对比不同的异步编程模型,本文详细介绍了如何高效地处理多线程任务,确保程序的稳定性和性能。同时,文章还结合实际案例,展示了这些方法在前端异步编程中的具体实现和优化策略。 ... [详细]
  • 本文探讨了使用JavaScript在不同页面间传递参数的技术方法。具体而言,从a.html页面跳转至b.html时,如何携带参数并使b.html替代当前页面显示,而非新开窗口。文中详细介绍了实现这一功能的代码及注释,帮助开发者更好地理解和应用该技术。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 在对WordPress Duplicator插件0.4.4版本的安全评估中,发现其存在跨站脚本(XSS)攻击漏洞。此漏洞可能被利用进行恶意操作,建议用户及时更新至最新版本以确保系统安全。测试方法仅限于安全研究和教学目的,使用时需自行承担风险。漏洞编号:HTB23162。 ... [详细]
  • 在ElasticStack日志监控系统中,Logstash编码插件自5.0版本起进行了重大改进。插件被独立拆分为gem包,每个插件可以单独进行更新和维护,无需依赖Logstash的整体升级。这不仅提高了系统的灵活性和可维护性,还简化了插件的管理和部署过程。本文将详细介绍这些编码插件的功能、配置方法,并通过实际生产环境中的应用案例,展示其在日志处理和监控中的高效性和可靠性。 ... [详细]
  • 本指南介绍了如何在ASP.NET Web应用程序中利用C#和JavaScript实现基于指纹识别的登录系统。通过集成指纹识别技术,用户无需输入传统的登录ID即可完成身份验证,从而提升用户体验和安全性。我们将详细探讨如何配置和部署这一功能,确保系统的稳定性和可靠性。 ... [详细]
  • 使用 ListView 浏览安卓系统中的回收站文件 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • V8不仅是一款著名的八缸发动机,广泛应用于道奇Charger、宾利Continental GT和BossHoss摩托车中。自2008年以来,作为Chromium项目的一部分,V8 JavaScript引擎在性能优化和技术创新方面取得了显著进展。该引擎通过先进的编译技术和高效的垃圾回收机制,显著提升了JavaScript的执行效率,为现代Web应用提供了强大的支持。持续的优化和创新使得V8在处理复杂计算和大规模数据时表现更加出色,成为众多开发者和企业的首选。 ... [详细]
  • 本文深入解析了 jQuery 中用于扩展功能的三个关键方法:`$.extend()`、`$.fn` 和 `$.fn.extend()`。其中,`$.extend()` 用于扩展 jQuery 对象本身,而 `$.fn.extend()` 则用于扩展 jQuery 的原型对象,使自定义方法能够作为 jQuery 实例的方法使用。通过这些方法,开发者可以轻松地创建和集成自定义插件,增强 jQuery 的功能。文章详细介绍了每个方法的用法、参数及实际应用场景,帮助读者更好地理解和运用这些强大的工具。 ... [详细]
author-avatar
mobiledu2502889153
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有