需求
(一) 数据描述
1、数据参数
该数据每日进行采集汇总。数据范围涵盖全国主要省份(港澳台西藏海南暂无数
据)的 180+的大型农产品批发市场,380+的农产品品类(由于季节性和地域性
等特点,每日的数据中不一定会涵盖全部的农产品品类)。
2、数据类型
(二)功能需求
1、农产品市场个数统计
- 统计每个省份的农产品市场总数
- 统计没有农产品市场的省份有哪些
2、农产品种类统计
- 根据农产品类型数量,统计排名前 3 名的省份
- 根据农产品类型数量,统计每个省份排名前 3 名的农产品市场
3、价格区间统计,
- 计算山西省的每种农产品的价格波动趋势,即计算每天价格均值。
- 某种农产品的价格均值计算公式:
PAVG = (PM1+PM2+…+PMn-max§-min§)/(N-2)
其中,P 表示价格,Mn 表示 market,即农产品市场。PM1 表示 M1 农产品
市场的该产品价格,max§表示价格最大值,min§价格最小值。
数据展示
allprovince.txt
河北
山西
辽宁
吉林
黑龙江
江苏
浙江
安徽
福建
江西
山东
河南
...
product.txt
生菜 2.00 2018/1/1 山西汾阳市晋阳农副产品批发市场 山西 汾阳
芹菜 2.40 2018/1/1 山西汾阳市晋阳农副产品批发市场 山西 汾阳
菜花 3.80 2018/1/1 北京朝阳区大洋路综合市场 北京 朝阳
生姜 10.00 2018/1/1 北京朝阳区大洋路综合市场 北京 朝阳
山药 8.00 2018/1/1 北京朝阳区大洋路综合市场 北京 朝阳
芋头 5.50 2018/1/1 北京朝阳区大洋路综合市场 北京 朝阳
小葱 1.50 2018/1/1 北京朝阳区大洋路综合市场 北京 朝阳
...
用RDD算子解法:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.junit/*** Description :* CreateTime : 2019/10/916:41** @author TuYouXian* @since JDK1.8*/
class Test {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))val source: RDD[String] = sc.textFile("data/product.txt")/*** 统计每个省份农产品批次市场个数* 已有数据:省份 市场名称* 目标数据:省份 市场数量*/@junit.Testdef product1(): Unit ={//1、读取数据//2、去重、过滤、列裁剪val result: collection.Map[String, Long] = source.filter(_.split("\t").size >= 5).map(line =>{val arr: Array[String] = line.split("\t")val province = arr(4)val product = arr(3)province-> product}).distinct()//3、数据处理//.map(item => item._1 -> 1).reduceByKey(_+_).collect()//.groupByKey().map(item => item._1->item._2.size).countByKey()//4、结果展示println(result)}/*** 统计没有农产品的省份有哪些* 已知数据:有农产品省份* 目标数据:无农产品省份*/@junit.Testdef product2(): Unit ={val allprovinces = sc.textFile("data/allprovince.txt")val provinces = source.filter(_.split("\t").size >= 5).map(line => {val arr = line.split("\t")val province = arr(4)province}).distinct()val noprovinces: Array[String] = allprovinces.subtract(provinces).collect()println(noprovinces.toBuffer)}/*** 根据农产品的类型数量,统计排名前三的省份* 已有数据:农产品类型,省份* 中间数据:农产品类型数量,省份* 目标数据:排名前三省份*/@junit.Testdef product3(): Unit ={val result: Array[(String, Int)] = source.filter(_.split("\t").size >= 5).map(line => {val arr: Array[String] = line.split("\t")val province = arr(4)val productType = arr(0)province -> productType}).distinct().groupByKey().map(item => item._1 -> item._2.size).sortBy(_._2, false).take(3)println(result.toBuffer)}/*** 根据农产品类型数量,统计每个省份排名前三的批发市场* 已有数据:农产品类型,省份,批发市场* 中间数据:(省份,((省份,批发市场),农产品类型数量))* 目标数据:(省份,批发市场,农产品类型数量)前三数据* 也就是要根据省份分区,求每个分区的前三*/@junit.Testdef product4(): Unit ={source.filter(_.split("\t").size >= 5).map(line =>{val arr = line.split("\t")val province = arr(4)val product = arr(3)val productType = arr(0)((province,product),productType)}).distinct().map(item => item._1 -> 1).reduceByKey(_+_).groupBy(_._1._1)//(省份,[((省份,批发市场),农产品类型数量),((省份,批发市场),农产品类型数量)]).flatMap(item =>{//需要压扁操作val province = item._1//val arr = item._2.toArrayval arr = item._2.toBufferarr.sortBy(_._2).reverse.take(3).map(item => (province,item._1._2,item._2))}).foreach(println(_))}/*** 计算山西省的每种农产品的价格波动趋势,即计算每天价格均值* PAVG = (PM1+PM2+...+PMn-max(P)-min(P))/(N-2)* 其中,P 表示价格,Mn 表示 market,即农产品市场。PM1 表示 M1 农产品* 市场的该产品价格,max(P)表示价格最大值,min(P)价格最小值。* 已知数据:农产品类型,批发价格* 目标数据:农产品类型,价格均值* 注意有部分农产品的数量并不能满足上面公式可能会出NAN,无穷数等结果*/@junit.Testdef product5(): Unit ={//1、过滤 去重 列裁剪source.filter(item=> item.split("\t").size>=5 && item.split("\t")(4)=="山西").map(item=>{val arr = item.split("\t")val name = arr(0)val price = arr(1).toDouble//避免后续出现1/3 = 0的情况(name,price)})//2、分组//(面粉,CompactBuffer(3.44, 2.9, 3.44))//(大葱,CompactBuffer(2.8, 2.6, 2.4, 3.0, 3.1, 2.8, 3.0, 2.5)).groupByKey()//.foreach(println(_)).map(item=>{val data: Iterable[Double] = item._2val maxprice = data.maxval minprice = data.minval sumPrice = data.sumval size = data.sizeval avgPrice = (sumPrice-maxprice-minprice)/(size-2)(item._1,avgPrice)})//3、计算平均价格.foreach(println(_))//4、结果}
}
SparkSql解法: