关于scala:Spark Row to JSON | 珊瑚贝

Spark Row to JSON


我想从 Spark v.1.6(使用 scala)数据帧创建 JSON。我知道有做 df.toJSON.

的简单解决方案

但是,我的问题看起来有点不同。例如,考虑具有以下列的数据框:

1
2
3
4
|  A  |     B     |  C1  | C2  |    C3   |
——————————————-
|  1  | test      |  ab  |  22  |  TRUE   |
|  2  | mytest    |  gh  |  17  |  FALSE  |

我希望最后有一个带有

的数据框

1
2
3
4
|  A  |     B     |                        C                   |
—————————————————————-
|  1  | test      | {“c1” :“ab”,“c2” : 22,“c3” : TRUE }    |
|  2  | mytest    | {“c1” :“gh”,“c2” : 17,“c3” : FALSE }   |

其中 C 是包含 C1、C2、C3 的 JSON。不幸的是,我在编译时不知道数据框是什么样子(除了总是”固定”的列 A 和 B)。

至于我需要这个的原因:我正在使用 Protobuf 来发送结果。不幸的是,我的数据框有时包含比预期更多的列,我仍然会通过 Protobuf 发送这些列,但我不想在定义中指定所有列。

我怎样才能做到这一点?


Spark 2.1 应该对这个用例提供原生支持(参见 #15354)。

1
2
import org.apache.spark.sql.functions.to_json
df.select(to_json(struct($“c1”, $“c2”, $“c3”)))

  • struct 在 Java 中是 StructType,你能给我 java 实现吗
  • 在使用上述解决方案时,我得到如下带有额外字符的 json 结果 ** \\\\\\”SIGNAL\\\\\\”:[{\\\\\\”TIME\\\\\\”:1569382072016,\\\\\\”VALUE\\\\ \\”:-9}],\\\\\\”SIGNAL0??1\\\\\\”:[{\\\\\\”TIME\\\\\\”:15693??82099654,\\\\\\”VALUE\\\\\\”:8 ??.0}]}


我用这个命令解决to_json问题:

1
output_df = (df.select(to_json(struct(col(“*”))).alias(“content”)))

这里没有 JSON 解析器,它会适应你的模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.apache.spark.sql.functions.{col, concat, concat_ws, lit}

df.select(
  col(df.columns(0)),
  col(df.columns(1)),
  concat(
    lit(“{“),
    concat_ws(“,”,df.dtypes.slice(2, df.dtypes.length).map(dt => {
      val c = dt._1;
      val t = dt._2;
      concat(
        lit(“”” + c +”“:” + (if (t ==“StringType”)“”“; else”“)  ),
        col(c),
        lit(if(t==”
StringType“)”“”; else“”)
      )
    }):_*),
    lit(“}”)
  ) as“C”
).collect()

  • 是的,是的。如果你问我,JSON 通常是 hacky。


首先让我们将 C 转换为 struct:

1
val dfStruct = df.select($“A”, $“B”, struct($“C1”, $“C2”, $“C3”).alias(“C”))

这个结构可以像以前一样使用 toJSON 转换为 JSONL:

1
2
3
4
dfStruct.toJSON.collect
// Array[String] = Array(
//   {“A”:1,”B”:”test”,”C”:{“C1″:”ab”,”C2″:22,”C3″:true}},
//   {“A”:2,”B”:”mytest”,”C”:{“C1″:”gh”,”C2″:17,”C3″:false}})

我不知道任何可以转换单个列的内置方法,但您可以单独转换它和 join 或在 UDF 中使用您最喜欢的 JSON 解析器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
case class C(C1: String, C2: Int, C3: Boolean)

object CJsonizer {
  import org.json4s._
  import org.json4s.JsonDSL._
  import org.json4s.jackson.Serialization
  import org.json4s.jackson.Serialization.write

  implicit val formats = Serialization.formats(org.json4s.NoTypeHints)

  def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3))
}

val cToJSON = udf((c1: String, c2: Int, c3: Boolean) =>
  CJsonizer.toJSON(c1, c2, c3))

df.withColumn(“c_json”, cToJSON($“C1”, $“C2”, $“C3”))

  • 实际上,我的问题实际上是关于如何将各个列转换为 JSON 的第二部分。您提到的是 join-ing 列,但这并不能真正起作用,因为我一方面有一个 RDD[String],另一方面有一个 DataFrame
  • 正如他所说,只需使用 UDF。您甚至不必在 UDF 中使用成熟的 JSON 解析器——您可以使用 map 和 mkString 即时制作 JSON 字符串。您可能需要使用 DataFrame.columns 或可能需要使用 DataFrame.dtypes 来制作 select 语句并作为 UDF 中 map 的基础。
  • 我同意@DavidGriffin – udf 可能是这里最简单的解决方案。并且 Jackson 和 json4s 已经被其他依赖项拖了。
  • 我看到的所有 JSON 解析器的问题是,您需要提前知道架构是什么样的——就像你的解决方案 @zero323 一样——它只适用于那些特定的列。如果名字不同怎么办?如果有超过 3 列怎么办?
  • 我看到的唯一问题是 Row 是极其丑陋的数据结构。否则,您可以简单地使用 Lift / json4s 构建任意复杂的 AST 并将其转换为 JSON。但说实话,将其放入 SO 答案中需要付出很多努力。
  • Row 丑陋的原因与我讨厌在 Scala 中处理 JSON 的原因相同——它是文化的冲突 Loosey、goosey vs strong、静态类型。 SQL 很松散——你离定义一个新类型还差一个 select ——因此 Row 很乱。 Avro 的 GenericRecord 也有同样的问题。


来源:https://www.codenong.com/36157810/

微信公众号
手机浏览(小程序)

Warning: get_headers(): SSL operation failed with code 1. OpenSSL Error messages: error:14090086:SSL routines:ssl3_get_server_certificate:certificate verify failed in /mydata/web/wwwshanhubei/web/wp-content/themes/shanhuke/single.php on line 57

Warning: get_headers(): Failed to enable crypto in /mydata/web/wwwshanhubei/web/wp-content/themes/shanhuke/single.php on line 57

Warning: get_headers(https://static.shanhubei.com/qrcode/qrcode_viewid_9558.jpg): failed to open stream: operation failed in /mydata/web/wwwshanhubei/web/wp-content/themes/shanhuke/single.php on line 57
0
分享到:
没有账号? 忘记密码?