亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定

Rocketmq安裝項目實戰教程

概述

本文详细介绍了Rocketmq安装项目实战的全过程,包括环境准备、安装JDK、下载解压RocketMQ、启动RocketMQ服务等步骤。此外,还包含了RocketMQ的基本操作和配置优化方法,以及通过一个简单项目实战展示了RocketMQ在电商系统中的应用。

RocketMQ简介

RocketMQ 是一款基于 Java 语言开发的分布式消息中间件,由阿里巴巴集团开源,并捐献给 Apache 基金会,成为 Apache 软件基金会的顶级项目。它不仅支持传统的消息队列功能,还支持发布/订阅模式,是云原生环境中常用的中间件之一。RocketMQ 以其高可用性、高吞吐量、低延时等特性,被广泛应用于金融、零售、物流等行业中。

主要特点

  1. 高吞吐量:RocketMQ 采用异步通信模式,能够支持每秒百万级的消息吞吐量。
  2. 高性能:通过消息批量处理、异步通信模式、顺序消息等特性,RocketMQ 实现了高效的消息传递。
  3. 高可用性:通过主从模式的部署方式,RocketMQ 可以保证在主节点故障时能快速切换到从节点,保持服务的连续性。
  4. 支持多种消息模型:RocketMQ 支持队列模式、发布/订阅模式、事务消息、顺序消息等多种消息模型。
  5. 消息追踪:RocketMQ 提供了消息追踪功能,可以轻松定位消息传输过程中的问题。
  6. 多语言支持:RocketMQ 支持 Java、C++、Python 等多种编程语言,便于不同技术栈下的开发者使用。

使用场景

RocketMQ 适用于多种场景,如日志收集、订单处理、数据同步、缓存更新、事件通知等。特别是对于需要高效处理海量数据、保证服务稳定性的应用来说,RocketMQ 是一个不错的选择。

安装RocketMQ环境准备

在安装 RocketMQ 之前,需要确认你的开发环境已经满足以下条件:

  • 操作系统:RocketMQ 可以在 Linux、Windows 和 macOS 上运行。本教程在 Ubuntu 操作系统上进行演示。
  • Java 版本:RocketMQ 需要 Java 8 或更高版本的 Java 运行环境。可以通过以下命令检查 Java 版本:
    java -version
  • 内存资源:RocketMQ 需要一定量的内存资源。建议为每个 RocketMQ 进程分配至少 1G 内存。

安装JDK

首先,确保你的系统中安装了 Java 开发工具包(JDK)。如果没有安装,可以通过以下步骤安装:

  1. 更新软件包列表
    sudo apt-get update
  2. 安装 JDK
    sudo apt-get install openjdk-8-jdk
  3. 验证安装
    java -version

安装依赖软件

RocketMQ 依赖其他软件包,如 wgetunzip 等。可以通过以下命令安装:

sudo apt-get install wget unzip
下载与解压RocketMQ

下载RocketMQ

首先,访问 RocketMQ 的官方 GitHub 仓库(https://github.com/apache/rocketmq),找到最近的稳定版本并下载。假设我们使用的是 4.9.3 版本的 RocketMQ,可以通过以下命令下载:

wget https://archive.apache.org/dist/rocketmq/4.9.3/apache-rocketmq-4.9.3-bin.tar.gz

解压RocketMQ

下载完成后,使用以下命令解压 tar 包:

tar -xvf apache-rocketmq-4.9.3-bin.tar.gz

解压完成后,目录结构如下:

apache-rocketmq-4.9.3
├── bin
├── demo
├── lib
└── namesrv
  • bin:包含启动 RocketMQ 的脚本文件。
  • demo:包含 RocketMQ 的演示代码。
  • lib:包含 RocketMQ 所需的各种依赖库。
  • namesrv:名服务相关的配置文件和脚本。
启动RocketMQ服务

启动NameServer

RocketMQ 的 NameServer 是提供了一种服务发现机制的组件,用于管理 Broker 的注册、发现和路由信息。启动 NameServer 需要执行以下步骤:

  1. 进入 RocketMQ 根目录
    cd apache-rocketmq-4.9.3
  2. 启动 NameServer
    nohup sh bin/mqnamesrv &
  3. 检查 NameServer 是否启动成功
    查看日志文件 logs/namesrv.log,找到类似以下的日志信息:
    The Name Server boot success. serializeType=JSON

启动Broker

Broker 是 RocketMQ 的消息处理节点,负责消息的发布、订阅、过滤、路由等操作。启动 Broker 需要执行以下步骤:

  1. 复制 broker 配置文件
    cp -r conf/2m-n1-c1/ conf/broker
    cd conf/broker
  2. 修改 broker.properties 文件
    例如,可以修改 brokerId、brokerName、brokerClusterName、namesrvAddr 等属性,这些属性的具体含义参见 RocketMQ 官方文档。
    # brokerId, 0 or 1,两者必须互不相同
    brokerId=0
    # broker 名称,建议格式为 clusterName + brokerId
    brokerName=broker-a
    # broker 所属的集群名称
    brokerClusterName=DefaultCluster
    # NameServer 地址列表,多个地址用逗号隔开
    namesrvAddr=127.0.0.1:9876
    # 是否作为 CommitLog 持久化磁盘
    fileReservedTime=1440
  3. 启动 Broker
    nohup sh ../bin/mqbroker -n 127.0.0.1:9876 -c ../conf/broker/broker.properties &
  4. 检查 Broker 是否启动成功
    查看日志文件 logs/broker.log,找到类似以下的日志信息:
    The broker boot success. serializeType=JSON

测试启动是否成功

可以通过发送一条测试消息来验证 NameServer 和 Broker 是否启动成功。具体方法如下:

  1. 创建生产者代码

    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    
    public class SimpleProducer {
       public static void main(String[] args) throws Exception {
           // 创建一个生产者实例
           DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
           // 设置 NameServer 地址
           producer.setNamesrvAddr("127.0.0.1:9876");
           // 启动生产者
           producer.start();
           // 创建一条消息
           Message msg = new Message("TestTopic", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
           // 发送消息
           SendResult sendResult = producer.send(msg);
           System.out.printf("%s%n", sendResult);
           // 关闭生产者
           producer.shutdown();
       }
    }
  2. 创建消费者代码

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderedSuccess;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    public class SimpleConsumer {
       public static void main(String[] args) throws Exception {
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
           consumer.setNamesrvAddr("127.0.0.1:9876");
           consumer.subscribe("TestTopic", "*");
           consumer.setMessageModel(MessageModel.BROADCASTING);
           consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
           consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
               for (MessageExt msg : msgs) {
                   System.out.printf("consume=%s%n", new String(msg.getBody()));
               }
               return ConsumeOrderedSuccess.instance();
           });
           consumer.start();
           System.out.printf("Consumer Started.%n");
       }
    }
  3. 启动生产者和消费者
    编译并运行生产者和消费者代码,确保它们能够正常运行。生产者发送的消息应该被消费者接收到并打印输出。
RocketMQ基本操作及配置

创建Topic

在 RocketMQ 中,Topic 是消息的逻辑分类,类似于数据库中的表。可以通过 RocketMQ 的控制台或命令行工具来创建 Topic。具体步骤如下:

  1. 创建 Topic
    使用 mqadmin 工具创建一个新的 Topic:

    sh bin/mqadmin updateTopic -n 127.0.0.1:9876 -t TestTopic

    这条命令会在 NameServer 中创建一个名为 TestTopic 的 Topic。

  2. 验证 Topic 创建
    查看 Topic 列表,确认 TestTopic 已经创建成功:
    sh bin/mqadmin topicList -n 127.0.0.1:9876

发布和订阅消息

在 RocketMQ 中,可以通过生产者发送消息,通过消费者订阅并接收消息。以下是如何创建生产者和消费者并进行消息的发布和订阅:

生产者

  1. 创建生产者代码

    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    
    public class Producer {
       public static void main(String[] args) throws Exception {
           DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
           producer.setNamesrvAddr("127.0.0.1:9876");
           producer.start();
           Message msg = new Message("TestTopic", "TagA", "Hello RocketMQ".getBytes());
           SendResult sendResult = producer.send(msg);
           System.out.printf("sendResult=%s%n", sendResult);
           producer.shutdown();
       }
    }
  2. 编译和运行生产者
    使用你喜欢的 Java 编译器编译并运行上面的代码,确保没有编译错误或运行时异常。

消费者

  1. 创建消费者代码

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderedSuccess;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    public class Consumer {
       public static void main(String[] args) throws Exception {
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
           consumer.setNamesrvAddr("127.0.0.1:9876");
           consumer.subscribe("TestTopic", "*");
           consumer.setMessageModel(MessageModel.BROADCASTING);
           consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
           consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
               for (MessageExt msg : msgs) {
                   System.out.printf("consume=%s%n", new String(msg.getBody()));
               }
               return ConsumeOrderedSuccess.instance();
           });
           consumer.start();
           System.out.printf("Consumer Started.%n");
       }
    }
  2. 编译和运行消费者
    使用你喜欢的 Java 编译器编译并运行上面的代码,确保没有编译错误或运行时异常。

配置优化

RocketMQ 提供了大量的配置选项,以满足不同场景下的需求。以下是一些常见的配置优化示例:

消息存储配置

RocketMQ 的消息存储分为 CommitLog 和 ConsumeQueue 两部分。CommitLog 是所有消息的持久化存储,而 ConsumeQueue 用于快速定位消息。要优化消息存储配置,可以修改 broker.properties 文件中的相关参数:

# CommitLog 文件大小,单位 MB,默认值为 1G
fileReservedTime=1440
# CommitLog 同步刷盘的条数,单位 MB,默认值为 1M
flushDiskType=ASYNC_FLUSH

消费者配置

通过调整消费者的配置,可以优化消息消费的性能。以下是一些常用的配置选项:

# 消费者线程池大小
consumerThreadsNum=20
# 消费者心跳间隔,单位秒,默认值为 30 秒
heartbeatBrokerInterval=30000
# 消费者拉取消息的时间间隔,单位毫秒,默认值为 500 毫秒
pullThreadSleepTime=1000

消息路由配置

RocketMQ 支持多种消息路由策略,可以根据具体需求进行配置。例如,可以通过修改 broker.properties 文件中的 clusterName 参数来调整 Broker 的集群名称:

# Broker 所属的集群名称
brokerClusterName=DefaultCluster
RocketMQ简单项目实战

案例背景

假设你正在开发一个电子商务系统,该系统需要实现订单生成、库存同步、支付通知等功能。为了保证系统的稳定性和可扩展性,你需要使用 RocketMQ 来实现消息的异步处理。

案例需求

  1. 订单生成:当用户下单后,生成一条订单消息并发送到 RocketMQ 的订单 Topic。
  2. 库存同步:订阅订单 Topic 中的消息,在接收到订单消息后,更新库存信息。
  3. 支付通知:订阅订单 Topic 中的消息,在接收到订单消息后,发送支付请求到支付系统。

案例步骤

1. 创建订单服务

订单服务的主要功能包括生成订单、发送订单消息到 RocketMQ。

  1. 创建订单生产者代码

    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    
    public class OrderService {
       public static void main(String[] args) throws Exception {
           DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
           producer.setNamesrvAddr("127.0.0.1:9876");
           producer.start();
           Message msg = new Message("OrderTopic", "TagOrder", "OrderID:12345".getBytes());
           SendResult sendResult = producer.send(msg);
           System.out.printf("Order sent successfully, msgId=%s%n", sendResult.getMessageId());
           producer.shutdown();
       }
    }
  2. 编译和运行订单生产者代码
    确保代码正确编译并运行成功,生成的订单消息应该能够发送到 RocketMQ 的 OrderTopic

2. 创建库存同步服务

库存同步服务的主要功能是在接收到订单消息后,更新库存信息。

  1. 创建库存同步消费者代码

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderedSuccess;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    public class InventorySyncService {
       public static void main(String[] args) throws Exception {
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("InventorySyncConsumer");
           consumer.setNamesrvAddr("127.0.0.1:9876");
           consumer.subscribe("OrderTopic", "TagOrder");
           consumer.setMessageModel(MessageModel.BROADCASTING);
           consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
           consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
               for (MessageExt msg : msgs) {
                   System.out.printf("Inventory_SYNC: %s%n", new String(msg.getBody()));
                   // 更新库存逻辑
               }
               return ConsumeOrderedSuccess.instance();
           });
           consumer.start();
           System.out.printf("InventorySyncService Started.%n");
       }
    }
  2. 编译和运行库存同步消费者代码
    确保库存同步服务能够正常接收订单消息并更新库存信息。

3. 创建支付通知服务

支付通知服务的主要功能是在接收到订单消息后,发送支付请求到支付系统。

  1. 创建支付通知消费者代码

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderedSuccess;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    public class PaymentNotifyService {
       public static void main(String[] args) throws Exception {
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PaymentNotifyConsumer");
           consumer.setNamesrvAddr("127.0.0.1:9876");
           consumer.subscribe("OrderTopic", "TagOrder");
           consumer.setMessageModel(MessageModel.BROADCASTING);
           consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
           consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
               for (MessageExt msg : msgs) {
                   System.out.printf("Payment_NOTIFY: %s%n", new String(msg.getBody()));
                   // 发送支付请求逻辑
               }
               return ConsumeOrderedSuccess.instance();
           });
           consumer.start();
           System.out.printf("PaymentNotifyService Started.%n");
       }
    }
  2. 编译和运行支付通知消费者代码
    确保支付通知服务能够正常接收订单消息并发送支付请求。

总结

通过以上步骤,你已经成功地搭建了一个使用 RocketMQ 进行消息异步处理的简单电子商务系统。这个示例展示了如何使用 RocketMQ 实现订单生成、库存同步、支付通知等功能。通过这种方式,可以有效地提高系统的性能和稳定性。

在实际应用中,你可能需要根据具体需求进一步优化 RocketMQ 的配置和代码逻辑,以满足更高的性能和可靠性要求。此外,还可以利用 RocketMQ 的其他高级特性(如事务消息、顺序消息等)来进一步增强系统的功能和灵活性。

點擊查看更多內容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優質文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學習,寫下你的評論
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學

大額優惠券免費領

立即參與 放棄機會
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號

舉報

0/150
提交
取消