作者:黄宗翰琼琦莉雯 | 来源:互联网 | 2023-09-17 19:20
目录处理函数基本处理函数ProcessFunction处理函数的功能ProcessFunction解析处理函数的分类按键分区处理函数KeyedProcessFunction定时器T
目录
- 处理函数
- 基本处理函数 ProcessFunction
- 处理函数的功能
- ProcessFunction解析
- 处理函数的分类
- 按键分区处理函数 KeyedProcessFunction
- 定时器Timer 和定时服务 TimerService
- 窗口处理函数
- 应用案例 TopN
- 侧输出流
整体框架:
处理函数
基本处理函数 ProcessFunction
处理函数的功能
ProcessFunction:能拿到别的API拿不到的东西。
处理函数提供了一个定时服务TimerService,可以用它访问流中的事件event、时间戳timestamp,水位线watermark,注册定时事件。
处理函数继承自AbstractRichFunction,拥有富函数类的特性,可以访问状态state和其他运行时信息。
处理函数可以直接将数据输出到侧输出流side output中。
处理函数是DataStream API的底层逻辑。
ProcessFunction解析
ProcessFunction类中:
1. processElement(输入类型,上下文,输入类型Collector)上下文中能获取到:时间戳、侧输出流、"timerService"timerService中能获取到:currentProcessingTime处理时间、currentWatermark事件时间、registerProcessingTimeTimer注册定时器、删除定时器。2. onTimer()注册了定时器后,到点时会触发这个回调,这是定时到了后的处理方法。但是,只有基于KeyedStream才能定义定时器。
处理函数的分类
1. ProcessFunction
2. KeyedProcessFunction----"重要"
3. ProcessWindowFunction
4. ProcessAllWindowFunction
5. CoProcessFunction
6. ProcessJoinFunction
7. BroadcastProcessFunction
8. KeyedBroadcastProcessFunction
按键分区处理函数 KeyedProcessFunction
stream.keyBy().process(new MyKeyedProcessFunction())
定时器Timer 和定时服务 TimerService
处理时间:
stream.keyBy(data->data.user).process(new KeyedProcessFunction<String, Event, String>(){&#64;Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception(){Long currTs &#61; ctx.timerService().currentProcessingTime();out.collect(ctx.getCurrentKey() &#43; " 数据到达&#xff0c;到达时间&#xff1a;" &#43; new Timestamp(currTs));ctx.timerService().registerProcessingTimeTimer(currTs &#43; 10 * 1000L);}&#64;Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception() {out.collect(ctx.getCurrentKey() &#43; " 定时器触发&#xff0c;触发时间&#xff1a;"&#43;new Timestamp(timestamp));}})
事件时间&#xff1a;
stream.keyBy(data->data.user).process(new KeyedProcessFunction<String, Event, String>(){&#64;Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception(){Long currTs &#61; ctx.timestamp();out.collect(ctx.getCurrentKey() &#43; " 数据到达&#xff0c;时间戳&#xff1a;" &#43; new Timestamp(currTs) &#43; " watermark&#xff1a;"&#43; ctx.timerService().currentWatermark());ctx.timerService().registerProcessingTimeTimer(currTs &#43; 10 * 1000L);}&#64;Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception() {out.collect(ctx.getCurrentKey() &#43; " 定时器触发&#xff0c;触发时间&#xff1a;"&#43;new Timestamp(timestamp) &#43; " watermark&#xff1a;"&#43; ctx.timerService().currentWatermark());}})
窗口处理函数
窗口处理函数的使用
ProcessWindowFunction解析
应用案例 TopN
要求&#xff1a;统计每隔10s的最受欢迎的URL的前两名&#xff0c;每隔5s更新一次结果。
使用ProcessAllWindowFunction
该方法数据量大的时候&#xff0c;把所有数据放在一个窗口里&#xff0c;不靠谱。
使用KeyedProcessFunction
SingleOutputStreamOperator<UrlViewCount> urlCountStream &#61; stream.keyBy(data -> data.url).window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))).aggregate(new UrlCountViewExample.UrlViewCountAgg(), new UrlCountViewExample.xxxx());
urlCountStream.keyBy(data -> data.windowEnd) .process(new TopNProcessResult(2)).print();
public static class TopNProcessResult extends KeyedProcessFunction<Long, UrlViewCount, String>{private int n;private ListState<UrlViewCount> urlViewCountListState;public TopNProcessResult(int n){this.n &#61; n;}&#64;Overridepublic void open(Configuration parameters) throws Exception{urlViewCountListState &#61; getRuntimeContext().getListState(new ListStateDescriptor<UrlViewCount>("url-count-list", Types.POJO(UrlViewCount.class)) ;);}&#64;Overridepublic void processElement(UrlViewCount value, Context ctx, Collector<String> out) throws Exception{urlViewCountListState.add(value);ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() &#43; 1); }&#64;Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception{ArrayList<UrlViewCount> urlViewCountArrayList &#61; new ArrayList();for(UrlViewCount u : urlViewCountListState.get()){urlViewCountArrayList.add(u);}urlViewCountArrayList.sort(new Comparator<UrlViewCount>(){&#64;Overridepublic int compare(UrlViewCount o1, UrlViewCount o2){return o2.count.intValue() - o1.count.intValue();}});StringBuilder result &#61; new StringBuilder();result.append("窗口结束时间&#xff1a;" &#43; new Timestamp(ctx.getCurrentKey()));for(int i&#61;0; i < 2; i&#43;&#43;){UrlViewCount currTuple &#61; urlViewCountArrayList.get(i);String info &#61; "No. "&#43;(i&#43;1)&#43;" "&#43; "url: "&#43; currTuple.url &#43; " "&#43; "访问量&#xff1a; "&#43; currTuple.count &#43; "\n";result.append(info);}out.collect(result.toString());}
}
侧输出流
可以用来做分流操作
OutputTag<String> outputTag &#61; new OutputTag<String>("side-output"){};public void processElement(){out.collect(Long.valueof(value));ctx.output(outputTag, String.valueof(value));
}
DataStream<String> stringStream &#61; longStream.getSideOutput(outputTag);