2 回答

TA貢獻1828條經驗 獲得超4個贊
您可以使用自定義“移動”時間戳TimestampExtractor
- 在將結果寫回輸出主題之前,您可以使用 aTransformer
并通過context.forward(key, value, To.all().withTimestamps())
.
功能請求票:https ://issues.apache.org/jira/browse/KAFKA-7911

TA貢獻2051條經驗 獲得超10個贊
因此,為了解決這個問題,我創建了自定義TimestampExtractor并使用它來更改流窗口創建時間以記錄來自有效負載的時間,如下所示。
public class RecordTimeStampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
JsonObject data = (JsonObject) new JsonParser().parse(record.value().toString());
Timestamp recordTimestamp = Timestamp.valueOf(data.get(Constant.SLOT).getAsString());
return recordTimestamp.getTime();
}
}
所以現在我已經用我的本地時區測試了它,因為昨天是 IST 05:30,它的工作正常,kafka 流也正在根據記錄時間戳創建窗口。也將使用其他時區進行測試并更新答案
添加回答
舉報