作者:乐橙味_367 | 来源:互联网 | 2023-09-10 15:51
packagecom.bigdataimportorg.apache.spark.rdd.RDDimportorg.apache.spark.{SparkConte
package com.bigdata
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
/**
*
*/
object BlackWhite {
def main(args: Array[String]) {
createContext()(process)
}
def createContext()(func:SparkCOntext=>Unit){
val cOnf=new SparkConf().setAppName("BlackWhite").setMaster("local[2]")
val sc=new SparkContext(conf)
try{
func(sc)
}finally {
sc.stop()
}
}
//处理数据
def process(sc:SparkContext): Unit ={
val aRdd=sc.textFile("/spark/a.txt")
val bRdd=sc.textFile("/spark/b.txt")//存放用户黑名单
val a=aRdd.map(_.split(" ")).map(arr=>(arr(0),arr))
val b=bRdd.map((_,true))
val aJoinb: RDD[(String, (Array[String], Option[Boolean]))] =a.leftOuterJoin(b)
val result: RDD[(String, String)] =aJoinb.filter({
case word:(String, (Array[String], Option[Boolean]))=>word._2._2.isEmpty
}).map(abc=>(abc._2._1(0),abc._2._1(1)))
println(result.collect().mkString(" "))
}
}