亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

使用 kafka 流根據消息密鑰將消息發送到主題

使用 kafka 流根據消息密鑰將消息發送到主題

躍然一笑 2023-08-04 15:23:09
我希望能夠根據消息鍵的鍵將 Kafkastream 中的所有記錄發送到不同的主題。前任。Kafka 中的流包含名稱作為鍵,記錄作為值。我想根據記錄的鍵將這些記錄分散到不同的主題數據:(jhon -> {jhonsRecord}),(sean -> {seansRecord}),(mary -> {marysRecord}),(jhon -> {jhonsRecord2}),預期topic1 :名稱: jhon ->(jhon -> {jhonsRecord}),(jhon -> {jhonsRecord2})主題2:sean->(sean -> {seansRecord})主題3:瑪麗 ->(瑪麗 -> {marysRecord})下面是我現在執行此操作的方式,但由于名稱列表是 hudge,所以速度很慢。另外,即使記錄了幾個名字,我也需要遍歷整個列表請提出修復建議 for( String name : names )     {         recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);     }
查看完整描述

3 回答

?
神不在的星期二

TA貢獻1963條經驗 獲得超6個贊

我認為你應該使用KStream::to(final TopicNameExtractor<K, V> topicExtractor)函數。它使您能夠計算每條消息的主題名稱。

示例代碼:

final KStream<String, String> stream = ???;
stream.to((key, value, recordContext) -> key);


查看完整回答
反對 回復 2023-08-04
?
墨色風雨

TA貢獻1853條經驗 獲得超6個贊

如果您需要為每個用戶生成聚合數據,則無需為每個用戶寫入單獨的主題。您最好在源流上編寫聚合。這樣,您就不會最終得到每個鍵一個主題,但您仍然可以獨立地對每個用戶運行操作。

Serde<UserRecord>?recordSerde?=?...
KStream<Stream,?UserAggregate>?aggregateByName?=?recordsByName
???.groupByKey(Grouped.with(Serdes.String(),?recordSerde))
???.aggregate(...)
???.toStream()

這種方法將擴展到數百萬用戶,這是您目前無法通過每個用戶一個主題的方法實現的。


查看完整回答
反對 回復 2023-08-04
?
慕田峪9158850

TA貢獻1794條經驗 獲得超7個贊

我想你正在尋找的是KStream#branch。

以下未經測試,但顯示了總體思路

// get a list of predicates to branch a topic on

final List<String> names = Arrays.asList("jhon", "sean", "mary");

final Predicate[] predicates = names.stream()

? ? .map((Function<String, Predicate<String, Object>>) n -> (s, o) -> s.equals(n))

? ? .toArray(Predicate[]::new);


// example input

final KStream<Object, Object> stream = new StreamsBuilder().stream("names");


// split the topic

KStream<String, Object>[] branches = stream.branch(predicates);

for (int i = 0; i < names.size(); i++) {

? ? branches[i].to(names.get(i));

}


// KStream branches[0] contains all records whose keys are "jhon"

// KStream branches[1] contains all records whose keys are "sean"

...


查看完整回答
反對 回復 2023-08-04
  • 3 回答
  • 0 關注
  • 182 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號