Spark Parquet Loader: Reduce number of jobs involved in listing a dataframe’s files
我正在通过
将 parquet 数据加载到数据框中
1
|
spark.read.parquet(‘hdfs:///path/goes/here/…’)
|
由于 parquet 分区,该路径中有大约 50k 个文件。当我运行该命令时,spark 会生成数十个小作业,这些小作业总体上需要几分钟才能完成。以下是 spark UI 中作业的外观:
如您所见,虽然每个作业有大约 2100 个任务,但它们执行速度很快,大约 2 秒。启动这么多”迷你作业”效率低下,并导致此文件列出步骤大约需要 10 分钟(其中集群资源大部分处于空闲状态,并且集群主要处理散乱的任务或管理作业/任务的开销)。
如何将这些任务整合到更少的工作中,每个工作都有更多的任务?
也适用于 pyspark 的解决方案的奖励积分。
我正在通过 pyspark 在 hadoop 2.8.3 上运行 spark 2.2.1。
- 我相信您遇到了一个错误,我的一位前同事为此提交了工单并打开了拉取请求(仍然没有反馈)。你可以在这里查看:issues.apache.org/jira/browse/SPARK-21056 如果它适合你的问题,你最好的选择可能是投票赞成这个问题,并在邮件列表上引起一些噪音。 :)
- 你的转换是否包括任何洗牌。如果洗牌默认涉及 spark.sql.shuffle.partitions 是 200。这就是为什么你会看到每个作业有那么多任务
- 我可以举个例子说明你正在做什么转换
- @saipradeepkumarkotha:无论我执行什么后续转换,都会出现这个问题。这是 spark 急切地执行工作的特殊情况之一——这些工作甚至在我对 DAG 执行任何操作之前就已经运行。
- @stefanobaghino:我想你已经解释了发生了什么。看起来 bbossys 的讨论表明调整 spark.sql.sources.parallelPartitionDiscovery.threshold 参数可能会有所帮助。如果您写下调整该参数如何解决问题,我会接受您的答案作为解决方案(您将获得赏金)。
- 目前正在编辑我的答案。
- 好的,应该完成了。
我相信您遇到了一个错误,我的一位前同事已为此提交了票证并打开了拉取请求。你可以在这里查看。如果它适合您的问题,那么您最好的选择可能是对问题进行投票并在邮件列表中对此发表一些意见。
您可能想要做的是以适合您工作的方式调整 spark.sql.sources.parallelPartitionDiscovery.threshold 和 spark.sql.sources.parallelPartitionDiscovery.parallelism 配置参数(在链接的票证中引用了前者)。
您可以在此处和此处查看配置密钥的使用方式。为了完整起见,我将在这里分享相关的片段。
spark.sql.sources.parallelPartitionDiscovery.threshold
1
2 3 4 5 6 |
// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { return paths.map { path => (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession))) } } |
spark.sql.sources.parallelPartitionDiscovery.parallelism
1
2 3 |
// Set the number of parallelism to prevent following file listing from generating many tasks
// in case of large #defaultParallelism. val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism) |
此配置的默认值是 32 为阈值,10000 为并行度(相关代码在这里)。
在您的情况下,我会说您可能想要做的是设置阈值,以便进程在不产生并行作业的情况下运行。
笔记
链接的来源来自撰写本文时可用的最新标记版本 2.3.0。
- 感谢您的精彩回答!我赞成这个问题,并希望其他人也会。
- 我试过你的建议。将阈值修改为更高会导致驱动程序长时间执行阻塞-也许然后spark会尝试仅发现驱动程序上的所有文件?我玩 spark.sql.sources.parallelPartitionDiscovery.parallelism 的运气更好。奇怪的是,当我将其设置为 250 时,它会产生很多工作(~50),当我将其设置为 1000 时,它会产生很多工作(再次~50),但是当我将其设置为 500 时,它使用的工作更少( ?一打)并更快地完成。
相对于对象存储,即使列出和调用 getFileStatus 也非常昂贵,并且由于这是在分区期间完成的,因此可以大大扩展工作。
与 mapreduce.input.fileinputformat.list-status.num-threads 一起玩,看看添加更多线程是否会加快速度,比如 20-30
- 我尝试将该参数设置为 25,但没有帮助。我觉得奇怪的一件事是,这些工作中的每一个都有大约 2100 个任务。知道这个数字来自哪里,以及如何增加每个工作的任务数量以减少工作吗?
- 每个文件至少有一个任务,如果数据可以分区,则更多
来源:https://www.codenong.com/49133228/