热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

20210128Flink33(Flink实时日志需求案例)

1.实时计算的指标

1.实时计算的指标

2021-01-28-Flink-33(Flink 实时日志需求案例 )
计算的指标

2.FlinkUtils

Properties

# 必须传入的参数
checkpoint.path=hdfs://linux03:9000/chk2021
bootstrap.servers=linux03:9092,linux04:9092,linux05:9092
group.id=g11
# 读取kafka的一个或多个topic,多个用”,“分隔
kafka.topics=flow18
flow.topic=flow-output
activity.topic=activity-output
mainstream.hdfs.out.path=hdfs://linux03:9000/main2021

kafka.transaction.timeout.ms=500000

redis.host=node-3.51doit.cn
redis.password=123456
redis.db=13


# 可以选择性传入的参数
# 在checkpoint时是否将偏移量写入到kafka特殊的topic中
commit.offsets.on.checkpoints=false
checkpoint.interval=30000
enable.auto.commit=false
auto.offset.reset=earliest
public class FlinkUtils {

    public static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    public static  DataStream createKafkaStream(ParameterTool parameterTool, Class extends DeserializationSchema> clazz) throws Exception {
        String checkpointPath = parameterTool.getRequired("checkpoint.path");
        long checkpointInterval = parameterTool.getLong("checkpoint.interval", 30000);

        //开启checkpointing
        env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new FsStateBackend(checkpointPath));

        //设置Kafka相关参数
        Properties properties = new Properties();//设置Kafka的地址和端口
        properties.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers"));
        //读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读
        properties.setProperty("auto.offset.reset", parameterTool.get("auto.offset.reset",  "earliest"));
        //设置消费者组ID
        properties.setProperty("group.id", parameterTool.getRequired("group.id"));
        //没有开启checkpoint,让flink提交偏移量的消费者定期自动提交偏移量
        properties.setProperty("enable.auto.commit", parameterTool.get("enable.auto.commit", "false"));
        //创建FlinkKafkaConsumer并传入相关参数
        String topics = parameterTool.getRequired("kafka.topics");
        List topicList = Arrays.asList(topics.split(","));

        FlinkKafkaConsumer kafkaCOnsumer= new FlinkKafkaConsumer(
                topicList, //要读取数据的Topic名称
                clazz.newInstance(), //读取文件的反序列化Schema
                properties //传入Kafka的参数
        );
        //使用addSource添加kafkaConsumer
        kafkaConsumer.setCommitOffsetsOnCheckpoints(parameterTool.getBoolean("commit.offsets.on.checkpoints", false)); //在checkpoint时,不将偏移量写入到kafka特殊的topic中



        return env.addSource(kafkaConsumer);
    }
}

//测试
public class PreEtl {

    public static void main(String[] args) throws Exception {

        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);

        DataStream lines = FlinkUtils.createKafkaStream(parameterTool, SimpleStringSchema.class);

        lines.print();
    }
}

3.数据预处理

一条json

2019-02-27 15:42:53,433 --> {"u":{"id":"","account":"pf4Ttk","email":"","phoneNbr":"","birthday":"","isRegistered":false,"isLogin":false,"addr":"","gender":"","phone":{"imei":"P1OJ3NBArOBSUWQB","osName":"iphone","osVer":"10.05","resolution":"1356*768","androidId":"","manufacture":"apple","deviceId":"ZHPRf54G"},"app":{"appid":"com.51doit.mall","appVer":"2.1.9","release_ch":"应用超市","promotion_ch":"网易"},"loc":{"areacode":621226108,"longtitude":105.26876187586532,"latitude":34.28729717981919,"carrier":"中国联通","netType":"WIFI","cid_sn":"mHzhuyXyFTiC","ip":"114.244.79.108"},"sessionId":"MBRwHCXLFIAc"},"logType":"pgview","commit_time":1551253373433,"event":{"pgid":"10-03-25847413","title":"包教包会8入门","skuid":"25847413","url":"/a/b/25847413.html"}}  

pojo

public class LogBean {

    private String id;
    private String account;
    private String sessionId;
    private String imei;
    private String osName;
    private String osVer;
    private String resolution; //屏幕分辨率
    private String androidId = "";
    private String manufacture; //手机厂商
    private String deviceId = "";
    private String areacode;
    private double longtitude;
    private double latitude;
    private String province;
    private String city;
    private String district;
    private String bizNames; //商圈名称
    private String carrier; //sim卡类型
    private String netType;
    private String cid_sn; //基站ID
    private String ip;
    private String appid;
    private String appVer;
    private String release_ch;
    private String promotion_ch;
    private String logType;
    private long commit_time;
    private String dt; //分区字段 2019-05-08
    private HashMap event;

    public LogBean(String id, String account, String sessionId, String imei, String osName, String osVer, String resolution, String androidId, String manufacture, String deviceId,
            String areacode, double longtitude, double latitude, String province, String city, String district, String bizNames, String carrier, String netType, String cid_sn,
            String ip, String appid, String appVer, String release_ch, String promotion_ch, String logType, long commit_time, String dt, HashMap event) {
        this.id = id;
        this.account = account;
        this.sessiOnId= sessionId;
        this.imei = imei;
        this.osName = osName;
        this.osVer = osVer;
        this.resolution = resolution;
        this.androidId = androidId;
        this.manufacture = manufacture;
        this.deviceId = deviceId;
        this.areacode = areacode;
        this.lOngtitude= longtitude;
        this.latitude = latitude;
        this.province = province;
        this.city = city;
        this.district = district;
        this.bizNames = bizNames;
        this.carrier = carrier;
        this.netType = netType;
        this.cid_sn = cid_sn;
        this.ip = ip;
        this.appid = appid;
        this.appVer = appVer;
        this.release_ch = release_ch;
        this.promotion_ch = promotion_ch;
        this.logType = logType;
        this.commit_time = commit_time;
        this.dt = dt;
        this.event = event;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getAccount() {
        return account;
    }

    public void setAccount(String account) {
        this.account = account;
    }

    public String getSessionId() {
        return sessionId;
    }

    public void setSessionId(String sessionId) {
        this.sessiOnId= sessionId;
    }

    public String getImei() {
        return imei;
    }

    public void setImei(String imei) {
        this.imei = imei;
    }

    public String getOsName() {
        return osName;
    }

    public void setOsName(String osName) {
        this.osName = osName;
    }

    public String getOsVer() {
        return osVer;
    }

    public void setOsVer(String osVer) {
        this.osVer = osVer;
    }

    public String getResolution() {
        return resolution;
    }

    public void setResolution(String resolution) {
        this.resolution = resolution;
    }

    public String getAndroidId() {
        return androidId;
    }

    public void setAndroidId(String androidId) {
        this.androidId = androidId;
    }

    public String getManufacture() {
        return manufacture;
    }

    public void setManufacture(String manufacture) {
        this.manufacture = manufacture;
    }

    public String getDeviceId() {
        return deviceId;
    }

    public void setDeviceId(String deviceId) {
        this.deviceId = deviceId;
    }

    public String getAreacode() {
        return areacode;
    }

    public void setAreacode(String areacode) {
        this.areacode = areacode;
    }

    public double getLongtitude() {
        return longtitude;
    }

    public void setLongtitude(double longtitude) {
        this.lOngtitude= longtitude;
    }

    public double getLatitude() {
        return latitude;
    }

    public void setLatitude(double latitude) {
        this.latitude = latitude;
    }

    public String getProvince() {
        return province;
    }

    public void setProvince(String province) {
        this.province = province;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public String getDistrict() {
        return district;
    }

    public void setDistrict(String district) {
        this.district = district;
    }

    public String getBizNames() {
        return bizNames;
    }

    public void setBizNames(String bizNames) {
        this.bizNames = bizNames;
    }

    public String getCarrier() {
        return carrier;
    }

    public void setCarrier(String carrier) {
        this.carrier = carrier;
    }

    public String getNetType() {
        return netType;
    }

    public void setNetType(String netType) {
        this.netType = netType;
    }

    public String getCid_sn() {
        return cid_sn;
    }

    public void setCid_sn(String cid_sn) {
        this.cid_sn = cid_sn;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public String getAppid() {
        return appid;
    }

    public void setAppid(String appid) {
        this.appid = appid;
    }

    public String getAppVer() {
        return appVer;
    }

    public void setAppVer(String appVer) {
        this.appVer = appVer;
    }

    public String getRelease_ch() {
        return release_ch;
    }

    public void setRelease_ch(String release_ch) {
        this.release_ch = release_ch;
    }

    public String getPromotion_ch() {
        return promotion_ch;
    }

    public void setPromotion_ch(String promotion_ch) {
        this.promotion_ch = promotion_ch;
    }

    public String getLogType() {
        return logType;
    }

    public void setLogType(String logType) {
        this.logType = logType;
    }

    public long getCommit_time() {
        return commit_time;
    }

    public void setCommit_time(long commit_time) {
        this.commit_time = commit_time;
    }

    public String getDt() {
        return dt;
    }

    public void setDt(String dt) {
        this.dt = dt;
    }

    public HashMap getEvent() {
        return event;
    }

    public void setEvent(HashMap event) {
        this.event = event;
    }

    @Override
    public String toString() {
        return "LogBean{" + "id='" + id + ''' + ", account='" + account + ''' + ", sessiOnId='" + sessionId + ''' + ", imei='" + imei + ''' + ", osName='" + osName + ''' + ", osVer='" + osVer + ''' + ", resolution='" + resolution + ''' + ", androidId='" + androidId + ''' + ", manufacture='" + manufacture + ''' + ", deviceId='" + deviceId + ''' + ", areacode='" + areacode + ''' + ", lOngtitude=" + longtitude + ", latitude=" + latitude + ", province='" + province + ''' + ", city='" + city + ''' + ", district='" + district + ''' + ", bizNames='" + bizNames + ''' + ", carrier='" + carrier + ''' + ", netType='" + netType + ''' + ", cid_sn='" + cid_sn + ''' + ", ip='" + ip + ''' + ", appid='" + appid + ''' + ", appVer='" + appVer + ''' + ", release_ch='" + release_ch + ''' + ", promotion_ch='" + promotion_ch + ''' + ", logType='" + logType + ''' + ", commit_time=" + commit_time + ", dt='" + dt + ''' + ", event=" + event + '}';
    }
}

GeoUtils,维度关联

public class GeoUtils {

    public static final String key = "9833dc1de6e628e1b8e244710783a31d";

    public static String[] getAreaInfoByLongitudeAndLatitude(HttpClient httpClient, Jedis jedis, double longitude, double latitude) {

        String province = "";
        String city = "";
        String district = "";
        String bizNames = "";
        //将经纬度使用GEOHash进行编码

        try {
            GeoHash geoHash = GeoHash.withCharacterPrecision(latitude, longitude, 8);
            String base32Code = geoHash.toBase32();
            //根据geoHash的编码,到Redis中进行查找
            String areaInfo = jedis.get(base32Code);
            //{wx4sqk42 -> 省,市,区|商圈1,商圈2}
            //如果有就关联地理信息和商圈信息
            if (areaInfo != null) {
                String[] fields = areaInfo.split("[|]");
                String area = fields[0];
                //判断是否有商圈信息
                if (fields.length > 1) {
                    bizNames = fields[1];
                }
                String[] pcd = area.split(",");
                province = pcd[0];
                city = pcd[1];
                district = pcd[2];
            } else {
                //如果没有查找到,就通过httpclient请求高德的API
                //通过外网调用高德的API
                //构造一个get对象
                GetMethod getMethod = new GetMethod("https://restapi.amap.com/v3/geocode/regeo?key="+ key +"&location=" + longitude + "," + latitude);
                //发送请求
                int status = httpClient.executeMethod(getMethod);

                if (status == 200) {
                    //获取请求的json字符串
                    String jsOnStr= getMethod.getResponseBodyAsString();
                    //转成json对象
                    JSONObject jsOnObj= JSON.parseObject(jsonStr);
                    //获取位置信息
                    JSONObject regeocode = jsonObj.getJSONObject("regeocode");

                    if (regeocode != null && !regeocode.isEmpty()) {

                        JSONObject address = regeocode.getJSONObject("addressComponent");
                        //获取省市区、商圈信息
                        province = address.getString("province");
                        city = address.getString("city");
                        district = address.getString("district");

                        ArrayList lb = new ArrayList();

                        //商圈数组(多个)
                        JSONArray businessAreas = address.getJSONArray("businessAreas");

                        for (int i = 0; i 

map方法

public class ToJSONMapFunction extends RichMapFunction {

    private transient Jedis jedis = null;

    private transient HttpClient httpClient = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        //获取一个HTTP连接、Redis的连接
        ParameterTool params = (ParameterTool) getRuntimeContext()
                .getExecutionConfig()
                .getGlobalJobParameters(); //获取全局的参数
        String host = params.getRequired("redis.host");
        String password = params.getRequired("redis.password");
        int db = params.getInt("redis.db", 0);
        jedis = new Jedis(host, 6379, 5000);
        jedis.auth(password);
        jedis.select(db);

        //访问高德地图API
        httpClient = new HttpClient();
    }

    @Override
    public LogBean map(String line) throws Exception {
        //判断jedis是否断开,断开重连
        if(!jedis.isConnected()) {
            jedis.connect();
        }

        LogBean logBean = null;
        try {
            String[] fields = line.split(" --> ");
            String dateTime = fields[0];
            String dt = dateTime.split(" ")[0];
            String json = fields[1];
            //使用FastJSON解析
            JSONObject jsOnObj= JSON.parseObject(json);

            JSONObject uObj = jsonObj.getJSONObject("u");
            JSONObject phOneObj= uObj.getJSONObject("phone");
            JSONObject locObj = uObj.getJSONObject("loc");
            JSONObject appObj = uObj.getJSONObject("app");

            // 取出user对象中的扁平字段
            String id = uObj.getString("id");
            String account = uObj.getString("account");
            String sessiOnId= uObj.getString("sessionId");

            // 取出手机设备信息
            String imei = phoneObj.getString("imei");
            String osName = phoneObj.getString("osName");
            String osVer = phoneObj.getString("osVer");
            String resolution = phoneObj.getString("resolution");
            String androidId = phoneObj.getString("androidId");
            String manufacture = phoneObj.getString("manufacture");
            String deviceId = phoneObj.getString("deviceId");

            // 取出loc位置信息
            String areacode = locObj.getString("areacode");
            double lOngtitude= locObj.getDouble("longtitude");
            double latitude = locObj.getDouble("latitude");

            //根据经纬度查找省份、市、商圈
            String[] areaInfo = GeoUtils.getAreaInfoByLongitudeAndLatitude(httpClient, jedis, longtitude, latitude);

            String province = areaInfo[0];
            String city = areaInfo[1];
            String district = areaInfo[2];
            String bizNames = areaInfo[3];

            String carrier = locObj.getString("carrier");
            String netType = locObj.getString("netType");
            String cid_sn = locObj.getString("cid_sn");
            String ip = locObj.getString("ip");

            // 取出app各个字段
            String appid = appObj.getString("appid");
            String appVer = appObj.getString("appVer");
            String release_ch = appObj.getString("release_ch");
            String promotion_ch = appObj.getString("promotion_ch");

            //事件类型
            String logType = jsonObj.getString("logType");
            //提交时间
            long commit_time = jsonObj.getLong("commit_time");

            JSONObject eventObj = jsonObj.getJSONObject("event");
            // 构造一个用于装event数据的hashmap
            HashMap eventMap = new HashMap();
            // 迭代取出event中每一对kv
            for (String k : eventObj.keySet()) {
                String v = eventObj.getString(k);
                // 添加到hashmap中
                eventMap.put(k, v);
            }
            // 组装数据并返回
            logBean = new LogBean(id,
                    account,
                    sessionId,
                    imei,
                    osName,
                    osVer,
                    resolution,
                    androidId,
                    manufacture,
                    deviceId,
                    areacode,
                    longtitude,
                    latitude,
                    province,
                    city,
                    district,
                    bizNames,
                    carrier,
                    netType,
                    cid_sn,
                    ip,
                    appid,
                    appVer,
                    release_ch,
                    promotion_ch,
                    logType,
                    commit_time,
                    dt,
                    eventMap
            );

        } catch (JSONException e) {
            e.printStackTrace();
            System.out.println("XXXXXXXX ====>" + line);
            //logger.error("parse json error -->" + line)
            //写入到HDFS的指定目录、Hbase
        }
        return logBean;
    }

    @Override
    public void close() throws Exception {
       //关闭Http连接和Redis连接
        jedis.close();
        httpClient = null;
    }
}

处理逻辑

/**
 * 使用侧流输出实现数据拆分
 *
 * 程序的功能:实时的ETL(拉取、过滤、关联维度、筛选字段、字段脱敏、数据拆分、转换格式或类型)
 *
 * 1.使用RichMapFunction关联维度数据(可以使用异步IO进行优化)
 * 2.将数据使用测流(旁路)输出进行查询
 * 3.将数据在写回到Kafka
 *   Kafka吞吐量高,并且可以保证ExactlyOnce【FlinkKafkaProducer实现了分两个阶段提交,继承了一个类TwoPhaseCommitSinkFunction】
 *   实现了两个接口:CheckpointedFunction和CheckpointListener,可以保证Checkpoint成功在提交事物,如果checkpoint失败将事物回滚
 *
 * 4.(可选的)将主流的是还可以写入到HDFS、可以使用BulkSink,以Parquet格式写入
 */
public class PreETLAndTopicSplit {

    public static void main(String[] args) throws Exception{

        System.setProperty("HADOOP_USER_NAME", "root");

        ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]);

        DataStream lines = FlinkUtils.createKafkaStream(parameters, SimpleStringSchema.class);

        //设置全局的参数
        FlinkUtils.env.getConfig().setGlobalJobParameters(parameters);

        SingleOutputStreamOperator beanDataStream = lines.map(new ToJSONMapFunction());

        SingleOutputStreamOperator filteredStream = beanDataStream.filter(new FilterFunction() {
            @Override
            public boolean filter(LogBean bean) throws Exception {
                return bean != null;
            }
        });

        //将数据进行拆分
        //原来使用split方式,再select,现在使用侧流输出
        //流量的Tag
        OutputTag flowOutputTag = new OutputTag("flow-output") {};

        //活动的Tag
        OutputTag activityOutputTag = new OutputTag("activity-output") {};


        SingleOutputStreamOperator mainStream = filteredStream.process(new ProcessFunction() {

            @Override
            public void processElement(LogBean bean, Context ctx, Collector out) throws Exception {

                //根据数据所携带的具有类型进行判断
                String logType = bean.getLogType();

                if (logType.startsWith("act")) {
                    //打上标签
                    ctx.output(activityOutputTag, bean);
                } else {
                    //流量的类型
                    //打上标签
                    ctx.output(flowOutputTag, bean);
                }
                //没有打标签的也输出
                out.collect(bean);
            }
        });

        DataStream flowStream = mainStream.getSideOutput(flowOutputTag);

        DataStream activityStream = mainStream.getSideOutput(activityOutputTag);

        String flowTopic = parameters.getRequired("flow.topic");

        Properties properties = parameters.getProperties();

        properties.setProperty("transaction.timeout.ms", parameters.getRequired("kafka.transaction.timeout.ms"));

        FlinkKafkaProducer kafkaProducer1 = new FlinkKafkaProducer(
                flowTopic, //指定topic
                new KafkaStringSerializationSchema(flowTopic), //指定写入Kafka的序列化Schema
                properties, //指定Kafka的相关参数
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定写入Kafka为EXACTLY_ONCE语义
        );

        //将数据存储转换成JSON字符串在写到Kafka
        flowStream.map(new MapFunction() {
            @Override
            public String map(LogBean bean) throws Exception {
                return JSON.toJSONString(bean);
            }
        }).addSink(kafkaProducer1);

        String activityTopic = parameters.getRequired("activity.topic");



        FlinkKafkaProducer kafkaProducer2 = new FlinkKafkaProducer(
                activityTopic, //指定topic
                new KafkaStringSerializationSchema(activityTopic), //指定写入Kafka的序列化Schema
                properties, //指定Kafka的相关参数
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定写入Kafka为EXACTLY_ONCE语义
        );

        activityStream.map(new MapFunction() {
            @Override
            public String map(LogBean bean) throws Exception {
                return JSON.toJSONString(bean);
            }
        }).addSink(kafkaProducer2);

        String path = parameters.getRequired("mainstream.hdfs.out.path");

//        StreamingFileSink streamingFileSink = StreamingFileSink
//                .forRowFormat(new Path(path), new SimpleStringEncoder("UTF-8"))
//                .withRollingPolicy(
//                        DefaultRollingPolicy.builder()
//                                .withRolloverInterval(TimeUnit.SECONDS.toMillis(30)) //
//                                .withInactivityInterval(TimeUnit.SECONDS.toMillis(10))
//                                .withMaxPartSize(1024 * 1024 * 1024)
//                                .build())
//                .build();

        //存储到HDFS中
//        mainStream.map(new MapFunction() {
//            @Override
//            public String map(LogBean bean) throws Exception {
//                return JSON.toJSONString(bean);
//            }
//        }).addSink(streamingFileSink);

        //指定文件目录生成的格式
        DateTimeBucketAssigner bucketAssigner = new DateTimeBucketAssigner(
                "yyyy-MM-dd--HH-mm",
                ZoneId.of("Asia/Shanghai"));

        //构建一个StreamingFileSink,数据使用Bulk批量写入方式,存储格式为Parquet列式存储
        StreamingFileSink streamingFileSink = StreamingFileSink.
                forBulkFormat(
                        new Path(path), //数据写入的目录
                        ParquetAvroWriters.forReflectRecord(LogBean.class) //以Parquet格式写入
                )
                .withBucketAssigner(bucketAssigner).build();

        mainStream.addSink(streamingFileSink);


        FlinkUtils.env.execute();

    }
}

4.布隆过滤器与TTL

2021-01-24 15:00:01,user1,act01,view
2021-01-24 15:00:02,user1,act01,view
2021-01-24 15:00:05,user1,act01,join
2021-01-24 15:00:02,user2,act01,view
2021-01-24 15:00:05,user2,act01,join
2021-01-24 15:00:02,user3,act02,view
2021-01-24 15:00:05,user3,act02,join
2021-01-24 16:00:02,user3,act01,view
2021-01-24 16:00:05,user3,act01,join

需求:按天、按小时、统计各个活动、不同事件的次数和人数

不要要再使用HashSet作为去重的集合了,而是使用布隆过滤器

2021-01-24,act01,view,4,3
2021-01-24,act01,join,3,3
2021-01-24,act02,view,1,1
2021-01-24,act02,join,1,1

按照小时:
2021-01-24 15,act01,view,3,2
2021-01-24 16,act01,view,1,1

State 设置 TTL
public class ActivityBean {

    public String time;

    public String uid;

    public String aid;

    public String eid;

    public Long disCount; //去重的次数
    public Long count; //未去重的次数

    public ActivityBean(String time, String uid, String aid, String eid) {
        this.time = time;
        this.uid = uid;
        this.aid = aid;
        this.eid = eid;
    }

    @Override
    public String toString() {
        return "ActivityBean{" +
                "time='" + time + ''' +
                ", uid='" + uid + ''' +
                ", aid='" + aid + ''' +
                ", eid='" + eid + ''' +
                ", disCount=" + disCount +
                ", count=" + count +
                '}';
    }

    public static ActivityBean of(String time, String uid, String aid, String eid) {
        return new ActivityBean(time, uid, aid, eid);
    }
}
public class AdvActivityCount {

    public static void main(String[] args) throws Exception {

        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);

        DataStream lines = FlinkUtils.createKafkaStream(parameterTool, SimpleStringSchema.class);

        SingleOutputStreamOperator beanDataStream = lines.process(new ProcessFunction() {
            @Override
            public void processElement(String value, Context ctx, Collector out) throws Exception {
                String[] fields = value.split(",");
                String time = fields[0];
                String uid = fields[1];
                String aid = fields[2];
                String eid = fields[3];
                ActivityBean bean = ActivityBean.of(time, uid, aid, eid);
                out.collect(bean);
            }
        });

        KeyedStream> keyedStream = beanDataStream.keyBy(new KeySelector>() {
            @Override
            public Tuple2 getKey(ActivityBean value) throws Exception {
                return Tuple2.of(value.uid, value.eid);
            }
        });

        SingleOutputStreamOperator result = keyedStream.process(new MyActivityCountFunction());

        result.print();

        FlinkUtils.env.execute();

    }


    private static class MyActivityCountFunction extends KeyedProcessFunction, ActivityBean, ActivityBean> {

        //按小时未去重的次数
        private transient MapState hourUserState;

        //按小时重的次数
        private transient MapState hourDisUserState;

        //判断按小时去重的Bloom过滤器
        private transient MapState> hourBloomFilterState;


        @Override
        public void open(Configuration parameters) throws Exception {
            MapStateDescriptor hourStateDescriptor = new MapStateDescriptor("hour-user-count-state", String.class, Long.class);
            //(2021-01-24 16 -> 2000)
            //(2021-01-24 17 -> 2000)
            hourUserState = getRuntimeContext().getMapState(hourStateDescriptor);

            MapStateDescriptor hourDisStateDescriptor = new MapStateDescriptor("hour-dis-user-count-state", String.class, Long.class);
            //(2021-01-24 16 -> 1000)
            //(2021-01-24 17 -> 1000)
            hourDisUserState = getRuntimeContext().getMapState(hourDisStateDescriptor);

            StateTtlConfig hourBloomStateTtlCOnfig= StateTtlConfig.newBuilder(Time.minutes(90))
                    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                    .build();

            MapStateDescriptor> hourBloomDescriptor = new MapStateDescriptor(
                    "hour-bloom-state",
                    TypeInformation.of(String.class),
                    TypeInformation.of(new TypeHint>() {
                    })
            );
            hourBloomDescriptor.enableTimeToLive(hourBloomStateTtlConfig);
            hourBloomFilterState = getRuntimeContext().getMapState(hourBloomDescriptor);

        }

        @Override
        public void processElement(ActivityBean value, Context ctx, Collector out) throws Exception {

            String uid = value.uid;
            //未去重的
            String dayAndHour = value.time.substring(0, 13);
            Long hourCount = hourUserState.get(dayAndHour);
            if (hourCount == null) {
                hourCount = 0L;
            }
            hourCount += 1;
            hourUserState.put(dayAndHour, hourCount);

            //去重的
            BloomFilter hourBloomFilter = hourBloomFilterState.get(dayAndHour);
            Long hourDisCount = hourDisUserState.get(dayAndHour);
            if (hourBloomFilter == null) {
                hourBloomFilter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 100000);
                hourDisCount = 0L;
            }
            //判断这样用户在这个小时内是否存在
            if (!hourBloomFilter.mightContain(uid)) {
                hourDisCount += 1L;
                hourBloomFilter.put(uid);
            }
            hourDisUserState.put(dayAndHour, hourDisCount);
            hourBloomFilterState.put(dayAndHour, hourBloomFilter);


            value.time = dayAndHour;
            value.count = hourCount;
            value.disCount = hourDisCount;
            out.collect(value);

        }

    }
}


5.topN案例

将数据进行增量聚合,窗口触发后还要进行排序(reduce,WindowFunction),在WindowFunction使用onTime进行排序

{"userId": "u001", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:11:11"}
{"userId": "u002", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:11:11"}
{"userId": "u003", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:11:11"}
{"userId": "u003", "itemId": "p1001", "categoryId": "c11", type: "cart", "timestamp": "2020-03-08 11:11:11"}
{"userId": "u011", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:11:11"}
{"userId": "u012", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:11:11"}
{"userId": "u012", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:12:01"}
{"userId": "u001", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:12:11"}
{"userId": "u011", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:12:11"}
{"userId": "u011", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:13:01"}


统计最近十分钟的热门商品,1分钟统计一次(按照EventTime划分滑动窗口,窗口长度为十分钟,1分滑动一次)

将数据进行增量聚合,窗口触发后还要进行排序(reduce,WindowFunction),在WindowFunction使用onTime进行排序





{itemId='p1001'type='pv', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=3}
{itemId='p1001'type='cart', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=1}
{itemId='p2222'type='pv', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=2}


{itemId='p1001'type='pv', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=3}
{itemId='p1001'type='cart', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=1}
{itemId='p2222'type='pv', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=2}

[{itemId='p1001'type='cart', windowStart=1583636580000 , 2020-03-08 11:03:00.0, windowEnd=1583637180000 , 2020-03-08 11:13:00.0, viewCount=1}]
[{itemId='p1001'type='pv', windowStart=1583636580000 , 2020-03-08 11:03:00.0, windowEnd=1583637180000 , 2020-03-08 11:13:00.0, viewCount=4}, {itemId='p2222'type='pv', windowStart=1583636580000 , 2020-03-08 11:03:00.0, windowEnd=1583637180000 , 2020-03-08 11:13:00.0, viewCount=4}]
public class ItemEventCount {
        public String itemId;     // 商品ID
        public String type;     // 事件类型
        public long windowStart;  // 窗口开始时间戳
        public long windowEnd;  // 窗口结束时间戳
        public long eventCount;  // 商品的点击量

        public static ItemEventCount of(String itemId, String type, long windowStart, long windowEnd, long eventCount) {
            ItemEventCount result = new ItemEventCount();
            result.itemId = itemId;
            result.type = type;
            result.windowStart = windowStart;
            result.windowEnd = windowEnd;
            result.eventCount = eventCount;
            return result;
        }

        @Override
        public String toString() {
            return "{" +
                    "itemId='" + itemId + ''' +
                    "type='" + type + ''' +
                    ", windowStart=" + windowStart + " , " + new Timestamp(windowStart) +
                    ", windowEnd=" + windowEnd + " , " + new Timestamp(windowEnd) +
                    ", eventCount=" + eventCount +
                    '}';
        }
    }
public class MyBehavior {

    public String userId;         // 用户ID
    public String itemId;         // 商品ID
    public String categoryId;      // 商品类目ID
    public String type;     // 用户行为, 包括("pv", "buy", "cart", "fav")
    public long timestamp;      // 行为发生的时间戳,单位秒
    public long counts = 1;

    public static MyBehavior of(String userId, String itemId, String categoryId, String type, long timestamp) {
        MyBehavior behavior = new MyBehavior();
        behavior.userId = userId;
        behavior.itemId = itemId;
        behavior.categoryId = categoryId;
        behavior.type = type;
        behavior.timestamp = timestamp;
        return behavior;
    }

    public static MyBehavior of(String userId, String itemId, String categoryId, String type, long timestamp, long counts) {
        MyBehavior behavior = new MyBehavior();
        behavior.userId = userId;
        behavior.itemId = itemId;
        behavior.categoryId = categoryId;
        behavior.type = type;
        behavior.timestamp = timestamp;
        behavior.counts = counts;
        return behavior;
    }

    @Override
    public String toString() {
        return "MyBehavior{" + "userId='" + userId + ''' + ", itemId='" + itemId + ''' + ", categoryId='" + categoryId + ''' + ", type='" + type + ''' + ", timestamp=" + timestamp + "," + new Timestamp(timestamp) + "counts=" + counts + '}';
    }

    public String getUserId() {
        return userId;
    }

    public String getItemId() {
        return itemId;
    }

    public String getCategoryId() {
        return categoryId;
    }

    public String getType() {
        return type;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public long getCounts() {
        return counts;
    }
}
public class HotGoodCount {

    public static void main(String[] args) throws Exception{

        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);

        DataStream lines = FlinkUtils.createKafkaStream(parameterTool, SimpleStringSchema.class);

        SingleOutputStreamOperator behaviorDataStream = lines.process(new ProcessFunction() {
            @Override
            public void processElement(String value, Context ctx, Collector out) throws Exception {
                try {
                    MyBehavior myBehavior = JSON.parseObject(value, MyBehavior.class);
                    out.collect(myBehavior);
                } catch (Exception e) {
                    //e.printStackTrace();
                }
            }
        });

        SingleOutputStreamOperator dataWithTimestamp = behaviorDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) {
            @Override
            public long extractTimestamp(MyBehavior element) {
                return element.timestamp;
            }
        });


        KeyedStream> keyedStream = dataWithTimestamp.keyBy(new KeySelector>() {
            @Override
            public Tuple2 getKey(MyBehavior value) throws Exception {
                return Tuple2.of(value.itemId, value.type);
            }
        });

        WindowedStream, TimeWindow> window = keyedStream.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)));

        SingleOutputStreamOperator aggregated = window.aggregate(new MyGoodAggFunction(), new MyGoodWindowFunction());

        KeyedStream> keyedStreamAndWindow = aggregated.keyBy(new KeySelector>() {
            @Override
            public Tuple3 getKey(ItemEventCount value) throws Exception {
                return Tuple3.of(value.type, value.windowStart, value.windowEnd);
            }
        });

        keyedStreamAndWindow.process(new KeyedProcessFunction, ItemEventCount, ItemEventCount>() {

            private transient ValueState> valueState;

            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor> stateDescriptor = new ValueStateDescriptor("lst-state", TypeInformation.of(new TypeHint>() {
                }));
                valueState = getRuntimeContext().getState(stateDescriptor);
            }

            @Override
            public void processElement(ItemEventCount value, Context ctx, Collector out) throws Exception {

                List lst = valueState.value();
                if(lst == null) {
                    lst = new ArrayList();
                }
                lst.add(value);
                valueState.update(lst);
                //注册一个定时器
                ctx.timerService().registerEventTimeTimer(value.windowEnd + 1);

            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {

                List lst = valueState.value();
                lst.sort((a, b) -> Long.compare(b.eventCount, a.eventCount));

                int loop = Math.min(lst.size(), 3);

                for (int i = 0; i  {

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(MyBehavior value, Long accumulator) {
            return accumulator += 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }

    private static class MyGoodWindowFunction implements WindowFunction, TimeWindow> {

        @Override
        public void apply(Tuple2 key, TimeWindow window, Iterable input, Collector out) throws Exception {

            long startTime = window.getStart();
            long endTime = window.getEnd();
            Long count = input.iterator().next();

            String itemId = key.f0;
            String type = key.f1;

            ItemEventCount itemEventCount = ItemEventCount.of(itemId, type, startTime, endTime, count);

            out.collect(itemEventCount);
        }
    }
}

推荐阅读
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 预备知识可参考我整理的博客Windows编程之线程:https:www.cnblogs.comZhuSenlinp16662075.htmlWindows编程之线程同步:https ... [详细]
  • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • 本文整理了Java中java.lang.NoSuchMethodError.getMessage()方法的一些代码示例,展示了NoSuchMethodErr ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • 闭包一直是Java社区中争论不断的话题,很多语言都支持闭包这个语言特性,闭包定义了一个依赖于外部环境的自由变量的函数,这个函数能够访问外部环境的变量。本文以JavaScript的一个闭包为例,介绍了闭包的定义和特性。 ... [详细]
author-avatar
EvilMaknaeKYU
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有