我真的很難編寫一個測試來檢查當消息發送到它的指定主題時我的 Kafka 消費者是否被正確調用。我的消費者:@Service@Slf4j@AllArgsConstructor(onConstructor = @__(@Autowired))public class ProcessingConsumer { private AppService appService; @KafkaListener( topics = "${topic}", containerFactory = "processingConsumerContainerFactory") public void listen(ConsumerRecord<Key, Value> message, Acknowledgment ack) { try { appService.processMessage(message); ack.acknowledge(); } catch (Throwable t) { log.error("error while processing message!", t); } }}我的消費者配置:@EnableKafka@Configurationpublic class ProcessingCosumerConfig { @Value("${spring.kafka.schema-registry-url}") private String schemaRegistryUrl; private KafkaProperties props; public ProcessingCosumerConfig(KafkaProperties kafkaProperties) { this.props = kafkaProperties; } public Map<String, Object> deserializerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); return props; } private KafkaAvroDeserializer getKafkaAvroDeserializer(Boolean isKey) { KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(); kafkaAvroDeserializer.configure(deserializerConfigs(), isKey); return kafkaAvroDeserializer; } private DefaultKafkaConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>( props.buildConsumerProperties(), getKafkaAvroDeserializer(true), getKafkaAvroDeserializer(false)); }
添加回答
舉報
0/150
提交
取消