本文详细介绍了Kafka重复消费项目实战,包括环境搭建、消息生产和消费、重复消费问题的产生及解决方案。通过具体案例解析了如何使用数据去重和消息幂等性处理来避免重复消费问题。
Kafka简介与环境搭建 Kafka的基本概念Apache Kafka 是一个分布式的流处理平台,最初由 LinkedIn 开发,后捐赠给 Apache 基金会。Kafka 被设计用于构建实时数据管道和流应用。它提供了高吞吐量、持久化消息、容错性以及实时处理等功能。Kafka 是一个分布式系统,这意味着它可以在多个计算机节点上运行,每个节点称为一个 broker,处理数据分发和存储。
在 Kafka 中,数据以主题(topic)的形式发布和订阅。主题是对消息流的抽象,可以看作是发布/订阅系统的等效概念。每一个主题可以被多个消费者订阅,消费者可以从主题中获取事件流。
Kafka 的消息是持久化的,这意味着消息不会在传递到消费者后立即被丢弃。相反,每个消息都会被存储在磁盘上,直到达到预定的保留期限。这种持久化特性使得 Kafka 能够支持流处理应用程序,这些应用程序需要处理历史数据。
Kafka 还提供了分区(partition)的概念,每个主题可以有多个分区。分区将主题的消息流分成多个并行的流,这样就可以在多个消费者之间进行负载均衡。每个分区都是一个顺序的日志文件,其中包含一系列消息。这使得 Kafka 能够支持并行数据处理,并且可以保证消息的顺序。
安装与配置Kafka环境安装 Kafka 需要 Java 环境,因此首先需要安装 JDK。安装步骤如下:
-
下载并安装 JDK。下载地址为 https://www.oracle.com/java/technologies/javase-downloads.html 或者使用 openjdk,下载地址为 https://openjdk.java.net/install/。
-
安装完成后,设置环境变量。编辑
~/.bashrc
文件,添加以下内容:export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 export PATH=$JAVA_HOME/bin:$PATH
然后运行
source ~/.bashrc
使环境变量生效。 -
下载 Kafka。下载地址为 https://kafka.apache.org/downloads。
-
解压 Kafka 压缩包:
tar -xzf kafka_2.13-3.1.0.tgz cd kafka_2.13-3.1.0
-
配置 Kafka。修改
config/server.properties
文件,设置broker.id
和log.dirs
参数。broker.id
是 Kafka 服务器的唯一标识符,log.dirs
是日志文件的存储位置。broker.id=0 log.dirs=/tmp/kafka-logs
Kafka 可以以单节点或多节点模式运行。以下步骤演示如何启动一个简单的单节点 Kafka 服务器:
-
启动 Kafka 服务器:
bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh config/server.properties &
-
创建一个主题:
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
-
启动一个 Kafka 消费者,用来查看生产者发送的消息:
bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
-
启动一个 Kafka 生产者,发送消息到主题
test
:bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
现在,可以向生产者发送消息到 test
主题,消费者会实时接收并显示这些消息。
生产者是将数据发送到 Kafka 主题的客户端。生产者发送消息到 Kafka 消费者可以订阅的主题。生产者通过客户端 API 发送消息并选择消息发送到哪个分区。Kafka 提供了生产者客户端库的 Java 实现。以下是一个简单的 Java 生产者示例:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("test", "key" + i, "value" + i));
}
producer.close();
}
}
这段代码创建了一个 Kafka 生产者,将消息发送到主题 test
。消息的键和值都是字符串类型。
消费者是订阅一个或多个 Kafka 主题的客户端。消费者读取消息并可以根据需要处理它们。Kafka 的消费者 API 也提供了一个 Java 实现。以下是一个简单的 Java 消费者示例:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
这段代码创建了一个 Kafka 消费者,订阅了主题 test
并持续读取消息。消费者会将接收到的消息打印到控制台。
Kafka 生产者和消费者有许多配置参数。以下是一些常见的配置及其作用:
-
bootstrap.servers
:用于连接到 Kafka 代理的地址,格式为host1:port1,host2:port2
。 -
key.serializer
和value.serializer
:键和值序列化器的类名。这些类负责将键和值转换为字节,以便它们可以被发送到 Kafka 主题。 -
key.deserializer
和value.deserializer
:键和值反序列化器的类名。这些类负责将键和值从字节数组转换为对象。 -
group.id
:消费组标识符。每个消费者必须属于一个消费组,消费组是 Kafka 为实现负载均衡而设计的概念。 -
auto.offset.reset
:当 Kafka 中没有可消费的位置时,消费者的初始偏移量。可能的值有latest
(开头位置)和earliest
(结尾位置)。 enable.auto.commit
:控制消费者是否自动提交偏移量。如果设置为true
,则消费者会定期自动提交其偏移量。
在分布式系统中,重复消费问题是一个常见的问题。在 Kafka 中,重复消费可能由以下原因引起:
- 消费者重启:当消费者崩溃并重启时,它可能会多次消费相同的消息。
- 消息重试:当消息处理失败时,为了保证消息的可靠性,系统可能会自动重试,但如果重试机制没有得到正确的实现,可能会导致消息被重复消费。
- 系统容错:当系统在处理消息时出现故障,进行故障恢复后,可能会导致重复消费。
重复消费可能导致数据的不一致性,比如更新操作被多次执行,这将导致数据的重复更新。例如,一个账户余额更新操作被重复执行,最终导致账户余额增加过多。此外,重复消费还可能导致系统状态的不一致,比如计算结果被多次计算,最终导致计算结果的错误。重复消费还可能导致系统资源的浪费,比如消息被多次处理,导致系统资源的浪费。例如,一个日志处理任务被重复执行,导致日志文件被多次读取和处理。
解决重复消费的策略 数据去重方法数据去重是解决重复消费问题的一种常见方法。数据去重的方法有很多,比如使用数据库的唯一约束,使用消息的唯一标识符等。
使用数据库的唯一约束是最简单的一种数据去重方法。当消息被处理时,可以将消息存储到数据库中,并设置唯一约束。这样,当消息被重复消费时,唯一约束会阻止消息被多次存储到数据库中。
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class DataDeDuplication {
public void handleMessage(String message) {
String insertQuery = "INSERT INTO messages (id, content) VALUES (?, ?)";
String selectQuery = "SELECT * FROM messages WHERE id = ?";
try (Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/kafka", "user", "password");
PreparedStatement insert = connection.prepareStatement(insertQuery);
PreparedStatement select = connection.prepareStatement(selectQuery);
ResultSet resultSet = select.executeQuery(selectQuery)) {
if (!resultSet.next()) {
insert.setString(1, message);
insert.setString(2, message);
insert.executeUpdate();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
这段代码使用了一个简单的插入查询,将消息存储到数据库中。在插入之前,它首先检查消息是否已经存在于数据库中。如果消息已经存在,那么它会被忽略。
序列化与唯一标识使用消息的唯一标识符也是一种有效的方法。每个消息都有一个唯一的标识符,当消息被处理时,可以使用这个标识符来检查消息是否已经被处理过。如果消息已经被处理过,那么它会被忽略。
以下是一个使用唯一标识符的例子:
public class UniqueIdentifier {
private Map<String, Boolean> processedMessages = new HashMap<>();
public void handleMessage(String message) {
if (!processedMessages.containsKey(message)) {
// Process the message
processedMessages.put(message, true);
}
}
}
这段代码使用了一个简单的哈希表来存储已经处理过的消息。当消息被处理时,它会检查消息是否已经存在于哈希表中。如果消息已经存在,那么它会被忽略。
消息幂等性处理消息幂等性处理是一种确保消息被最多处理一次的方法。消息幂等性处理的方法有很多,比如使用数据库的唯一约束,使用消息的唯一标识符等。
以下是一个使用数据库唯一约束的例子:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class Idempotence {
public void handleMessage(String message) {
String insertQuery = "INSERT INTO messages (id, content, processed) VALUES (?, ?, ?)";
String selectQuery = "SELECT * FROM messages WHERE id = ?";
try (Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/kafka", "user", "password");
PreparedStatement insert = connection.prepareStatement(insertQuery);
PreparedStatement select = connection.prepareStatement(selectQuery);
ResultSet resultSet = select.executeQuery(selectQuery)) {
if (!resultSet.next()) {
insert.setString(1, message);
insert.setString(2, message);
insert.setString(3, "false");
insert.executeUpdate();
} else {
// The message has already been processed
return;
}
} catch (SQLException e) {
e.printStackTrace();
}
// Process the message
try (Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/kafka", "user", "password");
PreparedStatement update = connection.prepareStatement("UPDATE messages SET processed = 'true' WHERE id = ?")) {
update.setString(1, message);
update.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
这段代码使用了一个简单的插入查询,将消息存储到数据库中。在插入之前,它首先检查消息是否已经存在于数据库中。如果消息已经存在,那么它会被忽略。如果消息不存在,它会被插入数据库,并设置为未处理。处理完消息后,状态会被更新为已处理。
实战案例解析 搭建测试环境为了演示如何处理重复消费问题,我们需要搭建一个简单的测试环境。以下是搭建测试环境的步骤:
-
配置 Kafka 生产者和消费者。
# producer.properties bootstrap.servers=localhost:9092 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer # consumer.properties bootstrap.servers=localhost:9092 group.id=my-group key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer auto.offset.reset=earliest
-
在 Kafka 中创建一个主题
test
。bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
-
启动一个 Kafka 生产者,将消息发送到主题
test
。import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class SimpleProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord<String, String>("test", "key" + i, "value" + i)); } producer.close(); } }
-
启动一个 Kafka 消费者,从主题
test
中消费消息。import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }
-
启动一个 Kafka 消费者,从主题
test
中消费消息,但使用数据库的唯一约束来处理重复消息。import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; public class DeDupeConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { if (!handleMessage(record.key(), record.value())) { continue; } System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } private boolean handleMessage(String key, String value) { String insertQuery = "INSERT INTO messages (id, content) VALUES (?, ?)"; String selectQuery = "SELECT * FROM messages WHERE id = ?"; try (Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/kafka", "user", "password"); PreparedStatement insert = connection.prepareStatement(insertQuery); PreparedStatement select = connection.prepareStatement(selectQuery); ResultSet resultSet = select.executeQuery(selectQuery)) { if (!resultSet.next()) { insert.setString(1, key); insert.setString(2, value); insert.executeUpdate(); return true; } } catch (SQLException e) { e.printStackTrace(); } return false; } }
在以上测试环境中,我们需要实现一个简单的重复消息处理机制。以下是实现的具体步骤:
-
创建一个简单的生产者,将消息发送到 Kafka 主题。
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class SimpleProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord<String, String>("test", "key" + i, "value" + i)); } producer.close(); } }
-
创建一个简单的消费者,从 Kafka 主题中消费消息。
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; public class DeDupeConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { if (!handleMessage(record.key(), record.value())) { continue; } System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } private boolean handleMessage(String key, String value) { String insertQuery = "INSERT INTO messages (id, content) VALUES (?, ?)"; String selectQuery = "SELECT * FROM messages WHERE id = ?"; try (Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/kafka", "user", "password"); PreparedStatement insert = connection.prepareStatement(insertQuery); PreparedStatement select = connection.prepareStatement(selectQuery); ResultSet resultSet = select.executeQuery(selectQuery)) { if (!resultSet.next()) { insert.setString(1, key); insert.setString(2, value); insert.executeUpdate(); return true; } } catch (SQLException e) { e.printStackTrace(); } return false; } }
- 在消费者中,使用数据库的唯一约束来处理重复消息。当接收到一条消息时,消费者会检查消息是否已经存在于数据库中。如果消息已经存在,那么它会被忽略。
为了确保我们的实现是正确的,我们需要运行测试,并确认结果。以下是运行测试的具体步骤:
-
启动一个 Kafka 生产者,将消息发送到主题
test
。java -cp kafka_2.13-3.1.0/libs/kafka-clients-3.1.0.jar:./SimpleProducer.jar SimpleProducer
-
启动一个 Kafka 消费者,从主题
test
中消费消息,并使用数据库的唯一约束来处理重复消息。java -cp kafka_2.13-3.1.0/libs/kafka-clients-3.1.0.jar:./DeDupeConsumer.jar DeDupeConsumer
-
确认数据库中存储的消息是唯一的。可以通过查询数据库来验证消息是否已经被处理过。
SELECT * FROM messages;
- 关闭消费者,并重启消费者,验证消息是否被重复处理。
在本项目中,我们学习了如何搭建 Kafka 环境,并使用 Kafka 生产者和消费者发送和接收消息。我们还学习了如何处理重复消费问题,包括使用数据库的唯一约束,使用消息的唯一标识符等。最后,我们实现了一个简单的重复消息处理机制,并运行了测试以验证我们的实现。
Kafka进阶学习建议对于想要更深入学习 Kafka 的读者,建议学习以下主题:
- Kafka 的高级特性,比如事务、幂等性、消息时间戳等。
- Kafka 的架构,比如消费者组、分区、副本等。
- Kafka 的扩展性,比如如何扩展 Kafka 集群,如何使用 Kafka 集群进行流处理等。
- Kafka 的监控和管理,比如如何监控 Kafka 的性能,如何管理 Kafka 的集群等。
可以通过慕课网 (http://www.xianlaiwan.cn/) 学习更多关于 Kafka 的课程,或者阅读 Kafka 的官方文档 (https://kafka.apache.org/documentation.html)。
共同學習,寫下你的評論
評論加載中...
作者其他優質文章