亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

對使用 kstream 連接的 kafka 拓撲進行單元測試

對使用 kstream 連接的 kafka 拓撲進行單元測試

DIEA 2022-07-27 09:47:42
我有一個做兩個 kstream 連接的拓撲,我面臨的問題是當嘗試使用 TopologyTestDriver 進行單元測試時,發送幾個帶有 pipeInput 然后 readOutput 的 ConsumerRecords。它似乎不起作用。我認為這可能是因為聯接在我們在測試中不使用的實際 kafka 中使用了內部的 Rocksdb。所以我一直在尋找解決方案,但找不到任何解決方案。注意:這種測試方法在移除 kstream-kstream 連接時效果很好。
查看完整描述

2 回答

?
ITMISS

TA貢獻1871條經驗 獲得超8個贊

我有一個做兩個 kstream 連接的拓撲,我面臨的問題是當嘗試使用 TopologyTestDriver 進行單元測試時,發送幾個帶有 pipeInput 然后 readOutput 的 ConsumerRecords。它似乎不起作用。

按照設計,但不幸的是,在您的情況下,這TopologyTestDriver并不是 Kafka Streams 引擎在運行時如何工作的 100% 準確模型。值得注意的是,新的傳入事件的處理順序存在一些差異。

例如,在嘗試測試某些連接時,這確實會導致問題,因為這些操作依賴于特定的處理順序(例如,在流表連接中,表應該在流之前已經有一個鍵 'alice' 的條目- 'alice' 的輔助事件到達,否則流端 'alice' 的連接輸出將不包含任何表端數據)。

所以我一直在尋找解決方案,但找不到任何解決方案。

我建議使用啟動嵌入式 Kafka 集群的測試,然后使用“真正的”Kafka Streams 引擎(即,不是TopologyTestDriver. 實際上,這意味著您正在將測試從單元測試更改為集成/系統測試:您的測試將啟動一個成熟的 Kafka Streams 拓撲,該拓撲與與您的測試在同一臺機器上運行的嵌入式 Kafka 集群通信。

請參閱 Apache Kafka 項目中的 Kafka Streams 集成測試,其中EmbeddedKafkaClusterIntegrationTestUtils是工具的核心部分。連接的具體測試示例是StreamTableJoinIntegrationTest(有一些與連接相關的集成測試)及其父級AbstractJoinIntegrationTest。(對于它的價值,在https://github.com/confluentinc/kafka-streams-examples#examples-integration-tests有進一步的集成測試示例,其中包括在使用 Apache Avro 作為您的數據格式等)

但是,除非我弄錯了,否則集成測試及其工具包含在 Kafka Streams 的測試實用程序工件中(即,org.apache.kafka:kafka-streams-test-utils)。因此,您必須將一些復制粘貼到您自己的代碼庫中。


查看完整回答
反對 回復 2022-07-27
?
aluckdog

TA貢獻1847條經驗 獲得超7個贊

你看過 Kafka Streams 單元測試 [1] 嗎?這是關于輸入數據并使用模擬處理器檢查最終結果。


例如對于以下流連接:


        stream1 = builder.stream(topic1, consumed);

        stream2 = builder.stream(topic2, consumed);

        joined = stream1.outerJoin(

            stream2,

            MockValueJoiner.TOSTRING_JOINER,

            JoinWindows.of(ofMillis(100)),

            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));

        joined.process(supplier);

然后,您可以開始將輸入項通過管道傳輸到第一個或第二個主題中,并檢查每個連續的輸入管道,處理器可以檢查什么:


// push two items to the primary stream; the other window is empty

            // w1 = {}

            // w2 = {}

            // --> w1 = { 0:A0, 1:A1 }

            //     w2 = {}

            for (int i = 0; i < 2; i++) {

                inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]);

            }

            processor.checkAndClearProcessResult(EMPTY);


            // push two items to the other stream; this should produce two items

            // w1 = { 0:A0, 1:A1 }

            // w2 = {}

            // --> w1 = { 0:A0, 1:A1 }

            //     w2 = { 0:a0, 1:a1 }

            for (int i = 0; i < 2; i++) {

                inputTopic2.pipeInput(expectedKeys[i], "a" + expectedKeys[i]);

            }

            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0),

                new KeyValueTimestamp<>(1, "A1+a1", 0));

我希望這有幫助。


參考文獻:[1] https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java#L279


查看完整回答
反對 回復 2022-07-27
  • 2 回答
  • 0 關注
  • 199 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號