我的 Structured Spark Streaming 程序是从 Kafka 读取 JSON 数据并以 JSON 格式写入 HDFS。我能够将 JSON 保存到 HDFS,但它保存了 JSON 字符串: "jsontostructs(CAST(value AS STRING))"key as below: {"jsontostructs(CAST(value AS STRING))":{"age":42,"name":"John"}}.如何只保存{"age":42,"name":"John"}?StructType schema = kafkaPrimerRow.schema();//Read json from kafka. JSON is: {"age":42,"name":"John"}Dataset<Row> df = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", input_bootstrap_server) .option("subscribe", topics[0]) .load(); //Save Stream to HDFS StreamingQuery ds = df .select(functions.from_json(col("value").cast(DataTypes.StringType),schema)) .writeStream().format("json").outputMode(OutputMode.Append()).option("path", destPath).option("checkpointLocation", checkpoint).start();
1 回答
BIG阳
TA贡献1859条经验 获得超6个赞
以下 .select("data.*") 达到了目的。
StreamingQuery ds = df
.select(functions.from_json(col("value").cast(DataTypes.StringType),schema).as("data"))
.select("data.*")
.writeStream()
.format("json")
.outputMode(OutputMode.Append())
.option("path", destPath)
.option("checkpointLocation", checkpoint)
.start();添加回答
举报
0/150
提交
取消
