热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

记一次spark数据倾斜的优化

记一次解决spark处理性

一、问题的背景

    最近在flink上开发了用户和物品的实时统计特征,线上推荐服务请求的时候通过查询redis进行数据交互,每次推荐结果返回的最终会落地到HDFS文件系统中,供离线训练模型使用。

    在离线训练的时候由于各种各样的原因(无能狂怒),无法在采样的过程中将样本和realtimeFeature直接抽取出来(待优化),实际采用的是采样样本和特征整合分离的两阶段方案。

    对于离线特征整合 join 得到, 再根据item作为key, join  最终得到, 由于用户特征和物品特征都是天级离线特征这样做虽然不够牛逼(正确做法应该还是通过返回结果附带特征的方案)但是也不会出现大的纰漏(小纰漏还是不少,有优化空间)。

    但是到了实时特征的时候会出现问题,不论是用户实时特征还是物品实时特征都不断在变化,不是一个feature, 对于样本需要找到user在t1时刻的userFeature,item在t1时刻的itemFeature。


二、不太合理但是勉强能用的方案

    举用户实时特征为例,样本关联实时特征。

    我们落地了如果直接拿user+time作为key进行关联岂不是美滋滋,还是由于上诉吐槽的各种各样的原因,user+sampleTime未必能完美关联上user+featureTime,需要查找样本sampleTime最近的用户特征featureTime。

    被拆解成为两份数据:

    1、根据user group by 的;

   2、以user+featureTime为key的特征数据;

    整体设计分成了两个阶段:1、查找最近时间,2、关联具体特征数据;


三、时间查找优化

  首先我们根据uid关联, 遍历featureTimeList 找到和sampleTime距离最近的featureTime。

    最开始版本非常low:

val featureTime = timeList
.map { t1 => (Math.abs(t1 - sampleTime), t1) }
.minBy(v => v._1)._2

    因为热门的用户和物品落地的实时特征特别多,导致timeList非常大,timeList理论上最大值是24*3600=86400, 在spark上处理的时候会发现卡在99/100的过程中(假设有100个partition)。

    优化的做法是将timeList进行排序,然后采用二分查找:

val featureTime = biSearch(featureTimeList, sampleTime)
def biSearch(data:Array[Long], input:Long):LOng= {
var start = 0
var end = data.length - 1
var retIndex = 0
if(data(start) <= input ){
data(start)
}else if(data(end) >= input){
data(end)
}else{
while(start <end){
val index = (start + end)/2
if(data(index) > input && data(index+1) > input){
start = index
}else if(data(index) data(index + 1)
end = index
}else if(data(index) > input && data(index + 1)
val c = data(index) + data(index + 1) -2*input
if(c > 0){
retIndex = index + 1
}else{
retIndex = index
}
start = end
}else if(data(index) == input){
retIndex = index
start = end
}else if(data(index + 1) == input){
retIndex = index + 1
start = end
}
}
data(retIndex)
}
}

    毕竟不是做ACM懒得考虑featureTimeList边界条件,在使用的时候只对size > 10的列表使用二分查找,其他的就直接遍历。


四、关联优化

    经过上述的二分查找后,查找效率提升了不少,接下来我们愉快的将进行关联希望得到 , 在实际使用的过程中又遇到了99/100的阻塞问题。

    经过排查发现并不是所有的用户或者物品都有实时特征,在时间查找过程中featureTimeList为空(其实连对应的user key也都没有)当时的做法是给这部分user+featureTime赋值为空,查看日志的时候这样做会导致大量的数据倾斜。

    无论是""还是user+featureTime的作用就只是关联,对后续的数据处理没有什么影响,因此就考虑将上一阶段关联不上被设置为""的key打散到100个分区里规避数据倾斜的问题:

def searchTimeKey_user(keyArrayJson: RDD[(String, (Array[String], JSONObject))],
onlineTime: RDD[(String, Array[Long])]): RDD[(String, (Array[String], JSONObject))] = {
keyArrayJson.leftOuterJoin(onlineTime, 100)
.map { case (id, ((arr, itemFeature), timeList)) =>
val t0 = TimeUtil.getSecondTimeStamp(arr(3))
var key = String.valueOf(Random.nextInt(100)) // 当关联不上的时候key反正没用,打散了防止数据倾斜
if (timeList.isDefined) {
if(timeList.get.length > 10){
val t1 = biSearch(timeList.get, t0)
key = id + String.valueOf(t1)
}else{
val t1 = timeList.get.map { t1 => (Math.abs(t1 - t0), t1) }.minBy(v => v._1)._2
key = id + String.valueOf(t1)
}
}
(key, (arr, itemFeature))
}
}


五、小结

    以上关于样本和特征这样关联是否合理和进一步改进优化我们不做讨论,本着先用为敬的原则后续再去优化。

    由于太久没有遇到spark执行性能的问题并且文章也好久没有更新了,这次有点小激动就随手记录一笔。

    spark在执行过程中遇到的性能问题大概率都是shuffle后的数据倾斜问题,解决方案也无非就是小表broadcast, 大表join小表,打散join key等等方法,美团技术博客在这方面已经讲得很透彻了,温故而知新吧,毕竟程序员这样的手艺人长时间没有操作容易忘记和手生。

https://tech.meituan.com/2016/04/29/spark-tuning-basic.html

Spark性能优化指南——基础篇


https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

Spark性能优化指南——高级篇




推荐阅读
author-avatar
UTOB
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有