热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用spark与MySQL进行数据交互的方法

在项目中,遇到一个场景是,需要从Hive数据仓库中拉取数据,进行过滤、裁剪或者聚合之后生成中间结果导入MySQL。对于这样一个极其普通的离线计算场景,有多种技术选型可以实现。例如,sqoop,MR,H

在项目中,遇到一个场景是,需要从Hive数据仓库中拉取数据,进行过滤、裁剪或者聚合之后生成中间结果导入MySQL。

对于这样一个极其普通的离线计算场景,有多种技术选型可以实现。例如,sqoop,MR,HSQL。

 

我们这里使用的spark,优点来说是两个:一是灵活性高,二是代码简洁。

1)灵活性高

相比sqoop和HSQL,spark可以更灵活的控制过滤和裁剪逻辑,甚至你可以通过外部的配置或者参数,来动态的调整spark的计算行为,提供定制化。

2)代码简洁

相比MR来说,代码量上少了很多。也无需实现MySQL客户端。

 

我抽象了一下需求,做了如下一个demo。

涉及的数据源有两个:Hive&MySQL;计算引擎:spark&spark-sql。我们的demo中分为两个步骤:

1)从Hive中读取数据,交给spark计算,最终输出到MySQL;

2)从MySQL中读取数据,交给spark计算,最终再输出到MySQL另一张表。

 

1、 数据准备

 

创建了Hive外部分区表

关于分区和外部表这里不说了。

CREATE EXTERNAL TABLE `gulfstream_test.accounts`(
`
id` string COMMENT '用户id',
`order_id`
string COMMENT '订单id',
`status` bigint COMMENT
'用户状态',
`count`
decimal(16,9) COMMENT '订单数')
COMMENT
'用户信息'
PARTITIONED BY (
`year`
string,
`month`
string,
`day`
string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY
'\t'
STORED AS INPUTFORMAT
'org.autonavi.udf.CustomInputFormat'
OUTPUTFORMAT
'org.autonavi.udf.CustomHiveOutputFormat'
LOCATION
'hdfs://mycluster-tj/***/acounts'
TBLPROPERTIES (
'LEVEL'='1',
'TTL'='60',
'last_modified_by'='yangfan',
'last_modified_time'='2017-10-23',
'transient_lastDdlTime'='1508746808')

 

建立分区,并指定分区路径

这里分区使用的年月日三级分区。通过下面的命令将year=2017/mOnth=10/day=23这个Hive分区的数据指向了location=hdfs://mycluster-tj/***/acounts/2017/10/23

hive> alter table gulfstream_test.accounts add partition(year='2017', mOnth='10', day='23') location 'hdfs://mycluster-tj/***/acounts/2017/10/23';

 

查询一下分区是否建立成功

可以看到分区已经有了。

show partitions gulfstream_test.accounts;
OK
partition
year
=2017/mOnth=10/day=23

 

上传本地测试数据到hdfs

hadoop fs -put a.txt  hdfs://mycluster-tj/***/acounts/2017/10/23

看一下数据,取了前10行,原谅我数据比较假。

[data_monitor@bigdata-arch-client10 target]$ hadoop fs -cat hdfs://mycluster-tj/***/acounts/2017/10/23/a | head -10
0 0 0 0
1 1 1 1
2 2 2 2
3 3 3 3
4 4 4 4
5 5 5 5
6 6 6 6
7 7 7 7
8 8 8 8
9 9 9 9

在Hive中,也查一下前10条,是一样的。只是多了分区字段。

hive (default)> select * from gulfstream_test.accounts where year=2017 and mOnth=10 and day=23 limit 10;
OK
accounts.
id accounts.order_id accounts.status accounts.count accounts.year accounts.month accounts.day
0 0 0 0 2017 10 23
1 1 1 1 2017 10 23
2 2 2 2 2017 10 23
3 3 3 3 2017 10 23
4 4 4 4 2017 10 23
5 5 5 5 2017 10 23
6 6 6 6 2017 10 23
7 7 7 7 2017 10 23
8 8 8 8 2017 10 23
9 9 9 9 2017 10 23
Time taken:
1.38 seconds, Fetched: 10 row(s)

至此,测试数据准备好了。一共1000000条,1百万。

 

2、代码

 

1)POM依赖

可以通过pom依赖来看一下笔者使用的组件版本。

这里就不赘述了。


org.apache.spark
spark-core_2.10
1.6.0
provided


org.apache.spark
spark-hive_2.10
1.6.0
provided


org.apache.spark
spark-sql_2.10
1.6.0
provided

打包方式




maven-assembly-plugin




com.kangaroo.studio.algorithms.filter.LoadDB




jar-with-dependencies




make-assembly
package

single





org.apache.maven.plugins
maven-compiler-plugin

1.6
1.6



 

2)java spark代码

先贴上代码,再说明

package com.kangaroo.studio.algorithms.filter;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.hive.HiveContext;

import java.io.Serializable;
import java.util.Properties;


public class LoadDB implements Serializable {

private SparkConf sparkConf;
private JavaSparkContext javaSparkContext;
private HiveContext hiveContext;
private SQLContext sqlContext;

/*
* 初始化Load
* 创建sparkContext, sqlContext, hiveContext
*
*/
public LoadDB() {
initSparckContext();
initSQLContext();
initHiveContext();
}

/*
* 创建sparkContext
*
*/
private void initSparckContext() {
String warehouseLocation
= System.getProperty("user.dir");
sparkConf
= new SparkConf()
.setAppName(
"from-to-mysql")
.set(
"spark.sql.warehouse.dir", warehouseLocation)
.setMaster(
"yarn-client");
javaSparkContext
= new JavaSparkContext(sparkConf);
}

/*
* 创建hiveContext
* 用于读取Hive中的数据
*
*/
private void initHiveContext() {
hiveContext
= new HiveContext(javaSparkContext);
}

/*
* 创建sqlContext
* 用于读写MySQL中的数据
*
*/
private void initSQLContext() {
sqlContext
= new SQLContext(javaSparkContext);
}

/*
* 使用spark-sql从hive中读取数据, 然后写入mysql对应表.
*
*/
public void hive2db() {
String url
= "jdbc:mysql://10.93.84.53:3306/big_data?characterEncoding=UTF-8";
String table
= "accounts";
Properties props
= new Properties();
props.put(
"user", "root");
props.put(
"password", "1234");
String query
= "select * from gulfstream_test.accounts where year=2017 and mOnth=10 and day=23";
DataFrame rows
= hiveContext.sql(query).select("id", "order_id", "status", "count");;
rows.write().mode(SaveMode.Append).jdbc(url, table, props);
}

/*
* 使用spark-sql从db中读取数据, 处理后再回写到db
*
*/
public void db2db() {
String url
= "jdbc:mysql://10.93.84.53:3306/big_data?characterEncoding=UTF-8";
String fromTable
= "accounts";
String toTable
= "accountsPart";
Properties props
= new Properties();
props.put(
"user", "root");
props.put(
"password", "1234");
DataFrame rows
= sqlContext.read().jdbc(url, fromTable, props).where("count <1000");
rows.write().mode(SaveMode.Append).jdbc(url, toTable, props);
}


public static void main(String[] args) {
LoadDB loadDB
= new LoadDB();
System.out.println(
" ---------------------- start hive2db ------------------------");
loadDB.hive2db();
System.out.println(
" ---------------------- finish hive2db ------------------------");
System.out.println(
" ---------------------- start db2db ------------------------");
loadDB.db2db();
System.out.println(
" ---------------------- finish db2db ------------------------");
}
}

说明:

  • hive2db

核心动作是使用hiveContext.sql(query)执行了hiveSQL,过滤出Hive表中year=2017/mOnth=10/day=23分钟的数据,返回一个DataFrame对象。

DataFrame是spark-sql数据处理的核心。对DataFrame的操作推荐这样一篇博客。你可以去使用这些方法,实现复杂的逻辑。

对DataFrame对象,我们使用了select裁剪了其中4列数据(id, order_id, status, count)出来,不过不裁剪的话,会有7列(加上分区的year,month,day)。

然后将数据以SaveMode.Append的方式,写入了mysql中的accounts表。

SaveMode.Append方式,数据会追加,而不会覆盖。如果想覆盖,还有一个常用的SaveMode.Overwrite。推荐这样一篇博客。

最终accounts中的数据有1000000条,百万。

 

  • db2db

db2db从刚刚生成的MySQL表accounts中读取出数据,也是返回了一个dataframe对象,通过执行where过滤除了其中id<1000的数据,这里正好是1000条。

然后写入了accountsPart。最终accountsPart数据应该有1000条。

 

3)编译和执行

 编译完成后,生成jar包from-to-mysql-1.0-SNAPSHOT-jar-with-dependencies.jar 

使用默认参数提交到yarn队列。

spark-submit --queue=root.zhiliangbu_prod_datamonitor from-to-mysql-1.0-SNAPSHOT-jar-with-dependencies.jar 

 片刻之后,观察输出。已经全部finish了。

 

4)查看一下结果

我们到mysql中瞅一瞅。

 

accounts表

有没有注意到,其实不用建立mysql表!这个过程会自动给你创建,相当于if not exists。

细心的你可能已经注意到了,hive里的string类型,到了MySQL中变成了Text。有个兄弟说,如果你手动创建了表,并且字段设置为String会报错,我没有试,只是记录了一下。

CREATE TABLE `accounts` (
`id` text,
`order_id` text,
`status` bigint(
20) DEFAULT NULL,
`count` decimal(
16,9) DEFAULT NULL
) ENGINE
=InnoDB DEFAULT CHARSET=utf8

简单看一下里面有多少数据。1百万

MariaDB [big_data]> select count(1) from accounts ;    
+----------+
| count(1) |
+----------+
| 1000000 |
+----------+
1 row in set (0.32 sec)

 

acountsPart表

 CREATE TABLE `accountsPart` (
`
id` text,
`order_id` text,
`status` bigint(
20) DEFAULT NULL,
`count`
decimal(16,9) DEFAULT NULL
) ENGINE
=InnoDB DEFAULT CHARSET=utf8

查看有多少数据,1000条,果然是没有问题的

MariaDB [big_data]> select count(1) from accountsPart;
+----------+
| count(1) |
+----------+
| 1000 |
+----------+
1 row in set (0.00 sec)

 

到此为止。

 


推荐阅读
  • 在分析Android的Audio系统时,我们对mpAudioPolicy->get_input进行了详细探讨,发现其背后涉及的机制相当复杂。本文将详细介绍这一过程及其背后的实现细节。 ... [详细]
  • 本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。 ... [详细]
  • 属性类 `Properties` 是 `Hashtable` 类的子类,用于存储键值对形式的数据。该类在 Java 中广泛应用于配置文件的读取与写入,支持字符串类型的键和值。通过 `Properties` 类,开发者可以方便地进行配置信息的管理,确保应用程序的灵活性和可维护性。此外,`Properties` 类还提供了加载和保存属性文件的方法,使其在实际开发中具有较高的实用价值。 ... [详细]
  • 本文介绍了如何利用Shell脚本高效地部署MHA(MySQL High Availability)高可用集群。通过详细的脚本编写和配置示例,展示了自动化部署过程中的关键步骤和注意事项。该方法不仅简化了集群的部署流程,还提高了系统的稳定性和可用性。 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • 本文详细介绍了 Pentaho Kettle 中 RowMetaInterface.writeMeta 方法的使用,并提供了多个代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • 本文详细介绍了在 CentOS 7 系统中配置 fstab 文件以实现开机自动挂载 NFS 共享目录的方法,并解决了常见的配置失败问题。 ... [详细]
  • 零拷贝技术是提高I/O性能的重要手段,常用于Java NIO、Netty、Kafka等框架中。本文将详细解析零拷贝技术的原理及其应用。 ... [详细]
  • oracle c3p0 dword 60,web_day10 dbcp c3p0 dbutils
    createdatabasemydbcharactersetutf8;alertdatabasemydbcharactersetutf8;1.自定义连接池为了不去经常创建连接和释放 ... [详细]
  • [转]doc,ppt,xls文件格式转PDF格式http:blog.csdn.netlee353086articledetails7920355确实好用。需要注意的是#import ... [详细]
  • 字节流(InputStream和OutputStream),字节流读写文件,字节流的缓冲区,字节缓冲流
    字节流抽象类InputStream和OutputStream是字节流的顶级父类所有的字节输入流都继承自InputStream,所有的输出流都继承子OutputStreamInput ... [详细]
  • 在CentOS 7环境中安装配置Redis及使用Redis Desktop Manager连接时的注意事项与技巧
    在 CentOS 7 环境中安装和配置 Redis 时,需要注意一些关键步骤和最佳实践。本文详细介绍了从安装 Redis 到配置其基本参数的全过程,并提供了使用 Redis Desktop Manager 连接 Redis 服务器的技巧和注意事项。此外,还探讨了如何优化性能和确保数据安全,帮助用户在生产环境中高效地管理和使用 Redis。 ... [详细]
  • PTArchiver工作原理详解与应用分析
    PTArchiver工作原理及其应用分析本文详细解析了PTArchiver的工作机制,探讨了其在数据归档和管理中的应用。PTArchiver通过高效的压缩算法和灵活的存储策略,实现了对大规模数据的高效管理和长期保存。文章还介绍了其在企业级数据备份、历史数据迁移等场景中的实际应用案例,为用户提供了实用的操作建议和技术支持。 ... [详细]
  • 基于Net Core 3.0与Web API的前后端分离开发:Vue.js在前端的应用
    本文介绍了如何使用Net Core 3.0和Web API进行前后端分离开发,并重点探讨了Vue.js在前端的应用。后端采用MySQL数据库和EF Core框架进行数据操作,开发环境为Windows 10和Visual Studio 2019,MySQL服务器版本为8.0.16。文章详细描述了API项目的创建过程、启动步骤以及必要的插件安装,为开发者提供了一套完整的开发指南。 ... [详细]
author-avatar
yolo_bean
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有