亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在反序列化過程中不使用無限循環的情況下編寫kafka消費者?

如何在反序列化過程中不使用無限循環的情況下編寫kafka消費者?

慕桂英4014372 2023-07-28 09:49:58
如何在java中編寫kafka消費者而不使用無限循環進行輪詢?在處理傳入記錄函數中編寫了 while(true) 循環,其中輪詢新事件。如果我在我的項目中使用它,除了這個我無法做任何其他事情。有沒有辦法避免使用這種無限循環來獲取新事件??public static void main(String[] str) throws InterruptedException {? ? System.out.println("Starting? AtMostOnceConsumer ...");? ? execute();}private static void execute() throws InterruptedException {? ? KafkaConsumer<String, Event> consumer = createConsumer();? ? // Subscribe to all partition in that topic. 'assign' could be used here? ? // instead of 'subscribe' to subscribe to specific partition.? ? consumer.subscribe(Arrays.asList("topic"));? ? processRecords(consumer);}private static KafkaConsumer<String, Event> createConsumer() {? ? Properties props = new Properties();? ? String consumeGroup = "group_id";? ? props.put("group.id", consumeGroup);? ? props.put("org.slf4j.simpleLogger.defaultLogLevel", "INFO");? ? props.put("client.id", "clientId");? ? props.put("security.protocol", "SASL_SSL");? ? props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "servers");? ? props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");? ? props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");? ? props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + "username" + " password="" + "password";");? ? props.put("enable.auto.commit", "true");? ? // Auto commit interval, kafka would commit offset at this interval.? ? props.put("auto.commit.interval.ms", "101");? ? // This is how to control number of records being read in each poll? ? props.put("max.partition.fetch.bytes", "135");? ? // Set this if you want to always read from beginning.? ? // props.put("auto.offset.reset", "earliest");? ? props.put("heartbeat.interval.ms", "3000");? ? props.put("session.timeout.ms", "6001");}有人可以幫我修改這個,以便我可以避免while(true)循環并且可以只監聽我傳入的事件嗎?
查看完整描述

3 回答

?
至尊寶的傳說

TA貢獻1789條經驗 獲得超10個贊

你可以嘗試這樣的事情:


public class ConsumerDemoWithThread {

private Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThread.class.getName());

private String bootstrapServers = "127.0.0.1:9092";

private String groupId = "my-first-application";

private String topic = "first-topic";


KafkaConsumer consumer = createConsumer(bootstrapServers, groupId, topic);


private void pollForRecords() {

? ? ExecutorService executor = Executors.newSingleThreadExecutor();

? ? executor.submit(() -> processRecords());

}



private KafkaConsumer createConsumer(String bootstrapServers, String groupId, String topic) {

? ? Properties properties = new Properties();

? ? properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

? ? properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

? ? properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

? ? properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);

? ? properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

? ? // create consumer

? ? KafkaConsumer consumer = new KafkaConsumer<String, String>(properties);

? ? // subscribe consumer to our topic(s)

? ? consumer.subscribe(Arrays.asList(topic));

? ? return consumer;

}



private void processRecords() {

? ? try {

? ? ? ? while (true) {

? ? ? ? ? ? ConsumerRecords<String, String> records =

? ? ? ? ? ? ? ? ? ? consumer.poll(Duration.ofMillis(100));


? ? ? ? ? ? for (ConsumerRecord<String, String> record : records) {

? ? ? ? ? ? ? ? logger.info("Key: " + record.key() + ", Value: " + record.value());

? ? ? ? ? ? ? ? logger.info("Partition: " + record.partition() + ", Offset:" + record.offset());

? ? ? ? ? ? }

? ? ? ? }

? ? } catch (WakeupException e) {

? ? ? ? logger.info("Received shutdown signal!");

? ? } finally {

? ? ? ? consumer.close();

? ? }

}


public static void main(String[] args) {

? ? ConsumerDemoWithThread consumerDemoWithThread = new ConsumerDemoWithThread();

? ? consumerDemoWithThread.pollForRecords();

}

}

查看完整回答
反對 回復 2023-07-28
?
寶慕林4294392

TA貢獻2021條經驗 獲得超8個贊

您可以使用@KafkaListener,然而,它也會以無限循環進行輪詢,因為這就是 Kafka 的設計方式——它不是一個隊列,而是一個存儲一段時間記錄的事件總線。沒有通知其消費者的機制。

輪詢不同的線程并以優雅的方式退出循環。


查看完整回答
反對 回復 2023-07-28
?
慕村225694

TA貢獻1880條經驗 獲得超4個贊

如果您希望能夠在代碼中同時執行多項操作,則需要后臺線程。

為了更輕松地做到這一點,您可以使用更高級別的 Kafka 庫,例如 Spring(已回答)、Vert.x或Smallrye

這是一個 Vert.x 示例,首先創建一個KafkaConsumer,然后分配處理程序并訂閱您的主題

consumer.handler(record -> {

? System.out.println("Processing key=" + record.key() + ",value=" + record.value() +

? ? ",partition=" + record.partition() + ",offset=" + record.offset());

});


// subscribe to a single topic

consumer.subscribe("a-single-topic");


查看完整回答
反對 回復 2023-07-28
  • 3 回答
  • 0 關注
  • 162 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號