热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spark保存计算结果

4.数据读取与保存Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。文件格式分为:Text文件、Json文件、Csv文件、S
4.数据读取与保存

  Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

文件格式分为:Text 文件、Json 文件、Csv 文件、Sequence 文件以及 Object 文件;

文件系统分为:本地文件系统、HDFS、HBASE 以及数据库。

 

 

 

1)数据读取:textFile(String) 

scala> val hdfsFile = sc.textFile("hdfs://hadoop102:9000/fruit.txt")
hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at :24

 

 

 

2)数据保存: saveAsTextFile(String)

scala> hdfsFile.saveAsTextFile("/fruitOut")

 

 

 

 

4.1.2 Json 文件 

  如果 JSON 文件中每一行就是一个 JSON 记录,那么可以通过将 JSON 文件当做文本

文件来读取,然后利用相关的 JSON 库对每一条数据进行 JSON 解析。

 

  注意:使用 RDD 读取 JSON 文件处理很复杂,同时 SparkSQL 集成了很好的处理

JSON 文件的方式,所以应用中多是采用 SparkSQL 处理 JSON 文件。 

 

(1)导入解析 json 所需的包 

scala> import scala.util.parsing.json.JSON

 

(2)上传 json 文件到 HDFS

[lxl@hadoop102 spark]$ hadoop fs -put ./examples/src/main/resources/people.json /

 

(3)读取文件

scala> val json = sc.textFile("/people.json")
json: org.apache.spark.rdd.RDD[String] = /people.json MapPartitionsRDD[8] at textFile at :24

 

(4)解析 json 数据

scala> val result = json.map(JSON.parseFull)
result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[10] at map at :27

 

(5)打印

scala> result.collect
res11: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))

 

 

 

4.1.3 Sequence 文件 

  SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面

文件(Flat File)。Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以

调用 sequenceFile[ keyClass, valueClass](path)。

注意:SequenceFile 文件只针对 PairRDD 

 

(1)创建一个 RDD

scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[13] at parallelize at :24

 

(2)将 RDD 保存为 Sequence 文件

scala> rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")

 

(3)查看该文件 

[lxl@hadoop102 seqFile]$ pwd
/opt/module/spark/seqFile[lxl@hadoop102 seqFile]$ ll
总用量 8
-rw-r--r-- 1 atguigu atguigu 108 10 月 9 10:29 part-00000
-rw-r--r-- 1 atguigu atguigu 124 10 月 9 10:29 part-00001
-rw-r--r-- 1 atguigu atguigu 0 10 月 9 10:29 _SUCCESS[lxl@hadoop102 seqFile]$ cat part-00000
SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritableط

 

(4)读取 Sequence 文件 

scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile")
seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at sequenceFile at :24

 

(5)打印读取后的 Sequence 文件

scala> seq.collect
res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))

 

 

 

4.1.4 对象文件 (objectFile)

  对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。可以通过

objectFile[k,v](path) 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调

用 saveAsObjectFile() 实现对对象文件的输出。因为是序列化所以要指定类型。 

 

(1)创建一个 RDD

scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at :24

 

(2)将 RDD 保存为 Object 文件 

scala> rdd.saveAsObjectFile("file:///opt/module/spark/objectFile")

 

(3)查看该文件

[lxl@hadoop102 object]$ pwd
/opt/module/spark/object

  [lxl@hadoop102 object]$ ll
  总用量 16
  -rw-r--r-- 1 lxl lxl 138 7月 8 03:12 part-00000
  -rw-r--r-- 1 lxl lxl 138 7月 8 03:12 part-00001
  -rw-r--r-- 1 lxl lxl 138 7月 8 03:12 part-00002
  -rw-r--r-- 1 lxl lxl 142 7月 8 03:12 part-00003
  -rw-r--r-- 1 lxl lxl 0 7月 8 03:12 _SUCCESS

  [lxl@hadoop102 object]$ cat part-00000
  SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritabley.)a¬촲[IMº`&v겥xp

 

(4)读取 Object 文件 

scala> val objFile = sc.objectFile[(Int)]("file:///opt/module/spark/objectFile")
objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at objectFile at :24

 

(5)打印读取后的 Sequence 文件 

scala> objFile.collect
res19: Array[Int] = Array(1, 2, 3, 4)

 

 

 

4.2 文件系统类数据读取与保存 


4.2.1 HDFS 

  Spark 的整个生态系统与 Hadoop 是完全兼容的,所以对于 Hadoop 所支持的文件类型

或者数据库类型,Spark 也同样支持.另外,由于 Hadoop 的 API 有新旧两个版本,所以 Spark 为

了能够兼容 Hadoop 所有的版本,也提供了两套创建操作接口.对于外部存储创建操作而

言,hadoopRDD 和 newHadoopRDD 是最为抽象的两个函数接口,主要包含以下四个参数.

 

  1)输入格式(InputFormat): 制定数据输入的类型,如 TextInputFormat 等,新旧两个版本

所引用的版本分别是 org.apache.hadoop.mapred.InputFormat 和

org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)

  2)键类型: 指定[K,V]键值对中 K 的类型

  3)值类型: 指定[K,V]键值对中 V 的类型

  4)分区值: 指定由外部存储生成的 RDD 的 partition 数量的最小值,如果没有指定,系

统会使用默认值 defaultMinSplits

 

注意:其他创建操作的 API 接口都是为了方便最终的 Spark 程序开发者而设置的,是这两个

接口的高效实现版本.例如,对于 textFile 而言,只有 path 这个指定文件路径的参数,其他参数

在系统内部指定了默认值。

 

  1.在 Hadoop 中以压缩形式存储的数据,不需要指定解压方式就能够进行读取,因为

Hadoop 本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压.

  2.如果用 Spark 从 Hadoop 中读取某种类型的数据不知道怎么读取的时候,上网查找一个

使用 map-reduce 的时候是怎么读取这种这种数据的,然后再将对应的读取方式改写成上面的

hadoopRDD 和 newAPIHadoopRDD 两个类就行了 

 

 

 

 

4.2.2 MySQL 数据库连接 

支持通过 Java JDBC 访问关系型数据库。需要通过 JdbcRDD 进行,示例如下:

 

(1)添加依赖 

mysqlmysql-connector-java5.1.27

 

(2)Mysql 读取: 

package com.lxlimport java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}object MysqlRDD {def main(args: Array[String]): Unit &#61; {//1.创建 spark 配置信息val sparkConf: SparkConf &#61; newSparkConf().setMaster("local[*]").setAppName("JdbcRDD")//2.创建 SparkContextval sc &#61; new SparkContext(sparkConf)//3.定义连接 mysql 的参数val driver &#61; "com.mysql.jdbc.Driver"val url &#61; "jdbc:mysql://hadoop102:3306/rdd"val userName &#61; "root"val passWd &#61; "000000"//创建 JdbcRDDval rdd &#61; new JdbcRDD(sc, () &#61;> {Class.forName(driver)DriverManager.getConnection(url, userName, passWd)},"select * from &#96;rddtable&#96; where &#96;id&#96; >&#61; ? and id <&#61; ?;",1,10,1,r &#61;> (r.getInt(1), r.getString(2)))//打印最后结果println(rdd.count())rdd.foreach(println)sc.stop()}
}

 

Mysql 写入&#xff1a; 

def main(args: Array[String]) {val sparkConf &#61; new SparkConf().setMaster("local[2]").setAppName("HBaseApp")val sc &#61; new SparkContext(sparkConf)val data &#61; sc.parallelize(List("Female", "Male","Female"))data.foreachPartition(insertData)
}def insertData(iterator: Iterator[String]): Unit &#61; {Class.forName ("com.mysql.jdbc.Driver").newInstance()val conn &#61; java.sql.DriverManager.getConnection("jdbc:mysql://master01:3306/rdd", "root","hive")iterator.foreach(data &#61;> {val ps &#61; conn.prepareStatement("insert into rddtable(name) values (?)")ps.setString(1, data)ps.executeUpdate()})
}

 

 

spark-shell 中使用 JDBC 连接 Mysql&#xff1a;

 

[lxl&#64;hadoop102 spark]$ cp /opt/module/hive/lib/mysql-connector-java-5.1.27-bin.jar ./jars/

 

 

 

 

scala> val rdd &#61; new org.apache.spark.rdd.JdbcRDD(sc, () &#61;> {| Class.forName("com.mysql.jdbc.Driver")| java.sql.DriverManager.getConnection("jdbc:mysql://hadoop102:3306/rdd", "root", "000000") | },| "select * from &#96;rddtable&#96; where id >&#61; ? and id <&#61; ?;",| 1,| 10,| 1,| r &#61;> (r.getInt(1), r.getString(2))| )
rdd: org.apache.spark.rdd.JdbcRDD[(Int, String)] &#61; JdbcRDD[1] at JdbcRDD at :24scala> println(rdd.count())
3scala> rdd.foreach(println)
(1,zhangsan)
(2,lisi)
(3,wangwu)

 

 

 

 

 

4.2.3 HBase 数据库 

  由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的实现&#xff0c;Spark 可以通过

Hadoop 输入格式访问 HBase。这个输入格式会返回键值对数据&#xff0c;其中键的类型为 org.

apache.hadoop.hbase.io.ImmutableBytesWritable&#xff0c;而值的类型为 org.apache.hadoop.hbase.client.

Result。 

 

&#xff08;1&#xff09;添加依赖 

org.apache.hbasehbase-server1.3.1
org.apache.hbasehbase-client1.3.1

 

&#xff08;2&#xff09;从 HBase 读取数据 

package com.lxlimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.util.Bytesobject HBaseSpark {def main(args: Array[String]): Unit &#61; {//创建 spark 配置信息val sparkConf: SparkConf &#61; new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")//创建 SparkContextval sc &#61; new SparkContext(sparkConf)//构建 HBase 配置信息val conf: Configuration &#61; HBaseConfiguration.create()conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104")conf.set(TableInputFormat.INPUT_TABLE, "rddtable")//从 HBase 读取数据形成 RDDval hbaseRDD: RDD[(ImmutableBytesWritable, Result)] &#61; sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])val count: Long &#61; hbaseRDD.count()println(count)//对 hbaseRDD 进行处理hbaseRDD.foreach {case (_, result) &#61;>val key: String &#61; Bytes.toString(result.getRow)val name: String &#61; Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("name")))val color: String &#61; Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("color")))println("RowKey:" &#43; key &#43; ",Name:" &#43; name &#43; ",Color:" &#43; color)}//关闭连接sc.stop()}
}

 

3&#xff09;往 HBase 写入 

def main(args: Array[String]) {//获取 Spark 配置信息并创建与 spark 的连接val sparkConf &#61; new SparkConf().setMaster("local[*]").setAppName("HBaseApp")val sc &#61; new SparkContext(sparkConf)//创建 HBaseConfval conf &#61; HBaseConfiguration.create()val jobConf &#61; new JobConf(conf)jobConf.setOutputFormat(classOf[TableOutputFormat[ImmutableBytesWritable]])jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")//构建 Hbase 表描述器val fruitTable &#61; TableName.valueOf("fruit_spark")val tableDescr &#61; new HTableDescriptor(fruitTable)tableDescr.addFamily(new HColumnDescriptor("info".getBytes))//创建 Hbase 表val admin &#61; new HBaseAdmin(conf)if (admin.tableExists(fruitTable)) {admin.disableTable(fruitTable)admin.deleteTable(fruitTable)}admin.createTable(tableDescr)//定义往 Hbase 插入数据的方法def convert(triple: (Int, String, Int)) &#61; {val put &#61; new Put(Bytes.toBytes(triple._1))put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))(new ImmutableBytesWritable, put)}//创建一个 RDDval initialRDD &#61; sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))//将 RDD 内容写到 HBaseval localData &#61; initialRDD.map(convert)localData.saveAsHadoopDataset(jobConf)
}


推荐阅读
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • 本文介绍了在Win10上安装WinPythonHadoop的详细步骤,包括安装Python环境、安装JDK8、安装pyspark、安装Hadoop和Spark、设置环境变量、下载winutils.exe等。同时提醒注意Hadoop版本与pyspark版本的一致性,并建议重启电脑以确保安装成功。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • Java如何导入和导出Excel文件的方法和步骤详解
    本文详细介绍了在SpringBoot中使用Java导入和导出Excel文件的方法和步骤,包括添加操作Excel的依赖、自定义注解等。文章还提供了示例代码,并将代码上传至GitHub供访问。 ... [详细]
  • pythonMatplotlib(二)
    Matplotlib+pandas作图一、对csv文件进行提取ruixi.csv对上述表格进行提取并做图画出图像二、对.xlsx进行提取:rui ... [详细]
  • 动量|收益率_基于MT策略的实战分析
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了基于MT策略的实战分析相关的知识,希望对你有一定的参考价值。基于MT策略的实战分析 ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • 后台获取视图对应的字符串
    1.帮助类后台获取视图对应的字符串publicclassViewHelper{将View输出为字符串(注:不会执行对应的ac ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
author-avatar
丢失的面包树
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有