作者:小小小菜鸡 | 来源:互联网 | 2023-08-31 09:09
spark将数据写入redis时调用以下代码会报org.apache.spark.SparkException:Tasknotserializableimportcom.redis
spark将数据写入redis时调用以下代码会报 org.apache.spark.SparkException: Task not serializable
import com.redis.RedisClient
val r = new RedisClient("192.168.1.101", 6379)
val perhit = rdd.map(x => {
val arr = x.split(" ")
val k = arr(0).toInt
val v = arr(1).toInt
r.rpush(k, v)
(k, v)
})
原因是:在spark,rdd的方法里比如这里的map,方法里的数据会被序列化,并且分发到executors 去执行。这就需要rdd方法里的所有元素是可被序列化的这里的redis连接是不可被序列化的,所以会报Task not serializable异常
解决这个问题的方法是在executors中创建连接对象,这里介绍两种方法
1)rdd.mapPartitions 这个方法允许一次处理整个partitons的数据,在此方法中创建连接:
val rdd = rdd.mapPartitions{partition =>
val r = new RedisClient("192.168.1.101", 6379)
val res = partition.map{ x =>
...
val refStr = r.rpush(...)
}
r.close
res
}
2)用可序列化的单例模式来管理连接,让连接用lazy的方式创建
object RedisConnection extends Serializable {
lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}
val rdd = rdd.map{x =>
... ...
val refStr = RedisConnection.conn.rpush(...)
}
这里主要是给出在处理rdd数据时,获得redis连接的方法,同样的,操作其他数据库道理是一样的,这里是以redis为例