Properties
# 必须传入的参数
checkpoint.path=hdfs://linux03:9000/chk2021
bootstrap.servers=linux03:9092,linux04:9092,linux05:9092
group.id=g11
# 读取kafka的一个或多个topic,多个用”,“分隔
kafka.topics=flow18
flow.topic=flow-output
activity.topic=activity-output
mainstream.hdfs.out.path=hdfs://linux03:9000/main2021
kafka.transaction.timeout.ms=500000
redis.host=node-3.51doit.cn
redis.password=123456
redis.db=13
# 可以选择性传入的参数
# 在checkpoint时是否将偏移量写入到kafka特殊的topic中
commit.offsets.on.checkpoints=false
checkpoint.interval=30000
enable.auto.commit=false
auto.offset.reset=earliest
public class FlinkUtils {
public static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
public static DataStream createKafkaStream(ParameterTool parameterTool, Class extends DeserializationSchema> clazz) throws Exception {
String checkpointPath = parameterTool.getRequired("checkpoint.path");
long checkpointInterval = parameterTool.getLong("checkpoint.interval", 30000);
//开启checkpointing
env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsStateBackend(checkpointPath));
//设置Kafka相关参数
Properties properties = new Properties();//设置Kafka的地址和端口
properties.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers"));
//读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读
properties.setProperty("auto.offset.reset", parameterTool.get("auto.offset.reset", "earliest"));
//设置消费者组ID
properties.setProperty("group.id", parameterTool.getRequired("group.id"));
//没有开启checkpoint,让flink提交偏移量的消费者定期自动提交偏移量
properties.setProperty("enable.auto.commit", parameterTool.get("enable.auto.commit", "false"));
//创建FlinkKafkaConsumer并传入相关参数
String topics = parameterTool.getRequired("kafka.topics");
List topicList = Arrays.asList(topics.split(","));
FlinkKafkaConsumer kafkaCOnsumer= new FlinkKafkaConsumer(
topicList, //要读取数据的Topic名称
clazz.newInstance(), //读取文件的反序列化Schema
properties //传入Kafka的参数
);
//使用addSource添加kafkaConsumer
kafkaConsumer.setCommitOffsetsOnCheckpoints(parameterTool.getBoolean("commit.offsets.on.checkpoints", false)); //在checkpoint时,不将偏移量写入到kafka特殊的topic中
return env.addSource(kafkaConsumer);
}
}
//测试
public class PreEtl {
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);
DataStream lines = FlinkUtils.createKafkaStream(parameterTool, SimpleStringSchema.class);
lines.print();
}
}
一条json
2019-02-27 15:42:53,433 --> {"u":{"id":"","account":"pf4Ttk","email":"","phoneNbr":"","birthday":"","isRegistered":false,"isLogin":false,"addr":"","gender":"","phone":{"imei":"P1OJ3NBArOBSUWQB","osName":"iphone","osVer":"10.05","resolution":"1356*768","androidId":"","manufacture":"apple","deviceId":"ZHPRf54G"},"app":{"appid":"com.51doit.mall","appVer":"2.1.9","release_ch":"应用超市","promotion_ch":"网易"},"loc":{"areacode":621226108,"longtitude":105.26876187586532,"latitude":34.28729717981919,"carrier":"中国联通","netType":"WIFI","cid_sn":"mHzhuyXyFTiC","ip":"114.244.79.108"},"sessionId":"MBRwHCXLFIAc"},"logType":"pgview","commit_time":1551253373433,"event":{"pgid":"10-03-25847413","title":"包教包会8入门","skuid":"25847413","url":"/a/b/25847413.html"}}
pojo
public class LogBean {
private String id;
private String account;
private String sessionId;
private String imei;
private String osName;
private String osVer;
private String resolution; //屏幕分辨率
private String androidId = "";
private String manufacture; //手机厂商
private String deviceId = "";
private String areacode;
private double longtitude;
private double latitude;
private String province;
private String city;
private String district;
private String bizNames; //商圈名称
private String carrier; //sim卡类型
private String netType;
private String cid_sn; //基站ID
private String ip;
private String appid;
private String appVer;
private String release_ch;
private String promotion_ch;
private String logType;
private long commit_time;
private String dt; //分区字段 2019-05-08
private HashMap event;
public LogBean(String id, String account, String sessionId, String imei, String osName, String osVer, String resolution, String androidId, String manufacture, String deviceId,
String areacode, double longtitude, double latitude, String province, String city, String district, String bizNames, String carrier, String netType, String cid_sn,
String ip, String appid, String appVer, String release_ch, String promotion_ch, String logType, long commit_time, String dt, HashMap event) {
this.id = id;
this.account = account;
this.sessiOnId= sessionId;
this.imei = imei;
this.osName = osName;
this.osVer = osVer;
this.resolution = resolution;
this.androidId = androidId;
this.manufacture = manufacture;
this.deviceId = deviceId;
this.areacode = areacode;
this.lOngtitude= longtitude;
this.latitude = latitude;
this.province = province;
this.city = city;
this.district = district;
this.bizNames = bizNames;
this.carrier = carrier;
this.netType = netType;
this.cid_sn = cid_sn;
this.ip = ip;
this.appid = appid;
this.appVer = appVer;
this.release_ch = release_ch;
this.promotion_ch = promotion_ch;
this.logType = logType;
this.commit_time = commit_time;
this.dt = dt;
this.event = event;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getAccount() {
return account;
}
public void setAccount(String account) {
this.account = account;
}
public String getSessionId() {
return sessionId;
}
public void setSessionId(String sessionId) {
this.sessiOnId= sessionId;
}
public String getImei() {
return imei;
}
public void setImei(String imei) {
this.imei = imei;
}
public String getOsName() {
return osName;
}
public void setOsName(String osName) {
this.osName = osName;
}
public String getOsVer() {
return osVer;
}
public void setOsVer(String osVer) {
this.osVer = osVer;
}
public String getResolution() {
return resolution;
}
public void setResolution(String resolution) {
this.resolution = resolution;
}
public String getAndroidId() {
return androidId;
}
public void setAndroidId(String androidId) {
this.androidId = androidId;
}
public String getManufacture() {
return manufacture;
}
public void setManufacture(String manufacture) {
this.manufacture = manufacture;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getAreacode() {
return areacode;
}
public void setAreacode(String areacode) {
this.areacode = areacode;
}
public double getLongtitude() {
return longtitude;
}
public void setLongtitude(double longtitude) {
this.lOngtitude= longtitude;
}
public double getLatitude() {
return latitude;
}
public void setLatitude(double latitude) {
this.latitude = latitude;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getDistrict() {
return district;
}
public void setDistrict(String district) {
this.district = district;
}
public String getBizNames() {
return bizNames;
}
public void setBizNames(String bizNames) {
this.bizNames = bizNames;
}
public String getCarrier() {
return carrier;
}
public void setCarrier(String carrier) {
this.carrier = carrier;
}
public String getNetType() {
return netType;
}
public void setNetType(String netType) {
this.netType = netType;
}
public String getCid_sn() {
return cid_sn;
}
public void setCid_sn(String cid_sn) {
this.cid_sn = cid_sn;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getAppid() {
return appid;
}
public void setAppid(String appid) {
this.appid = appid;
}
public String getAppVer() {
return appVer;
}
public void setAppVer(String appVer) {
this.appVer = appVer;
}
public String getRelease_ch() {
return release_ch;
}
public void setRelease_ch(String release_ch) {
this.release_ch = release_ch;
}
public String getPromotion_ch() {
return promotion_ch;
}
public void setPromotion_ch(String promotion_ch) {
this.promotion_ch = promotion_ch;
}
public String getLogType() {
return logType;
}
public void setLogType(String logType) {
this.logType = logType;
}
public long getCommit_time() {
return commit_time;
}
public void setCommit_time(long commit_time) {
this.commit_time = commit_time;
}
public String getDt() {
return dt;
}
public void setDt(String dt) {
this.dt = dt;
}
public HashMap getEvent() {
return event;
}
public void setEvent(HashMap event) {
this.event = event;
}
@Override
public String toString() {
return "LogBean{" + "id='" + id + ''' + ", account='" + account + ''' + ", sessiOnId='" + sessionId + ''' + ", imei='" + imei + ''' + ", osName='" + osName + ''' + ", osVer='" + osVer + ''' + ", resolution='" + resolution + ''' + ", androidId='" + androidId + ''' + ", manufacture='" + manufacture + ''' + ", deviceId='" + deviceId + ''' + ", areacode='" + areacode + ''' + ", lOngtitude=" + longtitude + ", latitude=" + latitude + ", province='" + province + ''' + ", city='" + city + ''' + ", district='" + district + ''' + ", bizNames='" + bizNames + ''' + ", carrier='" + carrier + ''' + ", netType='" + netType + ''' + ", cid_sn='" + cid_sn + ''' + ", ip='" + ip + ''' + ", appid='" + appid + ''' + ", appVer='" + appVer + ''' + ", release_ch='" + release_ch + ''' + ", promotion_ch='" + promotion_ch + ''' + ", logType='" + logType + ''' + ", commit_time=" + commit_time + ", dt='" + dt + ''' + ", event=" + event + '}';
}
}
GeoUtils,维度关联
public class GeoUtils {
public static final String key = "9833dc1de6e628e1b8e244710783a31d";
public static String[] getAreaInfoByLongitudeAndLatitude(HttpClient httpClient, Jedis jedis, double longitude, double latitude) {
String province = "";
String city = "";
String district = "";
String bizNames = "";
//将经纬度使用GEOHash进行编码
try {
GeoHash geoHash = GeoHash.withCharacterPrecision(latitude, longitude, 8);
String base32Code = geoHash.toBase32();
//根据geoHash的编码,到Redis中进行查找
String areaInfo = jedis.get(base32Code);
//{wx4sqk42 -> 省,市,区|商圈1,商圈2}
//如果有就关联地理信息和商圈信息
if (areaInfo != null) {
String[] fields = areaInfo.split("[|]");
String area = fields[0];
//判断是否有商圈信息
if (fields.length > 1) {
bizNames = fields[1];
}
String[] pcd = area.split(",");
province = pcd[0];
city = pcd[1];
district = pcd[2];
} else {
//如果没有查找到,就通过httpclient请求高德的API
//通过外网调用高德的API
//构造一个get对象
GetMethod getMethod = new GetMethod("https://restapi.amap.com/v3/geocode/regeo?key="+ key +"&location=" + longitude + "," + latitude);
//发送请求
int status = httpClient.executeMethod(getMethod);
if (status == 200) {
//获取请求的json字符串
String jsOnStr= getMethod.getResponseBodyAsString();
//转成json对象
JSONObject jsOnObj= JSON.parseObject(jsonStr);
//获取位置信息
JSONObject regeocode = jsonObj.getJSONObject("regeocode");
if (regeocode != null && !regeocode.isEmpty()) {
JSONObject address = regeocode.getJSONObject("addressComponent");
//获取省市区、商圈信息
province = address.getString("province");
city = address.getString("city");
district = address.getString("district");
ArrayList lb = new ArrayList();
//商圈数组(多个)
JSONArray businessAreas = address.getJSONArray("businessAreas");
for (int i = 0; i
map方法
public class ToJSONMapFunction extends RichMapFunction {
private transient Jedis jedis = null;
private transient HttpClient httpClient = null;
@Override
public void open(Configuration parameters) throws Exception {
//获取一个HTTP连接、Redis的连接
ParameterTool params = (ParameterTool) getRuntimeContext()
.getExecutionConfig()
.getGlobalJobParameters(); //获取全局的参数
String host = params.getRequired("redis.host");
String password = params.getRequired("redis.password");
int db = params.getInt("redis.db", 0);
jedis = new Jedis(host, 6379, 5000);
jedis.auth(password);
jedis.select(db);
//访问高德地图API
httpClient = new HttpClient();
}
@Override
public LogBean map(String line) throws Exception {
//判断jedis是否断开,断开重连
if(!jedis.isConnected()) {
jedis.connect();
}
LogBean logBean = null;
try {
String[] fields = line.split(" --> ");
String dateTime = fields[0];
String dt = dateTime.split(" ")[0];
String json = fields[1];
//使用FastJSON解析
JSONObject jsOnObj= JSON.parseObject(json);
JSONObject uObj = jsonObj.getJSONObject("u");
JSONObject phOneObj= uObj.getJSONObject("phone");
JSONObject locObj = uObj.getJSONObject("loc");
JSONObject appObj = uObj.getJSONObject("app");
// 取出user对象中的扁平字段
String id = uObj.getString("id");
String account = uObj.getString("account");
String sessiOnId= uObj.getString("sessionId");
// 取出手机设备信息
String imei = phoneObj.getString("imei");
String osName = phoneObj.getString("osName");
String osVer = phoneObj.getString("osVer");
String resolution = phoneObj.getString("resolution");
String androidId = phoneObj.getString("androidId");
String manufacture = phoneObj.getString("manufacture");
String deviceId = phoneObj.getString("deviceId");
// 取出loc位置信息
String areacode = locObj.getString("areacode");
double lOngtitude= locObj.getDouble("longtitude");
double latitude = locObj.getDouble("latitude");
//根据经纬度查找省份、市、商圈
String[] areaInfo = GeoUtils.getAreaInfoByLongitudeAndLatitude(httpClient, jedis, longtitude, latitude);
String province = areaInfo[0];
String city = areaInfo[1];
String district = areaInfo[2];
String bizNames = areaInfo[3];
String carrier = locObj.getString("carrier");
String netType = locObj.getString("netType");
String cid_sn = locObj.getString("cid_sn");
String ip = locObj.getString("ip");
// 取出app各个字段
String appid = appObj.getString("appid");
String appVer = appObj.getString("appVer");
String release_ch = appObj.getString("release_ch");
String promotion_ch = appObj.getString("promotion_ch");
//事件类型
String logType = jsonObj.getString("logType");
//提交时间
long commit_time = jsonObj.getLong("commit_time");
JSONObject eventObj = jsonObj.getJSONObject("event");
// 构造一个用于装event数据的hashmap
HashMap eventMap = new HashMap();
// 迭代取出event中每一对kv
for (String k : eventObj.keySet()) {
String v = eventObj.getString(k);
// 添加到hashmap中
eventMap.put(k, v);
}
// 组装数据并返回
logBean = new LogBean(id,
account,
sessionId,
imei,
osName,
osVer,
resolution,
androidId,
manufacture,
deviceId,
areacode,
longtitude,
latitude,
province,
city,
district,
bizNames,
carrier,
netType,
cid_sn,
ip,
appid,
appVer,
release_ch,
promotion_ch,
logType,
commit_time,
dt,
eventMap
);
} catch (JSONException e) {
e.printStackTrace();
System.out.println("XXXXXXXX ====>" + line);
//logger.error("parse json error -->" + line)
//写入到HDFS的指定目录、Hbase
}
return logBean;
}
@Override
public void close() throws Exception {
//关闭Http连接和Redis连接
jedis.close();
httpClient = null;
}
}
处理逻辑
/**
* 使用侧流输出实现数据拆分
*
* 程序的功能:实时的ETL(拉取、过滤、关联维度、筛选字段、字段脱敏、数据拆分、转换格式或类型)
*
* 1.使用RichMapFunction关联维度数据(可以使用异步IO进行优化)
* 2.将数据使用测流(旁路)输出进行查询
* 3.将数据在写回到Kafka
* Kafka吞吐量高,并且可以保证ExactlyOnce【FlinkKafkaProducer实现了分两个阶段提交,继承了一个类TwoPhaseCommitSinkFunction】
* 实现了两个接口:CheckpointedFunction和CheckpointListener,可以保证Checkpoint成功在提交事物,如果checkpoint失败将事物回滚
*
* 4.(可选的)将主流的是还可以写入到HDFS、可以使用BulkSink,以Parquet格式写入
*/
public class PreETLAndTopicSplit {
public static void main(String[] args) throws Exception{
System.setProperty("HADOOP_USER_NAME", "root");
ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]);
DataStream lines = FlinkUtils.createKafkaStream(parameters, SimpleStringSchema.class);
//设置全局的参数
FlinkUtils.env.getConfig().setGlobalJobParameters(parameters);
SingleOutputStreamOperator beanDataStream = lines.map(new ToJSONMapFunction());
SingleOutputStreamOperator filteredStream = beanDataStream.filter(new FilterFunction() {
@Override
public boolean filter(LogBean bean) throws Exception {
return bean != null;
}
});
//将数据进行拆分
//原来使用split方式,再select,现在使用侧流输出
//流量的Tag
OutputTag flowOutputTag = new OutputTag("flow-output") {};
//活动的Tag
OutputTag activityOutputTag = new OutputTag("activity-output") {};
SingleOutputStreamOperator mainStream = filteredStream.process(new ProcessFunction() {
@Override
public void processElement(LogBean bean, Context ctx, Collector out) throws Exception {
//根据数据所携带的具有类型进行判断
String logType = bean.getLogType();
if (logType.startsWith("act")) {
//打上标签
ctx.output(activityOutputTag, bean);
} else {
//流量的类型
//打上标签
ctx.output(flowOutputTag, bean);
}
//没有打标签的也输出
out.collect(bean);
}
});
DataStream flowStream = mainStream.getSideOutput(flowOutputTag);
DataStream activityStream = mainStream.getSideOutput(activityOutputTag);
String flowTopic = parameters.getRequired("flow.topic");
Properties properties = parameters.getProperties();
properties.setProperty("transaction.timeout.ms", parameters.getRequired("kafka.transaction.timeout.ms"));
FlinkKafkaProducer kafkaProducer1 = new FlinkKafkaProducer(
flowTopic, //指定topic
new KafkaStringSerializationSchema(flowTopic), //指定写入Kafka的序列化Schema
properties, //指定Kafka的相关参数
FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定写入Kafka为EXACTLY_ONCE语义
);
//将数据存储转换成JSON字符串在写到Kafka
flowStream.map(new MapFunction() {
@Override
public String map(LogBean bean) throws Exception {
return JSON.toJSONString(bean);
}
}).addSink(kafkaProducer1);
String activityTopic = parameters.getRequired("activity.topic");
FlinkKafkaProducer kafkaProducer2 = new FlinkKafkaProducer(
activityTopic, //指定topic
new KafkaStringSerializationSchema(activityTopic), //指定写入Kafka的序列化Schema
properties, //指定Kafka的相关参数
FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定写入Kafka为EXACTLY_ONCE语义
);
activityStream.map(new MapFunction() {
@Override
public String map(LogBean bean) throws Exception {
return JSON.toJSONString(bean);
}
}).addSink(kafkaProducer2);
String path = parameters.getRequired("mainstream.hdfs.out.path");
// StreamingFileSink streamingFileSink = StreamingFileSink
// .forRowFormat(new Path(path), new SimpleStringEncoder("UTF-8"))
// .withRollingPolicy(
// DefaultRollingPolicy.builder()
// .withRolloverInterval(TimeUnit.SECONDS.toMillis(30)) //
// .withInactivityInterval(TimeUnit.SECONDS.toMillis(10))
// .withMaxPartSize(1024 * 1024 * 1024)
// .build())
// .build();
//存储到HDFS中
// mainStream.map(new MapFunction() {
// @Override
// public String map(LogBean bean) throws Exception {
// return JSON.toJSONString(bean);
// }
// }).addSink(streamingFileSink);
//指定文件目录生成的格式
DateTimeBucketAssigner bucketAssigner = new DateTimeBucketAssigner(
"yyyy-MM-dd--HH-mm",
ZoneId.of("Asia/Shanghai"));
//构建一个StreamingFileSink,数据使用Bulk批量写入方式,存储格式为Parquet列式存储
StreamingFileSink streamingFileSink = StreamingFileSink.
forBulkFormat(
new Path(path), //数据写入的目录
ParquetAvroWriters.forReflectRecord(LogBean.class) //以Parquet格式写入
)
.withBucketAssigner(bucketAssigner).build();
mainStream.addSink(streamingFileSink);
FlinkUtils.env.execute();
}
}
2021-01-24 15:00:01,user1,act01,view
2021-01-24 15:00:02,user1,act01,view
2021-01-24 15:00:05,user1,act01,join
2021-01-24 15:00:02,user2,act01,view
2021-01-24 15:00:05,user2,act01,join
2021-01-24 15:00:02,user3,act02,view
2021-01-24 15:00:05,user3,act02,join
2021-01-24 16:00:02,user3,act01,view
2021-01-24 16:00:05,user3,act01,join
需求:按天、按小时、统计各个活动、不同事件的次数和人数
不要要再使用HashSet作为去重的集合了,而是使用布隆过滤器
2021-01-24,act01,view,4,3
2021-01-24,act01,join,3,3
2021-01-24,act02,view,1,1
2021-01-24,act02,join,1,1
按照小时:
2021-01-24 15,act01,view,3,2
2021-01-24 16,act01,view,1,1
State 设置 TTL
public class ActivityBean {
public String time;
public String uid;
public String aid;
public String eid;
public Long disCount; //去重的次数
public Long count; //未去重的次数
public ActivityBean(String time, String uid, String aid, String eid) {
this.time = time;
this.uid = uid;
this.aid = aid;
this.eid = eid;
}
@Override
public String toString() {
return "ActivityBean{" +
"time='" + time + ''' +
", uid='" + uid + ''' +
", aid='" + aid + ''' +
", eid='" + eid + ''' +
", disCount=" + disCount +
", count=" + count +
'}';
}
public static ActivityBean of(String time, String uid, String aid, String eid) {
return new ActivityBean(time, uid, aid, eid);
}
}
public class AdvActivityCount {
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);
DataStream lines = FlinkUtils.createKafkaStream(parameterTool, SimpleStringSchema.class);
SingleOutputStreamOperator beanDataStream = lines.process(new ProcessFunction() {
@Override
public void processElement(String value, Context ctx, Collector out) throws Exception {
String[] fields = value.split(",");
String time = fields[0];
String uid = fields[1];
String aid = fields[2];
String eid = fields[3];
ActivityBean bean = ActivityBean.of(time, uid, aid, eid);
out.collect(bean);
}
});
KeyedStream> keyedStream = beanDataStream.keyBy(new KeySelector>() {
@Override
public Tuple2 getKey(ActivityBean value) throws Exception {
return Tuple2.of(value.uid, value.eid);
}
});
SingleOutputStreamOperator result = keyedStream.process(new MyActivityCountFunction());
result.print();
FlinkUtils.env.execute();
}
private static class MyActivityCountFunction extends KeyedProcessFunction, ActivityBean, ActivityBean> {
//按小时未去重的次数
private transient MapState hourUserState;
//按小时重的次数
private transient MapState hourDisUserState;
//判断按小时去重的Bloom过滤器
private transient MapState> hourBloomFilterState;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor hourStateDescriptor = new MapStateDescriptor("hour-user-count-state", String.class, Long.class);
//(2021-01-24 16 -> 2000)
//(2021-01-24 17 -> 2000)
hourUserState = getRuntimeContext().getMapState(hourStateDescriptor);
MapStateDescriptor hourDisStateDescriptor = new MapStateDescriptor("hour-dis-user-count-state", String.class, Long.class);
//(2021-01-24 16 -> 1000)
//(2021-01-24 17 -> 1000)
hourDisUserState = getRuntimeContext().getMapState(hourDisStateDescriptor);
StateTtlConfig hourBloomStateTtlCOnfig= StateTtlConfig.newBuilder(Time.minutes(90))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
MapStateDescriptor> hourBloomDescriptor = new MapStateDescriptor(
"hour-bloom-state",
TypeInformation.of(String.class),
TypeInformation.of(new TypeHint>() {
})
);
hourBloomDescriptor.enableTimeToLive(hourBloomStateTtlConfig);
hourBloomFilterState = getRuntimeContext().getMapState(hourBloomDescriptor);
}
@Override
public void processElement(ActivityBean value, Context ctx, Collector out) throws Exception {
String uid = value.uid;
//未去重的
String dayAndHour = value.time.substring(0, 13);
Long hourCount = hourUserState.get(dayAndHour);
if (hourCount == null) {
hourCount = 0L;
}
hourCount += 1;
hourUserState.put(dayAndHour, hourCount);
//去重的
BloomFilter hourBloomFilter = hourBloomFilterState.get(dayAndHour);
Long hourDisCount = hourDisUserState.get(dayAndHour);
if (hourBloomFilter == null) {
hourBloomFilter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 100000);
hourDisCount = 0L;
}
//判断这样用户在这个小时内是否存在
if (!hourBloomFilter.mightContain(uid)) {
hourDisCount += 1L;
hourBloomFilter.put(uid);
}
hourDisUserState.put(dayAndHour, hourDisCount);
hourBloomFilterState.put(dayAndHour, hourBloomFilter);
value.time = dayAndHour;
value.count = hourCount;
value.disCount = hourDisCount;
out.collect(value);
}
}
}
将数据进行增量聚合,窗口触发后还要进行排序(reduce,WindowFunction),在WindowFunction使用onTime进行排序
{"userId": "u001", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:11:11"}
{"userId": "u002", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:11:11"}
{"userId": "u003", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:11:11"}
{"userId": "u003", "itemId": "p1001", "categoryId": "c11", type: "cart", "timestamp": "2020-03-08 11:11:11"}
{"userId": "u011", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:11:11"}
{"userId": "u012", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:11:11"}
{"userId": "u012", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:12:01"}
{"userId": "u001", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:12:11"}
{"userId": "u011", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:12:11"}
{"userId": "u011", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:13:01"}
统计最近十分钟的热门商品,1分钟统计一次(按照EventTime划分滑动窗口,窗口长度为十分钟,1分滑动一次)
将数据进行增量聚合,窗口触发后还要进行排序(reduce,WindowFunction),在WindowFunction使用onTime进行排序
{itemId='p1001'type='pv', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=3}
{itemId='p1001'type='cart', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=1}
{itemId='p2222'type='pv', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=2}
{itemId='p1001'type='pv', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=3}
{itemId='p1001'type='cart', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=1}
{itemId='p2222'type='pv', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=2}
[{itemId='p1001'type='cart', windowStart=1583636580000 , 2020-03-08 11:03:00.0, windowEnd=1583637180000 , 2020-03-08 11:13:00.0, viewCount=1}]
[{itemId='p1001'type='pv', windowStart=1583636580000 , 2020-03-08 11:03:00.0, windowEnd=1583637180000 , 2020-03-08 11:13:00.0, viewCount=4}, {itemId='p2222'type='pv', windowStart=1583636580000 , 2020-03-08 11:03:00.0, windowEnd=1583637180000 , 2020-03-08 11:13:00.0, viewCount=4}]
public class ItemEventCount {
public String itemId; // 商品ID
public String type; // 事件类型
public long windowStart; // 窗口开始时间戳
public long windowEnd; // 窗口结束时间戳
public long eventCount; // 商品的点击量
public static ItemEventCount of(String itemId, String type, long windowStart, long windowEnd, long eventCount) {
ItemEventCount result = new ItemEventCount();
result.itemId = itemId;
result.type = type;
result.windowStart = windowStart;
result.windowEnd = windowEnd;
result.eventCount = eventCount;
return result;
}
@Override
public String toString() {
return "{" +
"itemId='" + itemId + ''' +
"type='" + type + ''' +
", windowStart=" + windowStart + " , " + new Timestamp(windowStart) +
", windowEnd=" + windowEnd + " , " + new Timestamp(windowEnd) +
", eventCount=" + eventCount +
'}';
}
}
public class MyBehavior {
public String userId; // 用户ID
public String itemId; // 商品ID
public String categoryId; // 商品类目ID
public String type; // 用户行为, 包括("pv", "buy", "cart", "fav")
public long timestamp; // 行为发生的时间戳,单位秒
public long counts = 1;
public static MyBehavior of(String userId, String itemId, String categoryId, String type, long timestamp) {
MyBehavior behavior = new MyBehavior();
behavior.userId = userId;
behavior.itemId = itemId;
behavior.categoryId = categoryId;
behavior.type = type;
behavior.timestamp = timestamp;
return behavior;
}
public static MyBehavior of(String userId, String itemId, String categoryId, String type, long timestamp, long counts) {
MyBehavior behavior = new MyBehavior();
behavior.userId = userId;
behavior.itemId = itemId;
behavior.categoryId = categoryId;
behavior.type = type;
behavior.timestamp = timestamp;
behavior.counts = counts;
return behavior;
}
@Override
public String toString() {
return "MyBehavior{" + "userId='" + userId + ''' + ", itemId='" + itemId + ''' + ", categoryId='" + categoryId + ''' + ", type='" + type + ''' + ", timestamp=" + timestamp + "," + new Timestamp(timestamp) + "counts=" + counts + '}';
}
public String getUserId() {
return userId;
}
public String getItemId() {
return itemId;
}
public String getCategoryId() {
return categoryId;
}
public String getType() {
return type;
}
public long getTimestamp() {
return timestamp;
}
public long getCounts() {
return counts;
}
}
public class HotGoodCount {
public static void main(String[] args) throws Exception{
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);
DataStream lines = FlinkUtils.createKafkaStream(parameterTool, SimpleStringSchema.class);
SingleOutputStreamOperator behaviorDataStream = lines.process(new ProcessFunction() {
@Override
public void processElement(String value, Context ctx, Collector out) throws Exception {
try {
MyBehavior myBehavior = JSON.parseObject(value, MyBehavior.class);
out.collect(myBehavior);
} catch (Exception e) {
//e.printStackTrace();
}
}
});
SingleOutputStreamOperator dataWithTimestamp = behaviorDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) {
@Override
public long extractTimestamp(MyBehavior element) {
return element.timestamp;
}
});
KeyedStream> keyedStream = dataWithTimestamp.keyBy(new KeySelector>() {
@Override
public Tuple2 getKey(MyBehavior value) throws Exception {
return Tuple2.of(value.itemId, value.type);
}
});
WindowedStream, TimeWindow> window = keyedStream.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)));
SingleOutputStreamOperator aggregated = window.aggregate(new MyGoodAggFunction(), new MyGoodWindowFunction());
KeyedStream> keyedStreamAndWindow = aggregated.keyBy(new KeySelector>() {
@Override
public Tuple3 getKey(ItemEventCount value) throws Exception {
return Tuple3.of(value.type, value.windowStart, value.windowEnd);
}
});
keyedStreamAndWindow.process(new KeyedProcessFunction, ItemEventCount, ItemEventCount>() {
private transient ValueState> valueState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor> stateDescriptor = new ValueStateDescriptor("lst-state", TypeInformation.of(new TypeHint>() {
}));
valueState = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(ItemEventCount value, Context ctx, Collector out) throws Exception {
List lst = valueState.value();
if(lst == null) {
lst = new ArrayList();
}
lst.add(value);
valueState.update(lst);
//注册一个定时器
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
List lst = valueState.value();
lst.sort((a, b) -> Long.compare(b.eventCount, a.eventCount));
int loop = Math.min(lst.size(), 3);
for (int i = 0; i {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(MyBehavior value, Long accumulator) {
return accumulator += 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return null;
}
}
private static class MyGoodWindowFunction implements WindowFunction, TimeWindow> {
@Override
public void apply(Tuple2 key, TimeWindow window, Iterable input, Collector out) throws Exception {
long startTime = window.getStart();
long endTime = window.getEnd();
Long count = input.iterator().next();
String itemId = key.f0;
String type = key.f1;
ItemEventCount itemEventCount = ItemEventCount.of(itemId, type, startTime, endTime, count);
out.collect(itemEventCount);
}
}
}