热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

在Spark上基于Minhash计算jaccard相似度

问题引入在风控领域常会面临一种场景:随着安全策略的打击,部分已经显露的账号用户会被稽核、处置,要么被动地被封停,要么被坏人

问题引入

在风控领域常会面临一种场景:随着安全策略的打击,部分已经显露的账号/用户会被稽核、处置,要么被动地被封停,要么被坏人干脆舍弃掉。坏人会重新注册新的账号进行活跃。而这些新老账号之间很可能没有直接的交易关系,甚至连登陆设备也不同,就较难发现其关联性。但有一点是较难隐藏的:上下游的关系链。因此,可以尝试通过关系网络结构上的相似性来量化两个账号之间的关联度,从而对于风险用户关联分析起到一个补充作用。

可用下图来辅助说明,x的交易流入方集合为{a,b,c,d}, y的交易流入方集合为{b,c,d},一个很自然的想法就是用Jaccard相似度来计算两个集合之间的相似度,也即:


常用解法


  • 暴力计算

比较容易想到的就是该表与自己做JOIN,求出交集。然后再分别计算一个节点入度数,用 x的节点入度数 + y的节点入度 - 交集节点数 得到并集的大小,那么交集大小/并集大小就得到了结果。但是以蚂蚁的数据体量而言,动辄N亿的关系对,这个计算几乎是不可行的。


  • 借鉴倒排索引

在使用表的JOIN操作时,默认是不知道哪两个节点有交集的,所以会进行暴力的两两配对计算。这里可以借鉴自然语言处理中的倒排索引方法,将每个流入节点node看作一个词,得到一个索引表, 该表中的账号就有共同的流入节点,它们两两之间的交集统计值就可以+1,遍历所有的流入节点,就汇总出了两两节点之间的交集数量。


  • Minhash

前面的方法都是实打实地计算,但有时候一种“足够好”的近似求解结果也是可以接受的,尤其是工程上有较大的效率提升时。Minhash就常用于近似求解Jaccard相似度。现在Spark中也有现成的包可以用,使用成本就比较可控了。


实践代码

Spark官方文档中有一段样例代码可以参考:https://spark.apache.org/docs/3.0.0/ml-features.html#minhash-for-jaccard-distance,但DEMO距离落地的成品还是有开发成本的,需要我们把数据预处理成人家指定的格式,即是说,把节点集合的向量,变成0,1 值的向量。这里有点类似于文本处理中的bag of word方法,沿着这个思路去找到spark中的CountVectorizer类,但默认是统计的频数,通过指定.setBinary(true) 实现0-1值的转换。

基于阿里的ODPS平台,完整版本的代码如下:

import com.aliyun.odps.TableSchema
import com.aliyun.odps.data.Record
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel, MinHashLSH, MinHashLSHModel}
import org.apache.spark.ml.linalg.{ SparseVector}
import org.apache.spark.odps.OdpsOps
import org.apache.spark.rdd.RDDobject minhashJaccardCal {def main(args:Array[String]) = {val spark = SparkSession.builder().enableOdpsSupport().enableHiveSupport().appName("minhashJaccardCal").getOrCreate()//输入参数val inputProject = args(0)val inputTable = args(1)val outputProject = args(2)val outputTable = args(3)val usage = s"""Usage: """if(args.length <4){println("参数错误")sys.error(usage)sys.exit(-1)}try{val odpsOps = new OdpsOps(spark.sparkContext)val pair: RDD[(String, String)] = odpsOps.readTable(inputProject,inputTable, (r: Record, _: TableSchema) => (r.getString(0),r.getString(1)) ,100)//-----------------------------------------------------------------------------//计算流入节点的重合度,用MINHASH的方法来近似计算,总共分成两步//1. 数据预处理成bag of word形式的0-1向量,且用sparse向量来表示//2. 调用org.apache.spark.ml.feature.MinHashLSH 来近似计算jaccard距离//下面执行第1步val inputNodeVector: RDD[(String, List[String])] = pair.map(_.swap).combineByKey((v : String) => List(v),(c : List[String], v : String) => v :: c,(c1 : List[String], c2 : List[String]) => c1 ::: c2).repartition(100)val inputNodeVectorDF = spark.createDataFrame(inputNodeVector).toDF("node","neighbors")val cvModel: CountVectorizerModel = new CountVectorizer().setInputCol("neighbors").setOutputCol("features").setBinary(true).fit(inputNodeVectorDF)val inputNodeVectorDFSparse: DataFrame = cvModel.transform(inputNodeVectorDF).select("node","features")val inputNodeVectorDFSparseFilter = spark.createDataFrame(inputNodeVectorDFSparse.rdd.map(row => (row.getAs[String]("node") ,row.getAs[SparseVector]("features"))).map(x => (x._1,x._2,x._2.numNonzeros)).filter(x => x._3 >= 1).map(x => (x._1,x._2))).toDF("node","features")//下面执行第2步val mh = new MinHashLSH().setNumHashTables(100).setInputCol("features").setOutputCol("hashes")val model: MinHashLSHModel = mh.fit(inputNodeVectorDFSparseFilter)val inputNodeDistance: DataFrame = model.approxSimilarityJoin(inputNodeVectorDFSparseFilter, inputNodeVectorDFSparseFilter, 0.7, "JaccardDistance").select(col("datasetA.node").alias("node1"),col("datasetB.node").alias("node2"),col("JaccardDistance"))val inputNodeOverlapRatio = inputNodeDistance.rdd.map(x => {val node1 = x.getString(0)val node2 = x.getString(1)val overlapRatio = 1 - x.getDouble(2)if(node1 x._1._1 != x._1._2)//-----------------------------------------------------------------------------//计算流出节点的重合度, 思路与上相同val outputNodeVector: RDD[(String, List[String])] = pair.combineByKey((v : String) => List(v),(c : List[String], v : String) => v :: c,(c1 : List[String], c2 : List[String]) => c1 ::: c2)val outputNodeVectorDF = spark.createDataFrame(outputNodeVector).toDF("node","neighbors")val cvModelOutput: CountVectorizerModel = new CountVectorizer().setInputCol("neighbors").setOutputCol("features").setBinary(true).fit(outputNodeVectorDF)val outputNodeVectorDFSparse: DataFrame = cvModelOutput.transform(outputNodeVectorDF).select("node","features")val outputNodeVectorDFSparseFilter: DataFrame = spark.createDataFrame(outputNodeVectorDFSparse.rdd.map(row => (row.getAs[String]("node") ,row.getAs[SparseVector]("features"))).map(x => (x._1,x._2,x._2.numNonzeros)).filter(x => x._3 >= 1).map(x => (x._1,x._2))).toDF("node","features")//下面执行第2步val mh2 = new MinHashLSH().setNumHashTables(100).setInputCol("features").setOutputCol("hashes")val outputModel: MinHashLSHModel = mh2.fit(outputNodeVectorDFSparseFilter)val outputNodeOverlapRatio = outputModel.approxSimilarityJoin(outputNodeVectorDFSparseFilter, outputNodeVectorDFSparseFilter, 0.7, "JaccardDistance").select(col("datasetA.node").alias("node1"),col("datasetB.node").alias("node2"),col("JaccardDistance")).rdd.map(x => {val node1 = x.getString(0)val node2 = x.getString(1)val overlapRatio = 1 - x.getDouble(2)if(node1 x._1._1 != x._1._2)//-----------------------------------------------------------------------------//合并到一起val jaccardValuePair: RDD[(String, String, Double, Double)] = inputNodeOverlapRatio.fullOuterJoin(outputNodeOverlapRatio,100).map{case ((x,y),(inValue, outValue)) =>(x,y,inValue.getOrElse(0.0),outValue.getOrElse(0.0))}.filter(x => x._1 != x._2).distinct(100)// 写入结果表val saveTransfer = (v:Tuple4[String, String, Double, Double] , record:Record, schema: TableSchema) => {record.set("srcid", v._1)record.set("tarid", v._2)record.set("invalue", v._3)record.set("outvalue", v._4)}odpsOps.saveToTable(outputProject,outputTable,jaccardValuePair,saveTransfer,isOverWrite = true)}catch {case ex: Exception => {throw ex}} finally {spark.stop()}}
}

 


推荐阅读
  • 在多线程并发环境中,普通变量的操作往往是线程不安全的。本文通过一个简单的例子,展示了如何使用 AtomicInteger 类及其核心的 CAS 无锁算法来保证线程安全。 ... [详细]
  • 单元测试:使用mocha和should.js搭建nodejs的单元测试
    2019独角兽企业重金招聘Python工程师标准BDD测试利器:mochashould.js众所周知对于任何一个项目来说,做好单元测试都是必不可少 ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • 如何使用 `org.eclipse.rdf4j.query.impl.MapBindingSet.getValue()` 方法及其代码示例详解 ... [详细]
  • 深入解析CAS机制:全面替代传统锁的底层原理与应用
    本文深入探讨了CAS(Compare-and-Swap)机制,分析了其作为传统锁的替代方案在并发控制中的优势与原理。CAS通过原子操作确保数据的一致性,避免了传统锁带来的性能瓶颈和死锁问题。文章详细解析了CAS的工作机制,并结合实际应用场景,展示了其在高并发环境下的高效性和可靠性。 ... [详细]
  • Java学习第10天:深入理解Map接口及其应用 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 本文探讨了利用Java实现WebSocket实时消息推送技术的方法。与传统的轮询、长连接或短连接等方案相比,WebSocket提供了一种更为高效和低延迟的双向通信机制。通过建立持久连接,服务器能够主动向客户端推送数据,从而实现真正的实时消息传递。此外,本文还介绍了WebSocket在实际应用中的优势和应用场景,并提供了详细的实现步骤和技术细节。 ... [详细]
  • 本文作为“实现简易版Spring系列”的第五篇,继前文深入探讨了Spring框架的核心技术之一——控制反转(IoC)之后,将重点转向另一个关键技术——面向切面编程(AOP)。对于使用Spring框架进行开发的开发者来说,AOP是一个不可或缺的概念。了解AOP的背景及其基本原理,对于掌握这一技术至关重要。本文将通过具体示例,详细解析AOP的实现机制,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 成功实现Asp.Net MVC3网站与MongoDB数据库的高效集成
    我们成功地构建了一个基于Asp.NET MVC3框架的网站,并实现了与MongoDB数据库的高效集成。此次更新不仅完善了基本的创建和显示功能,还全面实现了数据的增删改查操作。在创建功能方面,我们修复了之前代码中的错误,确保每个属性都能正确生成。此外,我们还对数据模型进行了优化,以提高系统的性能和稳定性。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • VB.net 进程通信中FindWindow、FindWindowEX、SendMessage函数的理解
    目录一、代码背景二、主要工具三、函数解析1、FindWindow:2、FindWindowEx:3、SendMessage: ... [详细]
  • async/await 是现代 JavaScript 中非常强大的异步编程工具,可以极大地简化异步代码的编写。本文将详细介绍 async 和 await 的用法及其背后的原理。 ... [详细]
  • Java中不同类型的常量池(字符串常量池、Class常量池和运行时常量池)的对比与关联分析
    在研究Java虚拟机的过程中,笔者发现存在多种类型的常量池,包括字符串常量池、Class常量池和运行时常量池。通过查阅CSDN、博客园等相关资料,对这些常量池的特性、用途及其相互关系进行了详细探讨。本文将深入分析这三种常量池的差异与联系,帮助读者更好地理解Java虚拟机的内部机制。 ... [详细]
  • 在Spring框架中,基于Schema的异常通知与环绕通知的实现方法具有重要的实践价值。首先,对于异常通知,需要创建一个实现ThrowsAdvice接口的通知类。尽管ThrowsAdvice接口本身不包含任何方法,但开发者需自定义方法来处理异常情况。此外,环绕通知则通过实现MethodInterceptor接口来实现,允许在方法调用前后执行特定逻辑,从而增强功能或进行必要的控制。这两种通知机制的结合使用,能够有效提升应用程序的健壮性和灵活性。 ... [详细]
author-avatar
手机用户2502870457
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有