作者:手机用户2502918767 | 来源:互联网 | 2023-08-26 15:44
我大约有2.5 k个JSON文件,每个JSON文件代表1行。使用这些文件,我需要执行一些非常简单的ETL,并将它们移到我的数据湖的curated
部分中。
我遍历数据湖,并通过一个简单的.read
调用来调用JSON文件,这是我事先定义的JSON模式。
然后我执行ETL并尝试将这些文件写入到我的数据湖的单独部分中,但是写入过程非常慢,只花了15分钟的时间就写入了一个文件。几百kb?
rp = spark.read.json(paths,multiLine=True,schema=json_s).withColumn('path',F.input_file_name())
for iter in iterable:
#do stuff
# filter my sparkDF with .filter
SparkDF_F = sparkDF.filter(...)
sparkDF_F.write('path/filename.parquet')
我尝试使用“ OPTIMIZE”并在“路径”上调用它
%sql
OPTIMIZE delta.'dbfs:/mnt/raw/data/table'
这将引发以下错误。
Error in SQL statement: ParseException:
mismatched input 'dbfs:/mnt/raw/data/table' expecting {'SELECT','FROM','
ADD','AS','TIMESTAMP','VERSION','ALL','ANY','DISTINCT','WHERE','GROUP','BY','GROUPING','SETS','CUBE','ROLLUP','ORDER....
有人能引导我了解我在这里的误解吗?
设置
- Azure Databricks
- 6.0
- 火花2.4
- Python 3.6
- 具有12个内核的42GB群集。
- 4个节点
- Azure Gen1 DataLake。
两件事:
-
如果2.5k JSON文件存储在同一文件夹中。您可以使用相同的文件夹路径直接读取它们:
rp = spark.read.json(path_common,multiLine = True,schema = json_s).withColumn('path',F.input_file_name())
然后,您可以将rp.filter应用于整个数据帧,因为它只是一个(不需要对每个文件进行迭代)
- 关于Delta的文档,您只能优化表(存储在dbfs中),而不能直接优化DBFS文件。因此,您可以使用dbfs中指向的目录创建表,并按照文档此处的建议使用优化:https://docs.databricks.com/spark/latest/spark-sql/language-manual/optimize.html
希望这会有所帮助