热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

java利用友盟计算pvuv_Flink计算PV,UV的案例及问题分析

PV(访问量):即PageView,即页面浏览量或点击量,用户每次刷新即被计算一次。UV(独立访客):即UniqueVisitor,访问您

PV(访问量):即Page View, 即页面浏览量或点击量,用户每次刷新即被计算一次。

UV(独立访客):即Unique Visitor,访问您网站的一台电脑客户端为一个访客。00:00-24:00内相同的客户端只被计算一次。

一个UV可以用很多PV,一个PV也只能对应一个IP

没有这些数据的支持,意味着你不知道产品的发展情况,用户获取成本,UV,PV,注册转化率;没有这些数据做参考,你不会知道接下来提供什么建议给领导采纳,也推测不出领导为啥烦忧,那么就么有任何表现的机会。

举两个UV计算的场景:

1. 实时计算当天零点起,到当前时间的uv。

2. 实时计算当天每个小时的UV。0点...12点...24点

请问这个用spark streaming如何实现呢?是不是很难有好的思路呢?

今天主要是想给大家用flink来实现一下,在这方面flink确实比较优秀了。

主要技术点就在group by的使用。

下面就是完整的案例:

package org.table.uv;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.TimeCharacteristic;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.TableEnvironment;

import org.apache.flink.table.api.java.StreamTableEnvironment;

import org.apache.flink.table.descriptors.Json;

import org.apache.flink.table.descriptors.Kafka;

import org.apache.flink.table.descriptors.Rowtime;

import org.apache.flink.table.descriptors.Schema;

import org.apache.flink.types.Row;

public class ComputeUVDay {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

tEnv.registerFunction("DateUtil",new DateUtil());

tEnv.connect(

new Kafka()

.version("0.10")

// "0.8", "0.9", "0.10", "0.11", and "universal"

.topic("jsontest")

.property("bootstrap.servers", "localhost:9092")

.property("group.id","test")

.startFromLatest()

)

.withFormat(

new Json()

.failOnMissingField(false)

.deriveSchema()

)

.withSchema(

new Schema()

.field("rowtime", Types.SQL_TIMESTAMP)

.rowtime(new Rowtime()

.timestampsFromField("eventtime")

.watermarksPeriodicBounded(2000)

)

.field("fruit", Types.STRING)

.field("number", Types.INT)

)

.inAppendMode()

.registerTableSource("source");

// 計算天級別的uv

// Table table = tEnv.sqlQuery("select DateUtil(rowtime),count(distinct fruit) from source group by DateUtil(rowtime)");

// 计算小时级别uv

Table table = tEnv.sqlQuery("select DateUtil(rowtime,'yyyyMMddHH'),count(distinct fruit) from source group by DateUtil(rowtime,'yyyyMMddHH')");

tEnv.toRetractStream(table, Row.class).addSink(new SinkFunction>() {

@Override

public void invoke(Tuple2 value, Context context) throws Exception {

System.out.println(value.f1.toString());

}

});

System.out.println(env.getExecutionPlan());

env.execute("ComputeUVDay");

}

}

其中DateUtil类如下:

package org.table.uv;

import org.apache.flink.table.functions.ScalarFunction;

import java.sql.Timestamp;

import java.text.DateFormat;

import java.text.SimpleDateFormat;

public class DateUtil extends ScalarFunction {

public static String eval(long timestamp){

String result = "null";

try {

DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");

result = sdf.format(new Timestamp(timestamp));

} catch (Exception e) {

e.printStackTrace();

}

return result;

}

public static String eval(long ts, String format) {

String result = "null";

try {

DateFormat sdf = new SimpleDateFormat(format);

result = sdf.format(ts);

} catch (Exception e) {

e.printStackTrace();

}

return result;

}

public static void main(String[] args) {

String eval = eval(System.currentTimeMillis(),"yyyyMMddHH");

System.out.println(eval);

}

}

代码里面的案例,是可以用于生产中的吗?

假如数据量小可以直接使用,每秒数据量大的话,就比较麻烦。因为你看group by后面的维度,只有当天date 这个维度,这样就会导致计算状态超级集中而使得内存占用超大进而引发oom。

这种情况解决办法就是将状态打散,然后再次聚合即可,典型的分治思想。

具体做法作为福利分享给球友吧。

还有一个问题就是由于存在全局去重及分组操作,flink内部必然要维护一定的状态信息,那么这些状态信息肯定不是要一直保存的,比如uv,我们只需要更新今天,最多昨天的状态,这个点之前的状态要删除的,不能让他白白占着内存,而导致任务内存消耗巨大,甚至因oom而挂掉。

StreamQueryConfig streamQueryConfig = tEnv.queryConfig();

streamQueryConfig.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(15));

tEnv.sqlUpdate(sql,streamQueryConfig);

再有就是能使用事件时间吗?事件时间假如事件严重超时了,比如,我们状态保留时间设置的是两天,两天之后状态清除,那么这时候来了事件时间刚刚好是两天之前的,由于已经没有状态就会重新计算uv覆盖已经生成的值,就导致值错误了,这个问题如何解决呢?

这算是一个疑问吧?



推荐阅读
  • 本文介绍了闭包的定义和运转机制,重点解释了闭包如何能够接触外部函数的作用域中的变量。通过词法作用域的查找规则,闭包可以访问外部函数的作用域。同时还提到了闭包的作用和影响。 ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • javascript  – 概述在Firefox上无法正常工作
    我试图提出一些自定义大纲,以达到一些Web可访问性建议.但我不能用Firefox制作.这就是它在Chrome上的外观:而那个图标实际上是一个锚点.在Firefox上,它只概述了整个 ... [详细]
  • 展开全部下面的代码是创建一个立方体Thisexamplescreatesanddisplaysasimplebox.#Thefirstlineloadstheinit_disp ... [详细]
  • 不同优化算法的比较分析及实验验证
    本文介绍了神经网络优化中常用的优化方法,包括学习率调整和梯度估计修正,并通过实验验证了不同优化算法的效果。实验结果表明,Adam算法在综合考虑学习率调整和梯度估计修正方面表现较好。该研究对于优化神经网络的训练过程具有指导意义。 ... [详细]
  • C++字符字符串处理及字符集编码方案
    本文介绍了C++中字符字符串处理的问题,并详细解释了字符集编码方案,包括UNICODE、Windows apps采用的UTF-16编码、ASCII、SBCS和DBCS编码方案。同时说明了ANSI C标准和Windows中的字符/字符串数据类型实现。文章还提到了在编译时需要定义UNICODE宏以支持unicode编码,否则将使用windows code page编译。最后,给出了相关的头文件和数据类型定义。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 本文讨论了在shiro java配置中加入Shiro listener后启动失败的问题。作者引入了一系列jar包,并在web.xml中配置了相关内容,但启动后却无法正常运行。文章提供了具体引入的jar包和web.xml的配置内容,并指出可能的错误原因。该问题可能与jar包版本不兼容、web.xml配置错误等有关。 ... [详细]
  • Java如何导入和导出Excel文件的方法和步骤详解
    本文详细介绍了在SpringBoot中使用Java导入和导出Excel文件的方法和步骤,包括添加操作Excel的依赖、自定义注解等。文章还提供了示例代码,并将代码上传至GitHub供访问。 ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • TableAPI报一下异常:FieldtypesofqueryresultandregisteredTableSink
    报错信息如下:Exceptioninthread“main”org.apache.flink.table.api.ValidationException:Fieldtypesofq ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
  • 本文介绍了禅道作为一款国产开源免费的测试管理工具的特点和功能,并提供了禅道的搭建和调试方法。禅道是一款B/S结构的项目管理工具,可以实现组织管理、后台管理、产品管理、项目管理和测试管理等功能。同时,本文还介绍了其他软件测试相关工具,如功能自动化工具和性能自动化工具,以及白盒测试工具的使用。通过本文的阅读,读者可以了解禅道的基本使用方法和优势,从而更好地进行测试管理工作。 ... [详细]
author-avatar
fangsi155_7827d9
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有