热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink传递参数给函数

原文地址:https:ci.apache.orgprojectsflinkflink-docs-release-1.2devbatchindex.html#passi

原文地址:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#passing-parameters-to-functions


Passing Parameters to Functions


 参数可以使用构造函数或者withParameters(Configuration)方法传递,参数将会作为函数对象的一部分被序列化并传递到task实例中!

  

  1 使用构造函数方式:

   

package com.daxinimport org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
/*** Created by Daxin on 2017/4/18.*/
object PassingParameters2Functions1 {def main(args: Array[String]) {val env = ExecutionEnvironment.getExecutionEnvironmentval toFilter = env.fromElements(1, 2, 3)class MyFilter(limit: Int) extends FilterFunction[Int] {override def filter(value: Int): Boolean = {value > limit}}val result =toFilter.filter(new MyFilter(2))result.print()}}




 2 withParameters(Configuration)方式

这个方法将会携带一个Configuration对象作为参数,这个参数将会传递给Rich Function的open方法(关于Rich Function参见:rich function)。Configuration对象是一个Map,存储Key/Value键值对.


package com.daxinimport org.apache.flink.api.common.functions.RichFilterFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration/*** Created by Daxin on 2017/4/18.* 我的邮箱: leodaxin@163.com*/
object PassingParameters2Functions2 {def main(args: Array[String]) {val env = ExecutionEnvironment.getExecutionEnvironmentval toFilter = env.fromElements(1, 2, 3)val c = new Configuration()c.setInteger("limit", 2)val result = toFilter.filter(new RichFilterFunction[Int]() {var limit = 0override def open(config: Configuration): Unit = {limit = config.getInteger("limit", 0) //没有的话返回默认值0}def filter(in: Int): Boolean = {in > limit}}).withParameters(c)result.print()}}






  3 使用全局的the ExecutionConfig方式:



   

package com.daxinimport org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction, RichFilterFunction}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration/*** Created by Daxin on 2017/4/18.* 传递参数3:Globally via the ExecutionConfig**/
object PassingParameters2Functions3 {def main(args: Array[String]) {val env = ExecutionEnvironment.getExecutionEnvironmentval data = env.fromElements("1", "2")val conf = new Configuration()conf.setString("mykey", "2")env.getConfig.setGlobalJobParameters(conf)class RichFunc extends RichMapFunction[String, String] {var mykey: String = _override def open(parameters: Configuration): Unit = {super.open(parameters)val globalParams: ExecutionConfig.GlobalJobParameters = getRuntimeContext().getExecutionConfig().getGlobalJobParameters()val globConf = globalParams.asInstanceOf[Configuration]mykey = globConf.getString("mykey", "default")}override def map(value: String): String = {if (mykey.equals(value)) "is equals" else "not equals"}}data.map(new RichFunc).print()}}





  

 




推荐阅读
author-avatar
pengminglin1968
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有