热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

企业版SparkDatabricks+企业版KafkaConfluent联合高效挖掘数据价值

前提条件已注册阿里云账号,详情请参见阿里云账号注册流程已开通Databricks数据洞察服务已开通OSS对象存储服务已开通Confluent流数据服务创建Data

前提条件


  • 已注册阿里云账号,详情请参见阿里云账号注册流程
  • 已开通 Databricks 数据洞察服务
  • 已开通 OSS 对象存储服务
  • 已开通 Confluent 流数据服务

创建Databricks集群 & Confluent集群


  1. 登录Confluent管理控制台,创建Confluent集群,并开启公网服务
  2. 登录Databricks管理控制台,创建Databricks集群

Databricks Worker节点公网访问

Databricks的worker节点暂时不支持公网访问,为了能访问Confluent的公网地址,请联系Databricks的开发人员添加NAT网关。


案例:出租车数据入湖及分析

出租车和网约车在每天的运行中持续产生行驶轨迹和交易数据,这些数据对于车辆调度,流量预测,安全监控等场景有着极大的价值。

本案例中我们使用纽约市的出租车数据来模拟网约车数据从产生,发布到流数据服务Confluent,通过Databricks Structured Streaming进行实时数据处理,并存储到LakeHouse的整个流程。数据存储到LakeHouse后,我们使用spark和spark sql对数据进行分析,并使用Spark的MLlib进行机器学习训练。

前置准备:


  1. 创建topic:
    登录Confluent的control center,在左侧选中Topics,点击Add a topic按钮,创建一个名为nyc_taxi_data的topic,将partition设置为3,其他配置保持默认。
  2. 创建OSS bucket:
    在和Databricks同一Region的OSS中,创建bucket,bucket命名为:databricks-confluent-integration进入到Bucket列表页,点击创建bucket按钮创建好bucket之后,在该bucket创建目录:checkpoint_dir和data/nyc_taxi_data两个目录
  3. 收集url,用户名,密码,路径等以便后续使用
    1. confluent集群ID:在csp的管控界面,集群详情页获取
    2. Confluent Control Center的用户名和密码
    3. 路径:

  • Databricks Structured Streaming的checkpoint存储目录
  • 采集的数据的存储目录

下面是我们后续会使用到的一些变量:

# 集群管控界面获取
confluent_cluster_id = "your_confluent_cluster_id"
# 使用confluent集群ID拼接得到
confluent_server = "rb-{confluent_cluster_id}.csp.aliyuncs.com:9092"
control_center_username = "your_confluent_control_center_username"
control_center_password = "your_confluent_control_center_password"topic = "nyc_taxi_data"checkpoint_location = "oss://databricks-confluent-integration/checkpoint_dir"
taxi_data_delta_lake = "oss://databricks-confluent-integration/data/nyc_taxi_data"

数据的产生

在本案例中,我们使用Kaggle上的NYC出租车数据集来模拟数据产生。


  • 我们先安装confluent的python客户端,其他语言的客户端参考confluent官网

pip install confluent_kafka

  • 构造用于创建Kafka Producer的基础信息,如:bootstrap-server,control center的username,password等

conf = {'bootstrap.servers': confluent_server,'key.serializer': StringSerializer('utf_8'),'value.serializer': StringSerializer('utf_8'),'client.id': socket.gethostname(),'security.protocol': 'SASL_SSL','sasl.mechanism': 'PLAIN','sasl.username': control_center_username,'sasl.password': control_center_password
}

  • 创建Producer:

producer = Producer(conf)

  • 向Kafka中发送消息(模拟数据的产生):

with open("/Path/To/train.csv", "rt") as f:float_field = ['fare_amount', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude']for row in reader:i += 1try:for field in float_field:row[field] = float(row[field])row['passenger_count'] = int(row['passenger_count'])producer.produce(topic=topic, value=json.dumps(row))if i % 1000 == 0:producer.flush()if i == 200000:breakexcept ValueError: # discard null/NAN datacontinue

Kafka中的partition和offset

在使用spark读取Kafka中的数据之前,我们回顾一下Kafka中的概念:partition和offset


  • partition:kafka为了能并行进行数据的写入,将每个topic的数据分为多个partition,每个partition由一个Broker负责,向partition写入数据时,负责该partition的Broker将消息复制给它的follower
  • offset:Kafka会为每条写入partition里的消息进行编号,消息的编号即为offset

我们在读取Kafka中的数据时,需要指定我们想要读取的数据,该指定需要从两个维度:partition的维度 + offset的维度。


  • Earliest:从每个partition的offset 0开始读取和加载
  • Latest:从每个partition最新的数据开始读取
  • 自定义:指定每个partition的开始offset和结束offset
    • 读取topic1 partition 0 offset 23和partition 0 offset -2之后的数据:"""{"topic1":{"0":23,"1":-2}}"""

除了指定start offset,我们还可以通过endingOffsets参数指定读取到什么位置为止。

将数据存储到LakeHouse:Spark集成Confluent

理解上述概念后,Databricks和Confluent的集成非常简单,只需要对spark session的readStream参数进行简单的设置就可以将Kafka中的实时流数据转换为Spark中的Dataframe:

lines = (spark.readStream# 指定数据源: kafka.format("kafka")# 指定kafka bootstrap server的URL.option("kafka.bootstrap.servers", confluent_server)# 指定订阅的topic.option("subscribe", topic)# 指定想要读取的数据的offset,earliest表示从每个partition的起始点开始读取.option("startingOffsets", "earliest")# 指定认证协议.option("kafka.security.protocol", "SASL_SSL").option("kafka.sasl.mechanism", "PLAIN")# 指定confluent的用户名和密码.option("kafka.sasl.jaas.config",f"""org.apache.kafka.common.security.plain.PlainLoginModule required username="{control_center_username}" password="{control_center_password}";""").load())

从kafka中读取的数据格式如下:

root|-- key: binary (nullable = true)|-- value: binary (nullable = true)|-- topic: string (nullable = true)|-- partition: integer (nullable = true)|-- offset: long (nullable = true)|-- timestamp: timestamp (nullable = true)|-- timestampType: integer (nullable = true)

由于key和value都是binary格式的,我们需要将value(json)由binary转换为string格式,并定义schema,提取出Json中的数据,并转换为对应的格式:

schema = (StructType().add('key', TimestampType()).add('fare_amount', FloatType()).add('pickup_datetime', TimestampType()).add('pickup_longitude', FloatType()).add('pickup_latitude', FloatType()).add('dropoff_longitude', FloatType()).add('dropoff_latitude', FloatType()).add('passenger_count', IntegerType()))# 将json中的列提取出来
lines = (lines.withColumn('data', from_json(col('value').cast('string'), # binary 转 stringschema)) # 解析为schema.select(col('data.*'))) # select value中的所有列

过滤掉错误,为空,NaN的数据:

lines = (lines.filter(col('pickup_longitude') != 0).filter(col('pickup_latitude') != 0).filter(col('dropoff_longitude') != 0).filter(col('dropoff_latitude') != 0).filter(col('fare_amount') != 0).filter(col('passenger_count') != 0))

最后,我们将解析出来的数据输出到LakeHouse中,以进行后续的分析和机器学习模型训练:

# lakehouse 的存储格式为 delta
query = (lines.writeStream.format('delta').option('checkpointLocation', checkpoint_location).option('path', taxi_data_delta_lake).start())
# 执行job,直到出现异常(如果只想执行该Job一段时间,可以指定timeout参数)
query.awaitTermination()

数据分析

我们先将LakeHouse中的数据使用Spark加载进来:

然后,我们对该Dataframe创建一个Table View,并探索fare_amount的分布:

可以看到fare_amount的最小值是负数,这显然是一条错误的数据,我们将这些错误的数据过滤,并探索fare_amount的分布:

然后我们探索价格和年份,月份,星期,打车时间的关系:

从上面可以看出两点:


  • 出租车的价格和年份有很大关系,从09年到15年呈不断增长的态势
  • 在中午和凌晨打车比上午和下午打车更贵一些。

我们再进一步探索价格和乘客数量的关系:

此外,出租车价格的另一个影响因素就是距离,这里我们借助python的geopy包和Spark的UDF来计算给定两个位置的距离,然后再分析费用和距离的关系。

经纬度的范围为[-90, 90],因此,我们第一步是清除错误的数据:

然后,我们增加一列数据:出租车行驶的距离,并将距离离散化,进行后续的分析:

统计打车距离的分布:

从上图可以看出:打车距离分布在区间[0, 15]miles内,我们继续统计在该区间内,打车价格和打车距离的关系:

如上图所示:打车价格和打车距离呈现出线性增长的趋势。

机器学习建模

在上一小节的数据分析中,我们已经提取了和出租车相关联的一些特征,根据这些特征,我们建立一个简单的线性回归模型:

打车费用 ~ (年份,打车时间,乘客数,距离)

先将特征和目标值提取出来:

对特征做归一化:

分割训练集和测试集:

建立线性回归模型进行训练:

训练结果统计:

使用Evaluator对模型进行评价:


总结

我们在本文中介绍了如何使用阿里云的Confluent Cloud和Databricks来构建您的数据流和LakeHouse,并介绍了如何使用Databricks提供的能力来挖掘数据价值,使用Spark MLlib构建您的机器学习模型。有了Confluent Cloud和Databricks,您可以轻松实现数据入湖,及时在最新的数据上进行探索,挖掘您的数据价值。欢迎您试用阿里云Confluent和Databricks。

原文链接

本文为阿里云原创内容,未经允许不得转载。


推荐阅读
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • Java String与StringBuffer的区别及其应用场景
    本文主要介绍了Java中String和StringBuffer的区别,String是不可变的,而StringBuffer是可变的。StringBuffer在进行字符串处理时不生成新的对象,内存使用上要优于String类。因此,在需要频繁对字符串进行修改的情况下,使用StringBuffer更加适合。同时,文章还介绍了String和StringBuffer的应用场景。 ... [详细]
  • 李逍遥寻找仙药的迷阵之旅
    本文讲述了少年李逍遥为了救治婶婶的病情,前往仙灵岛寻找仙药的故事。他需要穿越一个由M×N个方格组成的迷阵,有些方格内有怪物,有些方格是安全的。李逍遥需要避开有怪物的方格,并经过最少的方格,找到仙药。在寻找的过程中,他还会遇到神秘人物。本文提供了一个迷阵样例及李逍遥找到仙药的路线。 ... [详细]
  • 本文介绍了一种轻巧方便的工具——集算器,通过使用集算器可以将文本日志变成结构化数据,然后可以使用SQL式查询。集算器利用集算语言的优点,将日志内容结构化为数据表结构,SPL支持直接对结构化的文件进行SQL查询,不再需要安装配置第三方数据库软件。本文还详细介绍了具体的实施过程。 ... [详细]
  • 本文介绍了在实现了System.Collections.Generic.IDictionary接口的泛型字典类中如何使用foreach循环来枚举字典中的键值对。同时还讨论了非泛型字典类和泛型字典类在foreach循环中使用的不同类型,以及使用KeyValuePair类型在foreach循环中枚举泛型字典类的优势。阅读本文可以帮助您更好地理解泛型字典类的使用和性能优化。 ... [详细]
  • 开发笔记:Spark Java API 之 CountVectorizer
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了SparkJavaAPI之CountVectorizer相关的知识,希望对你有一定的参考价值。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文详细介绍了在ASP.NET中获取插入记录的ID的几种方法,包括使用SCOPE_IDENTITY()和IDENT_CURRENT()函数,以及通过ExecuteReader方法执行SQL语句获取ID的步骤。同时,还提供了使用这些方法的示例代码和注意事项。对于需要获取表中最后一个插入操作所产生的ID或马上使用刚插入的新记录ID的开发者来说,本文提供了一些有用的技巧和建议。 ... [详细]
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 开发笔记:实验7的文件读写操作
    本文介绍了使用C++的ofstream和ifstream类进行文件读写操作的方法,包括创建文件、写入文件和读取文件的过程。同时还介绍了如何判断文件是否成功打开和关闭文件的方法。通过本文的学习,读者可以了解如何在C++中进行文件读写操作。 ... [详细]
  • 怎么在PHP项目中实现一个HTTP断点续传功能发布时间:2021-01-1916:26:06来源:亿速云阅读:96作者:Le ... [详细]
  • 本文介绍了在Cpp中将字符串形式的数值转换为int或float等数值类型的方法,主要使用了strtol、strtod和strtoul函数。这些函数可以将以null结尾的字符串转换为long int、double或unsigned long类型的数值,且支持任意进制的字符串转换。相比之下,atoi函数只能转换十进制数值且没有错误返回。 ... [详细]
author-avatar
aarongwang56_972
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有