原文地址:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#passing-parameters-to-functions
Passing Parameters to Functions
参数可以使用构造函数或者withParameters(Configuration)方法传递,参数将会作为函数对象的一部分被序列化并传递到task实例中!
1 使用构造函数方式:
package com.daxinimport org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
/*** Created by Daxin on 2017/4/18.*/
object PassingParameters2Functions1 {def main(args: Array[String]) {val env = ExecutionEnvironment.getExecutionEnvironmentval toFilter = env.fromElements(1, 2, 3)class MyFilter(limit: Int) extends FilterFunction[Int] {override def filter(value: Int): Boolean = {value > limit}}val result =toFilter.filter(new MyFilter(2))result.print()}}
2 withParameters(Configuration)方式
这个方法将会携带一个Configuration对象作为参数,这个参数将会传递给Rich Function的open方法(关于Rich Function参见:rich function)。Configuration对象是一个Map,存储Key/Value键值对.
package com.daxinimport org.apache.flink.api.common.functions.RichFilterFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration/*** Created by Daxin on 2017/4/18.* 我的邮箱: leodaxin@163.com*/
object PassingParameters2Functions2 {def main(args: Array[String]) {val env = ExecutionEnvironment.getExecutionEnvironmentval toFilter = env.fromElements(1, 2, 3)val c = new Configuration()c.setInteger("limit", 2)val result = toFilter.filter(new RichFilterFunction[Int]() {var limit = 0override def open(config: Configuration): Unit = {limit = config.getInteger("limit", 0) //没有的话返回默认值0}def filter(in: Int): Boolean = {in > limit}}).withParameters(c)result.print()}}
3 使用全局的the ExecutionConfig方式:
package com.daxinimport org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction, RichFilterFunction}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration/*** Created by Daxin on 2017/4/18.* 传递参数3:Globally via the ExecutionConfig**/
object PassingParameters2Functions3 {def main(args: Array[String]) {val env = ExecutionEnvironment.getExecutionEnvironmentval data = env.fromElements("1", "2")val conf = new Configuration()conf.setString("mykey", "2")env.getConfig.setGlobalJobParameters(conf)class RichFunc extends RichMapFunction[String, String] {var mykey: String = _override def open(parameters: Configuration): Unit = {super.open(parameters)val globalParams: ExecutionConfig.GlobalJobParameters = getRuntimeContext().getExecutionConfig().getGlobalJobParameters()val globConf = globalParams.asInstanceOf[Configuration]mykey = globConf.getString("mykey", "default")}override def map(value: String): String = {if (mykey.equals(value)) "is equals" else "not equals"}}data.map(new RichFunc).print()}}