Spark Row to JSON
我想从 Spark v.1.6(使用 scala)数据帧创建 JSON。我知道有做 df.toJSON.
的简单解决方案
但是,我的问题看起来有点不同。例如,考虑具有以下列的数据框:
1
2 3 4 |
| A | B | C1 | C2 | C3 |
——————————————- | 1 | test | ab | 22 | TRUE | | 2 | mytest | gh | 17 | FALSE | |
我希望最后有一个带有
的数据框
1
2 3 4 |
| A | B | C |
—————————————————————- | 1 | test | {“c1” :“ab”,“c2” : 22,“c3” : TRUE } | | 2 | mytest | {“c1” :“gh”,“c2” : 17,“c3” : FALSE } | |
其中 C 是包含 C1、C2、C3 的 JSON。不幸的是,我在编译时不知道数据框是什么样子(除了总是”固定”的列 A 和 B)。
至于我需要这个的原因:我正在使用 Protobuf 来发送结果。不幸的是,我的数据框有时包含比预期更多的列,我仍然会通过 Protobuf 发送这些列,但我不想在定义中指定所有列。
我怎样才能做到这一点?
Spark 2.1 应该对这个用例提供原生支持(参见 #15354)。
1
2 |
- struct 在 Java 中是 StructType,你能给我 java 实现吗
- 在使用上述解决方案时,我得到如下带有额外字符的 json 结果 ** \\\\\\”SIGNAL\\\\\\”:[{\\\\\\”TIME\\\\\\”:1569382072016,\\\\\\”VALUE\\\\ \\”:-9}],\\\\\\”SIGNAL0??1\\\\\\”:[{\\\\\\”TIME\\\\\\”:15693??82099654,\\\\\\”VALUE\\\\\\”:8 ??.0}]}
我用这个命令解决to_json问题:
1
|
output_df = (df.select(to_json(struct(col(“*”))).alias(“content”)))
|
这里没有 JSON 解析器,它会适应你的模式:
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import org.apache.spark.sql.functions.{col, concat, concat_ws, lit}
df.select( |
- 是的,是的。如果你问我,JSON 通常是 hacky。
首先让我们将 C 转换为 struct:
1
|
这个结构可以像以前一样使用 toJSON 转换为 JSONL:
1
2 3 4 |
dfStruct.toJSON.collect
// Array[String] = Array( // {“A”:1,”B”:”test”,”C”:{“C1″:”ab”,”C2″:22,”C3″:true}}, // {“A”:2,”B”:”mytest”,”C”:{“C1″:”gh”,”C2″:17,”C3″:false}}) |
我不知道任何可以转换单个列的内置方法,但您可以单独转换它和 join 或在 UDF 中使用您最喜欢的 JSON 解析器。
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
case class C(C1: String, C2: Int, C3: Boolean)
object CJsonizer { implicit val formats = Serialization.formats(org.json4s.NoTypeHints) def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3))
val cToJSON = udf((c1: String, c2: Int, c3: Boolean) => df.withColumn(“c_json”, cToJSON($“C1”, $“C2”, $“C3”)) |
- 实际上,我的问题实际上是关于如何将各个列转换为 JSON 的第二部分。您提到的是 join-ing 列,但这并不能真正起作用,因为我一方面有一个 RDD[String],另一方面有一个 DataFrame
- 正如他所说,只需使用 UDF。您甚至不必在 UDF 中使用成熟的 JSON 解析器——您可以使用 map 和 mkString 即时制作 JSON 字符串。您可能需要使用 DataFrame.columns 或可能需要使用 DataFrame.dtypes 来制作 select 语句并作为 UDF 中 map 的基础。
- 我同意@DavidGriffin – udf 可能是这里最简单的解决方案。并且 Jackson 和 json4s 已经被其他依赖项拖了。
- 我看到的所有 JSON 解析器的问题是,您需要提前知道架构是什么样的——就像你的解决方案 @zero323 一样——它只适用于那些特定的列。如果名字不同怎么办?如果有超过 3 列怎么办?
- 我看到的唯一问题是 Row 是极其丑陋的数据结构。否则,您可以简单地使用 Lift / json4s 构建任意复杂的 AST 并将其转换为 JSON。但说实话,将其放入 SO 答案中需要付出很多努力。
- Row 丑陋的原因与我讨厌在 Scala 中处理 JSON 的原因相同——它是文化的冲突 Loosey、goosey vs strong、静态类型。 SQL 很松散——你离定义一个新类型还差一个 select ——因此 Row 很乱。 Avro 的 GenericRecord 也有同样的问题。
来源:https://www.codenong.com/36157810/