说明,就是数据大于2 或者 时间到凌晨的时候就触发执行。
针对下面的业务做的简单的案例:
实际的意思是正常的情况下正常触发,在到达凌晨的时候直接触发,然后缓存清零
package application;import com.alibaba.fastjson.JSONObject;
import operator.DayProcessOperator;
import operator.DayTrigger;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;import java.util.Properties;/*** todo 数据分析* todo http://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/pic/49939/cn_zh/1487929553566/man-screen-shot.jpg** todo 指标维度: 1,活跃用户 2,新增用户 3,登陆会员 4,新注册用户 5,启动次数** todo 思路1: 每秒活跃用户数量都要存储到hbase,使用窗口函数,窗口大小为1s* todo 思路2:每天凌晨状态清零,开始重新累加计算,所以需要**/
public class Data_analysis_demo {public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);//todo 获取kafka的配置属性args = new String[]{"--input-topic", "topn_test", "--bootstrap.servers", "node2.hadoop:9092,node3.hadoop:9092","--zookeeper.connect", "node1.hadoop:2181,node2.hadoop:2181,node3.hadoop:2181", "--group.id", "cc2"};ParameterTool parameterTool = ParameterTool.fromArgs(args);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);Properties sendPros = parameterTool.getProperties();Properties pros = parameterTool.getProperties();//todo 指定输入数据为kafka topicDataStream kafkaDstream = env.addSource(new FlinkKafkaConsumer010(pros.getProperty("input-topic"),new SimpleStringSchema(),pros).setStartFromLatest());//todo 先转成JSONDataStream str2JsonDstream = kafkaDstream.map(new MapFunction() {@Overridepublic JSONObject map(String input) throws Exception {JSONObject inputJson = null;try {inputJson = JSONObject.parseObject(input);} catch (Exception e) {e.printStackTrace();}return inputJson;}});//todo 使用window算子str2JsonDstream.keyBy(value -> value.getString("appKey")).window(TumblingProcessingTimeWindows.of(Time.milliseconds(2000))).trigger(new DayTrigger()).process(new DayProcessOperator()).print();try {env.execute("开始执行....................");} catch (Exception e) {e.printStackTrace();}}
}
package operator;import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;public class DayTrigger extends Trigger {private ReducingStateDescriptor countStateDescriptor &#61;new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE);ValueStateDescriptor stateDescriptor &#61; new ValueStateDescriptor<>("total", Integer.class);//每个元素都会触发&#64;Overridepublic TriggerResult onElement(com.alibaba.fastjson.JSONObject element, long timestamp, Window window, TriggerContext ctx) throws Exception {System.out.println("先打印timestamp &#61; " &#43; timestamp&#43;",本地时间&#xff1a;"&#43;System.currentTimeMillis());ValueState sumState &#61; ctx.getPartitionedState(stateDescriptor);if (null &#61;&#61; sumState.value()) {sumState.update(0);}sumState.update(1 &#43; sumState.value());if (sumState.value() >&#61; 2) {//这里可以选择手动处理状态// 默认的trigger发送是TriggerResult.FIRE 不会清除窗口数据System.out.println("触发.....数据条数为&#xff1a;" &#43; (1 &#43; sumState.value()));return TriggerResult.FIRE;}Long todayZeroPointTimestamps &#61; getTodayZeroPointTimestamps();System.out.println("todayZeroPointTimestamps &#61; " &#43; todayZeroPointTimestamps);if (timestamp >&#61; todayZeroPointTimestamps) {return TriggerResult.FIRE_AND_PURGE;}if (timestamp >&#61; window.maxTimestamp()) {return TriggerResult.FIRE_AND_PURGE;} else {return TriggerResult.CONTINUE;}// ReducingState countState &#61; ctx.getPartitionedState(countStateDescriptor);
// countState.add(1L);
//System.out.println("打印countState.get() &#61; " &#43; countState.get());
// if (countState.get() >&#61; 3) {
// System.out.println("触发了............. " );
// //这里是触发并且清除
// return TriggerResult.FIRE_AND_PURGE;
// }
// return TriggerResult.CONTINUE;}&#64;Overridepublic TriggerResult onProcessingTime(long time, Window window, TriggerContext triggerContext) throws Exception {return TriggerResult.CONTINUE;}&#64;Overridepublic TriggerResult onEventTime(long time, Window window, TriggerContext triggerContext) throws Exception {return TriggerResult.CONTINUE;}&#64;Overridepublic void clear(Window window, TriggerContext ctx) throws Exception {
// System.out.println("清理窗口状态 窗口内保存值为" &#43; ctx.getPartitionedState(stateDescriptor).value());ctx.getPartitionedState(stateDescriptor).clear();}class Sum implements ReduceFunction {&#64;Overridepublic Long reduce(Long value1, Long value2) throws Exception {return value1 &#43; value2;}}public static Long getTodayZeroPointTimestamps() {long now &#61; System.currentTimeMillis();long daySecond &#61; 60 * 60 * 24 * 1000;long dayTime &#61; now - (now &#43; 8 * 3600 * 1000) % daySecond &#43; 1 * daySecond;return dayTime;}public static void main(String[] args) {Long todayZeroPointTimestamps &#61; getTodayZeroPointTimestamps();System.out.println("todayZeroPointTimestamps &#61; " &#43; todayZeroPointTimestamps);}}