我們有以下用例:從主題讀?。A期吞吐量是一個鍵每 2 秒記錄一次),groupByKey 并執行 30 分鐘窗口的窗口聚合,跳躍周期為 1 分鐘。聚合只是附加收到的記錄。當應用程序啟動時,一切正常,但在后期階段,當聚合大小增加時,應用程序會變慢并滯后拓撲結構:KStream<String, Foo> numericStream = builder.stream("topic", Consumed.with(Serdes.String(), FooSerde));static Duration WINDOW_MS = Duration.ofMinutes(30);static Duration ADVANCE_MS = Duration.ofMinutes(15);KStream<Windowed<String>, Foo1> windowedStream = numericStream.peek((key, value) -> System.out.println(value.getDateTime())) .groupByKey() .windowedBy((TimeWindows.of(WINDOW_MS).advanceBy(ADVANCE_MS)).grace(Duration.ofMillis(30))) .aggregate(new Initializer<Foo1>() { @Override public Foo1 apply() { return new Foo1(); }}, (key, value, aggregate) -> { aggregate.append(value); return aggregate; }, Materialized.<String, Foo1, WindowStore<Bytes,byte[]>>as("some_name").withValueSerde(Foo1Serde)) .toStream() .peek((key, value) -> System.out.println(" Key: "+key+ " Start: "+getISTTime(key.window().start()) + " End: "+ getISTTime(key.window().end()) +" Count: " + value.getCount() ));每條記錄的大小約為20KB。當聚合大小超過 10MB 左右時,記錄的處理時間會超過 2 秒,因此會出現滯后。COMMIT_INTERVAL_MS_CONFIG 設置為 0,因為狀態存儲應始終與最新數據包保持同步,并且狀態存儲會被查詢并且間隔不同。如何消除應用程序的延遲,是否與 RocksDB I/O 操作有關?因為計數操作而不是聚合操作沒有任何滯后每個主題有 3 個分區,但是具有相同鍵的記錄會轉到同一分區,那么線程/多個實例會有幫助嗎?我們也在考慮在不使用窗口的情況下執行此操作,窗口是否會對較大的聚合產生這種滯后?
1 回答

慕田峪7331174
TA貢獻1828條經驗 獲得超13個贊
由于您向 RocksDB 寫入和讀取越來越大的數據,因此可能會減慢處理速度。
是的,在一個實例中使用三個線程或啟動三個實例各一個線程也可能在這種情況下有所幫助。通過您的拓撲和三個分區,處理分布在三個任務上。如果只有一個實例和一個線程,則所有三個任務將由同一線程運行。您可以通過指定一個具有三個線程的實例來進行縱向擴展,也可以通過在不同的計算節點上啟動三個實例(每個實例具有一個線程)來進行橫向擴展。兩個實例之間的設置(一個具有兩個線程,另一個具有一個線程)也可以工作。
如果沒有窗口,聚合將永遠不會過期,也永遠不會從狀態存儲中刪除。因此,狀態存儲中的數據將無限增長,并且可能會減慢狀態存儲的速度。
如果使用交互式查詢來查詢狀態存儲,則無需將 COMMIT_INTERVAL_MS_CONFIG 設置為 0,因為交互式查詢還會查詢狀態存儲前面的緩存。實際上,將 COMMIT_INTERVAL_MS_CONFIG 設置為零也可能會減慢處理速度,因為它會增加磁盤 I/O,因為您不斷地將數據寫入磁盤。
添加回答
舉報
0/150
提交
取消