亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在 Spring Kafka 中接收來自 KSQL 的流式響應?

如何在 Spring Kafka 中接收來自 KSQL 的流式響應?

喵喵時光機 2022-05-12 15:34:15
java - 如何在java spring boot應用程序中接收來自kafka KSQL服務器的分塊響應?當我對端點進行休息呼叫時,/query我只得到 1 行并且連接關閉。如何保持連接打開并接收多行?醫生說響應流回,直到達到語句中指定的 LIMIT,或者客戶端關閉連接。在java中實現這一點的方法是什么?即使對于 KTable,我也只能得到 1 行作為回報。https://docs.confluent.io/current/ksql/docs/developer-guide/api.html#run-a-query-and-stream-back-the-output
查看完整描述

1 回答

?
千巷貓影

TA貢獻1829條經驗 獲得超7個贊

我能夠解決的方法如下:


以字符串形式獲取響應

逐行解析 JSON 對象(KafkaQueryResponse是代表 1 行的對象)


    ResponseEntity<String> result = template.exchange("/query",

        HttpMethod.POST,

        new HttpEntity<>(params, headers),

        String.class);


    List<KafkaQueryResponse> array = new ArrayList<>();

    JsonFactory jsonFactory = new JsonFactory();

    try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) {

        Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class);

        value.forEachRemaining(e -> {

            if (e.getRow() != null) {

                array.add(e);

            }

        });

    }

    array <----  this is the list of JSON objects

KafkaQueryResponse


    @Data

    @JsonIgnoreProperties(ignoreUnknown = true)

    public class KafkaQueryResponse {

        private KafkaQueryRow row;

        private String finalMessage;

        private String errorMessage;


        @Data

        @JsonIgnoreProperties(ignoreUnknown = true)

        public static class KafkaQueryRow {

            private List<Object> columns;

        }

    }

此解決方案不允許以塊的形式讀取流式響應。它等待整個響應到達客戶端,然后關閉連接,然后解析所有 json 對象。


查看完整回答
反對 回復 2022-05-12
  • 1 回答
  • 0 關注
  • 102 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號