热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FlinkTrigger代码实战

说明,就是数据大于2或者时间到凌晨的时候就触发执行。针对下面的业务做的简单的案例:实际的意思是正常的情况下正常触发,在到达凌晨的时候直

 说明,就是数据大于2 或者 时间到凌晨的时候就触发执行。

针对下面的业务做的简单的案例:

实际的意思是正常的情况下正常触发,在到达凌晨的时候直接触发,然后缓存清零

package application;import com.alibaba.fastjson.JSONObject;
import operator.DayProcessOperator;
import operator.DayTrigger;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;import java.util.Properties;/*** todo 数据分析* todo http://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/pic/49939/cn_zh/1487929553566/man-screen-shot.jpg** todo 指标维度: 1,活跃用户 2,新增用户 3,登陆会员 4,新注册用户 5,启动次数** todo 思路1: 每秒活跃用户数量都要存储到hbase,使用窗口函数,窗口大小为1s* todo 思路2:每天凌晨状态清零,开始重新累加计算,所以需要**/
public class Data_analysis_demo {public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);//todo 获取kafka的配置属性args = new String[]{"--input-topic", "topn_test", "--bootstrap.servers", "node2.hadoop:9092,node3.hadoop:9092","--zookeeper.connect", "node1.hadoop:2181,node2.hadoop:2181,node3.hadoop:2181", "--group.id", "cc2"};ParameterTool parameterTool = ParameterTool.fromArgs(args);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);Properties sendPros = parameterTool.getProperties();Properties pros = parameterTool.getProperties();//todo 指定输入数据为kafka topicDataStream kafkaDstream = env.addSource(new FlinkKafkaConsumer010(pros.getProperty("input-topic"),new SimpleStringSchema(),pros).setStartFromLatest());//todo 先转成JSONDataStream str2JsonDstream = kafkaDstream.map(new MapFunction() {@Overridepublic JSONObject map(String input) throws Exception {JSONObject inputJson = null;try {inputJson = JSONObject.parseObject(input);} catch (Exception e) {e.printStackTrace();}return inputJson;}});//todo 使用window算子str2JsonDstream.keyBy(value -> value.getString("appKey")).window(TumblingProcessingTimeWindows.of(Time.milliseconds(2000))).trigger(new DayTrigger()).process(new DayProcessOperator()).print();try {env.execute("开始执行....................");} catch (Exception e) {e.printStackTrace();}}
}

 

package operator;import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;public class DayTrigger extends Trigger {private ReducingStateDescriptor countStateDescriptor &#61;new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE);ValueStateDescriptor stateDescriptor &#61; new ValueStateDescriptor<>("total", Integer.class);//每个元素都会触发&#64;Overridepublic TriggerResult onElement(com.alibaba.fastjson.JSONObject element, long timestamp, Window window, TriggerContext ctx) throws Exception {System.out.println("先打印timestamp &#61; " &#43; timestamp&#43;",本地时间&#xff1a;"&#43;System.currentTimeMillis());ValueState sumState &#61; ctx.getPartitionedState(stateDescriptor);if (null &#61;&#61; sumState.value()) {sumState.update(0);}sumState.update(1 &#43; sumState.value());if (sumState.value() >&#61; 2) {//这里可以选择手动处理状态// 默认的trigger发送是TriggerResult.FIRE 不会清除窗口数据System.out.println("触发.....数据条数为&#xff1a;" &#43; (1 &#43; sumState.value()));return TriggerResult.FIRE;}Long todayZeroPointTimestamps &#61; getTodayZeroPointTimestamps();System.out.println("todayZeroPointTimestamps &#61; " &#43; todayZeroPointTimestamps);if (timestamp >&#61; todayZeroPointTimestamps) {return TriggerResult.FIRE_AND_PURGE;}if (timestamp >&#61; window.maxTimestamp()) {return TriggerResult.FIRE_AND_PURGE;} else {return TriggerResult.CONTINUE;}// ReducingState countState &#61; ctx.getPartitionedState(countStateDescriptor);
// countState.add(1L);
//System.out.println("打印countState.get() &#61; " &#43; countState.get());
// if (countState.get() >&#61; 3) {
// System.out.println("触发了............. " );
// //这里是触发并且清除
// return TriggerResult.FIRE_AND_PURGE;
// }
// return TriggerResult.CONTINUE;}&#64;Overridepublic TriggerResult onProcessingTime(long time, Window window, TriggerContext triggerContext) throws Exception {return TriggerResult.CONTINUE;}&#64;Overridepublic TriggerResult onEventTime(long time, Window window, TriggerContext triggerContext) throws Exception {return TriggerResult.CONTINUE;}&#64;Overridepublic void clear(Window window, TriggerContext ctx) throws Exception {
// System.out.println("清理窗口状态 窗口内保存值为" &#43; ctx.getPartitionedState(stateDescriptor).value());ctx.getPartitionedState(stateDescriptor).clear();}class Sum implements ReduceFunction {&#64;Overridepublic Long reduce(Long value1, Long value2) throws Exception {return value1 &#43; value2;}}public static Long getTodayZeroPointTimestamps() {long now &#61; System.currentTimeMillis();long daySecond &#61; 60 * 60 * 24 * 1000;long dayTime &#61; now - (now &#43; 8 * 3600 * 1000) % daySecond &#43; 1 * daySecond;return dayTime;}public static void main(String[] args) {Long todayZeroPointTimestamps &#61; getTodayZeroPointTimestamps();System.out.println("todayZeroPointTimestamps &#61; " &#43; todayZeroPointTimestamps);}}

 


推荐阅读
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 使用在线工具jsonschema2pojo根据json生成java对象
    本文介绍了使用在线工具jsonschema2pojo根据json生成java对象的方法。通过该工具,用户只需将json字符串复制到输入框中,即可自动将其转换成java对象。该工具还能解析列表式的json数据,并将嵌套在内层的对象也解析出来。本文以请求github的api为例,展示了使用该工具的步骤和效果。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • Java自带的观察者模式及实现方法详解
    本文介绍了Java自带的观察者模式,包括Observer和Observable对象的定义和使用方法。通过添加观察者和设置内部标志位,当被观察者中的事件发生变化时,通知观察者对象并执行相应的操作。实现观察者模式非常简单,只需继承Observable类和实现Observer接口即可。详情请参考Java官方api文档。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • 本文介绍了一个Java猜拳小游戏的代码,通过使用Scanner类获取用户输入的拳的数字,并随机生成计算机的拳,然后判断胜负。该游戏可以选择剪刀、石头、布三种拳,通过比较两者的拳来决定胜负。 ... [详细]
  • 本文介绍了如何在给定的有序字符序列中插入新字符,并保持序列的有序性。通过示例代码演示了插入过程,以及插入后的字符序列。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文介绍了Hyperledger Fabric外部链码构建与运行的相关知识,包括在Hyperledger Fabric 2.0版本之前链码构建和运行的困难性,外部构建模式的实现原理以及外部构建和运行API的使用方法。通过本文的介绍,读者可以了解到如何利用外部构建和运行的方式来实现链码的构建和运行,并且不再受限于特定的语言和部署环境。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
author-avatar
我是80初
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有