本文深入探讨了RocketMQ底层原理,并通过项目实战案例详细介绍了RocketMQ的环境搭建、核心概念、消息发送与接收机制以及消息存储和消费过程,旨在帮助读者全面理解RocketMQ底层原理项目实战。
RocketMQ简介与环境搭建 RocketMQ简介RocketMQ是由阿里巴巴研发的一款高性能、可扩展的分布式消息中间件。它支持亿级并发量的消息处理能力,具有高可用、高可靠、分布式部署等特点。RocketMQ的设计目标是为企业提供低延迟、高吞吐量、高可用性的消息中间件服务。其核心功能包括消息传输、事务消息、定时消息、顺序消息等。RocketMQ广泛应用于电商、金融、物流等领域,支持大规模分布式环境下的消息传递和应用解耦。
开发环境搭建安装Java
RocketMQ基于Java开发,因此需要确保开发环境中已经安装了Java环境。以下是安装步骤:
- 访问Oracle官方网站或开源中国的Java开发人员页面下载最新版本的Java JDK。
- 解压下载的JDK安装包,例如
tar -zxf jdk-8uXXX-linux-x64.tar.gz
。 - 设置环境变量,编辑
/etc/profile
文件,添加以下内容:export JAVA_HOME=/path/to/jdk export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
- 更新环境变量,运行
source /etc/profile
。
安装RocketMQ
- 访问RocketMQ的GitHub仓库下载RocketMQ的源码包或者二进制包。
- 解压下载的RocketMQ包,例如
tar -zxf rocketmq-all-4.9.2-bin-release.tar.gz
。 - 进入RocketMQ目录,例如
cd rocketmq-all-4.9.2
。 - 执行以下命令启动名称服务器(Name Server):
nohup sh bin/mqnamesrv &
- 启动Broker,进入RocketMQ目录并执行以下命令:
nohup sh bin/mqbroker -n localhost:9876 &
创建生产者和消费者
先创建一个简单的生产者和消费者示例。生产者将消息发送到指定的主题,消费者从主题中消费消息。
-
创建生产者程序:
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("TopicTest", // topic "TagA", // tag "OrderID188", // key ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body SendResult sendResult = producer.send(msg); producer.shutdown(); System.out.println(sendResult); } }
-
创建消费者程序:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyListener; import org.apache.rocketmq.client.consumer.listener.MessageQueueListener; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyMessageContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.common.consumer.ConsumeOrderlyStatus; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener((msgs, context) -> { for (MessageExt msg : msgs) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg); } return ConsumeOrderlyStatus.SUCCESS; }); consumer.start(); } }
- 运行生产者,生成的消息将被发送到
TopicTest
主题。运行消费者,消费者将从TopicTest
主题中接收并处理消息。
消息发送和接收的流程
生产者通过DefaultMQProducer
实例发送消息,首先通过producer.start()
启动生产者,设置名称服务器地址,然后创建Message
对象,设置消息的主题、标签、键和内容。调用producer.send()
方法发送消息,最后通过调用producer.shutdown()
关闭生产者。
消费者通过DefaultMQPushConsumer
实例接收消息,首先通过consumer.start()
启动消费者,设置名称服务器地址,然后通过consumer.subscribe()
订阅指定主题的消息。注册消息监听器,监听并处理消息。最后通过调用consumer.shutdown()
关闭消费者。
RocketMQ提供了多种类型的消息模型,包括普通消息、事务消息、定时消息和顺序消息。
普通消息
普通消息是最基本的消息类型,消息发送方将消息发送到Broker,Broker负责将消息持久化并分发给订阅者。
事务消息
事务消息是一种特殊的事务消息,它支持消息发送方和接收方之间的一致性事务。消息发送方可以执行本地事务,然后通过事务消息插件发送消息,接收方接收到消息后执行相应的业务逻辑。事务消息插件会根据业务逻辑的执行结果决定消息的最终状态(提交或回滚)。
定时消息
定时消息允许发送者设置一个消息的延迟时间,消息将在指定的时间之后发送给接收者。发送者可以指定消息的延迟时间范围,例如10秒、30秒、1分钟、2分钟、3分钟、4分钟、5分钟、6分钟、7分钟、8分钟、9分钟、10分钟、30分钟、1小时、2小时。
顺序消息
顺序消息是一种特殊的事务消息,它确保消息在消费端按照发送顺序进行消费。发送者可以将消息分成多个批次,每个批次的消息按照发送顺序进行消费。发送者和接收者之间需要使用相同的事务ID来保证顺序消费。
主要组件介绍RocketMQ由多个核心组件组成,包括消息发送者(Producer)、消息接收者(Consumer)、名称服务器(Name Server)和Broker。
消息发送者(Producer)
消息发送者负责将消息发送到Broker。RocketMQ提供了多种消息发送方式,例如同步发送、异步发送和单向发送。发送者可以配置消息的属性,例如消息的主题、标签、键和内容。发送者还可以配置消息的发送策略,例如消息的重试次数和时间间隔。
消息接收者(Consumer)
消息接收者负责从Broker接收消息并处理消息。RocketMQ提供了多种消息接收方式,例如顺序消费和并发消费。接收者可以配置消息的消费策略,例如消息的消费模式、消费线程数和消费超时时间。
名称服务器(Name Server)
名称服务器负责维护Broker的连接信息,提供Broker的地址信息给消息发送者和接收者。名称服务器通过心跳机制与Broker保持联系,如果Broker出现故障,名称服务器会将Broker从连接列表中移除,并通知消息发送者和接收者。
Broker
Broker是消息的传输和存储中心,负责接收消息发送者发送的消息,并将消息存储到磁盘或内存中。Broker还负责将消息分发给订阅者,并提供消息的查询和删除功能。Broker支持集群模式,多个Broker可以组成一个集群,提供高可用性和负载均衡。
名称服务器与Broker作用名称服务器的作用是维护Broker的连接信息,提供Broker的地址信息给消息发送者和接收者。名称服务器通过心跳机制与Broker保持联系,如果Broker出现故障,名称服务器会将Broker从连接列表中移除,并通知消息发送者和接收者。
Broker的作用是接收消息发送者发送的消息,并将消息存储到磁盘或内存中。Broker还负责将消息分发给订阅者,并提供消息的查询和删除功能。Broker支持集群模式,多个Broker可以组成一个集群,提供高可用性和负载均衡。
RocketMQ消息发送与接收 发送消息的基本流程创建Producer实例
首先创建一个DefaultMQProducer
实例,设置Producer的名称、消息发送策略和异常处理策略。
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.setSendMsgTimeout(3000); // 消息发送超时时间
producer.setRetryTimesWhenSendFailed(2); // 消息发送失败时的重试次数
启动Producer实例
通过调用producer.start()
方法启动Producer实例。
producer.start();
创建Message实例
创建一个Message
实例,设置消息的属性,例如消息的主题、标签、键和内容。
Message msg = new Message("TopicTest", // topic
"TagA", // tag
"OrderID188", // key
("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
发送消息
调用producer.send()
方法发送消息。
SendResult sendResult = producer.send(msg);
关闭Producer实例
通过调用producer.shutdown()
方法关闭Producer实例。
producer.shutdown();
接收消息的基本流程
创建Consumer实例
首先创建一个DefaultMQPushConsumer
实例,设置Consumer的名称、消息接收策略和异常处理策略。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeMessageBatchMaxSize(100); // 消息批处理的最大条数
consumer.setConsumeTimeout(1000); // 消息消费超时时间
订阅主题
通过调用consumer.subscribe()
方法订阅指定主题的消息。
consumer.subscribe("TopicTest", "*"); // 订阅TopicTest主题下的所有标签
配置消息监听器
通过调用consumer.registerMessageListener()
方法注册消息监听器,监听并处理消息。
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
}
return ConsumeOrderlyStatus.SUCCESS;
});
启动Consumer实例
通过调用consumer.start()
方法启动Consumer实例。
consumer.start();
关闭Consumer实例
通过调用consumer.shutdown()
方法关闭Consumer实例。
consumer.shutdown();
常见消息发送与接收模式
普通消息发送模式
普通消息发送模式是最基本的消息发送模式,消息发送者将消息发送到Broker,Broker负责将消息持久化并分发给订阅者。
Message msg = new Message("TopicTest", // topic
"TagA", // tag
"OrderID188", // key
("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
SendResult sendResult = producer.send(msg);
事务消息发送模式
事务消息发送模式支持消息发送方和接收方之间的一致性事务。消息发送者可以执行本地事务,然后通过事务消息插件发送消息,接收者接收到消息后执行相应的业务逻辑。事务消息插件会根据业务逻辑的执行结果决定消息的最终状态(提交或回滚)。
TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, new LocalTransactionChecker() {
@Override
public LocalTransactionState checkLocalTransaction(MessageExt message) {
// 执行本地事务
return LocalTransactionState.COMMIT_MESSAGE;
}
});
定时消息发送模式
定时消息发送模式允许发送者设置一个消息的延迟时间,消息将在指定的时间之后发送给接收者。发送者可以指定消息的延迟时间范围,例如10秒、30秒、1分钟、2分钟、3分钟、4分钟、5分钟、6分钟、7分钟、8分钟、9分钟、10分钟、30分钟、1小时、2小时。
Message msg = new Message("TopicTest", // topic
"TagA", // tag
"OrderID188", // key
("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
msg.setDelayTimeLevel(3); // 设置消息的延迟时间范围
SendResult sendResult = producer.send(msg);
顺序消息发送模式
顺序消息发送模式是一种特殊的事务消息,它确保消息在消费端按照发送顺序进行消费。发送者可以将消息分成多个批次,每个批次的消息按照发送顺序进行消费。发送者和接收者之间需要使用相同的事务ID来保证顺序消费。
TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, new LocalTransactionChecker() {
@Override
public LocalTransactionState checkLocalTransaction(MessageExt message) {
// 执行本地事务
return LocalTransactionState.COMMIT_MESSAGE;
}
}, "TransactionID");
消息接收模式
RocketMQ支持顺序消费和并发消费两种消息接收模式。顺序消费模式确保消息在消费端按照发送顺序进行消费,而并发消费模式允许多个消费线程并行处理消息。
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
}
return ConsumeOrderlyStatus.SUCCESS;
});
RocketMQ消息存储机制
消息的物理存储结构
RocketMQ的消息存储结构主要由两个部分组成:消息文件和索引文件。
消息文件
消息文件是RocketMQ存储消息的物理文件,每个Broker都有独立的消息文件。消息文件是一个二进制文件,文件名为message_index
,文件路径为storePathRootDir
下的message_index
子目录。消息文件采用了追加写入的方式,每次写入一条消息时,都会追加到文件末尾。消息文件的格式如下:
字段名称 | 字段长度 | 字段描述 |
---|---|---|
magic | 4 | 魔数,标识文件格式 |
version | 4 | 版本号 |
bodySize | 4 | 消息体长度 |
body | variable | 消息体 |
tag | variable | 消息标签 |
key | variable | 消息键 |
topic | variable | 消息主题 |
queueId | 4 | 队列ID |
offset | 8 | 消息偏移量 |
sysFlag | 2 | 系统标志位 |
bornHost | 4 | 生产者IP地址 |
bornTimestamp | 8 | 生产时间戳 |
msgId | variable | 消息ID |
storeHost | 4 | 存储节点IP地址 |
storeTimestamp | 8 | 存储时间戳 |
queueOffset | 8 | 队列偏移量 |
sysPropertiesSize | 4 | 系统属性长度 |
sysProperties | variable | 系统属性 |
索引文件
索引文件是RocketMQ存储消息索引的文件,每个Broker都有独立的索引文件。索引文件是一个二进制文件,文件名为index
,文件路径为storePathRootDir
下的message_index
子目录。索引文件是一个内存映射文件,文件格式如下:
字段名称 | 字段长度 | 字段描述 |
---|---|---|
magic | 4 | 魔数,标识文件格式 |
version | 4 | 版本号 |
indexTotalSize | 8 | 索引总大小 |
indexSize | 8 | 索引大小 |
indexKeySize | 4 | 索引键大小 |
indexValueSize | 4 | 索引值大小 |
indexKeys | variable | 索引键 |
indexValues | variable | 索引值 |
索引文件中的indexKeys
表示消息的键,indexValues
表示消息的偏移量。通过索引文件可以快速查找消息的偏移量,提高消息读取效率。
RocketMQ的消息逻辑存储结构是指消息在Broker中的存储形式。RocketMQ采用逻辑队列的方式存储消息,每个主题对应一个或多个逻辑队列。消息存储在逻辑队列中,每个逻辑队列有一个唯一的队列ID。每个逻辑队列可以分布在多个物理Broker上,每个Broker都有独立的消息文件和索引文件。
消息的逻辑队列
RocketMQ的消息逻辑队列是指消息在Broker中的存储形式。每个主题对应一个或多个逻辑队列,每个逻辑队列有一个唯一的队列ID。每个逻辑队列可以分布在多个物理Broker上,每个Broker都有独立的消息文件和索引文件。
消息的存储模式
RocketMQ支持多种消息存储模式,例如顺序存储和随机存储。顺序存储模式是指消息按照发送顺序存储在逻辑队列中,而随机存储模式是指消息按照随机的方式存储在逻辑队列中。
消息的删除机制
RocketMQ提供了多种消息删除机制,例如消息过期删除和手动删除。消息过期删除是指消息在过期时间之后会被自动删除,而手动删除是指用户可以通过API手动删除消息。
消息的读取过程RocketMQ的消息读取过程包括以下几个步骤:
读取索引文件
首先读取索引文件,通过索引文件中的indexKeys
和indexValues
快速查找消息的偏移量。
File indexFile = new File("message_index");
RandomAccessFile raf = new RandomAccessFile(indexFile, "r");
long fileSize = raf.length();
long indexTotalSize = raf.readInt();
long indexSize = raf.readLong();
long indexKeySize = raf.readInt();
long indexValueSize = raf.readInt();
long[] indexKeys = new long[(indexSize / (indexKeySize + indexValueSize)) * indexKeySize];
long[] indexValues = new long[(indexSize / (indexKeySize + indexValueSize)) * indexValueSize];
for (int i = 0; i < indexSize / (indexKeySize + indexValueSize); i++) {
for (int j = 0; j < indexKeySize; j++) {
indexKeys[i * indexKeySize + j] = raf.readLong();
}
for (int j = 0; j < indexValueSize; j++) {
indexValues[i * indexValueSize + j] = raf.readLong();
}
}
raf.close();
读取消息文件
通过索引文件中的indexValues
快速定位到消息文件中的offset
,然后读取消息文件中的offset
位置的数据。
File messageFile = new File("message_index", "message_file");
RandomAccessFile raf = new RandomAccessFile(messageFile, "r");
long offset = indexValues[0];
raf.seek(offset);
long magic = raf.readInt();
long version = raf.readInt();
int bodySize = raf.readInt();
byte[] body = new byte[bodySize];
raf.readFully(body, 0, bodySize);
String bodyStr = new String(body, RemotingHelper.DEFAULT_CHARSET);
raf.close();
解析消息
根据消息文件中的数据解析消息的属性,例如消息的主题、标签、键和内容。
String topic = "";
String tag = "";
String key = "";
String body = "";
// 根据消息文件中的数据解析消息的属性
返回消息
将解析后的消息返回给用户。
MessageExt message = new MessageExt();
message.setTopic(topic);
message.setTag(tag);
message.setKey(key);
message.setBody(body);
return message;
RocketMQ消息消费机制
消费模型详解
RocketMQ支持多种消息消费模型,例如顺序消费和并发消费。顺序消费模型是指消息按照发送顺序依次消费,而并发消费模型允许多个消费线程并行消费消息。
顺序消费模型
顺序消费模型是指消息按照发送顺序依次消费。RocketMQ通过为每个消息设置一个唯一的消费顺序ID来保证消息的顺序消费。每个消息的消费顺序ID由消息的生产者IP地址、生产时间戳和消息ID组成。
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
}
return ConsumeOrderlyStatus.SUCCESS;
});
并发消费模型
并发消费模型允许多个消费线程并行消费消息。RocketMQ通过为每个消息设置一个唯一的消费并行ID来保证消息的并发消费。每个消息的消费并行ID由消息的生产者IP地址、生产时间戳和消息ID组成。
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
消费者的启动与停止
RocketMQ提供了丰富的API来启动和停止消费者。通过调用consumer.start()
方法启动消费者,通过调用consumer.shutdown()
方法停止消费者。
consumer.start();
// 消费者接收消息
consumer.shutdown();
消费者容错机制
RocketMQ提供了多种消费者容错机制,例如消息重试和消息回溯。消息重试是指当消费者消费消息失败时,RocketMQ会自动将消息重新发送给消费者。消息回溯是指当消费者消费消息失败时,RocketMQ会自动将消息回溯到消息队列的末尾,等待重新消费。
消息重试
当消费者消费消息失败时,RocketMQ会自动将消息重新发送给消费者。消费者可以通过配置消息的重试次数来控制消息的重试次数。
consumer.setRetryTimesWhenSendFailed(2); // 消息发送失败时的重试次数
消息回溯
当消费者消费消息失败时,RocketMQ会自动将消息回溯到消息队列的末尾,等待重新消费。消费者可以通过配置消息的回溯策略来控制消息的回溯行为。
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 从消息队列的末尾开始消费
RocketMQ项目实战案例
实战需求分析
假设我们正在开发一个在线教育平台,该平台需要实现以下几个功能:
- 学生注册功能,将学生信息存储到数据库中,并发送注册成功消息到消息队列。
- 学生登录功能,验证学生信息是否正确,并发送登录成功消息到消息队列。
- 学生购买课程功能,将学生的购买记录存储到数据库中,并发送购买成功消息到消息队列。
为了实现上述功能,我们需要使用RocketMQ来实现消息队列功能,将学生注册、登录和购买课程等操作的异步通知实现。
实战步骤详解创建RocketMQ生产者
首先创建一个RocketMQ生产者,设置生产者名称、消息发送策略和异常处理策略。
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.setSendMsgTimeout(3000); // 消息发送超时时间
producer.setRetryTimesWhenSendFailed(2); // 消息发送失败时的重试次数
producer.start();
创建RocketMQ消费者
然后创建一个RocketMQ消费者,设置消费者名称、消息接收策略和异常处理策略。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeMessageBatchMaxSize(100); // 消息批处理的最大条数
consumer.setConsumeTimeout(1000); // 消息消费超时时间
consumer.subscribe("TopicTest", "*"); // 订阅TopicTest主题下的所有标签
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
}
return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();
实现学生注册功能
实现学生注册功能,将学生信息存储到数据库中,并发送注册成功消息到消息队列。
public void registerStudent(Student student) throws MQClientException {
// 存储学生信息到数据库
// ...
// 发送注册成功消息到消息队列
Message msg = new Message("TopicTest", // topic
"RegisterMessage", // tag
"StudentID1", // key
("Hello RocketMQ, Student Registered").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
SendResult sendResult = producer.send(msg);
}
实现学生登录功能
实现学生登录功能,验证学生信息是否正确,并发送登录成功消息到消息队列。
public void loginStudent(Student student) throws MQClientException {
// 验证学生信息是否正确
// ...
// 发送注册成功消息到消息队列
Message msg = new Message("TopicTest", // topic
"LoginMessage", // tag
"StudentID1", // key
("Hello RocketMQ, Student Logged In").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
SendResult sendResult = producer.send(msg);
}
实现学生购买课程功能
实现学生购买课程功能,将学生的购买记录存储到数据库中,并发送购买成功消息到消息队列。
public void purchaseCourse(Student student) throws MQClientException {
// 存储学生的购买记录到数据库
// ...
// 发送注册成功消息到消息队列
Message msg = new Message("TopicTest", // topic
"PurchaseMessage", // tag
"StudentID1", // key
("Hello RocketMQ, Student Purchased Course").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
SendResult sendResult = producer.send(msg);
}
实战代码实现与部署
实战代码实现
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class StudentService {
private DefaultMQProducer producer;
public StudentService() throws MQClientException {
producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.setSendMsgTimeout(3000);
producer.setRetryTimesWhenSendFailed(2);
producer.start();
}
public void registerStudent(Student student) throws MQClientException {
// 存储学生信息到数据库
// ...
// 发送注册成功消息到消息队列
Message msg = new Message("TopicTest", // topic
"RegisterMessage", // tag
"StudentID1", // key
("Hello RocketMQ, Student Registered").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
SendResult sendResult = producer.send(msg);
}
public void loginStudent(Student student) throws MQClientException {
// 验证学生信息是否正确
// ...
// 发送注册成功消息到消息队列
Message msg = new Message("TopicTest", // topic
"LoginMessage", // tag
"StudentID1", // key
("Hello RocketMQ, Student Logged In").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
SendResult sendResult = producer.send(msg);
}
public void purchaseCourse(Student student) throws MQClientException {
// 存储学生的购买记录到数据库
// ...
// 发送注册成功消息到消息队列
Message msg = new Message("TopicTest", // topic
"PurchaseMessage", // tag
"StudentID1", // key
("Hello RocketMQ, Student Purchased Course").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
SendResult sendResult = producer.send(msg);
}
public void shutdown() {
producer.shutdown();
}
}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderedListener;
import org.apache.rocketmq.client.consumer.listener.MessageQueueListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class MessageConsumer {
private DefaultMQPushConsumer consumer;
public MessageConsumer() throws Exception {
consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeMessageBatchMaxSize(100);
consumer.setConsumeTimeout(1000);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
}
return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();
}
public void shutdown() {
consumer.shutdown();
}
}
实战代码部署
部署RocketMQ生产者和消费者时,需要确保RocketMQ服务已经启动并且配置正确。RocketMQ生产者和消费者可以通过配置文件或者代码方式启动。
- 创建生产者和消费者实例,并设置相应的配置。
- 启动生产者和消费者实例。
- 注册相应的消息监听器,监听和处理消息。
- 通过发送消息或者消费消息实现相应的业务逻辑。
public class Main {
public static void main(String[] args) throws Exception {
StudentService studentService = new StudentService();
MessageConsumer messageConsumer = new MessageConsumer();
// 注册学生
studentService.registerStudent(new Student());
// 登录学生
studentService.loginStudent(new Student());
// 购买课程
studentService.purchaseCourse(new Student());
// 等待一段时间,确保消息已经发送并被消费
Thread.sleep(10000);
// 关闭生产者和消费者
studentService.shutdown();
messageConsumer.shutdown();
}
}
``
共同學習,寫下你的評論
評論加載中...
作者其他優質文章