作者:血流的风霜_565 | 来源:互联网 | 2023-07-22 19:03
1.工程maven依赖包
1
2
3 2.3.1
4
5 5.5.2
6 1.2.28
7 6.3.2
8 5.5.2
9
10
11
12
13 org.apache.spark
14 spark-core_2.11
15 ${spark_version}
16
17
18
19 org.apache.spark
20 spark-sql_2.11
21 ${spark_version}
22
23
24
25 org.apache.spark
26 spark-yarn_2.11
27 ${spark_version}
28
29
30 org.elasticsearch
31 elasticsearch-spark-20_2.11
32 ${elasticsearch-spark.version}
33
34
35 mysql
36 mysql-connector-java
37 5.1.46
38
39
2.spark加载数据库中数据
1 public class GoodsFromMySQL {
2
3 /**
4 * 加载数据库数据
5 *
6 * @param sc spark context
7 * @param sparkSession spark session
8 */
9 public static void loadGoodsInfo(SparkContext sc, SparkSession sparkSession) {
10 String url = "jdbc:mysql://x.x.x.x:3306/db-test";
11
12 String sql = "(SELECT item_name as itemName, goods_category as goodsCategory FROM goods where dict_type='100203' and item_name " +
13 "is not null) as my-goods";
14
15 SQLContext sqlCOntext= SQLContext.getOrCreate(sc);
16 DataFrameReader reader = sqlContext.read().format("jdbc").
17 option("url", url).option("dbtable", sql).
18 option("driver", "com.mysql.jdbc.Driver").
19 option("user", "root").
20 option("password", "xxxxx");
21
22
23 Dataset goodsDataSet = reader.load();
24
25 // Looks the schema of this DataFrame.
26 goodsDataSet.printSchema();
27
28 goodsDataSet.write().mode(SaveMode.Overwrite).json("/data/app/source_new.json");
29 }
30
31
32 public static void main(String[] args) {
33 SparkConf cOnf= new SparkConf().setAppName("my-app");
34 SparkContext sc = new SparkContext(conf);
35
36 SparkSession sparkSession = new SparkSession(sc);
37
38 loadGoodsInfo(sc, sparkSession);
39 }
40 }3.spark支持加载多种数据库,仅需要用户依赖不同的数据库驱动包,并且代码进行微调即可
根据以上java代码,仅需调整18行,更改驱动加载类即可。