一、连接SQL
方法一、
packagecom.njbdqn.linkSqlimportjava.util.Propertiesimportorg.apache.spark.sql.SparkSessionimportorg.apache.spark.sql._
object LinkSql {
def main(args: Array[String]): Unit={
val spark= SparkSession.builder().appName("apptest").master("local[2]").getOrCreate()//1.properties
val prop = newProperties()
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","root")//2.jdbcDF show
val jdbcDF = spark.read.jdbc("jdbc:mysql://192.168.56.111:3306/test","studentInfo",prop)
jdbcDF.show(false)//3.添加一行
importspark.implicits._
val df= spark.createDataFrame(spark.sparkContext.parallelize(Seq((90, "抖抖抖", "男", 23, "sdf", "sdfg@dfg"),(8, "抖33", "男", 23, "s444f", "sdfg@dfg"))))
.toDF("sid","sname","sgender","sage","saddress","semail")//df.show(false)
df.write.mode("append").jdbc("jdbc:mysql://192.168.56.111:3306/test","studentInfo",prop)
}
}
方法二、
packagecom.njbdqnimportorg.apache.spark.sql.{DataFrame, SparkSession}
object KMeansTest {
def readMySQL(spark:SparkSession):DataFrame={
val map:Map[String,String]=Map[String,String](
elems="url"->"jdbc:mysql://192.168.56.111:3306/myshops","driver" -> "com.mysql.jdbc.Driver","user" ->"root","password"->"root","dbtable"->"customs")
spark.read.format("jdbc").options(map).load()
}
def main(args: Array[String]): Unit={
val spark=SparkSession.builder().appName("db").master("local[*]").getOrCreate()
readMySQL(spark).select("cust_id","company","province_id","city_id","district_id","membership_level","create_at","last_login_time","idno","biz_point","sex","marital_status","education_id","login_count","vocation","post")
.show(20)
spark.stop()
}
}
方法三、读取Resource上写的.properties配置:
二、连接HIVE
(一)8 9月写的,没有理解,写的不好
1.添加resources
![194923d71277eaeb4d62b0ab05c1b84d.png](https://img.php1.cn/3cd4a/189d8/978/7dbdf0f38ad53545.jpeg)
2.代码
packagecom.njbdqn.linkSqlimportorg.apache.spark.sql.SparkSession
object LinkHive {
def main(args: Array[String]): Unit={
val spark= SparkSession.builder().appName("apptest").master("local[2]")
.enableHiveSupport()
.getOrCreate()
spark//.sql("show databases")
.sql("select * from storetest.testhive")
.show(false)
}
}
注意!如果XML配置中配置的是集群, val df = spark.read.format("csv").load("file:///D:/idea/ideaProjects/spark_projects/myspark8/src/main/scala/com/njbdqn/DSDF/orders.csv") 就失败了,因为
>>> spark可以读取本地数据文件,但是需要在所有的节点都有这个数据文件(亲测,在有三个节点的集群中,只在master中有这个数据文件时执行textFile方法一直报找不到文件,
在另外两个work中复制这个文件之后,就可以读取文件了)
>>> 解决:删除配置(本地)/上传到hdfs(集群)
(二)12月25日写的
pom文件:
org.apache.spark
spark-hive_2.11
2.3.4
代码中增加配置:hive.metastore.uris
开启metastore元数据共享,
importorg.apache.spark.sql.SparkSession
object EventTrans {
def main(args: Array[String]): Unit={
val spark= SparkSession.builder().master("local[*]")
.config("hive.metastore.uris","thrift://192.168.56.115:9083") # 配置metastore server的访问地址,该server必须开启服务
.appName("test")
.enableHiveSupport().getOrCreate()
spark.sql("select * from dm_events.dm_final limit 3")
.show(false)
spark.close()
}
}
1)192.168.56.115 需要开启metastore服务
hive --service metastore
如果不启动服务,在启动Spark thriftServer服务的时候会报如下错误:
org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
2)192.168.56.115 需要配置直连mysql
验证:
![5af84648f888ed9860ecc00b772e6071.png](https://img.php1.cn/3cd4a/1eebe/cd5/1e3db12dd78db092.webp)
三、操作HDFS 之 删除
val spark = SparkSession.builder().master("local[*]").appName("app").getOrCreate();/**
* 删除checkpoint留下的过程数据*/val path= new Path(HDFSConnection.paramMap("hadoop_url")+"/checkpoint"); //声明要操作(删除)的hdfs 文件路径
val hadoopConf =spark.sparkContext.hadoopConfiguration
val hdfs= org.apache.hadoop.fs.FileSystem.get(new URI(HDFSConnection.paramMap("hadoop_url")+"/checkpoint"),hadoopConf)if(hdfs.exists(path)) {//需要递归删除设置true,不需要则设置false
hdfs.delete(path, true) //这里因为是过程数据,可以递归删除
}
出现的问题:
Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs://h1:9000/out, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:381)
at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:55)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:393)
at org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileSystem.java:452)
at mapreduce.WordCountApp.main(WordCountApp.java:36)
解决方法:
val hdfs = org.apache.hadoop.fs.FileSystem.get(new URI(HDFSConnection.paramMap("hadoop_url")+"/checkpoint"),hadoopConf)