热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spark将jdbc查询的数据封装成DataFrame

简述spark在2.2.0版本是不支持通过jdbc的方式直接访问hive数据的,需要修改部分源码实现spark直接通过jdbc的方式读取hive数据,就在之前写的文章中的方法二里。


简述

spark在2.2.0版本是不支持通过jdbc的方式直接访问hive数据的,需要修改部分源码实现spark直接通过jdbc的方式读取hive数据,就在之前写的文章中的方法二里。

https://blog.csdn.net/qq_42213403/article/details/117557610?spm=1001.2014.3001.5501

还有一种方法不用重写源码,是先通过jdbc获取数据,再用spark封装成dataframe的方式操作的

 


实现过程

首先使用jdbc查询的方式获取hive表数据


def getResult()={
val properties = new Properties
properties.setProperty("url", "jdbc:hive2://192.168.5.61:10000/")
properties.setProperty("user", "hive")
properties.setProperty("password", "")
properties.setProperty("driver", "org.apache.hive.jdbc.HiveDriver")
val cOnnection= getConnection(properties)
val statement = connection.createStatement
val resultSet = statement.executeQuery("select * from test.user_info")
resultSet
}
def getConnection(prop: Properties): COnnection= try {
Class.forName(prop.getProperty("driver"))
cOnn= DriverManager.getConnection(prop.getProperty("url"), prop.getProperty("user"), prop.getProperty("password"))
conn
} catch {
case e: Exception =>
e.printStackTrace()
null
}

 

把查出的ResultSet转换成DataFrame


def createStructField(name:String,colType:String):StructField={
colType match {
case "java.lang.String" =>{StructField(name,StringType,true)}
case "java.lang.Integer" =>{StructField(name,IntegerType,true)}
case "java.lang.Long" =>{StructField(name,LongType,true)}
case "java.lang.Boolean" =>{StructField(name,BooleanType,true)}
case "java.lang.Double" =>{StructField(name,DoubleType,true)}
case "java.lang.Float" =>{StructField(name,FloatType,true)}
case "java.sql.Date" =>{StructField(name,DateType,true)}
case "java.sql.Time" =>{StructField(name,TimestampType,true)}
case "java.sql.Timestamp" =>{StructField(name,TimestampType,true)}
case "java.math.BigDecimal" =>{StructField(name,DecimalType(10,0),true)}
}
}
/**
* 把查出的ResultSet转换成DataFrame
*/
def createResultSetToDF(rs:ResultSet,sparkSession: SparkSession):DataFrame= {
val rsmd = rs.getMetaData
val columnTypeList = new util.ArrayList[String]
val rowSchemaList = new util.ArrayList[StructField]
for (i <- 1 to rsmd.getColumnCount) {
var temp = rsmd.getColumnClassName(i)
temp = temp.substring(temp.lastIndexOf(".") + 1)
if ("Integer".equals(temp)) {
temp = "Int";
}
columnTypeList.add(temp)
rowSchemaList.add(createStructField(rsmd.getColumnName(i), rsmd.getColumnClassName(i)))
}
val rowSchema = StructType(rowSchemaList)
//ResultSet反射类对象
val rsClass = rs.getClass
var count = 1
val resultList = new util.ArrayList[Row]
var totalDF = sparkSession.createDataFrame(new util.ArrayList[Row], rowSchema)
while (rs.next()) {
count = count + 1
// val temp = new util.ArrayList[Object]
val buffer = new ArrayBuffer[Any]()
for (i <- 0 to columnTypeList.size() - 1) {
val method = rsClass.getMethod("get" + columnTypeList.get(i), "aa".getClass)
buffer+=method.invoke(rs, rsmd.getColumnName(i + 1))
}
resultList.add(Row(buffer: _*))
if (count % 100000 == 0) {
val tempDF = sparkSession.createDataFrame(resultList, rowSchema)
totalDF = totalDF.union(tempDF).distinct()
resultList.clear()
}
}
val tempDF = sparkSession.createDataFrame(resultList, rowSchema)
totalDF = totalDF.union(tempDF)
totalDF
}

运行代码

val spark = SparkSession.builder()
.master("local[2]")
.appName("test")
.getOrCreate()

val df = createResultSetToDF(getResult(),spark)
df.show()

结果

 

完整代码

import java.sql.{Connection, DriverManager, ResultSet}
import java.util
import java.util.Properties
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{BooleanType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
import scala.collection.mutable.ArrayBuffer
/**
* @Author: fcy
* @Date: 2021/6/4 4:16 下午
*/
object SparkJDBCHiveDataFrame {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("test")
.getOrCreate()
val df = createResultSetToDF(getResult(),spark)
df.show()
}
def getResult()={
val properties = new Properties
properties.setProperty("url", "jdbc:hive2://xxx:10000/")
properties.setProperty("user", "hive")
properties.setProperty("password", xxx)
properties.setProperty("driver", "org.apache.hive.jdbc.HiveDriver")
val cOnnection= getConnection(properties)
val statement = connection.createStatement
val resultSet = statement.executeQuery("select * from test.user_info")
resultSet
}
def getConnection(prop: Properties): COnnection= try {
Class.forName(prop.getProperty("driver"))
cOnn= DriverManager.getConnection(prop.getProperty("url"), prop.getProperty("user"), prop.getProperty("password"))
conn
} catch {
case e: Exception =>
e.printStackTrace()
null
}
def createStructField(name:String,colType:String):StructField={
colType match {
case "java.lang.String" =>{StructField(name,StringType,true)}
case "java.lang.Integer" =>{StructField(name,IntegerType,true)}
case "java.lang.Long" =>{StructField(name,LongType,true)}
case "java.lang.Boolean" =>{StructField(name,BooleanType,true)}
case "java.lang.Double" =>{StructField(name,DoubleType,true)}
case "java.lang.Float" =>{StructField(name,FloatType,true)}
case "java.sql.Date" =>{StructField(name,DateType,true)}
case "java.sql.Time" =>{StructField(name,TimestampType,true)}
case "java.sql.Timestamp" =>{StructField(name,TimestampType,true)}
case "java.math.BigDecimal" =>{StructField(name,DecimalType(10,0),true)}
}
}
/**
* 把查出的ResultSet转换成DataFrame
*/
def createResultSetToDF(rs:ResultSet,sparkSession: SparkSession):DataFrame= {
val rsmd = rs.getMetaData
val columnTypeList = new util.ArrayList[String]
val rowSchemaList = new util.ArrayList[StructField]
for (i <- 1 to rsmd.getColumnCount) {
var temp = rsmd.getColumnClassName(i)
temp = temp.substring(temp.lastIndexOf(".") + 1)
if ("Integer".equals(temp)) {
temp = "Int";
}
columnTypeList.add(temp)
rowSchemaList.add(createStructField(rsmd.getColumnName(i), rsmd.getColumnClassName(i)))
}
val rowSchema = StructType(rowSchemaList)
//ResultSet反射类对象
val rsClass = rs.getClass
var count = 1
val resultList = new util.ArrayList[Row]
var totalDF = sparkSession.createDataFrame(new util.ArrayList[Row], rowSchema)
while (rs.next()) {
count = count + 1
// val temp = new util.ArrayList[Object]
val buffer = new ArrayBuffer[Any]()
for (i <- 0 to columnTypeList.size() - 1) {
val method = rsClass.getMethod("get" + columnTypeList.get(i), "aa".getClass)
buffer+=method.invoke(rs, rsmd.getColumnName(i + 1))
}
resultList.add(Row(buffer: _*))
if (count % 100000 == 0) {
val tempDF = sparkSession.createDataFrame(resultList, rowSchema)
totalDF = totalDF.union(tempDF).distinct()
resultList.clear()
}
}
val tempDF = sparkSession.createDataFrame(resultList, rowSchema)
totalDF = totalDF.union(tempDF)
totalDF
}
}

 

 


欢迎留言讨论和指正

推荐阅读
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 本文详细介绍了在ASP.NET中获取插入记录的ID的几种方法,包括使用SCOPE_IDENTITY()和IDENT_CURRENT()函数,以及通过ExecuteReader方法执行SQL语句获取ID的步骤。同时,还提供了使用这些方法的示例代码和注意事项。对于需要获取表中最后一个插入操作所产生的ID或马上使用刚插入的新记录ID的开发者来说,本文提供了一些有用的技巧和建议。 ... [详细]
  • 本文介绍了Android 7的学习笔记总结,包括最新的移动架构视频、大厂安卓面试真题和项目实战源码讲义。同时还分享了开源的完整内容,并提醒读者在使用FileProvider适配时要注意不同模块的AndroidManfiest.xml中配置的xml文件名必须不同,否则会出现问题。 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 这是原文链接:sendingformdata许多情况下,我们使用表单发送数据到服务器。服务器处理数据并返回响应给用户。这看起来很简单,但是 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文介绍了数据库的存储结构及其重要性,强调了关系数据库范例中将逻辑存储与物理存储分开的必要性。通过逻辑结构和物理结构的分离,可以实现对物理存储的重新组织和数据库的迁移,而应用程序不会察觉到任何更改。文章还展示了Oracle数据库的逻辑结构和物理结构,并介绍了表空间的概念和作用。 ... [详细]
  • 目录实现效果:实现环境实现方法一:基本思路主要代码JavaScript代码总结方法二主要代码总结方法三基本思路主要代码JavaScriptHTML总结实 ... [详细]
  • CSS3选择器的使用方法详解,提高Web开发效率和精准度
    本文详细介绍了CSS3新增的选择器方法,包括属性选择器的使用。通过CSS3选择器,可以提高Web开发的效率和精准度,使得查找元素更加方便和快捷。同时,本文还对属性选择器的各种用法进行了详细解释,并给出了相应的代码示例。通过学习本文,读者可以更好地掌握CSS3选择器的使用方法,提升自己的Web开发能力。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • Java String与StringBuffer的区别及其应用场景
    本文主要介绍了Java中String和StringBuffer的区别,String是不可变的,而StringBuffer是可变的。StringBuffer在进行字符串处理时不生成新的对象,内存使用上要优于String类。因此,在需要频繁对字符串进行修改的情况下,使用StringBuffer更加适合。同时,文章还介绍了String和StringBuffer的应用场景。 ... [详细]
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • 本文介绍了一些Java开发项目管理工具及其配置教程,包括团队协同工具worktil,版本管理工具GitLab,自动化构建工具Jenkins,项目管理工具Maven和Maven私服Nexus,以及Mybatis的安装和代码自动生成工具。提供了相关链接供读者参考。 ... [详细]
  • GreenDAO快速入门
    前言之前在自己做项目的时候,用到了GreenDAO数据库,其实对于数据库辅助工具库从OrmLite,到litePal再到GreenDAO,总是在不停的切换,但是没有真正去了解他们的 ... [详细]
  • 如何在php文件中添加图片?
    本文详细解答了如何在php文件中添加图片的问题,包括插入图片的代码、使用PHPword在载入模板中插入图片的方法,以及使用gd库生成不同类型的图像文件的示例。同时还介绍了如何生成一个正方形文件的步骤。希望对大家有所帮助。 ... [详细]
author-avatar
手机用户2502918237
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有