Read.javapublic class Read { public static void main(String[] args) { String conn = "db_url"; String username = "*****"; String pwd = "*****"; String sql = "INSERT INTO table (coloumn) values (?)"; Properties props = new Properties(); props.put("bootstrap.servers", "10.247.36.174:3306"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); rops.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = null; try (Connection con = DriverManager.getConnection(conn, username, pwd); PreparedStatement ps = con.prepareStatement(sql); BufferedReader br = new BufferedReader(new FileReader("All.log"));) { String line = null; processMessages(line, br, listparam, ps); break; } } catch (Exception ex) { System.err.println("Error in \n" + ex); } finally { producer = new KafkaProducer<>(props); String msg = "Done"; producer.send(new ProducerRecord<String, String>("HelloKafka", msg)); System.out.println("Sent: " + msg); } producer.close(); }我是卡夫卡的新手。有人可以告訴我哪一部分做錯了嗎?我不知道如何在java中使用Kafka。在 ReadLg.java 中,我想從日志文件中讀取并將其插入到數據庫中,然后在完成后我想向 RetrieveData.java 發送一條消息,以便它可以啟動。檢索數據將運行,但空閑等待來自 ReadLg.java 的消息。這是一個不好的方法嗎?還是舊方法?有什么建議或幫助解決這個問題嗎?我不斷收到錯誤“無法連接到 IP 地址”,
1 回答

MYYA
TA貢獻1868條經驗 獲得超4個贊
總結:要使用 Kafka Consumer/Producer,首先必須啟動 Zookeeper 和 Kafka Broker。
為了測試或開發目的,您可以自己使用以下命令啟動它:
如果您的 Kafka 已準備就緒,您就可以開始使用它了。您必須記住設置正確的值bootstrap.server
(對于本地使用通常是localhost:9092
)
添加回答
舉報
0/150
提交
取消