手写RocketMQ学习涉及RocketMQ的基本概念、安装配置、环境变量设置、消息的创建与发送、接收与消费,以及集群搭建和监控等详细步骤,帮助读者全面掌握RocketMQ的使用。
RocketMQ简介与安装 RocketMQ基本概念RocketMQ是一个分布式消息中间件,由阿里巴巴开源,目前在Apache软件基金会下作为顶级项目孵化。它具有高吞吐量、高可用性、可扩展性强等特性,广泛应用于大规模分布式系统中。RocketMQ的核心组件包括Name Server、Broker和Producer/Consumer。
-
Name Server:Name Server是消息中间件中的服务发现模块,负责维护Broker的路由信息。每个Broker都会向Name Server注册,Name Server则负责接收并维护这些信息,并提供给客户端进行查询。
-
Broker:Broker是消息的存储和转发中心,负责存储消息和转发消息。RocketMQ支持多个Broker集群,每个Broker集群可以分布在网络的不同节点上,实现消息的分布式存储和分发。
-
Producer:Producer是消息的发送者,负责向指定的Topic发送消息。发送消息时,Producer会将消息发送到Broker,同时也可以设置消息的属性,如消息的优先级、延迟级别等。
- Consumer:Consumer是消息的接收者,负责从指定的Topic接收消息并进行处理。RocketMQ支持多种消费模式,如顺序消费、广播消费、集群消费等。
安装RocketMQ环境需要先下载RocketMQ的安装包,可以从其GitHub仓库或者官网下载。下面以Windows环境为例,介绍RocketMQ的安装步骤,其他操作系统类似。
下载RocketMQ
访问RocketMQ的GitHub仓库,下载最新版本的安装包,解压到指定目录。
# 下载最新版本的RocketMQ
wget https://github.com/apache/rocketmq/releases/download/v4.9.1/rocketmq-release-4.9.1-bin.tar.gz
# 解压安装包
tar -zxvf rocketmq-release-4.9.1-bin.tar.gz
cd rocketmq-release-4.9.1
启动NameServer
RocketMQ的NameServer是分布式系统中的服务发现模块,负责维护Broker的路由信息。启动NameServer前,确保已经下载并解压RocketMQ的安装包。
# 启动NameServer
nohup sh bin/mqnamesrv &
启动NameServer后,可以在控制台看到启动成功的信息,同时NameServer会将路由信息保存到本地的配置文件中,以备后续使用。
启动Broker
在启动NameServer之后,需要启动Broker以提供消息的存储和转发服务。
# 启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
启动Broker时,需要指定NameServer的地址,以便Broker能够向NameServer注册自己。启动成功后,控制台会输出启动成功的信息。
配置RocketMQ环境变量为了方便使用RocketMQ的命令行工具,建议配置环境变量。以下是配置环境变量的步骤:
设置环境变量
在系统的环境变量设置中,添加RocketMQ的安装目录到系统的PATH中,确保可以使用RocketMQ的命令行工具。
export PATH=$PATH:/path/to/rocketmq/bin
验证配置
配置完成后,可以通过命令行工具验证设置是否成功。
# 检查RocketMQ命令行工具是否可用
mqadmin
如果配置正确,将会显示RocketMQ命令行工具的帮助信息。
创建与发送消息 创建生产者实例生产者是消息的发送者,负责将消息发送到指定的Topic。在RocketMQ中,生产者需要通过NameServer获取Broker的路由信息,然后将消息发送到Broker。
创建生产者代码示例
创建一个生产者实例,用于发送消息。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
}
在上述代码中,首先创建了一个生产者实例,设置了NameServer的地址,然后启动生产者。接着,创建了一个消息对象,指定了Topic和Tag,然后发送消息。最后,关闭生产者。
发送同步消息同步消息是指生产者发送消息后,等待Broker返回消息是否发送成功的响应。发送同步消息可以确保消息已被成功发送。
发送同步消息代码示例
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
上述代码中,producer.send(msg)
方法用于发送消息,该方法会阻塞直到收到Broker的响应。如果消息发送成功,sendResult
对象将包含消息的发送状态和消息ID等信息。
异步消息是指生产者发送消息后,不等待Broker返回响应,而是通过回调函数处理发送结果。这种方式可以提高生产者的发送效率。
发送异步消息代码示例
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%s%n", sendResult);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
在上述代码中,producer.send(msg, new SendCallback())
方法用于异步发送消息。当消息发送成功时,会调用onSuccess
方法,当发送失败时,会调用onException
方法。
消费者是消息的接收者,负责从指定的Topic接收消息并进行处理。在RocketMQ中,消费者需要通过NameServer获取Broker的路由信息,然后注册到指定的Topic上。
创建消费者代码示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("Receive New Message: %s %n", new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
在上述代码中,首先创建了一个消费者实例,设置了NameServer的地址,并注册到指定的Topic上。接着,设置消息模型为集群消费模式,设置从上次消费的位置开始消费。最后,注册消息监听器,用于处理接收到的消息。
接收并消费消息消费者接收到消息后,会触发消息监听器中的回调函数,消费者在回调函数中对消息进行处理。
消费者消息处理代码示例
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("Receive New Message: %s %n", new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
在上述代码中,msg.getBody()
用于获取消息的内容。消费者将接收到的消息打印到控制台上。
在实际的应用场景中,消息的处理逻辑可能非常复杂,可能需要进行业务逻辑处理、数据库操作、日志记录等。
消息处理逻辑代码示例
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String body = new String(msg.getBody());
System.out.println("Consume Message: " + body);
// 业务逻辑处理
// processBusinessLogic(body);
// 数据库操作
// saveToDatabase(body);
// 日志记录
// logMessage(body);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
在上述代码中,可以根据具体业务需求编写消息的处理逻辑。例如,可以将消息内容保存到数据库,或者进行日志记录。
RocketMQ集群搭建 配置Nameserver与Broker在生产环境中,通常需要配置多个NameServer和Broker,以提高系统的可靠性和可扩展性。
配置NameServer
配置多个NameServer可以使用多个实例,并在配置文件中指定多个NameServer地址。例如:
# 配置文件示例
namesrv.addr=192.168.1.100:9876;192.168.1.101:9876
配置Broker
配置多个Broker可以使用多个实例,并在配置文件中指定多Broker地址和集群名称。例如:
# 配置文件示例
broker.name=BrokerA
broker.id=0
broker.clusterName=DefaultCluster
broker.addr=192.168.1.102:10911
broker.allowPollIfQueueEmpty=true
namesrv.addr=192.168.1.100:9876;192.168.1.101:9876
启动RocketMQ集群
启动RocketMQ集群需要启动多个NameServer和多个Broker实例,并确保所有实例都能正常启动。
启动NameServer
# 启动NameServer实例1
nohup sh bin/mqnamesrv &
# 启动NameServer实例2
nohup sh bin/mqnamesrv -n 192.168.1.101:9876 &
启动Broker
# 启动Broker实例1
nohup sh bin/mqbroker -n 192.168.1.100:9876;192.168.1.101:9876 -c brokerA.properties &
# 启动Broker实例2
nohup sh bin/mqbroker -n 192.168.1.100:9876;192.168.1.101:9876 -c brokerB.properties &
集群中的消息路由
在集群环境中,RocketMQ通过路由信息来确定消息发送到哪个Broker。每个Broker都会向NameServer注册自己的路由信息,NameServer则维护这些信息,并提供给客户端进行查询。
路由信息示例
# BrokerA的路由信息示例
brokerAddr: 192.168.1.102:10911
brokerName: BrokerA
brokerId: 0
clusterName: DefaultCluster
消息路由示例
当生产者发送消息到Topic时,会从NameServer获取Topic对应的路由信息,并根据路由信息将消息发送到指定的Broker。
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
在上述代码中,生产者会根据NameServer返回的路由信息,将消息发送到指定的Broker。
消息的过滤与重试机制 消息过滤规则消息过滤规则用于在消费者端筛选消息,只消费满足特定条件的消息。
消息过滤规则示例
consumer.subscribe("TopicTest", "*");
在上述代码中,使用*
表示订阅所有Tag的消息。如果需要订阅特定Tag的消息,可以指定Tag名称,例如:
consumer.subscribe("TopicTest", "TagA");
消息重试策略
消息重试策略用于处理发送失败的消息,确保消息最终能够被成功发送和消费。
消息重试策略示例
producer.setMessageQueueSelector(new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object o) {
int index = (int) (o);
return mqs.get(index);
}
});
在上述代码中,MessageQueueSelector
用于选择消息队列,可以实现自定义的消息队列选择逻辑。
消息的幂等性处理用于避免重复消费消息,确保消息只被消费一次。
消息幂等性处理示例
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String msgId = msg.getMsgId();
if (!context.getMessageIDs().contains(msgId)) {
System.out.printf("Receive New Message: %s %n", new String(msg.getBody()));
context.getMessageIDs().add(msgId);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
在上述代码中,msg.getMsgId()
用于获取消息的唯一标识,context.getMessageIDs()
用于存储已经消费过的消息ID。通过检查消息ID是否已经消费过,可以避免重复消费消息。
RocketMQ的日志配置用于记录系统运行状态和错误信息,帮助排查问题。
日志配置文件示例
logFile=logs/rocketmqlogs
logLevel=WARN
fileAppend=true
consoleLog=false
在上述配置文件中,logFile
用于指定日志文件的路径,logLevel
用于指定日志级别,fileAppend
用于指定是否追加日志到文件,consoleLog
用于指定是否输出日志到控制台。
日志配置代码示例
import org.apache.rocketmq.remoting.common.RemotingContext;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRemotingServerFactory;
public class LogConfigurationExample {
public static void main(String[] args) {
// 获取日志配置
Properties properties = RemotingContext.getContext().getProperties();
String logFile = properties.getProperty("rocketmq.client.logFile");
String logLevel = properties.getProperty("rocketmq.client.logLevel");
System.out.println("Log File: " + logFile);
System.out.println("Log Level: " + logLevel);
}
}
使用工具监控RocketMQ
RocketMQ提供了多种监控工具,帮助监控系统运行状态和性能指标。
监控工具示例
RocketMQ自带的监控工具包括Dashboard和JMX。
Dashboard监控工具
Dashboard是一个Web界面,提供RocketMQ的监控视图和操作工具。
# 启动Dashboard
nohup sh bin/mqdashboard &
启动Dashboard后,可以在浏览器中访问http://localhost:8080
来查看RocketMQ的监控视图。
JMX监控工具
JMX用于监控Java应用程序的运行状态和性能指标。
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
public class JMXExample {
public static void main(String[] args) throws Exception {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName("com.taobao.rocketmq:type=BrokerStats");
System.out.println(mbs.getAttribute(name, "MessagePutTotal"));
}
}
在上述代码中,通过JMX获取Broker的统计信息。
检查RocketMQ运行状态检查RocketMQ运行状态可以帮助确保系统正常运行。
检查RocketMQ运行状态示例
# 检查NameServer状态
mqadmin clusterList
# 检查Broker状态
mqadmin brokerList
# 检查Topic状态
mqadmin topicList
在上述命令中,mqadmin clusterList
用于检查NameServer的状态,mqadmin brokerList
用于检查Broker的状态,mqadmin topicList
用于检查Topic的状态。
通过上述命令,可以获取系统的运行状态信息,帮助排查问题。
共同學習,寫下你的評論
評論加載中...
作者其他優質文章