Spark:在数组类型列上连接两个数据框 | 珊瑚贝

Spark: Join two dataframes on an array type column


我有一个简单的用例
我有两个数据框 df1 和 df2,我正在寻找一种有效的方式来加入它们?

df1:包含我的主要数据框(数十亿条记录)

1
2
3
4
5
6
+——–+———–+————–+
|doc_id  |doc_name   |doc_type_id   |
+——–+———–+————–+
|   1    |doc_name_1 |[1,4]         |
|   2    |doc_name_2 |[3,2,6]       |
+——–+———–+————–+

df2:包含文档类型的标签(40000条记录),因为它是一个小的我正在广播它。

1
2
3
4
5
6
7
8
9
10
+————+—————-+
|doc_type_id |doc_type_name   |
+————+—————-+
|   1        |doc_type_1      |
|   2        |doc_type_2      |
|   3        |doc_type_3      |
|   4        |doc_type_4      |
|   5        |doc_type_5      |
|   6        |doc_type_5      |
+————+—————-+

我想加入这两个数据框以产生如下结果:

1
2
3
4
5
6
+——–+————+————–+—————————————-+
|doc_id  |doc_name    |doc_type_id   |doc_type_name                           |
+——–+————+————–+—————————————-+
|   1    |doc_name_1  |[1,4]         |[“doc_type_1″,”doc_type_4”]             |
|   2    |doc_name_2  |[3,2,6]       |[“doc_type_3″,”doc_type_2″,”doc_type_6”]|
+——–+————+————–+—————————————-+

谢谢


对于这种情况,我们可以使用 array_contains groupBy collect_list 函数。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val df1=Seq((“1″,”doc_name_1”,Seq(1,4)),(“2″,”doc_name_2”,Seq(3,2,6))).toDF(“doc_id”,”doc_name”,”doc_type_id”)

val df2=Seq((“1″,”doc_type_1”),(“2″,”doc_type_2”),(“3″,”doc_type_3”),(“4″,”doc_type_4”),(“5″,”doc_type_5”),(“6″,”doc_type_6”)).toDF(“doc_type_id”,”doc_type_name”)

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

df1.createOrReplaceTempView(“tbl”)
df2.createOrReplaceTempView(“tbl2”)

spark.sql(“select a.doc_id,a.doc_name,a.doc_type_id,collect_list(b.doc_type_name) doc_type_name from tbl a join tbl2 b on array_contains(a.doc_type_id,int(b.doc_type_id)) = TRUE group by a.doc_id,a.doc_name,a.doc_type_id”).show(false)

//+——+———-+———–+————————————+
//|doc_id|doc_name  |doc_type_id|doc_type_name                       |
//+——+———-+———–+————————————+
//|2     |doc_name_2|[3, 2, 6]  |[doc_type_2, doc_type_3, doc_type_6]|
//|1     |doc_name_1|[1, 4]     |[doc_type_1, doc_type_4]            |
//+——+———-+———–+————————————+

其他实现方式是使用 explode join collect_list:

1
2
3
4
5
6
7
8
9
10
11
12
13
val df3=df1.withColumn(“arr”,explode(col(“doc_type_id”)))

df3.join(df2,df2.col(“doc_type_id”) === df3.col(“arr”),”inner”).
groupBy(df3.col(“doc_id”),df3.col(“doc_type_id”),df3.col(“doc_name”)).
agg(collect_list(df2.col(“doc_type_name”)).alias(“doc_type_name”)).
show(false)

//+——+———–+———-+————————————+
//|doc_id|doc_type_id|doc_name  |doc_type_name                       |
//+——+———–+———-+————————————+
//|1     |[1, 4]     |doc_name_1|[doc_type_1, doc_type_4]            |
//|2     |[3, 2, 6]  |doc_name_2|[doc_type_2, doc_type_3, doc_type_6]|
//+——+———–+———-+————————————+

  • 感谢您的回答,第一种方法有效。作为参考,还有另一篇包含更多选项和详细信息的帖子:stackoverflow.com/questions/59534351/…


来源:https://www.codenong.com/61312786/

微信公众号
手机浏览(小程序)

Warning: get_headers(): SSL operation failed with code 1. OpenSSL Error messages: error:14090086:SSL routines:ssl3_get_server_certificate:certificate verify failed in /mydata/web/wwwshanhubei/web/wp-content/themes/shanhuke/single.php on line 57

Warning: get_headers(): Failed to enable crypto in /mydata/web/wwwshanhubei/web/wp-content/themes/shanhuke/single.php on line 57

Warning: get_headers(https://static.shanhubei.com/qrcode/qrcode_viewid_9234.jpg): failed to open stream: operation failed in /mydata/web/wwwshanhubei/web/wp-content/themes/shanhuke/single.php on line 57
0
分享到:
没有账号? 忘记密码?