热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SparkSQL用UDF实现按列特征重分区

SparkSQL用UDF实现按列特征重分区浪尖浪尖聊大数据欢迎关注,浪尖公众号,bigdatatip,建议置顶。这两天,球友又问了我一个比较有意思的问题:解决问题之前,要先了解一下


Spark SQL用UDF实现按列特征重分区


浪尖 浪尖聊大数据


欢迎关注,浪尖公众号,bigdatatip,建议置顶。


这两天,球友又问了我一个比较有意思的问题:



解决问题之前,要先了解一下Spark 原理,要想进行相同数据归类到相同分区,肯定要有产生shuffle步骤。



比如,F到G这个shuffle过程,那么如何决定数据到哪个分区去的呢?这就有一个分区器的概念,默认是hash分区器。


假如,我们能在分区这个地方着手的话肯定能实现我们的目标。


那么,在没有看Spark Dataset的接口之前,浪尖也不知道Spark Dataset有没有给我门提供这种类型的API,抱着试一试的心态,可以去Dataset类看一下,这个时候会发现有一个函数叫做repartition。


/**
* Returns a new Dataset partitioned by the given partitioning expressions, using
* `spark.sql.shuffle.partitions` as number of partitions.
* The resulting Dataset is hash partitioned.
*
* This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
*
* @group typedrel
* @since 2.0.0
*/
@scala.annotation.varargs
def repartition(partitionExprs: Column*): Dataset[T] = {
repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*)
}

可以传入列表达式来进行重新分区,产生的新的Dataset的分区数是由参数spark.sql.shuffle.partitions决定,那么是不是可以满足我们的需求呢?


明显,直接用是不行的,可以间接使用UDF来实现该功能。


方式一-简单重分区


首先,实现一个UDF截取列值共同前缀,当然根据业务需求来写该udf


val substring = udf{(str: String) => {
str.substring(0,str.length-1)
}}

注册UDF


spark.udf.register("substring",substring)

创建Dataset


val sales = spark.createDataFrame(Seq(
("Warsaw1", 2016, 100),
("Warsaw2", 2017, 200),
("Warsaw3", 2016, 100),
("Warsaw4", 2017, 200),
("Beijing1", 2017, 200),
("Beijing2", 2017, 200),
("Warsaw4", 2017, 200),
("Boston1", 2015, 50),
("Boston2", 2016, 150)
)).toDF("city", "year", "amount")

执行充分去操作


val res = sales.repartition(substring(col("city")))

打印分区ID及对应的输出结果


res.foreachPartition(partition=>{
println("---------------------> Partition start ")
println("partitionID is "+TaskContext.getPartitionId())
partition.foreach(println)
println("=====================> Partition stop ")
})

浪尖这里spark.sql.shuffle.partitions设置的数值为10.


输出结果截图如下:




方式二-SQL实现


对于Dataset的repartition产生的shuffle是不需要进行聚合就可以产生shuffle使得按照字段值进行归类到某些分区。


SQL的实现要实现重分区要使用group by,然后udf跟上面一样,需要进行聚合操作。


完整代码如下:


val sales = spark.createDataFrame(Seq(
("Warsaw1", 2016, 100),
("Warsaw2", 2017, 200),
("Warsaw3", 2016, 100),
("Warsaw4", 2017, 200),
("Beijing1", 2017, 200),
("Beijing2", 2017, 200),
("Warsaw4", 2017, 200),
("Boston1", 2015, 50),
("Boston2", 2016, 150)
)).toDF("city", "year", "amount")
sales.registerTempTable("temp");
val substring = udf{(str: String) => {
str.substring(0,str.length-1)
}}
spark.udf.register("substring",substring)
val res = spark.sql("select sum(amount) from temp group by substring(city)")
//
res.foreachPartition(partition=>{
println("---------------------> Partition start ")
println("partitionID is "+TaskContext.getPartitionId())
partition.foreach(println)
println("=====================> Partition stop ")
})

输出结果如下:




由上面的结果也可以看到task执行结束时间是无序的。


浪尖在这里主要是讲了Spark SQL 如何实现按照自己的需求对某列重分区。


那么,浪尖在这里就顺带问一下,如何用Spark Core实现该功能呢?




推荐阅读
  • Struts2+Sring+Hibernate简单配置
    2019独角兽企业重金招聘Python工程师标准Struts2SpringHibernate搭建全解!Struts2SpringHibernate是J2EE的最 ... [详细]
  • 本文介绍了在Win10上安装WinPythonHadoop的详细步骤,包括安装Python环境、安装JDK8、安装pyspark、安装Hadoop和Spark、设置环境变量、下载winutils.exe等。同时提醒注意Hadoop版本与pyspark版本的一致性,并建议重启电脑以确保安装成功。 ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • 标题: ... [详细]
  • JSP内置对象之application的作用范围和获取方式
    本文介绍了JSP内置对象之application的作用时间范围、可以在不同浏览器获取的特点,以及获取application对象的方法。通过示例代码展示了在JSP中设置和在servlet中获取application对象的步骤。对于学习JSP内置对象的读者来说,本文具有一定的参考价值。摘要长度为163字。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • REVERT权限切换的操作步骤和注意事项
    本文介绍了在SQL Server中进行REVERT权限切换的操作步骤和注意事项。首先登录到SQL Server,其中包括一个具有很小权限的普通用户和一个系统管理员角色中的成员。然后通过添加Windows登录到SQL Server,并将其添加到AdventureWorks数据库中的用户列表中。最后通过REVERT命令切换权限。在操作过程中需要注意的是,确保登录名和数据库名的正确性,并遵循安全措施,以防止权限泄露和数据损坏。 ... [详细]
  • Asp.net Mvc Framework 七 (Filter及其执行顺序) 的应用示例
    本文介绍了在Asp.net Mvc中应用Filter功能进行登录判断、用户权限控制、输出缓存、防盗链、防蜘蛛、本地化设置等操作的示例,并解释了Filter的执行顺序。通过示例代码,详细说明了如何使用Filter来实现这些功能。 ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • Apache Shiro 身份验证绕过漏洞 (CVE202011989) 详细解析及防范措施
    本文详细解析了Apache Shiro 身份验证绕过漏洞 (CVE202011989) 的原理和影响,并提供了相应的防范措施。Apache Shiro 是一个强大且易用的Java安全框架,常用于执行身份验证、授权、密码和会话管理。在Apache Shiro 1.5.3之前的版本中,与Spring控制器一起使用时,存在特制请求可能导致身份验证绕过的漏洞。本文还介绍了该漏洞的具体细节,并给出了防范该漏洞的建议措施。 ... [详细]
  • Sleuth+zipkin链路追踪SpringCloud微服务的解决方案
    在庞大的微服务群中,随着业务扩展,微服务个数增多,系统调用链路复杂化。Sleuth+zipkin是解决SpringCloud微服务定位和追踪的方案。通过TraceId将不同服务调用的日志串联起来,实现请求链路跟踪。通过Feign调用和Request传递TraceId,将整个调用链路的服务日志归组合并,提供定位和追踪的功能。 ... [详细]
  • 1.Listener是Servlet的监听器,它可以监听客户端的请求、服务端的操作等。通过监听器,可以自动激发一些操作,比如监听在线的用户的数量。当增加一个HttpSession时 ... [详细]
  • 原文地址http://balau82.wordpress.com/2010/02/28/hello-world-for-bare-metal-arm-using-qemu/最开始时 ... [详细]
author-avatar
莪乜子12
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有