Extracting value using Window and Partition
我在 pyspark 中有一个数据框
1
2 3 4 5 6 7 8 9 10 |
id | value
1 0 |
我想提取同一 id 组中 value 列中第一次出现 1 之后的所有行。我创建了带有 Id 分区的窗口,但不知道如何获取值 1 之后存在的行。
我期待结果是
1
2 3 4 5 6 7 |
id | value
1 1 |
- 你有定义窗口内排序的东西吗?否则我认为结果将是不确定的
- 我只能按 id 列订购。
- 底层数据模型是一个集合,而不是一个列表,例如对于 id=1,值 0、1 和 0 可以按任何顺序处理。如果没有洗牌,顺序保持不变,但不会基于该假设构建任何东西
以下解决方案可能与此相关(它对于小数据非常有效,但如果 id 在多个分区上可能会导致大数据出现问题)
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
df = sqlContext.createDataFrame([
[1, 0], [1, 1], [1, 0], [2, 1], [2, 0], [3, 0], [3, 0], [3, 1] ], [‘id’, ‘Value’] ) df.show() +—+—–+ | id|Value| +—+—–+ | 1| 0| | 1| 1| | 1| 0| | 2| 1| | 2| 0| | 3| 0| | 3| 0| | 3| 1| +—+—–+ #importing Libraries #This way we can generate a cumulative sum for values #Filter all those which are having sum > 0 +—+—–+—–+ |
Before running this you must be sure that data related to ID should be partitioned and no id can be on 2 partitions.
理想情况下,您需要:
AFAIK,Spark 的窗口中没有查找功能。但是,您可以遵循这个想法并解决一些问题。让我们首先创建数据并导入函数和窗口。
1
2 3 4 5 |
import pyspark.sql.functions as F
from pyspark.sql.window import Window l = [(1, 0), (1, 1), (1, 0), (2, 1), (2, 0), (3, 0), (3, 0), (3, 1)] |
然后,让我们在数据框上添加一个索引(它是免费的),以便能够对窗口进行排序。
1
|
indexedDf = df.withColumn(“index”, F.monotonically_increasing_id())
|
然后我们创建一个窗口,它只查看当前行之前的值,按该索引排序并按 id 分区。
1
|
w = Window.partitionBy(“id”).orderBy(“index”).rowsBetween(Window.unboundedPreceding, 0)
|
最后,我们使用该窗口收集每行的前面值集合,并过滤掉不包含 1 的值。可选地,我们按 index 排序,因为窗口化不保留按 id 列的顺序。
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
indexedDf\\
.withColumn(‘set’, F.collect_set(F.col(‘value’)).over(w))\\ .where(F.array_contains(F.col(‘set’), 1))\\ .orderBy(“index”)\\ .select(“id”,”value”).show() +—+—–+ |
来源:https://www.codenong.com/56072653/