亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何處理 Kafka Streams 中的不同時區?

如何處理 Kafka Streams 中的不同時區?

HUWWW 2022-05-25 10:54:00
因此,我正在評估 Kafka Streams 以及它可以做些什么來查看它是否適合我的用例,因為我需要每隔 15 分鐘、每小時、每天聚合一次傳感器的數據,并且由于它的 Windowing 功能而發現它很有用。因為我可以通過應用創建窗口,windowedBy()但KGroupedStream問題是窗口是在 UTC 中創建的,我希望我的數據按其原始時區而不是按 UTC 時區分組,因為它阻礙了聚合,所以任何人都可以幫助我解決這個問題。
查看完整描述

2 回答

?
明月笑刀無情

TA貢獻1828條經驗 獲得超4個贊

您可以使用自定義“移動”時間戳TimestampExtractor- 在將結果寫回輸出主題之前,您可以使用 aTransformer并通過context.forward(key, value, To.all().withTimestamps()).

功能請求票:https ://issues.apache.org/jira/browse/KAFKA-7911


查看完整回答
反對 回復 2022-05-25
?
侃侃無極

TA貢獻2051條經驗 獲得超10個贊

因此,為了解決這個問題,我創建了自定義TimestampExtractor并使用它來更改流窗口創建時間以記錄來自有效負載的時間,如下所示。


public class RecordTimeStampExtractor implements TimestampExtractor {


    @Override

    public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {

        JsonObject data = (JsonObject) new JsonParser().parse(record.value().toString());

        Timestamp recordTimestamp = Timestamp.valueOf(data.get(Constant.SLOT).getAsString());

        return recordTimestamp.getTime();

    }


}

所以現在我已經用我的本地時區測試了它,因為昨天是 IST 05:30,它的工作正常,kafka 流也正在根據記錄時間戳創建窗口。也將使用其他時區進行測試并更新答案


查看完整回答
反對 回復 2022-05-25
  • 2 回答
  • 0 關注
  • 187 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號