亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定

RocketMQ底層原理項目實戰詳解

概述

本文深入探讨了RocketMQ底层原理,并通过项目实战案例详细介绍了RocketMQ的环境搭建、核心概念、消息发送与接收机制以及消息存储和消费过程,旨在帮助读者全面理解RocketMQ底层原理项目实战。

RocketMQ简介与环境搭建
RocketMQ简介

RocketMQ是由阿里巴巴研发的一款高性能、可扩展的分布式消息中间件。它支持亿级并发量的消息处理能力,具有高可用、高可靠、分布式部署等特点。RocketMQ的设计目标是为企业提供低延迟、高吞吐量、高可用性的消息中间件服务。其核心功能包括消息传输、事务消息、定时消息、顺序消息等。RocketMQ广泛应用于电商、金融、物流等领域,支持大规模分布式环境下的消息传递和应用解耦。

开发环境搭建

安装Java

RocketMQ基于Java开发,因此需要确保开发环境中已经安装了Java环境。以下是安装步骤:

  1. 访问Oracle官方网站或开源中国的Java开发人员页面下载最新版本的Java JDK。
  2. 解压下载的JDK安装包,例如tar -zxf jdk-8uXXX-linux-x64.tar.gz
  3. 设置环境变量,编辑/etc/profile文件,添加以下内容:
    export JAVA_HOME=/path/to/jdk
    export PATH=$JAVA_HOME/bin:$PATH
    export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
  4. 更新环境变量,运行source /etc/profile

安装RocketMQ

  1. 访问RocketMQ的GitHub仓库下载RocketMQ的源码包或者二进制包。
  2. 解压下载的RocketMQ包,例如tar -zxf rocketmq-all-4.9.2-bin-release.tar.gz
  3. 进入RocketMQ目录,例如cd rocketmq-all-4.9.2
  4. 执行以下命令启动名称服务器(Name Server):
    nohup sh bin/mqnamesrv &
  5. 启动Broker,进入RocketMQ目录并执行以下命令:
    nohup sh bin/mqbroker -n localhost:9876 &
快速开始RocketMQ

创建生产者和消费者

先创建一个简单的生产者和消费者示例。生产者将消息发送到指定的主题,消费者从主题中消费消息。

  1. 创建生产者程序:

    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class Producer {
       public static void main(String[] args) throws MQClientException, InterruptedException {
           DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
           producer.setNamesrvAddr("localhost:9876");
           producer.start();
    
           Message msg = new Message("TopicTest", // topic
                   "TagA", // tag
                   "OrderID188", // key
                   ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
    
           SendResult sendResult = producer.send(msg);
           producer.shutdown();
    
           System.out.println(sendResult);
       }
    }
  2. 创建消费者程序:

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyListener;
    import org.apache.rocketmq.client.consumer.listener.MessageQueueListener;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyMessageContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
    import org.apache.rocketmq.common.consumer.ConsumeOrderlyStatus;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class Consumer {
       public static void main(String[] args) throws Exception {
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
           consumer.setNamesrvAddr("localhost:9876");
           consumer.subscribe("TopicTest", "*");
    
           consumer.registerMessageListener((msgs, context) -> {
               for (MessageExt msg : msgs) {
                   System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
               }
               return ConsumeOrderlyStatus.SUCCESS;
           });
    
           consumer.start();
       }
    }
  3. 运行生产者,生成的消息将被发送到TopicTest主题。运行消费者,消费者将从TopicTest主题中接收并处理消息。

消息发送和接收的流程

生产者通过DefaultMQProducer实例发送消息,首先通过producer.start()启动生产者,设置名称服务器地址,然后创建Message对象,设置消息的主题、标签、键和内容。调用producer.send()方法发送消息,最后通过调用producer.shutdown()关闭生产者。

消费者通过DefaultMQPushConsumer实例接收消息,首先通过consumer.start()启动消费者,设置名称服务器地址,然后通过consumer.subscribe()订阅指定主题的消息。注册消息监听器,监听并处理消息。最后通过调用consumer.shutdown()关闭消费者。

RocketMQ核心概念解析
消息模型与分类

RocketMQ提供了多种类型的消息模型,包括普通消息、事务消息、定时消息和顺序消息。

普通消息

普通消息是最基本的消息类型,消息发送方将消息发送到Broker,Broker负责将消息持久化并分发给订阅者。

事务消息

事务消息是一种特殊的事务消息,它支持消息发送方和接收方之间的一致性事务。消息发送方可以执行本地事务,然后通过事务消息插件发送消息,接收方接收到消息后执行相应的业务逻辑。事务消息插件会根据业务逻辑的执行结果决定消息的最终状态(提交或回滚)。

定时消息

定时消息允许发送者设置一个消息的延迟时间,消息将在指定的时间之后发送给接收者。发送者可以指定消息的延迟时间范围,例如10秒、30秒、1分钟、2分钟、3分钟、4分钟、5分钟、6分钟、7分钟、8分钟、9分钟、10分钟、30分钟、1小时、2小时。

顺序消息

顺序消息是一种特殊的事务消息,它确保消息在消费端按照发送顺序进行消费。发送者可以将消息分成多个批次,每个批次的消息按照发送顺序进行消费。发送者和接收者之间需要使用相同的事务ID来保证顺序消费。

主要组件介绍

RocketMQ由多个核心组件组成,包括消息发送者(Producer)、消息接收者(Consumer)、名称服务器(Name Server)和Broker。

消息发送者(Producer)

消息发送者负责将消息发送到Broker。RocketMQ提供了多种消息发送方式,例如同步发送、异步发送和单向发送。发送者可以配置消息的属性,例如消息的主题、标签、键和内容。发送者还可以配置消息的发送策略,例如消息的重试次数和时间间隔。

消息接收者(Consumer)

消息接收者负责从Broker接收消息并处理消息。RocketMQ提供了多种消息接收方式,例如顺序消费和并发消费。接收者可以配置消息的消费策略,例如消息的消费模式、消费线程数和消费超时时间。

名称服务器(Name Server)

名称服务器负责维护Broker的连接信息,提供Broker的地址信息给消息发送者和接收者。名称服务器通过心跳机制与Broker保持联系,如果Broker出现故障,名称服务器会将Broker从连接列表中移除,并通知消息发送者和接收者。

Broker

Broker是消息的传输和存储中心,负责接收消息发送者发送的消息,并将消息存储到磁盘或内存中。Broker还负责将消息分发给订阅者,并提供消息的查询和删除功能。Broker支持集群模式,多个Broker可以组成一个集群,提供高可用性和负载均衡。

名称服务器与Broker作用

名称服务器的作用是维护Broker的连接信息,提供Broker的地址信息给消息发送者和接收者。名称服务器通过心跳机制与Broker保持联系,如果Broker出现故障,名称服务器会将Broker从连接列表中移除,并通知消息发送者和接收者。

Broker的作用是接收消息发送者发送的消息,并将消息存储到磁盘或内存中。Broker还负责将消息分发给订阅者,并提供消息的查询和删除功能。Broker支持集群模式,多个Broker可以组成一个集群,提供高可用性和负载均衡。

RocketMQ消息发送与接收
发送消息的基本流程

创建Producer实例

首先创建一个DefaultMQProducer实例,设置Producer的名称、消息发送策略和异常处理策略。

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.setSendMsgTimeout(3000); // 消息发送超时时间
producer.setRetryTimesWhenSendFailed(2); // 消息发送失败时的重试次数

启动Producer实例

通过调用producer.start()方法启动Producer实例。

producer.start();

创建Message实例

创建一个Message实例,设置消息的属性,例如消息的主题、标签、键和内容。

Message msg = new Message("TopicTest", // topic
        "TagA", // tag
        "OrderID188", // key
        ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body

发送消息

调用producer.send()方法发送消息。

SendResult sendResult = producer.send(msg);

关闭Producer实例

通过调用producer.shutdown()方法关闭Producer实例。

producer.shutdown();
接收消息的基本流程

创建Consumer实例

首先创建一个DefaultMQPushConsumer实例,设置Consumer的名称、消息接收策略和异常处理策略。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeMessageBatchMaxSize(100); // 消息批处理的最大条数
consumer.setConsumeTimeout(1000); // 消息消费超时时间

订阅主题

通过调用consumer.subscribe()方法订阅指定主题的消息。

consumer.subscribe("TopicTest", "*"); // 订阅TopicTest主题下的所有标签

配置消息监听器

通过调用consumer.registerMessageListener()方法注册消息监听器,监听并处理消息。

consumer.registerMessageListener((msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
    }
    return ConsumeOrderlyStatus.SUCCESS;
});

启动Consumer实例

通过调用consumer.start()方法启动Consumer实例。

consumer.start();

关闭Consumer实例

通过调用consumer.shutdown()方法关闭Consumer实例。

consumer.shutdown();
常见消息发送与接收模式

普通消息发送模式

普通消息发送模式是最基本的消息发送模式,消息发送者将消息发送到Broker,Broker负责将消息持久化并分发给订阅者。

Message msg = new Message("TopicTest", // topic
        "TagA", // tag
        "OrderID188", // key
        ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
SendResult sendResult = producer.send(msg);

事务消息发送模式

事务消息发送模式支持消息发送方和接收方之间的一致性事务。消息发送者可以执行本地事务,然后通过事务消息插件发送消息,接收者接收到消息后执行相应的业务逻辑。事务消息插件会根据业务逻辑的执行结果决定消息的最终状态(提交或回滚)。

TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, new LocalTransactionChecker() {
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt message) {
        // 执行本地事务
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});

定时消息发送模式

定时消息发送模式允许发送者设置一个消息的延迟时间,消息将在指定的时间之后发送给接收者。发送者可以指定消息的延迟时间范围,例如10秒、30秒、1分钟、2分钟、3分钟、4分钟、5分钟、6分钟、7分钟、8分钟、9分钟、10分钟、30分钟、1小时、2小时。

Message msg = new Message("TopicTest", // topic
        "TagA", // tag
        "OrderID188", // key
        ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
msg.setDelayTimeLevel(3); // 设置消息的延迟时间范围
SendResult sendResult = producer.send(msg);

顺序消息发送模式

顺序消息发送模式是一种特殊的事务消息,它确保消息在消费端按照发送顺序进行消费。发送者可以将消息分成多个批次,每个批次的消息按照发送顺序进行消费。发送者和接收者之间需要使用相同的事务ID来保证顺序消费。

TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, new LocalTransactionChecker() {
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt message) {
        // 执行本地事务
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}, "TransactionID");

消息接收模式

RocketMQ支持顺序消费和并发消费两种消息接收模式。顺序消费模式确保消息在消费端按照发送顺序进行消费,而并发消费模式允许多个消费线程并行处理消息。

consumer.registerMessageListener((msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
    }
    return ConsumeOrderlyStatus.SUCCESS;
});
RocketMQ消息存储机制
消息的物理存储结构

RocketMQ的消息存储结构主要由两个部分组成:消息文件和索引文件。

消息文件

消息文件是RocketMQ存储消息的物理文件,每个Broker都有独立的消息文件。消息文件是一个二进制文件,文件名为message_index,文件路径为storePathRootDir下的message_index子目录。消息文件采用了追加写入的方式,每次写入一条消息时,都会追加到文件末尾。消息文件的格式如下:

字段名称 字段长度 字段描述
magic 4 魔数,标识文件格式
version 4 版本号
bodySize 4 消息体长度
body variable 消息体
tag variable 消息标签
key variable 消息键
topic variable 消息主题
queueId 4 队列ID
offset 8 消息偏移量
sysFlag 2 系统标志位
bornHost 4 生产者IP地址
bornTimestamp 8 生产时间戳
msgId variable 消息ID
storeHost 4 存储节点IP地址
storeTimestamp 8 存储时间戳
queueOffset 8 队列偏移量
sysPropertiesSize 4 系统属性长度
sysProperties variable 系统属性

索引文件

索引文件是RocketMQ存储消息索引的文件,每个Broker都有独立的索引文件。索引文件是一个二进制文件,文件名为index,文件路径为storePathRootDir下的message_index子目录。索引文件是一个内存映射文件,文件格式如下:

字段名称 字段长度 字段描述
magic 4 魔数,标识文件格式
version 4 版本号
indexTotalSize 8 索引总大小
indexSize 8 索引大小
indexKeySize 4 索引键大小
indexValueSize 4 索引值大小
indexKeys variable 索引键
indexValues variable 索引值

索引文件中的indexKeys表示消息的键,indexValues表示消息的偏移量。通过索引文件可以快速查找消息的偏移量,提高消息读取效率。

消息的逻辑存储结构

RocketMQ的消息逻辑存储结构是指消息在Broker中的存储形式。RocketMQ采用逻辑队列的方式存储消息,每个主题对应一个或多个逻辑队列。消息存储在逻辑队列中,每个逻辑队列有一个唯一的队列ID。每个逻辑队列可以分布在多个物理Broker上,每个Broker都有独立的消息文件和索引文件。

消息的逻辑队列

RocketMQ的消息逻辑队列是指消息在Broker中的存储形式。每个主题对应一个或多个逻辑队列,每个逻辑队列有一个唯一的队列ID。每个逻辑队列可以分布在多个物理Broker上,每个Broker都有独立的消息文件和索引文件。

消息的存储模式

RocketMQ支持多种消息存储模式,例如顺序存储和随机存储。顺序存储模式是指消息按照发送顺序存储在逻辑队列中,而随机存储模式是指消息按照随机的方式存储在逻辑队列中。

消息的删除机制

RocketMQ提供了多种消息删除机制,例如消息过期删除和手动删除。消息过期删除是指消息在过期时间之后会被自动删除,而手动删除是指用户可以通过API手动删除消息。

消息的读取过程

RocketMQ的消息读取过程包括以下几个步骤:

读取索引文件

首先读取索引文件,通过索引文件中的indexKeysindexValues快速查找消息的偏移量。

File indexFile = new File("message_index");
RandomAccessFile raf = new RandomAccessFile(indexFile, "r");
long fileSize = raf.length();
long indexTotalSize = raf.readInt();
long indexSize = raf.readLong();
long indexKeySize = raf.readInt();
long indexValueSize = raf.readInt();
long[] indexKeys = new long[(indexSize / (indexKeySize + indexValueSize)) * indexKeySize];
long[] indexValues = new long[(indexSize / (indexKeySize + indexValueSize)) * indexValueSize];
for (int i = 0; i < indexSize / (indexKeySize + indexValueSize); i++) {
    for (int j = 0; j < indexKeySize; j++) {
        indexKeys[i * indexKeySize + j] = raf.readLong();
    }
    for (int j = 0; j < indexValueSize; j++) {
        indexValues[i * indexValueSize + j] = raf.readLong();
    }
}
raf.close();

读取消息文件

通过索引文件中的indexValues快速定位到消息文件中的offset,然后读取消息文件中的offset位置的数据。

File messageFile = new File("message_index", "message_file");
RandomAccessFile raf = new RandomAccessFile(messageFile, "r");
long offset = indexValues[0];
raf.seek(offset);
long magic = raf.readInt();
long version = raf.readInt();
int bodySize = raf.readInt();
byte[] body = new byte[bodySize];
raf.readFully(body, 0, bodySize);
String bodyStr = new String(body, RemotingHelper.DEFAULT_CHARSET);
raf.close();

解析消息

根据消息文件中的数据解析消息的属性,例如消息的主题、标签、键和内容。

String topic = "";
String tag = "";
String key = "";
String body = "";
// 根据消息文件中的数据解析消息的属性

返回消息

将解析后的消息返回给用户。

MessageExt message = new MessageExt();
message.setTopic(topic);
message.setTag(tag);
message.setKey(key);
message.setBody(body);
return message;
RocketMQ消息消费机制
消费模型详解

RocketMQ支持多种消息消费模型,例如顺序消费和并发消费。顺序消费模型是指消息按照发送顺序依次消费,而并发消费模型允许多个消费线程并行消费消息。

顺序消费模型

顺序消费模型是指消息按照发送顺序依次消费。RocketMQ通过为每个消息设置一个唯一的消费顺序ID来保证消息的顺序消费。每个消息的消费顺序ID由消息的生产者IP地址、生产时间戳和消息ID组成。

consumer.registerMessageListener((msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
    }
    return ConsumeOrderlyStatus.SUCCESS;
});

并发消费模型

并发消费模型允许多个消费线程并行消费消息。RocketMQ通过为每个消息设置一个唯一的消费并行ID来保证消息的并发消费。每个消息的消费并行ID由消息的生产者IP地址、生产时间戳和消息ID组成。

consumer.registerMessageListener((msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
消费者的启动与停止

RocketMQ提供了丰富的API来启动和停止消费者。通过调用consumer.start()方法启动消费者,通过调用consumer.shutdown()方法停止消费者。

consumer.start();
// 消费者接收消息
consumer.shutdown();
消费者容错机制

RocketMQ提供了多种消费者容错机制,例如消息重试和消息回溯。消息重试是指当消费者消费消息失败时,RocketMQ会自动将消息重新发送给消费者。消息回溯是指当消费者消费消息失败时,RocketMQ会自动将消息回溯到消息队列的末尾,等待重新消费。

消息重试

当消费者消费消息失败时,RocketMQ会自动将消息重新发送给消费者。消费者可以通过配置消息的重试次数来控制消息的重试次数。

consumer.setRetryTimesWhenSendFailed(2); // 消息发送失败时的重试次数

消息回溯

当消费者消费消息失败时,RocketMQ会自动将消息回溯到消息队列的末尾,等待重新消费。消费者可以通过配置消息的回溯策略来控制消息的回溯行为。

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 从消息队列的末尾开始消费
RocketMQ项目实战案例
实战需求分析

假设我们正在开发一个在线教育平台,该平台需要实现以下几个功能:

  • 学生注册功能,将学生信息存储到数据库中,并发送注册成功消息到消息队列。
  • 学生登录功能,验证学生信息是否正确,并发送登录成功消息到消息队列。
  • 学生购买课程功能,将学生的购买记录存储到数据库中,并发送购买成功消息到消息队列。

为了实现上述功能,我们需要使用RocketMQ来实现消息队列功能,将学生注册、登录和购买课程等操作的异步通知实现。

实战步骤详解

创建RocketMQ生产者

首先创建一个RocketMQ生产者,设置生产者名称、消息发送策略和异常处理策略。

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.setSendMsgTimeout(3000); // 消息发送超时时间
producer.setRetryTimesWhenSendFailed(2); // 消息发送失败时的重试次数
producer.start();

创建RocketMQ消费者

然后创建一个RocketMQ消费者,设置消费者名称、消息接收策略和异常处理策略。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeMessageBatchMaxSize(100); // 消息批处理的最大条数
consumer.setConsumeTimeout(1000); // 消息消费超时时间
consumer.subscribe("TopicTest", "*"); // 订阅TopicTest主题下的所有标签
consumer.registerMessageListener((msgs, context) -> {
    for (MessageExt msg : msgs) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
    }
    return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();

实现学生注册功能

实现学生注册功能,将学生信息存储到数据库中,并发送注册成功消息到消息队列。

public void registerStudent(Student student) throws MQClientException {
    // 存储学生信息到数据库
    // ...
    // 发送注册成功消息到消息队列
    Message msg = new Message("TopicTest", // topic
            "RegisterMessage", // tag
            "StudentID1", // key
            ("Hello RocketMQ, Student Registered").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
    SendResult sendResult = producer.send(msg);
}

实现学生登录功能

实现学生登录功能,验证学生信息是否正确,并发送登录成功消息到消息队列。

public void loginStudent(Student student) throws MQClientException {
    // 验证学生信息是否正确
    // ...
    // 发送注册成功消息到消息队列
    Message msg = new Message("TopicTest", // topic
            "LoginMessage", // tag
            "StudentID1", // key
            ("Hello RocketMQ, Student Logged In").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
    SendResult sendResult = producer.send(msg);
}

实现学生购买课程功能

实现学生购买课程功能,将学生的购买记录存储到数据库中,并发送购买成功消息到消息队列。

public void purchaseCourse(Student student) throws MQClientException {
    // 存储学生的购买记录到数据库
    // ...
    // 发送注册成功消息到消息队列
    Message msg = new Message("TopicTest", // topic
            "PurchaseMessage", // tag
            "StudentID1", // key
            ("Hello RocketMQ, Student Purchased Course").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
    SendResult sendResult = producer.send(msg);
}
实战代码实现与部署

实战代码实现

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class StudentService {
    private DefaultMQProducer producer;

    public StudentService() throws MQClientException {
        producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("localhost:9876");
        producer.setSendMsgTimeout(3000);
        producer.setRetryTimesWhenSendFailed(2);
        producer.start();
    }

    public void registerStudent(Student student) throws MQClientException {
        // 存储学生信息到数据库
        // ...
        // 发送注册成功消息到消息队列
        Message msg = new Message("TopicTest", // topic
                "RegisterMessage", // tag
                "StudentID1", // key
                ("Hello RocketMQ, Student Registered").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
        SendResult sendResult = producer.send(msg);
    }

    public void loginStudent(Student student) throws MQClientException {
        // 验证学生信息是否正确
        // ...
        // 发送注册成功消息到消息队列
        Message msg = new Message("TopicTest", // topic
                "LoginMessage", // tag
                "StudentID1", // key
                ("Hello RocketMQ, Student Logged In").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
        SendResult sendResult = producer.send(msg);
    }

    public void purchaseCourse(Student student) throws MQClientException {
        // 存储学生的购买记录到数据库
        // ...
        // 发送注册成功消息到消息队列
        Message msg = new Message("TopicTest", // topic
                "PurchaseMessage", // tag
                "StudentID1", // key
                ("Hello RocketMQ, Student Purchased Course").getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
        SendResult sendResult = producer.send(msg);
    }

    public void shutdown() {
        producer.shutdown();
    }
}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderedListener;
import org.apache.rocketmq.client.consumer.listener.MessageQueueListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyStatus;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class MessageConsumer {
    private DefaultMQPushConsumer consumer;

    public MessageConsumer() throws Exception {
        consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeMessageBatchMaxSize(100);
        consumer.setConsumeTimeout(1000);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
            }
            return ConsumeOrderlyStatus.SUCCESS;
        });
        consumer.start();
    }

    public void shutdown() {
        consumer.shutdown();
    }
}

实战代码部署

部署RocketMQ生产者和消费者时,需要确保RocketMQ服务已经启动并且配置正确。RocketMQ生产者和消费者可以通过配置文件或者代码方式启动。

  1. 创建生产者和消费者实例,并设置相应的配置。
  2. 启动生产者和消费者实例。
  3. 注册相应的消息监听器,监听和处理消息。
  4. 通过发送消息或者消费消息实现相应的业务逻辑。

public class Main {
    public static void main(String[] args) throws Exception {
        StudentService studentService = new StudentService();
        MessageConsumer messageConsumer = new MessageConsumer();

        // 注册学生
        studentService.registerStudent(new Student());

        // 登录学生
        studentService.loginStudent(new Student());

        // 购买课程
        studentService.purchaseCourse(new Student());

        // 等待一段时间,确保消息已经发送并被消费
        Thread.sleep(10000);

        // 关闭生产者和消费者
        studentService.shutdown();
        messageConsumer.shutdown();
    }
}
``
點擊查看更多內容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優質文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學習,寫下你的評論
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學

大額優惠券免費領

立即參與 放棄機會
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號

舉報

0/150
提交
取消