Spark: Join two dataframes on an array type column
我有一个简单的用例
我有两个数据框 df1 和 df2,我正在寻找一种有效的方式来加入它们?
df1:包含我的主要数据框(数十亿条记录)
1
2 3 4 5 6 |
+——–+———–+————–+
|doc_id |doc_name |doc_type_id | +——–+———–+————–+ | 1 |doc_name_1 |[1,4] | | 2 |doc_name_2 |[3,2,6] | +——–+———–+————–+ |
df2:包含文档类型的标签(40000条记录),因为它是一个小的我正在广播它。
1
2 3 4 5 6 7 8 9 10 |
+————+—————-+
|doc_type_id |doc_type_name | +————+—————-+ | 1 |doc_type_1 | | 2 |doc_type_2 | | 3 |doc_type_3 | | 4 |doc_type_4 | | 5 |doc_type_5 | | 6 |doc_type_5 | +————+—————-+ |
我想加入这两个数据框以产生如下结果:
1
2 3 4 5 6 |
+——–+————+————–+—————————————-+
|doc_id |doc_name |doc_type_id |doc_type_name | +——–+————+————–+—————————————-+ | 1 |doc_name_1 |[1,4] |[“doc_type_1″,”doc_type_4”] | | 2 |doc_name_2 |[3,2,6] |[“doc_type_3″,”doc_type_2″,”doc_type_6”]| +——–+————+————–+—————————————-+ |
谢谢
对于这种情况,我们可以使用 array_contains groupBy collect_list 函数。
示例:
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
val df1=Seq((“1″,”doc_name_1”,Seq(1,4)),(“2″,”doc_name_2”,Seq(3,2,6))).toDF(“doc_id”,”doc_name”,”doc_type_id”)
val df2=Seq((“1″,”doc_type_1”),(“2″,”doc_type_2”),(“3″,”doc_type_3”),(“4″,”doc_type_4”),(“5″,”doc_type_5”),(“6″,”doc_type_6”)).toDF(“doc_type_id”,”doc_type_name”) import org.apache.spark.sql.functions._ df1.createOrReplaceTempView(“tbl”) spark.sql(“select a.doc_id,a.doc_name,a.doc_type_id,collect_list(b.doc_type_name) doc_type_name from tbl a join tbl2 b on array_contains(a.doc_type_id,int(b.doc_type_id)) = TRUE group by a.doc_id,a.doc_name,a.doc_type_id”).show(false) //+——+———-+———–+————————————+ |
其他实现方式是使用 explode join collect_list:
1
2 3 4 5 6 7 8 9 10 11 12 13 |
val df3=df1.withColumn(“arr”,explode(col(“doc_type_id”)))
df3.join(df2,df2.col(“doc_type_id”) === df3.col(“arr”),”inner”). //+——+———–+———-+————————————+ |
- 感谢您的回答,第一种方法有效。作为参考,还有另一篇包含更多选项和详细信息的帖子:stackoverflow.com/questions/59534351/…
来源:https://www.codenong.com/61312786/