package day03;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.util.Collector;import java.sql.Timestamp;/** * @program: Flink_learn * @description: KeyedProcessFunction简单例子 * @author: Mr.逗 * @create: 2021-09-17 15:05 **/public class KeyedProcessFunctionDemo<I extends Number, I1 extends Number, S> { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource source = env.socketTextStream("172.17.0.50", 9999); source.keyBy(v->1) .process(new MyKeyed()) .print(); String name = KeyedProcessFunctionDemo.class.getName(); env.execute(name); } public static class MyKeyed extends KeyedProcessFunction<Integer, String, String> { @Override public void processElement(String value, Context ctx, Collector out) throws Exception { //当前机器时间 long ts = ctx.timerService().currentProcessingTime(); out.collect(value+"在"+new Timestamp(ts)+"到达"); //注册一个10s的定时器 long tenSecLater = ts + 10 * 1000; out.collect("注册了一个时间在"+new Timestamp(tenSecLater)+"的定时器"); ctx.timerService().registerEventTimeTimer(tenSecLater); } // 定时器也是状态 // 每个key独有定时器 // 每个key都可以注册自己的定时器 // 对于每个key,在某个时间戳,只能注册一个定时器 @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { super.onTimer(timestamp, ctx, out); out.collect("定时器触发时间是:"+new Timestamp(timestamp)); } }}