手写RocketMQ学习涵盖了从环境搭建到消息发送与接收的全过程,介绍了RocketMQ的基本概念、特点和优势,并详细讲解了生产者和消费者的实现方法。此外,文章还提供了实战演练和性能优化的建议,帮助读者全面掌握RocketMQ的使用技巧。
RocketMQ简介 RocketMQ的基本概念RocketMQ是一款由阿里巴巴开源的分布式消息中间件,它具有高可用、高可靠、强一致性等特点。RocketMQ主要适用于大规模分布式系统中的异步通信场景,它能够支持多种消息模式和灵活的消息路由机制,确保消息的可靠传输。
RocketMQ的消息模型包括生产者(Producer)、消费者(Consumer)和消息队列(Message Queue)。生产者负责发送消息,消费者负责接收并处理消息。消息队列作为中间层存放消息,确保消息的可靠传输。
RocketMQ的特点与优势- 高可用性和高可靠性:RocketMQ通过主从复制、多副本备份、事务消息等机制,确保消息的可靠传输和系统的高可用性。
- 高并发处理能力:RocketMQ能够支持每秒数百万的消息吞吐量,适用于高并发场景。
- 多种消息模式:RocketMQ支持多种消息模式,如普通消息、定时消息、延时消息、顺序消息等,满足不同的业务需求。
- 灵活的消息路由机制:RocketMQ支持自定义消息路由规则,方便进行消息的分发和处理。
- 丰富的管理接口:RocketMQ提供了丰富的管理和监控接口,便于进行日志查看、性能监控等运维操作。
- 电商领域:在订单系统中,RocketMQ可以用于订单创建、支付通知等场景,确保消息的可靠传输。
- 金融领域:在交易系统中,RocketMQ可以用于交易通知、账户余额更新等场景,保证系统的高可用性。
- 物流领域:在物流跟踪系统中,RocketMQ可以用于物流状态更新、通知消息推送等场景,提供实时的消息推送。
- 大数据处理:在大数据处理场景中,RocketMQ可以用于数据采集、数据分发等场景,提高数据处理的效率。
1. 下载RocketMQ
首先访问Apache RocketMQ下载页面,下载最新版本的RocketMQ。
wget https://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.9.3/apache-rocketmq-4.9.3-bin-release.zip
unzip apache-rocketmq-4.9.3-bin-release.zip
cd apache-rocketmq-4.9.3
2. 启动NameServer
RocketMQ使用NameServer作为服务注册中心,负责管理和分发消息。
nohup sh bin/mqnamesrv &
3. 启动Broker
RocketMQ Broker是消息的存储和转发组件。启动Broker时,需要指定对应的配置文件。
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker-a.properties &
至此,RocketMQ的环境搭建完成,可以通过ps -ef | grep mqnamesrv
和ps -ef | grep mqbroker
命令查看服务是否正常启动。
RocketMQ的配置文件位于conf
目录下,主要的配置文件包括broker-a.properties
和broker-b.properties
。这些配置文件定义了Broker的名称、IP地址、端口等信息。例如:
# broker-a.properties
brokerName=broker-a
brokerId=0
brokerRole=ASYNC_MASTER
namesrvAddr=localhost:9876
在配置文件中,brokerId
表示Broker的唯一标识符,brokerRole
表示Broker的角色,namesrvAddr
表示NameServer的地址。
启动Broker时,可以通过命令行参数指定配置文件:
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker-a.properties &
通过这种方式,可以灵活地配置和启动多个Broker实例。
手写消息发送 创建生产者实例在RocketMQ中,发送消息需要创建一个生产者实例。生产者实例可以通过RocketMQ提供的DefaultMQProducer
类来创建,并配置相关的参数。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
public class Producer {
public static void main(String[] args) {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
}
}
编写发送消息的代码示例
生产者创建完成后,可以通过send
方法发送消息。发送消息时,需要创建一个Message
对象,包含消息的主题(Topic)、消息体(Body)等信息。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String topic = "TestTopic";
String message = "Hello RocketMQ";
Message msg = new Message(topic, message.getBytes());
// 发送消息
producer.send(msg);
}
}
发送消息后,可以通过RocketMQ的管理界面或日志查看消息的发送状态。
消息发送的异步处理为了提高系统的性能,在发送消息时可以采用异步发送的方式。RocketMQ提供了异步发送的接口,可以通过回调函数来处理发送结果。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String topic = "TestTopic";
String message = "Hello RocketMQ";
Message msg = new Message(topic, message.getBytes());
// 异步发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("Message sent successfully");
}
@Override
public void onException(Throwable e) {
System.out.println("Failed to send message: " + e.getMessage());
}
});
}
}
通过这种方式,发送消息的过程不会阻塞主程序,提高了系统的并发性能。
手写消息接收 创建消费者实例在RocketMQ中,接收消息需要创建一个消费者实例。消费者实例可以通过RocketMQ提供的DefaultMQPushConsumer
类来创建,并配置相关的参数。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
// 设置从消息队列的末尾开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.COMMIT_MSG_ORDERLY);
consumer.subscribe("TestTopic", "*");
// 启动消费者
consumer.start();
}
}
编写接收消息的代码示例
消费者创建完成后,可以通过messageListener
来处理接收到的消息。RocketMQ为消息处理提供了多种监听器接口,可以灵活地处理不同类型的消息。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderedMessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageQueueListener;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.COMPENSATE_FROM_FAILED);
consumer.subscribe("TestTopic", "*");
// 设置消息处理监听器
consumer.setMessageListener(new ConsumeOrderedMessageListener() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("Receive new message: %s %n", new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
消费消息时,可以设置不同的消息处理策略,如顺序消费、批量消费等。
消费者的消息回溯处理在某些情况下,消费者可能需要重新消费之前的消息。RocketMQ提供了消息回溯的功能,可以通过设置pullFromWhere
参数来控制消息的回溯位置。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderedMessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageQueueListener;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.MessageQueue;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.RECONSUME_FROM_STORE_OFFSET);
consumer.subscribe("TestTopic", "*");
// 设置消息处理监听器
consumer.setMessageListener(new ConsumeOrderedMessageListener() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("Receive new message: %s %n", new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 设置消息回溯的MessageQueue
List<MessageQueue> mqs = consumer.fetchMessageQueues("TestTopic");
MessageQueue mq = mqs.get(0);
consumer.pull(mq, "*", 0L, 10L);
consumer.start();
}
}
通过这种方式,可以控制消费者从指定的位置开始消费消息。
常见问题与解决方案 消息发送失败的原因与处理方法消息发送失败的原因可能包括网络问题、消息队列已满、生产者配置错误等。在实际使用中,可以通过以下几种方式来处理消息发送失败的问题:
- 检查网络连接:确保生产者和NameServer、Broker之间的网络连接正常。
- 增加消息队列:如果消息队列已满,可以增加消息队列的数量,提高消息的存储和转发能力。
- 优化生产者配置:确保生产者配置正确,例如配置合适的发送超时时间、重试次数等参数。
- 使用异步发送:采用异步发送的方式,提高消息发送的效率。
- 增加备份Broker:通过主从复制、多副本备份的方式,增强系统的可靠性。
消费者消费不到消息的原因可能包括消息队列为空、订阅主题配置错误、消费者配置问题等。在实际使用中,可以通过以下几种方式来处理消费者消费不到消息的问题:
- 检查消息队列:确保消息队列中有待消费的消息,可以通过RocketMQ的管理界面查看消息队列的状态。
- 检查订阅主题配置:确保消费者订阅的主题配置正确,可以通过RocketMQ的管理界面查看消费者订阅的主题信息。
- 优化消费者配置:确保消费者配置正确,例如配置合适的拉取间隔、拉取的最大消息数等参数。
- 增加日志记录:增加消费者日志记录,查看消费者的运行日志,方便排查问题。
- 检查网络连接:确保消费者和NameServer、Broker之间的网络连接正常。
在某些情况下,消费者可能会重复消费相同的消息。这可能是由于网络抖动、消费者重启等原因导致的。在实际使用中,可以通过以下几种方式来解决消息重复消费的问题:
- 使用唯一标识:为每个消息设置唯一标识,避免重复处理相同的业务逻辑。
- 事务消息:使用RocketMQ的事务消息机制,确保消息的发送和消费具有强一致性的语义。
- 幂等性处理:在处理消息时,增加幂等性处理的逻辑,确保消息被处理多次时不会产生重复的结果。
- 重试机制:增加重试机制,确保消息能够被正确地处理,避免消息丢失或重复消费。
- 增加日志记录:增加消费者日志记录,查看消费者的运行日志,方便排查问题。
通过以上方式,可以有效地处理消息发送失败、消费者消费不到消息以及消息重复消费的问题,提高系统的稳定性和可靠性。
实战演练 小项目实战演练场景描述
假设我们有一个电商平台,需要在订单创建时发送消息到支付系统,以便进行支付通知。具体场景如下:
- 订单创建:用户下单后,订单系统会生成一条订单信息。
- 消息发送:订单系统将订单信息转换为消息,发送到RocketMQ。
- 支付通知:支付系统订阅订单主题,接收消息并进行支付处理。
代码实现
1. 创建订单系统
订单系统负责生成订单信息,并将订单信息发送到RocketMQ。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class OrderSystem {
public static void main(String[] args) throws Exception {
// 初始化生产者
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 创建订单信息
String orderId = "10001";
String orderInfo = "User 10001 ordered product 12345";
// 发送消息
producer.send(new Message("OrderTopic", orderInfo.getBytes()), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("Order sent successfully: " + orderId);
}
@Override
public void onException(Throwable e) {
System.out.println("Failed to send order: " + orderId);
}
});
// 关闭生产者
producer.shutdown();
}
}
2. 创建支付系统
支付系统订阅订单主题,接收订单信息并进行支付处理。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderedMessageListener;
import java.util.List;
public class PaymentSystem {
public static void main(String[] args) throws Exception {
// 初始化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PaymentConsumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("OrderTopic", "*");
// 设置消息处理监听器
consumer.setMessageListener(new ConsumeOrderedMessageListener() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("Receive new order: %s %n", new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
通过上述代码,可以在订单系统中创建订单信息并发送到RocketMQ,支付系统订阅订单主题并接收消息进行处理。
代码部署与调试技巧部署RocketMQ
RocketMQ的部署步骤如下:
-
编译RocketMQ:将RocketMQ源码下载后,进行编译。
mvn clean install -DskipTests
-
启动NameServer
nohup sh bin/mqnamesrv &
-
启动Broker
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker-a.properties &
- 部署应用:将订单系统和支付系统部署到相应的服务器上,并启动相关服务。
调试技巧
-
日志查看:RocketMQ提供了丰富的日志,可以通过日志查看消息的发送和接收情况。
tail -f logs/rocketmq.log
-
消息追踪:RocketMQ提供了消息追踪功能,可以通过消息ID查看消息的流转情况。
sh bin/mqadmin topicList -n localhost:9876 sh bin/mqadmin consumeStats -n localhost:9876 -t OrderTopic -c PaymentConsumer
-
性能监控:RocketMQ提供了性能监控接口,可以通过这些接口查看系统的运行状态。
sh bin/mqadmin brokerList -n localhost:9876 sh bin/mqadmin topicConfig -n localhost:9876 -t OrderTopic
通过以上步骤,可以有效地部署RocketMQ,并进行相关的调试和监控。
性能优化与调优建议性能优化
-
增加消息队列:增加消息队列的数量可以提高系统的消息存储和转发能力。
# broker-a.properties brokerName=broker-a brokerId=0 brokerRole=ASYNC_MASTER namesrvAddr=localhost:9876 messageQueueNums=16
-
优化网络配置:优化网络配置,提高网络传输的效率。
# broker-a.properties brokerName=broker-a brokerId=0 brokerRole=ASYNC_MASTER namesrvAddr=localhost:9876 bindIp=0.0.0.0 listenPort=10911
-
使用异步发送:使用异步发送的方式,提高消息发送的效率。
producer.send(new Message("OrderTopic", orderInfo.getBytes()), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("Order sent successfully: " + orderId); } @Override public void onException(Throwable e) { System.out.println("Failed to send order: " + orderId); } });
调优建议
-
调整生产者配置:根据消息发送的频率和延迟要求,调整生产者的超时时间、重试次数等参数。
# producer.properties producerName=OrderProducer namesrvAddr=localhost:9876 sendMsgTimeout=30000 retryTimesWhenSendFailed=0
-
调整消费者配置:根据消息消费的频率和延迟要求,调整消费者的拉取间隔、拉取的最大消息数等参数。
# consumer.properties consumerName=PaymentConsumer namesrvAddr=localhost:9876 pullInterval=1000 maxMsgNums=10
-
增加备份Broker:通过主从复制、多副本备份的方式,增强系统的可靠性。
# broker-b.properties brokerName=broker-b brokerId=1 brokerRole=SLAVE namesrvAddr=localhost:9876
通过以上优化和调优建议,可以提高RocketMQ的性能,确保系统的稳定性和可靠性。
综合以上内容,通过从环境搭建、消息发送与接收、常见问题解决方案到实战演练和性能优化,可以全面掌握RocketMQ的使用和优化技巧。希望这些内容对您的学习和实践有所帮助。
共同學習,寫下你的評論
評論加載中...
作者其他優質文章