亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

將 JSON 保存到 HDFS 的結構化流

將 JSON 保存到 HDFS 的結構化流

繁星淼淼 2023-04-26 14:18:58
我的 Structured Spark Streaming 程序是從 Kafka 讀取 JSON 數據并以 JSON 格式寫入 HDFS。我能夠將 JSON 保存到 HDFS,但它保存了 JSON 字符串: "jsontostructs(CAST(value AS STRING))"key as below: {"jsontostructs(CAST(value AS STRING))":{"age":42,"name":"John"}}.如何只保存{"age":42,"name":"John"}?StructType schema = kafkaPrimerRow.schema();//Read json from kafka. JSON is: {"age":42,"name":"John"}Dataset<Row> df = spark                    .readStream()                    .format("kafka")                    .option("kafka.bootstrap.servers", input_bootstrap_server)                    .option("subscribe", topics[0])                    .load();    //Save Stream to HDFS    StreamingQuery ds = df             .select(functions.from_json(col("value").cast(DataTypes.StringType),schema)) .writeStream().format("json").outputMode(OutputMode.Append()).option("path", destPath).option("checkpointLocation", checkpoint).start();
查看完整描述

1 回答

?
BIG陽

TA貢獻1859條經驗 獲得超6個贊

以下 .select("data.*") 達到了目的。

StreamingQuery ds = df
                        .select(functions.from_json(col("value").cast(DataTypes.StringType),schema).as("data"))
                        .select("data.*")
                        .writeStream()
                        .format("json")
                        .outputMode(OutputMode.Append())
                        .option("path", destPath)
                        .option("checkpointLocation", checkpoint)
                        .start();


查看完整回答
反對 回復 2023-04-26
  • 1 回答
  • 0 關注
  • 203 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號