3 回答

TA貢獻1963條經驗 獲得超6個贊
我認為你應該使用KStream::to(final TopicNameExtractor<K, V> topicExtractor)
函數。它使您能夠計算每條消息的主題名稱。
示例代碼:
final KStream<String, String> stream = ???; stream.to((key, value, recordContext) -> key);

TA貢獻1853條經驗 獲得超6個贊
如果您需要為每個用戶生成聚合數據,則無需為每個用戶寫入單獨的主題。您最好在源流上編寫聚合。這樣,您就不會最終得到每個鍵一個主題,但您仍然可以獨立地對每個用戶運行操作。
Serde<UserRecord>?recordSerde?=?... KStream<Stream,?UserAggregate>?aggregateByName?=?recordsByName ???.groupByKey(Grouped.with(Serdes.String(),?recordSerde)) ???.aggregate(...) ???.toStream()
這種方法將擴展到數百萬用戶,這是您目前無法通過每個用戶一個主題的方法實現的。

TA貢獻1794條經驗 獲得超7個贊
我想你正在尋找的是KStream#branch
。
以下未經測試,但顯示了總體思路
// get a list of predicates to branch a topic on
final List<String> names = Arrays.asList("jhon", "sean", "mary");
final Predicate[] predicates = names.stream()
? ? .map((Function<String, Predicate<String, Object>>) n -> (s, o) -> s.equals(n))
? ? .toArray(Predicate[]::new);
// example input
final KStream<Object, Object> stream = new StreamsBuilder().stream("names");
// split the topic
KStream<String, Object>[] branches = stream.branch(predicates);
for (int i = 0; i < names.size(); i++) {
? ? branches[i].to(names.get(i));
}
// KStream branches[0] contains all records whose keys are "jhon"
// KStream branches[1] contains all records whose keys are "sean"
...
添加回答
舉報