3 回答

TA貢獻1789條經驗 獲得超10個贊
你可以嘗試這樣的事情:
public class ConsumerDemoWithThread {
private Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThread.class.getName());
private String bootstrapServers = "127.0.0.1:9092";
private String groupId = "my-first-application";
private String topic = "first-topic";
KafkaConsumer consumer = createConsumer(bootstrapServers, groupId, topic);
private void pollForRecords() {
? ? ExecutorService executor = Executors.newSingleThreadExecutor();
? ? executor.submit(() -> processRecords());
}
private KafkaConsumer createConsumer(String bootstrapServers, String groupId, String topic) {
? ? Properties properties = new Properties();
? ? properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
? ? properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
? ? properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
? ? properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
? ? properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
? ? // create consumer
? ? KafkaConsumer consumer = new KafkaConsumer<String, String>(properties);
? ? // subscribe consumer to our topic(s)
? ? consumer.subscribe(Arrays.asList(topic));
? ? return consumer;
}
private void processRecords() {
? ? try {
? ? ? ? while (true) {
? ? ? ? ? ? ConsumerRecords<String, String> records =
? ? ? ? ? ? ? ? ? ? consumer.poll(Duration.ofMillis(100));
? ? ? ? ? ? for (ConsumerRecord<String, String> record : records) {
? ? ? ? ? ? ? ? logger.info("Key: " + record.key() + ", Value: " + record.value());
? ? ? ? ? ? ? ? logger.info("Partition: " + record.partition() + ", Offset:" + record.offset());
? ? ? ? ? ? }
? ? ? ? }
? ? } catch (WakeupException e) {
? ? ? ? logger.info("Received shutdown signal!");
? ? } finally {
? ? ? ? consumer.close();
? ? }
}
public static void main(String[] args) {
? ? ConsumerDemoWithThread consumerDemoWithThread = new ConsumerDemoWithThread();
? ? consumerDemoWithThread.pollForRecords();
}
}

TA貢獻2021條經驗 獲得超8個贊
您可以使用@KafkaListener,
然而,它也會以無限循環進行輪詢,因為這就是 Kafka 的設計方式——它不是一個隊列,而是一個存儲一段時間記錄的事件總線。沒有通知其消費者的機制。
輪詢不同的線程并以優雅的方式退出循環。

TA貢獻1880條經驗 獲得超4個贊
如果您希望能夠在代碼中同時執行多項操作,則需要后臺線程。
為了更輕松地做到這一點,您可以使用更高級別的 Kafka 庫,例如 Spring(已回答)、Vert.x或Smallrye
這是一個 Vert.x 示例,首先創建一個KafkaConsumer
,然后分配處理程序并訂閱您的主題
consumer.handler(record -> {
? System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
? ? ",partition=" + record.partition() + ",offset=" + record.offset());
});
// subscribe to a single topic
consumer.subscribe("a-single-topic");
添加回答
舉報