热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink操练(三十)之KeyedProcessFunction简单例子

1、代码逻辑实现packageday03;importorg.apache.flin

1、代码逻辑实现

package day03;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
/**
* @program: Flink_learn
* @description: KeyedProcessFunction简单例子
* @author: Mr.逗
* @create: 2021-09-17 15:05
**/

public class KeyedProcessFunctionDemo<I extends Number, I1 extends Number, S> {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource source = env.socketTextStream("172.17.0.50", 9999);
source.keyBy(v->1)
.process(new MyKeyed())
.print();
String name = KeyedProcessFunctionDemo.class.getName();
env.execute(name);
}
public static class MyKeyed extends KeyedProcessFunction<Integer, String, String> {
@Override
public void processElement(String value, Context ctx, Collector out) throws Exception {
//当前机器时间
long ts = ctx.timerService().currentProcessingTime();
out.collect(value+"在"+new Timestamp(ts)+"到达");
//注册一个10s的定时器
long tenSecLater = ts + 10 * 1000;
out.collect("注册了一个时间在"+new Timestamp(tenSecLater)+"的定时器");
ctx.timerService().registerEventTimeTimer(tenSecLater);
}
// 定时器也是状态
// 每个key独有定时器
// 每个key都可以注册自己的定时器
// 对于每个key,在某个时间戳,只能注册一个定时器
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
super.onTimer(timestamp, ctx, out);
out.collect("定时器触发时间是:"+new Timestamp(timestamp));
}
}
}




推荐阅读
author-avatar
sdr700724
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有