我有两个数据框df1
,df2
并且想在称为的高基数字段上多次连接这些表visitor_id
。我只想执行一次初始改组,并进行所有联接,而无需在Spark执行程序之间改组/交换数据。
为此,我创建了另一个列visitor_partition
,该列为每个visitor_id始终分配一个介于之间的随机值[0, 1000)
。我使用了一个自定义分区程序来确保对df1
和df2
进行精确分区,以使每个分区仅包含来自的一个值的行visitor_partition
。最初的重新分区是我唯一想改组数据的时间。
我已将每个数据帧保存到s3中的镶木地板中,并按访问者分区进行分区-对于每个数据帧,这将创建以df1/visitor_partition=0
,df1/visitor_partition=1
...形式组织的1000个文件df1/visitor_partition=999
。
现在,我从镶木地板中加载每个数据帧,并通过df1.createOrReplaceTempView('df1')
(与df2相同)将它们注册为tempview ,然后运行以下查询
SELECT ... FROM df1 FULL JOIN df1 ON df1.visitor_partition = df2.visitor_partition AND df1.visitor_id = df2.visitor_id
从理论上讲,查询执行计划者应该意识到这里不需要进行改组。例如,单个执行程序可以从中加载数据df1/visitor_partition=1
并df2/visitor_partition=2
在其中联接行。但是,在实践中,spark 2.4.4的查询计划程序会在此处执行完整的数据重排。
有什么办法可以防止这种洗牌的发生?