热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【Flink】基于Flink的电商用户行为分析(一)

1、项目整体介绍电商的用户行为电商平台中的用户行为频繁且较复杂,系统上线运行一段时间后,可以收集到大量的用户行为数据,进而利用大数据技

1、项目整体介绍


电商的用户行为

电商平台中的用户行为频繁且较复杂,系统上线运行一段时间后,可以收集到大量的用户行为数据,进而利用大数据技术进行深入挖掘和分析,得到感兴趣的商业指标并增强对风险的控制。
电商用户行为数据多样,整体可以分为用户行为习惯数据和业务行为数据两大类。用户的行为习惯数据包括了用户的登录方式、上线的时间点及时长、点击和浏览页面、页面停留时间以及页面跳转等等,我们可以从中进行流量统计和热门商品的统计,也可以深入挖掘用户的特征;这些数据往往可以从 web 服务器日志中直接读取到。而业务行为数据就是用户在电商平台中针对每个业务(通常是某个具体商品)所作的操作,我们一般会在业务系统中相应的位置埋点,然后收集日志进行分析。业务行为数据又可以简单分为两类:一类是能够明显地表现出用户兴趣的行为,比如对商品的收藏、喜欢、评分和评价,我们可以从中对数据进行深入分析,得到用户画像,进而对用户给出个性化的推荐商品列表,这个过程往往会用到机器学习相关的算法;另一类则是常规的业务操作,但需要着重关注一些异常状况以做好风
控,比如登录和订单支付。

项目主要模块

基于对电商用户行为数据的基本分类,我们可以发现主要有以下三个分析方向:

  1. 热门统计
    利用用户的点击浏览行为,进行流量统计、近期热门商品统计等。
  2. 偏好统计
    利用用户的偏好行为,比如收藏、喜欢、评分等,进行用户画像分析,给出个性化的商品推荐列表。
  3. 风险控制
    利用用户的常规业务行为,比如登录、下单、支付等,分析数据,对异常情况进行报警提示。
    本项目限于数据,我们只实现热门统计和风险控制中的部分内容,将包括以下四大模块:实时热门商品统计、实时流量统计、恶意登录监控和订单支付失效监控。


由于对实时性要求较高,我们会用 flink 作为数据处理的框架。在项目中,我们将综合运用 flink 的各种 API,基于 EventTime 去处理基本的业务需求,并且灵活地使用底层的 processFunction,基于状态编程和 CEP 去处理更加复杂的情形。

数据源解析

我们准备了一份淘宝用户行为数据集,保存为 csv 文件。本数据集包含了淘宝上某一天随机一百万用户的所有行为(包括点击、购买、收藏、喜欢)。数据集的每一行表示一条用户行为,由用户 ID、商品 ID、商品类目 ID、行为类型和时间戳组成,并以逗号分隔。关于数据集中每一列的详细描述如下:

字段名数据类型说明
userIdLong加密后的用户 ID
itemIdLong加密后的商品 ID
categoryIdInt加密后的商品所属类别 ID
behaviorString用户行为类型,包括(‘pv’, ‘’buy, ‘cart’,‘fav’)
timestampLong行为发生的时间戳,单位秒

另外,我们还可以拿到 web 服务器的日志数据,这里以 apache 服务器的一份 log 为例,每一行日志记录了访问者的 IP、userId、访问时间、访问方法以及访问的 url,具体描述如下:

字段名数据类型说明
ipString访问的 IP
userIdLong访问的 user ID
eventTimeLong访问时间
methodString访问方法 GET/POST/PUT/DELETE
urlString访问的 url

由于行为数据有限,在实时热门商品统计模块中可以使用 UserBehavior 数据集,而对于恶意登录监控和订单支付失效监控,我们只以示例数据来做演示。

2、实时热门商品统计

首先要实现的是实时热门商品统计,我们将会基于 UserBehavior 数据集来进行分析。
项目主体用 Scala 编写,采用 IDEA 作为开发环境进行项目编写,采用 maven 作为项目构建和管理工具。首先我们需要搭建项目框架。

项目框架搭建

打开 IDEA,创建一个 maven 项目,命名为 UserBehaviorAnalysis。由于包含了多个模块,我们可以以 UserBehaviorAnalysis 作为父项目,并在其下建一个名为 HotItemsAnalysis 的子项目,用于实时统计热门 topN 商品。
在 UserBehaviorAnalysis 下新 建一个 maven module 作 为子项 目,命名为 HotItemsAnalysis。
父项目只是为了规范化项目结构,方便依赖管理,本身是不需要代码实现的,所以 UserBehaviorAnalysis 下的 src 文件夹可以删掉。

声明项目中工具的版本信息

我们整个项目需要的工具的不同版本可能会对程序运行造成影响,所以应该在最外层的 UserBehaviorAnalysis 中声明所有子模块共用的版本信息。
在 pom.xml 中加入以下配置:
UserBehaviorAnalysis/pom.xml

1.7.22.112.2.0


添加项目依赖

对于整个项目而言,所有模块都会用到 flink 相关的组件,所以我们在 UserBehaviorAnalysis 中引入公有依赖:
UserBehaviorAnalysis/pom.xml

org.apache.flinkflink-scala_${scala.binary.version}${flink.version}org.apache.flinkflink-streaming-scala_${scala.binary.version}${flink.version}org.apache.kafkakafka_${scala.binary.version}${kafka.version}org.apache.flinkflink-connector-kafka_${scala.binary.version}${flink.version}

同样,对于 maven 项目的构建,可以引入公有的插件:

net.alchim31.mavenscala-maven-plugin3.4.6testCompileorg.apache.maven.pluginsmaven-assembly-plugin3.0.0jar-with-dependenciesmake-assemblypackagesingle

在 HotItemsAnalysis 子模块中,我们并没有引入更多的依赖,所以不需要改动 pom 文件。

数据准备

在 src/main/目录下,可以看到已有的默认源文件目录是 java,我们可以将其改名为 scala。将数据文件 UserBehavior.csv 复制到资源文件目录 src/main/resources 下,我们将从这里读取数据。
至此,我们的准备工作都已完成,接下来可以写代码了。

模块代码实现

我们将实现一个“实时热门商品”的需求,可以将“实时热门商品”翻译成程序员更好理解的需求:每隔 5 分钟输出最近一小时内点击量最多的前 N 个商品。将这个需求进行分解我们大概要做这么几件事情:

  • 抽取出业务时间戳,告诉 Flink 框架基于业务时间做窗口
  • 过滤出点击行为数据
  • 按一小时的窗口大小,每 5 分钟统计一次,做滑动窗口聚合(Sliding Window)
  • 按每个窗口聚合,输出每个窗口中点击量前 N 名的商品

程序主体

在 src/main/scala 下创建 HotItems.scala 文件,新建一个单例对象。定义样例类 UserBehavior 和 ItemViewCount,在 main 函数中创建 StreamExecutionEnvironment 并做配置,然后从 UserBehavior.csv 文件中读取数据,并包装成 UserBehavior 类型。
代码如下:
HotItemsAnalysis/src/main/scala/HotItems.scala

case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)object HotItems {def main(args: Array[String]): Unit = {// 创建一个 StreamExecutionEnvironmentval env = StreamExecutionEnvironment.getExecutionEnvironment// 设定 Time 类型为 EventTimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 为了打印到控制台的结果不乱序,我们配置全局的并发为 1 ,这里改变并发对结果正确性没有影响env.setParallelism(1)val stream = env// 以 window 下为例,需替换成自己的路径.readTextFile("YOUR_PATH\\resources\\UserBehavior.csv").map(line => {val linearray = line.split(",")UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt, linearray(3), linearray(4).toLong)})// 指定时间戳和 watermark.assignAscendingTimestamps(_.timestamp * 1000)env.execute("Hot Items Job")
}

这里注意,我们需要统计业务时间上的每小时的点击量,所以要基于 EventTime 来处理。那么如果让 Flink 按照我们想要的业务时间来处理呢?这里主要有两件事情要做。
第一件是告诉 Flink 我们现在按照 EventTime 模式进行处理,Flink 默认使用 ProcessingTime 处理,所以我们要显式设置如下:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

第二件事情是指定如何获得业务时间,以及生成 Watermark。Watermark 是用来追踪业务事件的概念,可以理解成 EventTime 世界中的时钟,用来指示当前处理到什么时刻的数据了。由于我们的数据源的数据已经经过整理,没有乱序,即事件的时间戳是单调递增的,所以可以将每条数据的业务时间就当做 Watermark。这里我们用 assignAscendingTimestamps 来实现时间戳的抽取和 Watermark 的生成。
注:真实业务场景一般都是乱序的,所以一般不用assignAscendingTimestamps,而是使用 BoundedOutOfOrdernessTimestampExtractor。

.assignAscendingTimestamps(_.timestamp * 1000)

这样我们就得到了一个带有时间标记的数据流了,后面就能做一些窗口的操作。

过滤出点击事件

在开始窗口操作之前,先回顾下需求“每隔 5 分钟输出过去一小时内点击量最多的前 N 个商品”。由于原始数据中存在点击、购买、收藏、喜欢各种行为的数据,但是我们只需要统计点击量,所以先使用 filter 将点击行为数据过滤出来。

.filter(_.behavior == "pv")

设置滑动窗口,统计点击量

由于要每隔 5 分钟统计一次最近一小时每个商品的点击量,所以窗口大小是一小时,每隔 5 分钟滑动一次。即分别要统计[09:00, 10:00), [09:05, 10:05), [09:10,10:10)…等窗口的商品点击量。是一个常见的滑动窗口需求(Sliding Window)。

.keyBy("itemId")
.timeWindow(Time.minutes(60), Time.minutes(5))
.aggregate(new CountAgg(), new WindowResultFunction());

我们使用.keyBy(“itemId”)对商品进行分组,使用.timeWindow(Time size, Time slide)对每个商品做滑动窗口(1 小时窗口,5 分钟滑动一次)。然后我们使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用 AggregateFunction 提 前 聚 合 掉 数 据 , 减 少 state 的 存 储 压 力 。 较之 .apply(WindowFunction wf) 会将窗口中的数据都存储下来,最后一起计算要高效得多。这里的 CountAgg 实现了 AggregateFunction 接口,功能是统计窗口中的条数,即遇到一条数据就加一。

// COUNT 统计的聚合函数实现,每出现一条记录就加一
class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {override def createAccumulator(): Long = 0Loverride def add(userBehavior: UserBehavior, acc: Long): Long = acc + 1override def getResult(acc: Long): Long = accoverride def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
}

聚合操作.aggregate(AggregateFunction af, WindowFunction wf)的第二个参数 WindowFunction 将每个 key 每个窗口聚合后的结果带上其他信息进行输出。我们这里实现的 WindowResultFunction 将 <主键商品 ID&#xff0c;窗 口 &#xff0c; 点击量>封装成了 ItemViewCount 进行输出。

// 商品点击量&#xff08;窗口操作的输出类型&#xff09;
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

代码如下&#xff1a;

// 用于输出窗口的结果
class WindowResultFunction extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long], collector: Collector[ItemViewCount]) : Unit &#61; {val itemId: Long &#61; key.asInstanceOf[Tuple1[Long]].f0val count &#61; aggregateResult.iterator.nextcollector.collect(ItemViewCount(itemId, window.getEnd, count))}
}

现在我们就得到了每个商品在每个窗口的点击量的数据流。

计算最热门 Top N 商品

为了统计每个窗口下最热门的商品&#xff0c;我们需要再次按窗口进行分组&#xff0c;这里根据 ItemViewCount 中的 windowEnd 进行 keyBy()操作。然后使用ProcessFunction 实现一个自定义的 TopN 函数 TopNHotItems 来计算点击量排名前 3 名的商品&#xff0c;并将排名结果格式化成字符串&#xff0c;便于后续输出。

.keyBy("windowEnd")
.process(new TopNHotItems(3)); // 求点击量前 3 名的商品

ProcessFunction 是 Flink 提供的一个 low-level API&#xff0c;用于实现更高级的功能。它主要提供了定时器 timer 的功能&#xff08;支持 EventTime 或 ProcessingTime&#xff09;。本案例中我们将利用 timer 来判断何时收齐了某个 window 下所有商品的点击量数据。由于 Watermark 的进度是全局的&#xff0c;在 processElement 方法中&#xff0c;每当收到一条数据 ItemViewCount&#xff0c;我们就注册一个 windowEnd&#43;1 的定时器&#xff08;Flink 框架会自动忽略同一时间的重复注册&#xff09;。windowEnd&#43;1 的定时器被触发时&#xff0c;意味着收到了 windowEnd&#43;1
的 Watermark&#xff0c;即收齐了该 windowEnd 下的所有商品窗口统计值。我们在 onTimer() 中处理将收集的所有商品及点击量进行排序&#xff0c;选出 TopN&#xff0c;并将排名信息格式化成字符串后进行输出。
这里我们还使用了 ListState来存储收到的每条 ItemViewCount 消息&#xff0c;保证在发生故障时&#xff0c;状态数据的不丢失和一致性。ListState 是 Flink 提供的类似 Java List 接口的 State API&#xff0c;它集成了框架的 checkpoint 机制&#xff0c;自动做到了 exactly-once 的语义保证。

// 求某个窗口中前 N 名的热门点击商品&#xff0c;key 为窗口时间戳&#xff0c;输出为 TopN 的结果字符串
class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {private var itemState : ListState[ItemViewCount] &#61; _override def open(parameters: Configuration): Unit &#61; {super.open(parameters)// 命名状态变量的名字和状态变量的类型val itemsStateDesc &#61; new ListStateDescriptor[ItemViewCount]("itemState-state", classOf[ItemViewCount])// 定义状态变量itemState &#61; getRuntimeContext.getListState(itemsStateDesc)}override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit &#61; {// 每条数据都保存到状态中itemState.add(input)// 注册 windowEnd&#43;1 的 EventTime Timer, 当触发时&#xff0c;说明收齐了属于windowEnd 窗口的所有商品数据// 也就是当程序看到 windowend &#43; 1 的水位线 watermark 时&#xff0c;触发 onTimer 回调函数context.timerService.registerEventTimeTimer(input.windowEnd &#43; 1)}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit &#61; {// 获取收到的所有商品点击量val allItems: ListBuffer[ItemViewCount] &#61; ListBuffer()import scala.collection.JavaConversions._for (item <- itemState.get) {allItems &#43;&#61; item}// 提前清除状态中的数据&#xff0c;释放空间itemState.clear()// 按照点击量从大到小排序val sortedItems &#61; allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)// 将排名信息格式化成 String, 便于打印val result: StringBuilder &#61; new StringBuilderresult.append("&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;\n")result.append(" 时间: ").append(new Timestamp(timestamp - 1)).append("\n")for(i <- sortedItems.indices){val currentItem: ItemViewCount &#61; sortedItems(i)// e.g. No1 &#xff1a; 商品 ID&#61;12224 浏览量 &#61;2413result.append("No").append(i&#43;1).append(":").append(" 商品 ID&#61;").append(currentItem.itemId).append(" 浏览量&#61;").append(currentItem.count).append("\n")}result.append("&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;\n\n")// 控制输出频率&#xff0c;模拟实时滚动结果Thread.sleep(1000)out.collect(result.toString)}
}

最后我们可以在 main 函数中将结果打印输出到控制台&#xff0c;方便实时观测&#xff1a;
.print();至此整个程序代码全部完成&#xff0c;我们直接运行 main 函数&#xff0c;就可以在控制台看到不断输出的各个时间点统计出的热门商品。

完整代码

最终完整代码如下&#xff1a;

case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)object HotItems {def main(args: Array[String]): Unit &#61; {val env &#61; StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val stream &#61; env.readTextFile("YOUR_PATH\\resources\\UserBehavior.csv").map(line &#61;> {val linearray &#61; line.split(",")UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt, linearray(3), linearray(4).toLong)}).assignAscendingTimestamps(_.timestamp * 1000).filter(_.behavior&#61;&#61;"pv").keyBy("itemId").timeWindow(Time.minutes(60), Time.minutes(5)).aggregate(new CountAgg(), new WindowResultFunction()).keyBy(1).process(new TopNHotItems(3)).print()env.execute("Hot Items Job")}// COUNT 统计的聚合函数实现&#xff0c;每出现一条记录加一class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {override def createAccumulator(): Long &#61; 0Loverride def add(userBehavior: UserBehavior, acc: Long): Long &#61; acc &#43; 1override def getResult(acc: Long): Long &#61; accoverride def merge(acc1: Long, acc2: Long): Long &#61; acc1 &#43; acc2}// 用于输出窗口的结果class WindowResultFunction extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long], collector: Collector[ItemViewCount]) : Unit &#61; {val itemId: Long &#61; key.asInstanceOf[Tuple1[Long]].f0val count &#61; aggregateResult.iterator.nextcollector.collect(ItemViewCount(itemId, window.getEnd, count))}}// 求某个窗口中前 N 名的热门点击商品&#xff0c;key 为窗口时间戳&#xff0c;输出为 TopN 的结果字符串class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {private var itemState : ListState[ItemViewCount] &#61; _override def open(parameters: Configuration): Unit &#61; {super.open(parameters)// 命名状态变量的名字和状态变量的类型val itemsStateDesc &#61; new ListStateDescriptor[ItemViewCount]("itemState-state", classOf[ItemViewCount])// 从运行时上下文中获取状态并赋值itemState &#61; getRuntimeContext.getListState(itemsStateDesc)}override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit &#61; {// 每条数据都保存到状态中itemState.add(input)// 注册 windowEnd&#43;1 的 EventTime Timer&#xff0c;当触发时&#xff0c;说明收齐了属于windowEnd 窗口的所有商品数据// 也就是当程序看到 windowend &#43; 1 的水位线 watermark 时&#xff0c;触发 onTimer 回调函数context.timerService.registerEventTimeTimer(input.windowEnd &#43; 1)}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit &#61; {// 获取收到的所有商品点击量val allItems: ListBuffer[ItemViewCount] &#61; ListBuffer()import scala.collection.JavaConversions._for (item <- itemState.get) {allItems &#43;&#61; item}// 提前清除状态中的数据&#xff0c;释放空间itemState.clear()// 按照点击量从大到小排序val sortedItems &#61; allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)// 将排名信息格式化成 String, 便于打印val result: StringBuilder &#61; new StringBuilder()result.append("&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;\n")result.append(" 时间: ").append(new Timestamp(timestamp - 1)).append("\n")for(i <- sortedItems.indices){val currentItem: ItemViewCount &#61; sortedItems(i)// e.g. No1&#xff1a; 商品 ID&#61;12224 浏览量 &#61;2413result.append("No").append(i&#43;1).append(":").append(" 商品 ID&#61;").append(currentItem.itemId).append(" 浏览量&#61;").append(currentItem.count).append("\n")}result.append("&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;\n\n")// 控制输出频率&#xff0c;模拟实时滚动结果Thread.sleep(1000)out.collect(result.toString)}}
}

更换 Kafka 作为数据源

实际生产环境中&#xff0c;我们的数据流往往是从 Kafka 获取到的。如果要让代码更贴近生产实际&#xff0c;我们只需将 source 更换为 Kafka 即可&#xff1a;

val properties &#61; new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val env &#61; StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream &#61; env.addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(),properties))

当然&#xff0c;根据实际的需要&#xff0c;我们还可以将 Sink 指定为 Kafka、ES、Redis 或其它存储&#xff0c;这里就不一一展开实现了。

3、实时流量统计


模块创建和数据准备

在 UserBehaviorAnalysis 下新 建一个 maven module 作 为子项 目&#xff0c; 命名为 NetworkFlowAnalysis。在这个子模块中&#xff0c;我们同样并没有引入更多的依赖&#xff0c;所以也不需要改动 pom 文件。
在 src/main/目录下&#xff0c;将默认源文件目录 java 改名为 scala。将 apache 服务器的日志文件 apache.log 复制到资源文件目录 src/main/resources 下&#xff0c;我们将从这里读取数据。
当然&#xff0c;我们也可以仍然用 UserBehavior.csv 作为数据源&#xff0c;这时我们分析的就不是每一次对服务器的访问请求了&#xff0c;而是具体的页面浏览&#xff08;“pv”&#xff09;操作。

基于服务器 log 的热门页面浏览量统计

我们现在要实现的模块是 “实时流量统计”。对于一个电商平台而言&#xff0c;用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据&#xff0c;而这些数据&#xff0c;可以简单地从 web 服务器的日志中提取出来。
我们在这里先实现“热门页面浏览数”的统计&#xff0c;也就是读取服务器日志中的每一行 log&#xff0c;统计在一段时间内用户访问每一个 url 的次数&#xff0c;然后排序输出显示。
具体做法为&#xff1a;每隔 5 秒&#xff0c;输出最近 10 分钟内访问量最多的前 N 个 URL。可以看出&#xff0c;这个需求与之前“实时热门商品统计”非常类似&#xff0c;所以我们完全可以借鉴此前的代码。
在 src/main/scala 下创建 NetworkFlow.scala 文件&#xff0c;新建一个单例对象。定义样例类 ApacheLogEvent&#xff0c;这是输入的日志数据流&#xff1b;另外还有 UrlViewCount&#xff0c;这是窗口操作统计的输出数据类型。在 main 函数中创建 StreamExecutionEnvironment 并做配置&#xff0c;然后从 apache.log 文件中读取数据&#xff0c;并包装成 ApacheLogEvent 类型。
需要注意的是&#xff0c;原始日志中的时间是“dd/MM/yyyy:HH:mm:ss”的形式&#xff0c;需要定义一个 DateTimeFormat 将其转换为我们需要的时间戳格式&#xff1a;

.map(line &#61;> {val linearray &#61; line.split(" ")val sdf &#61; new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")val timestamp &#61; sdf.parse(linearray(3)).getTimeApacheLogEvent(linearray(0), linearray(2), timestamp, linearray(5), linearray(6))
})

完整代码如下&#xff1a;
NetworkFlowAnalysis/src/main/scala/NetworkFlow.scala

case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)
case class UrlViewCount(url: String, windowEnd: Long, count: Long)object NetworkFlow{def main(args: Array[String]): Unit &#61; {val env &#61; StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val stream &#61; env// 以 window 下为例&#xff0c;需替换成自己的路径.readTextFile("YOUR_PATH\\resources\\apache.log").map(line &#61;> {val linearray &#61; line.split(" ")val simpleDateFormat &#61; new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")val timestamp &#61; simpleDateFormat.parse(linearray(3)).getTimeApacheLogEvent(linearray(0), linearray(2), timestamp, linearray(5), linearray(6))}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.milliseconds(1000)) {override def extractTimestamp(t: ApacheLogEvent): Long &#61; {t.eventTime}}).filter( data &#61;> {val pattern &#61; "^((?!\\.(css|js)$).)*$".r(pattern findFirstIn data.url).nonEmpty}).keyBy("url").timeWindow(Time.minutes(10), Time.seconds(5)).aggregate(new CountAgg(), new WindowResultFunction()).keyBy(1).process(new TopNHotUrls(5)).print()env.execute("Network Flow Job")}class CountAgg extends AggregateFunction[ApacheLogEvent, Long, Long] {override def createAccumulator(): Long &#61; 0Loverride def add(apacheLogEvent: ApacheLogEvent, acc: Long): Long &#61; acc &#43; 1override def getResult(acc: Long): Long &#61; accoverride def merge(acc1: Long, acc2: Long): Long &#61; acc1 &#43; acc2}class WindowResultFunction extends WindowFunction[Long, UrlViewCount, Tuple, TimeWindow] {override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long], collector: Collector[UrlViewCount]) : Unit &#61; {val url: String &#61; key.asInstanceOf[Tuple1[String]].f0val count &#61; aggregateResult.iterator.nextcollector.collect(UrlViewCount(url, window.getEnd, count))}}class TopNHotUrls(topsize: Int) extends KeyedProcessFunction[Tuple, UrlViewCount, String] {private var urlState : ListState[UrlViewCount] &#61; _override def open(parameters: Configuration): Unit &#61; {super.open(parameters)val urlStateDesc &#61; new ListStateDescriptor[UrlViewCount]("urlState-state", classOf[UrlViewCount])urlState &#61; getRuntimeContext.getListState(urlStateDesc)}override def processElement(input: UrlViewCount, context: KeyedProcessFunction[Tuple, UrlViewCount, String]#Context, collector: Collector[String]): Unit &#61; {// 每条数据都保存到状态中urlState.add(input)context.timerService.registerEventTimeTimer(input.windowEnd &#43; 1)}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit &#61; {// 获取收到的所有 URL 访问量val allUrlViews: ListBuffer[UrlViewCount] &#61; ListBuffer()import scala.collection.JavaConversions._for (urlView <- urlState.get) {allUrlViews &#43;&#61; urlView}// 提前清除状态中的数据&#xff0c;释放空间urlState.clear()// 按照访问量从大到小排序val sortedUrlViews &#61; allUrlViews.sortBy(_.count)(Ordering.Long.reverse).take(topSize)// 将排名信息格式化成 String&#xff0c;便于打印var result: StringBuilder &#61; new StringBuilderresult.append("&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;\n")result.append(" 时间: ").append(new Timestamp(timestamp - 1)).append("\n")for (i <- sortedUrlViews.indices) {val currentUrlView: UrlViewCount &#61; sortedUrlViews(i)// e.g. No1&#xff1a; URL &#61;/blog/tags/firefox?flav&#61;rss20 流量 &#61;55result.append("No").append(i&#43;1).append(":").append(" URL&#61;").append(currentUrlView.url).append(" 流量&#61;").append(currentUrlView.count).append("\n")}result.append("&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;\n\n")// 控制输出频率&#xff0c;模拟实时滚动结果Thread.sleep(1000)out.collect(result.toString)}}
}

基于埋点日志数据的网络流量统计

我们发现&#xff0c;从 web 服务器 log 中得到的 url&#xff0c;往往更多的是请求某个资源地址&#xff08;/.js、/.css&#xff09;&#xff0c;如果要针对页面进行统计往往还需要进行过滤。而在实际电商应用中&#xff0c;相比每个单独页面的访问量&#xff0c;我们可能更加关心整个电商网站的网络流量。
这个指标&#xff0c;除了合并之前每个页面的统计结果之外&#xff0c;还可以通过统计埋点日志数据中的“pv”行为来得到。

网站总浏览量&#xff08;PV &#xff09;的统计

衡量网站流量一个最简单的指标&#xff0c;就是网站的页面浏览量&#xff08;Page View&#xff0c;PV&#xff09;。用户每次打开一个页面便记录 1 次 PV&#xff0c;多次打开同一页面则浏览量累计。一般来说&#xff0c;PV 与来访者的数量成正比&#xff0c;但是 PV 并不直接决定页面的真实来访者数量&#xff0c;如同一个来访者通过不断的刷新页面&#xff0c;也可以制造出非常高的 PV。
我们知道&#xff0c;用户浏览页面时&#xff0c;会从浏览器向网络服务器发出一个请求&#xff08;Request&#xff09;&#xff0c;网络服务器接到这个请求后&#xff0c;会将该请求对应的一个网页&#xff08;Page&#xff09;发送给浏览器&#xff0c;从而产生了一个 PV。所以我们的统计方法&#xff0c;可以是从 web 服务器的日志中去提取对应的页面访问然后统计&#xff0c;就向上一节中的做法一样&#xff1b;也可以直接从埋点日志中提取用户发来的页面请求&#xff0c;从而统计出总浏览量。
所以&#xff0c;接下来我们用 UserBehavior.csv 作为数据源&#xff0c;实现一个网站总浏览量的统计。我们可以设置滚动时间窗口&#xff0c;实时统计每小时内的网站 PV。
在 src/main/scala 下创建 PageView.scala 文件&#xff0c;具体代码如下&#xff1a;
NetworkFlowAnalysis/src/main/scala/PageView.scala

case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)object PageView {def main(args: Array[String]): Unit &#61; {val resourcesPath &#61; getClass.getResource("/UserBehaviorTest.csv")val env &#61; StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val stream &#61; env.readTextFile(resourcesPath.getPath).map(data &#61;> {val dataArray &#61; data.split(",")UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)}).assignAscendingTimestamps(_.timestamp * 1000).filter(_.behavior &#61;&#61; "pv").map(x &#61;> ("pv", 1)).keyBy(_._1).timeWindow(Time.seconds(60 * 60)).sum(1).print()env.execute("Page View Job")}
}

网站独立访客数 &#xff08;UV &#xff09;的统计

在上节的例子中&#xff0c;我们统计的是所有用户对页面的所有浏览行为&#xff0c;也就是说&#xff0c;同一用户的浏览行为会被重复统计。而在实际应用中&#xff0c;我们往往还会关注&#xff0c;在一段时间内到底有多少不同的用户访问了网站。
另外一个统计流量的重要指标是网站的独立访客数&#xff08;Unique Visitor&#xff0c;UV&#xff09;。UV 指的是一段时间&#xff08;比如一小时&#xff09;内访问网站的总人数&#xff0c;1 天内同一访客的多次访问只记录为一个访客。通过 IP 和 COOKIE 一般是判断 UV 值的两种方式。当客户端第一次访问某个网站服务器的时候&#xff0c;网站服务器会给这个客户端的电脑发出一个 COOKIE&#xff0c;通常放在这个客户端电脑的 C盘当中。在这个 COOKIE中会分配一个独一无二的编号&#xff0c;这其中会记录一些访问服务器的信息&#xff0c;如访问时间&#xff0c;访问了哪些页面等等。当你下次再访问这个服务器的时候&#xff0c;服务器就可以直接从你的电脑中找到上一次放进去的 COOKIE 文件&#xff0c;并且对其进行一些更新&#xff0c;但那个独一无二的编号是不会变的。
当然&#xff0c;对于 UserBehavior 数据源来说&#xff0c;我们直接可以根据 userId 来区分不同的用户。
在 src/main/scala 下创建 UniqueVisitor.scala 文件&#xff0c;具体代码如下&#xff1a;
NetworkFlowAnalysis/src/main/scala/UniqueVisitor.scala

case class UvCount(windowEnd: Long, count: Long)object UniqueVisitor {def main(args: Array[String]): Unit &#61; {val resourcesPath &#61; getClass.getResource("/UserBehaviorTest.csv")val env &#61; StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val stream &#61; env.readTextFile(resourcesPath.getPath).map(line &#61;> {val linearray &#61; line.split(",")UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt,linearray(3), linearray(4).toLong)}).assignAscendingTimestamps(_.timestamp * 1000).filter(_.behavior &#61;&#61; "pv").timeWindowAll(Time.seconds(60 * 60)).apply(new UvCountByWindow()).print()env.execute("Unique Visitor Job")}
}class UvCountByWindow extends AllWindowFunction[UserBehavior, UvCount, TimeWindow] {override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit &#61; {val s: collection.mutable.Set[Long] &#61; collection.mutable.Set()var idSet &#61; Set[Long]()for ( userBehavior <- input) {idSet &#43;&#61; userBehavior.userId}out.collect(UvCount(window.getEnd, idSet.size))}
}

使用布隆过滤器的 UV 统计

在上节的例子中&#xff0c;我们把所有数据的 userId 都存在了窗口计算的状态里&#xff0c;在窗口收集数据的过程中&#xff0c;状态会不断增大。一般情况下&#xff0c;只要不超出内存的承受范围&#xff0c;这种做法也没什么问题&#xff1b;但如果我们遇到的数据量很大呢&#xff1f;
把所有数据暂存放到内存里&#xff0c;显然不是一个好注意。我们会想到&#xff0c;可以利用 redis 这种内存级 k-v 数据库&#xff0c;为我们做一个缓存。但如果我们遇到的情况非常极端&#xff0c;数据大到惊人呢&#xff1f;比如上亿级的用户&#xff0c;要去重计算 UV。
如果放到 redis 中&#xff0c;亿级的用户 id&#xff08;每个 20 字节左右的话&#xff09;可能需要几 G 甚至几十 G 的空间来存储。当然放到 redis 中&#xff0c;用集群进行扩展也不是不可以&#xff0c;但明显代价太大了。
一个更好的想法是&#xff0c;其实我们不需要完整地存储用户 ID 的信息&#xff0c;只要知道他在不在就行了。所以其实我们可以进行压缩处理&#xff0c;用一位&#xff08;bit&#xff09;就可以表示一个用户的状态。这个思想的具体实现就是布隆过滤器&#xff08;Bloom Filter&#xff09;。
本质上布隆过滤器是一种数据结构&#xff0c;比较巧妙的概率型数据结构&#xff08;probabilistic data structure&#xff09;&#xff0c;特点是高效地插入和查询&#xff0c;可以用来告诉你 “某样东西一定不存在或者可能存在”。
它本身是一个很长的二进制向量&#xff0c;既然是二进制的向量&#xff0c;那么显而易见的&#xff0c;存放的不是 0&#xff0c;就是 1。相比于传统的 List、Set、Map 等数据结构&#xff0c;它更高效、占用空间更少&#xff0c;但是缺点是其返回的结果是概率性的&#xff0c;而不是确切的。
我们的目标就是&#xff0c;利用某种方法&#xff08;一般是 Hash 函数&#xff09;把每个数据&#xff0c;对应到一个位图的某一位上去&#xff1b;如果数据存在&#xff0c;那一位就是 1&#xff0c;不存在则为 0。
接下来我们就来具体实现一下。
注意这里我们用到了 redis 连接存取数据&#xff0c;所以需要加入 redis 客户端的依赖&#xff1a;

redis.clientsjedis2.8.1

在 src/main/scala 下创建 UniqueVisitor.scala 文件&#xff0c;具体代码如下&#xff1a;
NetworkFlowAnalysis/src/main/scala/UvWithBloom.scala

object UvWithBloomFilter {def main(args: Array[String]): Unit &#61; {val env &#61; StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val resourcesPath &#61; getClass.getResource("/UserBehaviorTest.csv")val stream &#61; env.readTextFile(resourcesPath.getPath).map(data &#61;> {val dataArray &#61; data.split(",")UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)}).assignAscendingTimestamps(_.timestamp * 1000).filter(_.behavior &#61;&#61; "pv").map(data &#61;> ("dummyKey", data.userId)).keyBy(_._1).timeWindow(Time.seconds(60 * 60)).trigger(new MyTrigger()) // 自定义窗口触发规则.process(new UvCountWithBloom()) // 自定义窗口处理规则stream.print()env.execute("Unique Visitor with bloom Job")}
}// 自定义触发器
class MyTrigger() extends Trigger[(String, Long), TimeWindow] {override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult &#61; {TriggerResult.CONTINUE}override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult &#61; {TriggerResult.CONTINUE}override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit &#61; {}override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult &#61; {// 每来一条数据&#xff0c;就触发窗口操作并清空TriggerResult.FIRE_AND_PURGE}
}// 自定义窗口处理函数
class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow] {// 创建 redis 连接lazy val jedis &#61; new Jedis("localhost", 6379)lazy val bloom &#61; new Bloom(1 <<29)override def process(key: String, context: Context, elements: Iterable[(String,Long)], out: Collector[UvCount]): Unit &#61; {val storeKey &#61; context.window.getEnd.toStringvar count &#61; 0Lif (jedis.hget("count", storeKey) !&#61; null) {count &#61; jedis.hget("count", storeKey).toLong}val userId &#61; elements.last._2.toStringval offset &#61; bloom.hash(userId, 61)val isExist &#61; jedis.getbit(storeKey, offset)if (!isExist) {jedis.setbit(storeKey, offset, true)jedis.hset("count", storeKey, (count &#43; 1).toString)out.collect(UvCount(storeKey.toLong, count &#43; 1))} else {out.collect(UvCount(storeKey.toLong, count))}}
}// 定义一个布隆过滤器
class Bloom(size: Long) extends Serializable {private val cap &#61; sizedef hash(value: String, seed: Int): Long &#61; {var result &#61; 0for (i <- 0 until value.length) {// 最简单的 hash 算法&#xff0c;每一位字符的 ascii 码值&#xff0c;乘以 seed 之后&#xff0c;做叠加result &#61; result * seed &#43; value.charAt(i)}(cap - 1) & result}
}


推荐阅读
  • 利用Visual Basic开发SAP接口程序初探的方法与原理
    本文介绍了利用Visual Basic开发SAP接口程序的方法与原理,以及SAP R/3系统的特点和二次开发平台ABAP的使用。通过程序接口自动读取SAP R/3的数据表或视图,在外部进行处理和利用水晶报表等工具生成符合中国人习惯的报表样式。具体介绍了RFC调用的原理和模型,并强调本文主要不讨论SAP R/3函数的开发,而是针对使用SAP的公司的非ABAP开发人员提供了初步的接口程序开发指导。 ... [详细]
  • 如何实现织梦DedeCms全站伪静态
    本文介绍了如何通过修改织梦DedeCms源代码来实现全站伪静态,以提高管理和SEO效果。全站伪静态可以避免重复URL的问题,同时通过使用mod_rewrite伪静态模块和.htaccess正则表达式,可以更好地适应搜索引擎的需求。文章还提到了一些相关的技术和工具,如Ubuntu、qt编程、tomcat端口、爬虫、php request根目录等。 ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • 本文详细介绍了云服务器API接口的概念和作用,以及如何使用API接口管理云上资源和开发应用程序。通过创建实例API、调整实例配置API、关闭实例API和退还实例API等功能,可以实现云服务器的创建、配置修改和销毁等操作。对于想要学习云服务器API接口的人来说,本文提供了详细的入门指南和使用方法。如果想进一步了解相关知识或阅读更多相关文章,请关注编程笔记行业资讯频道。 ... [详细]
  • [译]技术公司十年经验的职场生涯回顾
    本文是一位在技术公司工作十年的职场人士对自己职业生涯的总结回顾。她的职业规划与众不同,令人深思又有趣。其中涉及到的内容有机器学习、创新创业以及引用了女性主义者在TED演讲中的部分讲义。文章表达了对职业生涯的愿望和希望,认为人类有能力不断改善自己。 ... [详细]
  • 如何用UE4制作2D游戏文档——计算篇
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了如何用UE4制作2D游戏文档——计算篇相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 本文介绍了使用PHP实现断点续传乱序合并文件的方法和源码。由于网络原因,文件需要分割成多个部分发送,因此无法按顺序接收。文章中提供了merge2.php的源码,通过使用shuffle函数打乱文件读取顺序,实现了乱序合并文件的功能。同时,还介绍了filesize、glob、unlink、fopen等相关函数的使用。阅读本文可以了解如何使用PHP实现断点续传乱序合并文件的具体步骤。 ... [详细]
  • 本文介绍了Hyperledger Fabric外部链码构建与运行的相关知识,包括在Hyperledger Fabric 2.0版本之前链码构建和运行的困难性,外部构建模式的实现原理以及外部构建和运行API的使用方法。通过本文的介绍,读者可以了解到如何利用外部构建和运行的方式来实现链码的构建和运行,并且不再受限于特定的语言和部署环境。 ... [详细]
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
author-avatar
罗kowalske
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有