2 回答
TA貢獻1871條經驗 獲得超8個贊
我有一個做兩個 kstream 連接的拓撲,我面臨的問題是當嘗試使用 TopologyTestDriver 進行單元測試時,發送幾個帶有 pipeInput 然后 readOutput 的 ConsumerRecords。它似乎不起作用。
按照設計,但不幸的是,在您的情況下,這TopologyTestDriver并不是 Kafka Streams 引擎在運行時如何工作的 100% 準確模型。值得注意的是,新的傳入事件的處理順序存在一些差異。
例如,在嘗試測試某些連接時,這確實會導致問題,因為這些操作依賴于特定的處理順序(例如,在流表連接中,表應該在流之前已經有一個鍵 'alice' 的條目- 'alice' 的輔助事件到達,否則流端 'alice' 的連接輸出將不包含任何表端數據)。
所以我一直在尋找解決方案,但找不到任何解決方案。
我建議使用啟動嵌入式 Kafka 集群的測試,然后使用“真正的”Kafka Streams 引擎(即,不是TopologyTestDriver. 實際上,這意味著您正在將測試從單元測試更改為集成/系統測試:您的測試將啟動一個成熟的 Kafka Streams 拓撲,該拓撲與與您的測試在同一臺機器上運行的嵌入式 Kafka 集群通信。
請參閱 Apache Kafka 項目中的 Kafka Streams 集成測試,其中EmbeddedKafkaCluster和IntegrationTestUtils是工具的核心部分。連接的具體測試示例是StreamTableJoinIntegrationTest(有一些與連接相關的集成測試)及其父級AbstractJoinIntegrationTest。(對于它的價值,在https://github.com/confluentinc/kafka-streams-examples#examples-integration-tests有進一步的集成測試示例,其中包括在使用 Apache Avro 作為您的數據格式等)
但是,除非我弄錯了,否則集成測試及其工具不包含在 Kafka Streams 的測試實用程序工件中(即,org.apache.kafka:kafka-streams-test-utils)。因此,您必須將一些復制粘貼到您自己的代碼庫中。
TA貢獻1847條經驗 獲得超7個贊
你看過 Kafka Streams 單元測試 [1] 嗎?這是關于輸入數據并使用模擬處理器檢查最終結果。
例如對于以下流連接:
stream1 = builder.stream(topic1, consumed);
stream2 = builder.stream(topic2, consumed);
joined = stream1.outerJoin(
stream2,
MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(ofMillis(100)),
StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
joined.process(supplier);
然后,您可以開始將輸入項通過管道傳輸到第一個或第二個主題中,并檢查每個連續的輸入管道,處理器可以檢查什么:
// push two items to the primary stream; the other window is empty
// w1 = {}
// w2 = {}
// --> w1 = { 0:A0, 1:A1 }
// w2 = {}
for (int i = 0; i < 2; i++) {
inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]);
}
processor.checkAndClearProcessResult(EMPTY);
// push two items to the other stream; this should produce two items
// w1 = { 0:A0, 1:A1 }
// w2 = {}
// --> w1 = { 0:A0, 1:A1 }
// w2 = { 0:a0, 1:a1 }
for (int i = 0; i < 2; i++) {
inputTopic2.pipeInput(expectedKeys[i], "a" + expectedKeys[i]);
}
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0),
new KeyValueTimestamp<>(1, "A1+a1", 0));
我希望這有幫助。
參考文獻:[1] https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java#L279
添加回答
舉報
