How to efficiently group every k rows in spark dataset?
我创建了一个 spark Dataset[Row],Row 是 Row(x: Vector)。 x 这里是一个 1xp 向量。
是否可以 1) 每 k 行分组 2) 将这些行连接成一个 k x p 矩阵 – mX 即,将 Dateset[Row(Vector)] 更改为 Dateset[Row(Matrix)] ?
这是我目前的解决方案,将此 Dataset[Row] 转换为 RDD,并使用 zipWithIndex 和 aggregateByKey 连接每 k 行。
1
2 3 |
val dataRDD = data_df.rdd.zipWithIndex
.map { case (line, index) => (index/k, line) } .aggregateByKey(…) (…, …) |
但是好像效率不是很高,有没有更高效的方法呢?
提前致谢。
您的方法存在两个性能问题:
如果您绝对需要全局排序,从第 1 行开始,并且您不能将数据分成多个分区,那么 Spark 必须通过单个核心移动所有数据。您可以通过找到一种拥有多个分区的方法来加快该部分的速度。
您可以通过使用 mapPartitions:
一次处理一个分区的数据来避免洗牌
1
2 3 4 5 6 7 8 9 10 |
spark.range(1, 20).coalesce(1).mapPartitions(_.grouped(5)).show
+——————–+ |
请注意,上面的 coalesce(1) 将所有 20 行强制为一个分区。
- 嗨,西姆,谢谢你的评论。我实际上不需要全球订购。 k 组也不必要地需要改组。我将 rdd.repartition(…).mapPartitions(_.grouped(5)) 应用于我的代码以避免全局分组。我只是想知道上面的代码是否会发生两次改组?即,一个用于重新分区,另一个用于 mapPartiion。
- repartition() 会随机播放,mapPartitions() 不会。但是,除非您需要分发数据,否则您不需要 repartition()。 coalesce() 不会随机播放。
这是一个将 N 条记录分组为列的解决方案:
从RDD生成到DF,如下图处理。
g 是组,k 是记录 g 内重复的编号的关键。 v 是您的记录内容。
输入是一个 6 行的文件,我在这里使用了 3 组。
唯一的缺点是行的余数小于分组 N。
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.rdd.RDDFunctions._ val dfsFilename =”/FileStore/tables/7dxa9btd1477497663691/Text_File_01-880f5.txt” val df2 = df.withColumn(“vke”, explode($”vk”)).drop(“vk”) val result = df3 result.show() |
返回:
1
2 3 4 5 6 |
+—+——————–+——————–+——————–+
| g| 0| 1| 2| +—+——————–+——————–+——————–+ | 0|The quick brown f…|Here he lays I te…|Gone are the days…| | 1| Gosh, what to say.|Hallo, hallo, how…| I am fine.| +—+——————–+——————–+——————–+ |
来源:https://www.codenong.com/53146855/