本文将详细介绍如何手写RocketMQ的关键数据结构和组件设计,包括消息、主题、标签、生产者和消费者等重要概念,并深入探讨消息发送和接收模块的实现细节。此外,还将介绍手写RocketMQ的运行步骤和测试方法,确保消息的可靠传输和高效处理。
RocketMQ简介 RocketMQ是什么RocketMQ是阿里开源的一款分布式消息中间件,它基于Java语言开发,能够支持大规模分布式系统中的异步通信和大规模数据传输。RocketMQ在阿里巴巴集团内部被广泛应用于各种业务场景中,如订单交易、物流跟踪、广告推送等,其高性能和高可靠性得到了充分验证。
RocketMQ的设计目标是实现高吞吐量、高可用性和高可靠性的消息传递,提供了丰富的消息路由、消息过滤和消息重试等功能。此外,RocketMQ还支持多种消息模型,包括点对点消息模型和发布订阅消息模型,可以根据不同的应用场景灵活选择。
RocketMQ的作用及优势作用
- 解耦系统: RocketMQ可以帮助系统之间的解耦,使得生产者和消费者之间无需直接调用对方的接口,通过消息中间件进行通信。
- 异步处理: 支持异步消息处理,提高系统的响应速度和灵活性。
- 流量削峰: 在高并发场景下,通过消息队列进行流量削峰,避免系统被瞬间的大流量冲击而崩溃。
- 日志收集与分析: 用于系统间的数据交换,特别是日志收集和分析场景。
- 任务调度: RocketMQ可以用来实现任务调度,例如定时任务的触发和执行。
优势
- 高吞吐量: RocketMQ支持每秒百万级的消息吞吐量,适用于高并发场景。
- 高可用性: RocketMQ通过分布式集群部署,提供了极高的可用性,即使部分节点失效,也能保证服务的连续性。
- 高可靠性: RocketMQ具备消息重复发送、消息顺序性保证等功能,确保消息不丢失。
- 多语言支持: RocketMQ支持Java、C++、Python等多种语言客户端,具有良好的跨平台性。
- 插件扩展性: RocketMQ具有良好的插件扩展性,可以通过插件进行扩展,例如监控插件、审计插件等。
RocketMQ采用分布式消息中间件架构,其核心组件包括:
- NameServer: RocketMQ中的NameServer负责维护broker和topic的元数据信息,提供了broker的地址信息注册和查询服务。
- Broker: RocketMQ中的Broker负责消息的存储和转发。RocketMQ支持主备模式的Broker集群部署,主备Broker之间可以实现消息同步,从而保证了消息的可靠性和一致性。
- Producer: 消息生产者,负责发送消息到Broker,可以配置消息的topic、tag、key等信息。
- Consumer: 消息消费者,负责从Broker拉取消息。RocketMQ支持按主题订阅和按标签订阅,可以根据不同的场景选择合适的订阅策略。
- 客户端组件: RocketMQ的客户端组件包括消息发送端(Producer)和消息接收端(Consumer),实现了与NameServer和Broker之间的通信协议。
RocketMQ使用主备模式的Broker集群,确保了系统的高可用性。同时,RocketMQ支持多种消息模型,包括发布/订阅模型和点对点模型,可以根据业务需求选择合适的消息模型。
NameServer实现示例
public class NameServer {
private Map<String, String> brokerAddressMap;
public NameServer() {
this.brokerAddressMap = new HashMap<>();
}
public void registerBroker(String brokerName, String address) {
this.brokerAddressMap.put(brokerName, address);
}
public String getBrokerAddress(String brokerName) {
return this.brokerAddressMap.get(brokerName);
}
}
Broker实现示例
public class Broker {
private String name;
private Map<String, List<Message>> topicMessagesMap;
public Broker(String name) {
this.name = name;
this.topicMessagesMap = new HashMap<>();
}
public void addMessage(String topic, Message message) {
this.topicMessagesMap.computeIfAbsent(topic, k -> new ArrayList<>()).add(message);
}
public List<Message> getMessages(String topic) {
return this.topicMessagesMap.getOrDefault(topic, Collections.emptyList());
}
}
Producer实现示例
public class Producer {
private String name;
private String brokerAddress;
private List<Message> messages;
public Producer(String name, String brokerAddress) {
this.name = name;
this.brokerAddress = brokerAddress;
this.messages = new ArrayList<>();
}
public void sendMessage(Message message) {
// 将消息发送到Broker
this.messages.add(message);
System.out.println("Producer " + name + " sent message: " + message.getMessageBody());
}
public String getBrokerAddress() {
return brokerAddress;
}
public String getName() {
return name;
}
public List<Message> getMessages() {
return messages;
}
}
Consumer实现示例
public class Consumer {
private String name;
private String brokerAddress;
private List<Message> messages;
public Consumer(String name, String brokerAddress) {
this.name = name;
this.brokerAddress = brokerAddress;
this.messages = new ArrayList<>();
}
public void receiveMessage(Message message) {
// 从Broker拉取消息
this.messages.add(message);
System.out.println("Consumer " + name + " received message: " + message.getMessageBody());
}
public String getBrokerAddress() {
return brokerAddress;
}
public String getName() {
return name;
}
public List<Message> getMessages() {
return messages;
}
}
开发环境搭建
Java开发环境配置
安装JDK
- 访问JDK官方网站下载最新版本的JDK。
- 解压下载的JDK安装包到指定目录,例如
/usr/local/java/jdk1.8.0_181
。 - 设置环境变量,编辑
/etc/profile
文件,加入以下内容:export JAVA_HOME=/usr/local/java/jdk1.8.0_181 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
- 使环境变量生效:
source /etc/profile
- 验证JDK安装:
java -version
输出类似信息,表明安装成功:
java version "1.8.0_181" Java(TM) SE Runtime Environment (build 1.8.0_181-b13) Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
安装Java开发工具包
安装Java开发工具包(JDK)主要包括JRE(Java Runtime Environment)和JDK(Java Development Kit)。为了开发RocketMQ,我们只需要JDK。
安装Maven
- 访问Maven官方网站下载最新版本的Maven。
- 解压下载的Maven安装包到指定目录,例如
/usr/local/maven/apache-maven-3.6.3
。 - 设置环境变量,编辑
/etc/profile
文件,加入以下内容:export MAVEN_HOME=/usr/local/maven/apache-maven-3.6.3 export PATH=$MAVEN_HOME/bin:$PATH
- 使环境变量生效:
source /etc/profile
- 验证Maven安装:
mvn -v
输出类似信息,表明安装成功:
Apache Maven 3.6.3 (cecedd343002696d0abb6dd0b81e15b3f82b1b92; 2019-03-25T14:59:27+08:00) Maven home: /usr/local/maven/apache-maven-3.6.3 Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/local/java/jdk1.8.0_181/jre Default locale: en_US, platform encoding: UTF-8 OS name: "linux", version: "4.15.0-124-generic", arch: "amd64", family: "unix"
Maven项目构建工具安装
通过上述步骤安装Maven后,还需要配置Maven,确保其正确运行。
-
创建Maven配置文件
settings.xml
,通常位于$MAVEN_HOME/conf/settings.xml
。配置文件示例如下:<settings> <localRepository>/path/to/local/repo</localRepository> <mirrors> <mirror> <id>aliyun</id> <name>Aliyun Maven Mirror</name> <url>https://maven.aliyun.com/repository/public</url> <mirrorOf>central</mirrorOf> </mirror> </mirrors> <profiles> <profile> <id>aliyun</id> <repositories> <repository> <id>aliyun</id> <url>https://maven.aliyun.com/repository/public</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> </profile> </profiles> </settings>
-
创建Maven项目目录结构:
mkdir -p ~/workspace/maven-project cd ~/workspace/maven-project mvn archetype:generate -DgroupId=com.example -DartifactId=maven-project -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
- 进入项目目录并构建项目:
cd maven-project mvn clean install
下载RocketMQ源码
- 访问RocketMQ官方GitHub仓库。
- 点击绿色的
Code
按钮,选择Download ZIP
下载压缩包。 - 解压下载的压缩包到指定目录,例如
/usr/local/src/rocketmq
。
解析RocketMQ源码
RocketMQ的源码结构如下:
bin
: 包含RocketMQ的启动脚本。conf
: 包含RocketMQ的配置文件。src/main/java
: 包含RocketMQ的核心Java代码。src/main/resources
: 包含RocketMQ的资源文件。
编译RocketMQ源码
- 进入RocketMQ源码目录:
cd /usr/local/src/rocketmq
- 使用Maven编译RocketMQ源码:
mvn clean install -DskipTests
运行RocketMQ示例
RocketMQ提供了一系列示例代码,位于RocketMQ_HOME/src/main/java/org/apache/rocketmq/example
目录下。可以运行这些示例代码来熟悉RocketMQ的使用方法。
- 启动NameServer:
nohup sh bin/mqnamesrv &
- 启动Broker:
nohup sh bin/mqbroker -n localhost:9876 &
- 运行示例代码:
cd /usr/local/src/rocketmq/src/main/java/org/apache/rocketmq/example java -cp ./target/rocketmq-examples-4.5.2.jar org.apache.rocketmq.example.quickstart.Producer java -cp ./target/rocketmq-examples-4.5.2.jar org.apache.rocketmq.example.quickstart.Consumer
手写RocketMQ需要定义一些关键的数据结构,包括消息、主题、标签、生产者、消费者等。
消息Message
消息是RocketMQ中最基本的数据单位,它由以下几个部分组成:
messageId
: 消息ID,用于唯一标识一条消息。messageBody
: 消息内容,通常是一个字符串或字节数组。topic
: 主题,表示消息所属的分类。tags
: 标签,用于进一步分类和过滤消息。key
: 键,用于消息的唯一标识。timestamp
: 消息发送的时间戳。properties
: 消息的附加属性,可以用来存储额外的信息。
定义消息结构的示例代码如下:
public class Message {
private String messageId;
private String messageBody;
private String topic;
private String tags;
private String key;
private long timestamp;
private Map<String, String> properties;
public Message(String topic, String tags, String key, String messageBody) {
this.messageId = UUID.randomUUID().toString();
this.messageBody = messageBody;
this.topic = topic;
this.tags = tags;
this.key = key;
this.timestamp = System.currentTimeMillis();
this.properties = new HashMap<>();
}
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getTags() {
return tags;
}
public void setTags(String tags) {
this.tags = tags;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getMessageBody() {
return messageBody;
}
public void setMessageBody(String messageBody) {
this.messageBody = messageBody;
}
public long getTimestamp() {
return timestamp;
}
public Map<String, String> getProperties() {
return properties;
}
public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
}
主题Topic
主题用于对消息进行分类,一个主题可以包含多个标签和多个消息。
定义主题结构的示例代码如下:
public class Topic {
private String name;
private Map<String, List<Message>> tagMessagesMap;
public Topic(String name) {
this.name = name;
this.tagMessagesMap = new HashMap<>();
}
public void addMessage(String tag, Message message) {
tagMessagesMap.computeIfAbsent(tag, k -> new ArrayList<>()).add(message);
}
public List<Message> getMessagesByTag(String tag) {
return tagMessagesMap.getOrDefault(tag, Collections.emptyList());
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Map<String, List<Message>> getTagMessagesMap() {
return tagMessagesMap;
}
}
标签Tag
标签用于进一步分类消息,一个主题下可以有多个标签。
定义标签结构的示例代码如下:
public class Tag {
private String name;
private List<Message> messageList;
public Tag(String name) {
this.name = name;
this.messageList = new ArrayList<>();
}
public void addMessage(Message message) {
this.messageList.add(message);
}
public List<Message> getMessageList() {
return messageList;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
生产者Producer
生产者负责发送消息到Broker。
定义生产者结构的示例代码如下:
public class Producer {
private String name;
private String brokerAddress;
private List<Message> messages;
public Producer(String name, String brokerAddress) {
this.name = name;
this.brokerAddress = brokerAddress;
this.messages = new ArrayList<>();
}
public void sendMessage(Message message) {
// 将消息发送到Broker
this.messages.add(message);
System.out.println("Producer " + name + " sent message: " + message.getMessageBody());
}
public String getBrokerAddress() {
return brokerAddress;
}
public String getName() {
return name;
}
public List<Message> getMessages() {
return messages;
}
}
消费者Consumer
消费者负责从Broker拉取消息。
定义消费者结构的示例代码如下:
public class Consumer {
private String name;
private String brokerAddress;
private List<Message> messages;
public Consumer(String name, String brokerAddress) {
this.name = name;
this.brokerAddress = brokerAddress;
this.messages = new ArrayList<>();
}
public void receiveMessage(Message message) {
// 从Broker拉取消息
this.messages.add(message);
System.out.println("Consumer " + name + " received message: " + message.getMessageBody());
}
public String getBrokerAddress() {
return brokerAddress;
}
public String getName() {
return name;
}
public List<Message> getMessages() {
return messages;
}
}
主要组件设计思路
Broker
Broker是消息的存储和转发中心,负责接收生产者发送的消息并将其存储到指定的主题和标签下。当消费者请求拉取消息时,Broker会从存储中取出相应消息并返回给消费者。
设计Broker的关键点包括:
- 消息存储:使用哈希表或树结构存储消息,可以按照主题和标签进行索引。
- 消息转发:当消费者请求拉取消息时,Broker会从存储中取出相应消息并返回。
- 消息持久化:将消息持久化到磁盘,确保消息不丢失。
- 消息过滤:支持按主题、标签、键等进行消息过滤。
NameServer
NameServer负责维护Broker和主题的元数据信息,提供了Broker的地址信息注册和查询服务。当生产者或消费者需要连接到Broker时,会首先查询NameServer获取Broker的地址。
设计NameServer的关键点包括:
- 元数据存储:使用内存中的哈希表或树结构存储Broker和主题的元数据信息。
- 注册服务:接收Broker的注册请求,并将Broker的地址信息存储到内存中。
- 查询服务:提供Broker地址信息的查询接口,供生产者和消费者使用。
- 心跳机制:与Broker建立心跳机制,确保NameServer知道Broker的状态。
Producer
Producer是消息的发送者,负责将消息发送到指定的Broker。Producer可以配置消息的topic、tag、key等信息,并将消息发送到对应的Broker。
设计Producer的关键点包括:
- 发送消息:将消息发送到指定的Broker。
- 重试机制:当发送消息失败时,自动重试发送。
- 批量发送:支持批量发送消息,提高发送效率。
- 消息过滤:支持按主题、标签、键等进行消息过滤。
Consumer
Consumer是消息的接收者,负责从指定的Broker拉取消息。Consumer可以按主题、标签、键等订阅消息,并从Broker处拉取消息。
设计Consumer的关键点包括:
- 拉取消息:从指定的Broker拉取消息。
- 批量拉取:支持批量拉取消息,提高拉取效率。
- 消息过滤:支持按主题、标签、键等进行消息过滤。
- 消息消费确认:在消费完消息后,向Broker确认消息已消费。
消息模型
RocketMQ支持两种消息模型:点对点消息模型和发布/订阅消息模型。
- 点对点消息模型:每条消息只能被一个消费者消费。RocketMQ中的每个消息都有一个唯一的键(key),确保消息的唯一性。
- 发布/订阅消息模型:每条消息可以被多个消费者订阅并消费。RocketMQ中的每个消息有主题(topic)和标签(tag),消费者可以按主题或标签订阅消息。
消息发送模式
RocketMQ支持同步和异步两种消息发送模式。
- 同步发送:生产者发送消息后需要等待Broker的响应,确认消息是否成功发送。
- 异步发送:生产者发送消息后不需要等待Broker的响应,消息发送结果通过回调函数返回。
消息消费模式
RocketMQ支持独占消费和广播消费两种消息消费模式。
- 独占消费:每个消费者只消费一条消息。RocketMQ中的每个消费者都有一个唯一的消费者ID(consumerId),确保消息的唯一性。
- 广播消费:每条消息被所有消费者消费。RocketMQ中的每个消费者都可以消费所有消息。
消息重试机制
RocketMQ支持消息重试机制,当消息发送失败时,会自动重试发送。RocketMQ提供了多种重试策略,包括固定间隔重试、指数退避重试等。
消息顺序性保障
RocketMQ支持消息顺序性保障,确保消息按照发送顺序被消费。RocketMQ通过消息键(key)和消息队列(queue)来实现消息顺序性保障。
消息过滤机制
RocketMQ支持多种消息过滤机制,包括主题过滤、标签过滤、键过滤等。RocketMQ中的每个消息都有主题(topic)、标签(tag)和键(key),消费者可以按主题、标签、键等进行消息过滤。
消息持久化策略
RocketMQ支持多种消息持久化策略,包括内存缓存、磁盘持久化等。RocketMQ的Broker将消息持久化到磁盘,确保消息不丢失。
消息路由机制
RocketMQ支持多种消息路由机制,包括广播路由、集群路由、分区路由等。RocketMQ的NameServer负责维护Broker的路由信息,并提供Broker地址信息查询服务。
消息发送模块手写 消息发送流程概述消息发送流程包括以下几个步骤:
- 初始化生产者:创建生产者实例,并配置消息的topic、tag、key等信息。
- 发送消息:调用生产者的发送消息接口,将消息发送到指定的Broker。
- 消息存储:Broker接收消息后,将其存储到指定的topic和tag下。
- 消息转发:当消费者请求拉取消息时,Broker会从存储中取出相应消息并返回给消费者。
创建生产者实例
首先,需要创建生产者实例,并配置消息的topic、tag、key等信息。
public class Producer {
private String name;
private String brokerAddress;
private List<Message> messages;
public Producer(String name, String brokerAddress) {
this.name = name;
this.brokerAddress = brokerAddress;
this.messages = new ArrayList<>();
}
public void sendMessage(Message message) {
// 将消息发送到Broker
this.messages.add(message);
System.out.println("Producer " + name + " sent message: " + message.getMessageBody());
}
public String getBrokerAddress() {
return brokerAddress;
}
public String getName() {
return name;
}
public List<Message> getMessages() {
return messages;
}
}
创建消息对象
接下来,需要创建消息对象,并设置消息的topic、tag、key等信息。
public class Message {
private String messageId;
private String messageBody;
private String topic;
private String tags;
private String key;
private long timestamp;
private Map<String, String> properties;
public Message(String topic, String tags, String key, String messageBody) {
this.messageId = UUID.randomUUID().toString();
this.messageBody = messageBody;
this.topic = topic;
this.tags = tags;
this.key = key;
this.timestamp = System.currentTimeMillis();
this.properties = new HashMap<>();
}
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getTags() {
return tags;
}
public void setTags(String tags) {
this.tags = tags;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getMessageBody() {
return messageBody;
}
public void setMessageBody(String messageBody) {
this.messageBody = messageBody;
}
public long getTimestamp() {
return timestamp;
}
public Map<String, String> getProperties() {
return properties;
}
public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
}
发送消息
最后,调用生产者的发送消息接口,将消息发送到指定的Broker。
public class Main {
public static void main(String[] args) {
Producer producer = new Producer("Producer1", "localhost:9876");
Message message = new Message("Topic1", "Tag1", "Key1", "Hello, World!");
producer.sendMessage(message);
}
}
消息发送异常处理机制
处理网络异常
当发送消息时,如果网络异常导致消息发送失败,需要进行异常处理。
public class Producer {
private String name;
private String brokerAddress;
private List<Message> messages;
public Producer(String name, String brokerAddress) {
this.name = name;
this.brokerAddress = brokerAddress;
this.messages = new ArrayList<>();
}
public void sendMessage(Message message) {
try {
// 将消息发送到Broker
this.messages.add(message);
System.out.println("Producer " + name + " sent message: " + message.getMessageBody());
} catch (IOException e) {
// 网络异常导致消息发送失败
System.err.println("Network error when sending message");
e.printStackTrace();
}
}
public String getBrokerAddress() {
return brokerAddress;
}
public String getName() {
return name;
}
public List<Message> getMessages() {
return messages;
}
}
处理消息格式异常
当发送消息时,如果消息格式异常导致消息发送失败,需要进行异常处理。
public class Producer {
private String name;
private String brokerAddress;
private List<Message> messages;
public Producer(String name, String brokerAddress) {
this.name = name;
this.brokerAddress = brokerAddress;
this.messages = new ArrayList<>();
}
public void sendMessage(Message message) {
try {
// 检查消息格式
if (message.getMessageBody() == null || message.getMessageBody().isEmpty()) {
throw new IllegalArgumentException("Message body cannot be empty");
}
// 将消息发送到Broker
this.messages.add(message);
System.out.println("Producer " + name + " sent message: " + message.getMessageBody());
} catch (IllegalArgumentException e) {
// 消息格式异常导致消息发送失败
System.err.println("Message format error when sending message");
e.printStackTrace();
}
}
public String getBrokerAddress() {
return brokerAddress;
}
public String getName() {
return name;
}
public List<Message> getMessages() {
return messages;
}
}
处理消息队列满异常
当发送消息时,如果消息队列满导致消息发送失败,需要进行异常处理。
public class Producer {
private String name;
private String brokerAddress;
private List<Message> messages;
public Producer(String name, String brokerAddress) {
this.name = name;
this.brokerAddress = brokerAddress;
this.messages = new ArrayList<>();
}
public void sendMessage(Message message) {
try {
// 将消息发送到Broker
this.messages.add(message);
System.out.println("Producer " + name + " sent message: " + message.getMessageBody());
} catch (IllegalStateException e) {
// 消息队列满导致消息发送失败
System.err.println("Message queue full when sending message");
e.printStackTrace();
}
}
public String getBrokerAddress() {
return brokerAddress;
}
public String getName() {
return name;
}
public List<Message> getMessages() {
return messages;
}
}
消息接收模块手写
消息接收流程分析
消息接收流程包括以下几个步骤:
- 初始化消费者:创建消费者实例,并配置消息的topic、tag、key等信息。
- 拉取消息:调用消费者拉取消息接口,从指定的Broker拉取消息。
- 消息消费:消费者从Broker拉取消息后,执行消息消费逻辑。
- 消息确认:在消费完消息后,向Broker确认消息已消费。
创建消费者实例
首先,需要创建消费者实例,并配置消息的topic、tag、key等信息。
public class Consumer {
private String name;
private String brokerAddress;
private List<Message> messages;
public Consumer(String name, String brokerAddress) {
this.name = name;
this.brokerAddress = brokerAddress;
this.messages = new ArrayList<>();
}
public void receiveMessage(Message message) {
// 从Broker拉取消息
this.messages.add(message);
System.out.println("Consumer " + name + " received message: " + message.getMessageBody());
}
public String getBrokerAddress() {
return brokerAddress;
}
public String getName() {
return name;
}
public List<Message> getMessages() {
return messages;
}
}
拉取消息
接下来,调用消费者拉取消息接口,从指定的Broker拉取消息。
public class Main {
public static void main(String[] args) {
Consumer consumer = new Consumer("Consumer1", "localhost:9876");
Message message = new Message("Topic1", "Tag1", "Key1", "Hello, World!");
consumer.receiveMessage(message);
}
}
消息消费
消费者从Broker拉取消息后,执行消息消费逻辑。
public class Consumer {
private String name;
private String brokerAddress;
private List<Message> messages;
public Consumer(String name, String brokerAddress) {
this.name = name;
this.brokerAddress = brokerAddress;
this.messages = new ArrayList<>();
}
public void receiveMessage(Message message) {
// 从Broker拉取消息
this.messages.add(message);
System.out.println("Consumer " + name + " received message: " + message.getMessageBody());
// 执行消息消费逻辑
consumeMessage(message);
}
private void consumeMessage(Message message) {
// 消息消费逻辑
System.out.println("Message consumed: " + message.getMessageBody());
}
public String getBrokerAddress() {
return brokerAddress;
}
public String getName() {
return name;
}
public List<Message> getMessages() {
return messages;
}
}
消息接收中的常见问题与解决方案
消息重复消费问题
当消费者从Broker拉取消息后,如果Broker还没有收到消息消费确认,那么Broker会认为消息还未消费,可能会再次拉取该消息,导致消息重复消费。
解决方案:在消息消费完成后,向Broker发送消息确认请求,确保消息只被消费一次。
public class Consumer {
private String name;
private String brokerAddress;
private List<Message> messages;
public Consumer(String name, String brokerAddress) {
this.name = name;
this.brokerAddress = brokerAddress;
this.messages = new ArrayList<>();
}
public void receiveMessage(Message message) {
// 从Broker拉取消息
this.messages.add(message);
System.out.println("Consumer " + name + " received message: " + message.getMessageBody());
// 执行消息消费逻辑
consumeMessage(message);
}
private void consumeMessage(Message message) {
// 消息消费逻辑
System.out.println("Message consumed: " + message.getMessageBody());
// 发送消息确认请求
confirmMessageConsumption(message);
}
private void confirmMessageConsumption(Message message) {
// 发送消息确认请求
System.out.println("Message confirmed: " + message.getMessageBody());
}
public String getBrokerAddress() {
return brokerAddress;
}
public String getName() {
return name;
}
public List<Message> getMessages() {
return messages;
}
}
消息丢失问题
当消费者从Broker拉取消息后,如果消费者崩溃或重启,可能会导致消息丢失。
解决方案:在消息消费完成后,向Broker发送消息确认请求,保证消息已被可靠消费,即使消费者崩溃或重启也不会丢失消息。
public class Consumer {
private String name;
private String brokerAddress;
private List<Message> messages;
public Consumer(String name, String brokerAddress) {
this.name = name;
this.brokerAddress = brokerAddress;
this.messages = new ArrayList<>();
}
public void receiveMessage(Message message) {
// 从Broker拉取消息
this.messages.add(message);
System.out.println("Consumer " + name + " received message: " + message.getMessageBody());
// 执行消息消费逻辑
consumeMessage(message);
}
private void consumeMessage(Message message) {
// 消息消费逻辑
System.out.println("Message consumed: " + message.getMessageBody());
// 发送消息确认请求
confirmMessageConsumption(message);
}
private void confirmMessageConsumption(Message message) {
// 发送消息确认请求
System.out.println("Message confirmed: " + message.getMessageBody());
}
public String getBrokerAddress() {
return brokerAddress;
}
public String getName() {
return name;
}
public List<Message> getMessages() {
return messages;
}
}
消息延迟问题
当消费者从Broker拉取消息后,如果消息处理时间较长,可能会导致消息延迟。
解决方案:在消息消费完成后,向Broker发送消息确认请求,确保消息处理完毕后才会确认。
public class Consumer {
private String name;
private String brokerAddress;
private List<Message> messages;
public Consumer(String name,.
private void confirmMessageConsumption(Message message) {
// 发送消息确认请求
System.out.println("Message confirmed: " + message.getMessageBody());
}
public String getBrokerAddress() {
return brokerAddress;
}
public String getName() {
return name;
}
public List<Message> getMessages() {
return messages;
}
}
``
# 运行与测试
## 手写RocketMQ的运行步骤
1. **启动NameServer**:确保NameServer启动正常。
2. **启动Broker**:确保Broker启动正常,并注册到NameServer。
3. **启动生产者**:创建生产者实例,并配置消息的topic、tag、key等信息。
4. **启动消费者**:创建消费者实例,并配置消息的topic、tag、key等信息。
5. **发送消息**:调用生产者的发送消息接口,将消息发送到指定的Broker。
6. **拉取消息**:调用消费者拉取消息接口,从指定的Broker拉取消息。
7. **消息消费**:消费者从Broker拉取消息后,执行消息消费逻辑。
### 启动NameServer
```bash
nohup sh bin/mqnamesrv &
启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
启动生产者
public class Main {
public static void main(String[] args) {
Producer producer = new Producer("Producer1", "localhost:9876");
Message message = new Message("Topic1", "Tag1", "Key1", "Hello, World!");
producer.sendMessage(message);
}
}
启动消费者
public class Main {
public static void main(String[] args) {
Consumer consumer = new Consumer("Consumer1", "localhost:9876");
Message message = new Message("Topic1", "Tag1", "Key1", "Hello, World!");
consumer.receiveMessage(message);
}
}
消息发送接收测试
测试发送消息
public class Main {
public static void main(String[] args) {
Producer producer = new Producer("Producer1", "localhost:9876");
Message message = new Message("Topic1", "Tag1", "Key1", "Hello, World!");
producer.sendMessage(message);
}
}
测试接收消息
public class Main {
public static void main(String[] args) {
Consumer consumer = new Consumer("Consumer1", "localhost:9876");
Message message = new Message("Topic1", "Tag1", "Key1", "Hello, World!");
consumer.receiveMessage(message);
}
}
故障排查与性能优化
故障排查
- 检查NameServer日志:查看NameServer的启动日志,确保NameServer启动正常。
- 检查Broker日志:查看Broker的启动日志,确保Broker启动正常,并注册到NameServer。
- 检查生产者日志:查看生产者的日志,确保生产者发送消息正常。
- 检查消费者日志:查看消费者的日志,确保消费者拉取消息正常。
性能优化
- 批量发送消息:使用批量发送消息的方式提高发送效率。
- 批量拉取消息:使用批量拉取消息的方式提高拉取效率。
- 消息压缩:对消息进行压缩,减少网络传输的开销。
- 消息压缩:对消息进行压缩,减少存储空间的开销。
共同學習,寫下你的評論
評論加載中...
作者其他優質文章