亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在流查詢中使用 from_json 標準函數(在 select 中)?

如何在流查詢中使用 from_json 標準函數(在 select 中)?

搖曳的薔薇 2023-07-19 16:32:33
我使用以下 JSON 結構處理來自 Kafka 的消息:{"unix_time": 1557678233, "category_id": 1000, "ip": "172.10.34.17", "type": "view"}我想打印出我收到的內容。這是我已經完成的代碼片段:JavaSparkContext sc = createJavaSparkContext();JavaStreamingContext streamingContext =                new JavaStreamingContext(sc, Durations.seconds(BATCH_DURATION_IN_SECONDS));SparkSession sparkSession = SparkSession        .builder()        .config(new SparkConf())        .getOrCreate();Dataset<Row> df = sparkSession        .readStream()        .format("kafka")        .option("kafka.bootstrap.servers", CommonUtils.KAFKA_HOST_PORT)        .option("subscribe", KAFKA_TOPIC)        .load();StreamingQuery query = df.selectExpr("CAST(value AS STRING)")            .select(from_json(new Column("value"), getSchema())).as("data").                    select("data.category_id").writeStream().foreach(new ForeachWriter<Row>() {                @Override                public void process(Row value) {                    System.out.println(value);                }                @Override                public void close(Throwable errorOrNull) {                }                @Override                public boolean open(long partitionId, long version) {                    return true;                }            })            .start();    query.awaitTermination();架構方法:private static StructType getSchema() {    return new StructType(new StructField[]{            new StructField(UNIX_TIME, DataTypes.TimestampType, false, Metadata.empty()),            new StructField(CATEGORY_ID, DataTypes.IntegerType, false, Metadata.empty()),            new StructField(IP, DataTypes.StringType, false, Metadata.empty()),            new StructField(TYPE, DataTypes.StringType, false, Metadata.empty()),    });}如何克服這個問題?對此有何建議?
查看完整描述

1 回答

?
慕虎7371278

TA貢獻1802條經驗 獲得超4個贊

異常的這一部分準確地告訴您在哪里尋找答案:

無法解析給定輸入列的“data.category_id”:[jsontostruct(value)]

換句話說,data.category_id可用列中沒有一列只是 1 列jsontostruct(value)。

這意味著僅select在流式查詢中不起作用。原因相當簡單(我可以將其視為拼寫錯誤)——在Column和Datasetas("data")類型上可用的右括號太多。

總之,替換查詢的以下部分:

.select(from_json(new?Column("value"),?getSchema())).as("data")

至以下內容:

.select(from_json(new?Column("value"),?getSchema()).as("data"))

請注意,我將一個右括號移到了末尾。


查看完整回答
反對 回復 2023-07-19
  • 1 回答
  • 0 關注
  • 136 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號