我正在嘗試創建一個 Kafka Streams 應用程序,我試圖在一個時間窗口內計算每個平臺的唯一設備。事件類public class Event { private String eventId; private String deviceId; private String platform; private ZonedDateTime createdAt;}我需要時間窗口尊重事件的 createdAt 所以我寫了一個TimestampExtractor如下的實現:public class EventTimestampExtractor implements TimestampExtractor { @Override public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) { final Event event = (Event) record.value(); final ZonedDateTime eventCreationTime = event.getCreatedAt(); final long timestamp = eventCreationTime.toEpochSecond(); log.trace("Event ({}) yielded timestamp: {}", event.getEventId(), timestamp); return timestamp; }}最后,這是我的流媒體應用代碼:final KStream<String, Event> eventStream = builder.stream("events_ingestion");eventStream .selectKey((key, event) -> { final String platform = event.getPlatform(); final String deviceId = event.getDeviceId()); return String.join("::", platform, deviceId); }) .groupByKey() .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15))) .count(Materialized.as(COUNT_STORE));當我將事件推送到event_ingestion主題時,我可以看到時間戳已記錄到應用程序日志中,并且數據正在寫入計數存儲中。當我遍歷計數存儲時,我看到以下內容:Key: [ANDROID::1@1539000000/1539900000], Value: 2雖然我的時間窗口是 15 分鐘,但密鑰跨越 10 天。如果我從流配置中刪除我的 TimestampExtractor 實現(因此回到處理時間),密鑰按預期跨越 15 分鐘:Key: [ANDROID::1@1539256500000/1539257400000], Value: 1我在這里做錯了什么?有任何想法嗎?
添加回答
舉報
0/150
提交
取消