我正在嘗試對@KafkaListener 的單元測試進行測試,但我不知道如何導入此類“AcknowledgeingConsumerAwareMessageListener”。@Testpublic void test() throws Exception { ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry .getListenerContainer("EmptyLegCommandFeed_V2"); container.stop(); @SuppressWarnings("unchecked") AcknowledgingConsumerAwareMessageListener<String, String> messageListener = (AcknowledgingConsumerAwareMessageListener<String, String>) container .getContainerProperties().getMessageListener(); CountDownLatch latch = new CountDownLatch(1); container.getContainerProperties() .setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, String>() { @Override public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) { messageListener.onMessage(data, acknowledgment, consumer); latch.countDown(); } }); container.start(); template.send("EmptyLegCommandFeed_V2", "foo"); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();}
1 回答

慕田峪4524236
TA貢獻1875條經驗 獲得超5個贊
spring-kafka
package 中 jar 中的那個類org.springframework.kafka.listener.adapter
。
添加回答
舉報
0/150
提交
取消