亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定

手寫RocketMQ學習:入門教程與實踐

概述

手写RocketMQ学习涵盖了从环境搭建到消息发送与接收的全过程,介绍了RocketMQ的基本概念、特点和优势,并详细讲解了生产者和消费者的实现方法。此外,文章还提供了实战演练和性能优化的建议,帮助读者全面掌握RocketMQ的使用技巧。

RocketMQ简介
RocketMQ的基本概念

RocketMQ是一款由阿里巴巴开源的分布式消息中间件,它具有高可用、高可靠、强一致性等特点。RocketMQ主要适用于大规模分布式系统中的异步通信场景,它能够支持多种消息模式和灵活的消息路由机制,确保消息的可靠传输。

RocketMQ的消息模型包括生产者(Producer)、消费者(Consumer)和消息队列(Message Queue)。生产者负责发送消息,消费者负责接收并处理消息。消息队列作为中间层存放消息,确保消息的可靠传输。

RocketMQ的特点与优势
  1. 高可用性和高可靠性:RocketMQ通过主从复制、多副本备份、事务消息等机制,确保消息的可靠传输和系统的高可用性。
  2. 高并发处理能力:RocketMQ能够支持每秒数百万的消息吞吐量,适用于高并发场景。
  3. 多种消息模式:RocketMQ支持多种消息模式,如普通消息、定时消息、延时消息、顺序消息等,满足不同的业务需求。
  4. 灵活的消息路由机制:RocketMQ支持自定义消息路由规则,方便进行消息的分发和处理。
  5. 丰富的管理接口:RocketMQ提供了丰富的管理和监控接口,便于进行日志查看、性能监控等运维操作。
RocketMQ的应用场景
  1. 电商领域:在订单系统中,RocketMQ可以用于订单创建、支付通知等场景,确保消息的可靠传输。
  2. 金融领域:在交易系统中,RocketMQ可以用于交易通知、账户余额更新等场景,保证系统的高可用性。
  3. 物流领域:在物流跟踪系统中,RocketMQ可以用于物流状态更新、通知消息推送等场景,提供实时的消息推送。
  4. 大数据处理:在大数据处理场景中,RocketMQ可以用于数据采集、数据分发等场景,提高数据处理的效率。
环境搭建
快速安装RocketMQ

1. 下载RocketMQ

首先访问Apache RocketMQ下载页面,下载最新版本的RocketMQ。

wget https://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.9.3/apache-rocketmq-4.9.3-bin-release.zip
unzip apache-rocketmq-4.9.3-bin-release.zip
cd apache-rocketmq-4.9.3

2. 启动NameServer

RocketMQ使用NameServer作为服务注册中心,负责管理和分发消息。

nohup sh bin/mqnamesrv &

3. 启动Broker

RocketMQ Broker是消息的存储和转发组件。启动Broker时,需要指定对应的配置文件。

nohup sh bin/mqbroker -n localhost:9876 -c conf/broker-a.properties &

至此,RocketMQ的环境搭建完成,可以通过ps -ef | grep mqnamesrvps -ef | grep mqbroker命令查看服务是否正常启动。

基本配置与启动RocketMQ

RocketMQ的配置文件位于conf目录下,主要的配置文件包括broker-a.propertiesbroker-b.properties。这些配置文件定义了Broker的名称、IP地址、端口等信息。例如:

# broker-a.properties
brokerName=broker-a
brokerId=0
brokerRole=ASYNC_MASTER
namesrvAddr=localhost:9876

在配置文件中,brokerId表示Broker的唯一标识符,brokerRole表示Broker的角色,namesrvAddr表示NameServer的地址。

启动Broker时,可以通过命令行参数指定配置文件:

nohup sh bin/mqbroker -n localhost:9876 -c conf/broker-a.properties &

通过这种方式,可以灵活地配置和启动多个Broker实例。

手写消息发送
创建生产者实例

在RocketMQ中,发送消息需要创建一个生产者实例。生产者实例可以通过RocketMQ提供的DefaultMQProducer类来创建,并配置相关的参数。

import org.apache.rocketmq.client.producer.DefaultMQProducer;

public class Producer {
    public static void main(String[] args) {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        // 设置NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();
    }
}
编写发送消息的代码示例

生产者创建完成后,可以通过send方法发送消息。发送消息时,需要创建一个Message对象,包含消息的主题(Topic)、消息体(Body)等信息。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        String topic = "TestTopic";
        String message = "Hello RocketMQ";
        Message msg = new Message(topic, message.getBytes());

        // 发送消息
        producer.send(msg);
    }
}

发送消息后,可以通过RocketMQ的管理界面或日志查看消息的发送状态。

消息发送的异步处理

为了提高系统的性能,在发送消息时可以采用异步发送的方式。RocketMQ提供了异步发送的接口,可以通过回调函数来处理发送结果。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        String topic = "TestTopic";
        String message = "Hello RocketMQ";
        Message msg = new Message(topic, message.getBytes());

        // 异步发送消息
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("Message sent successfully");
            }

            @Override
            public void onException(Throwable e) {
                System.out.println("Failed to send message: " + e.getMessage());
            }
        });
    }
}

通过这种方式,发送消息的过程不会阻塞主程序,提高了系统的并发性能。

手写消息接收
创建消费者实例

在RocketMQ中,接收消息需要创建一个消费者实例。消费者实例可以通过RocketMQ提供的DefaultMQPushConsumer类来创建,并配置相关的参数。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        consumer.setNamesrvAddr("localhost:9876");

        // 设置从消息队列的末尾开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.COMMIT_MSG_ORDERLY);
        consumer.subscribe("TestTopic", "*");

        // 启动消费者
        consumer.start();
    }
}
编写接收消息的代码示例

消费者创建完成后,可以通过messageListener来处理接收到的消息。RocketMQ为消息处理提供了多种监听器接口,可以灵活地处理不同类型的消息。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderedMessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageQueueListener;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        consumer.setNamesrvAddr("localhost:9876");

        consumer.setConsumeFromWhere(ConsumeFromWhere.COMPENSATE_FROM_FAILED);
        consumer.subscribe("TestTopic", "*");

        // 设置消息处理监听器
        consumer.setMessageListener(new ConsumeOrderedMessageListener() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("Receive new message: %s %n", new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
    }
}

消费消息时,可以设置不同的消息处理策略,如顺序消费、批量消费等。

消费者的消息回溯处理

在某些情况下,消费者可能需要重新消费之前的消息。RocketMQ提供了消息回溯的功能,可以通过设置pullFromWhere参数来控制消息的回溯位置。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderedMessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageQueueListener;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.MessageQueue;

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        consumer.setNamesrvAddr("localhost:9876");

        consumer.setConsumeFromWhere(ConsumeFromWhere.RECONSUME_FROM_STORE_OFFSET);
        consumer.subscribe("TestTopic", "*");

        // 设置消息处理监听器
        consumer.setMessageListener(new ConsumeOrderedMessageListener() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("Receive new message: %s %n", new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        // 设置消息回溯的MessageQueue
        List<MessageQueue> mqs = consumer.fetchMessageQueues("TestTopic");
        MessageQueue mq = mqs.get(0);
        consumer.pull(mq, "*", 0L, 10L);

        consumer.start();
    }
}

通过这种方式,可以控制消费者从指定的位置开始消费消息。

常见问题与解决方案
消息发送失败的原因与处理方法

消息发送失败的原因可能包括网络问题、消息队列已满、生产者配置错误等。在实际使用中,可以通过以下几种方式来处理消息发送失败的问题:

  1. 检查网络连接:确保生产者和NameServer、Broker之间的网络连接正常。
  2. 增加消息队列:如果消息队列已满,可以增加消息队列的数量,提高消息的存储和转发能力。
  3. 优化生产者配置:确保生产者配置正确,例如配置合适的发送超时时间、重试次数等参数。
  4. 使用异步发送:采用异步发送的方式,提高消息发送的效率。
  5. 增加备份Broker:通过主从复制、多副本备份的方式,增强系统的可靠性。
消费者消费不到消息的原因与处理方法

消费者消费不到消息的原因可能包括消息队列为空、订阅主题配置错误、消费者配置问题等。在实际使用中,可以通过以下几种方式来处理消费者消费不到消息的问题:

  1. 检查消息队列:确保消息队列中有待消费的消息,可以通过RocketMQ的管理界面查看消息队列的状态。
  2. 检查订阅主题配置:确保消费者订阅的主题配置正确,可以通过RocketMQ的管理界面查看消费者订阅的主题信息。
  3. 优化消费者配置:确保消费者配置正确,例如配置合适的拉取间隔、拉取的最大消息数等参数。
  4. 增加日志记录:增加消费者日志记录,查看消费者的运行日志,方便排查问题。
  5. 检查网络连接:确保消费者和NameServer、Broker之间的网络连接正常。
消息重复消费与解决策略

在某些情况下,消费者可能会重复消费相同的消息。这可能是由于网络抖动、消费者重启等原因导致的。在实际使用中,可以通过以下几种方式来解决消息重复消费的问题:

  1. 使用唯一标识:为每个消息设置唯一标识,避免重复处理相同的业务逻辑。
  2. 事务消息:使用RocketMQ的事务消息机制,确保消息的发送和消费具有强一致性的语义。
  3. 幂等性处理:在处理消息时,增加幂等性处理的逻辑,确保消息被处理多次时不会产生重复的结果。
  4. 重试机制:增加重试机制,确保消息能够被正确地处理,避免消息丢失或重复消费。
  5. 增加日志记录:增加消费者日志记录,查看消费者的运行日志,方便排查问题。

通过以上方式,可以有效地处理消息发送失败、消费者消费不到消息以及消息重复消费的问题,提高系统的稳定性和可靠性。

实战演练
小项目实战演练

场景描述

假设我们有一个电商平台,需要在订单创建时发送消息到支付系统,以便进行支付通知。具体场景如下:

  1. 订单创建:用户下单后,订单系统会生成一条订单信息。
  2. 消息发送:订单系统将订单信息转换为消息,发送到RocketMQ。
  3. 支付通知:支付系统订阅订单主题,接收消息并进行支付处理。

代码实现

1. 创建订单系统

订单系统负责生成订单信息,并将订单信息发送到RocketMQ。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class OrderSystem {
    public static void main(String[] args) throws Exception {
        // 初始化生产者
        DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        // 创建订单信息
        String orderId = "10001";
        String orderInfo = "User 10001 ordered product 12345";

        // 发送消息
        producer.send(new Message("OrderTopic", orderInfo.getBytes()), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("Order sent successfully: " + orderId);
            }

            @Override
            public void onException(Throwable e) {
                System.out.println("Failed to send order: " + orderId);
            }
        });

        // 关闭生产者
        producer.shutdown();
    }
}

2. 创建支付系统

支付系统订阅订单主题,接收订单信息并进行支付处理。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderedMessageListener;

import java.util.List;

public class PaymentSystem {
    public static void main(String[] args) throws Exception {
        // 初始化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PaymentConsumer");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("OrderTopic", "*");

        // 设置消息处理监听器
        consumer.setMessageListener(new ConsumeOrderedMessageListener() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("Receive new order: %s %n", new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
    }
}

通过上述代码,可以在订单系统中创建订单信息并发送到RocketMQ,支付系统订阅订单主题并接收消息进行处理。

代码部署与调试技巧

部署RocketMQ

RocketMQ的部署步骤如下:

  1. 编译RocketMQ:将RocketMQ源码下载后,进行编译。

    mvn clean install -DskipTests
  2. 启动NameServer

    nohup sh bin/mqnamesrv &
  3. 启动Broker

    nohup sh bin/mqbroker -n localhost:9876 -c conf/broker-a.properties &
  4. 部署应用:将订单系统和支付系统部署到相应的服务器上,并启动相关服务。

调试技巧

  1. 日志查看:RocketMQ提供了丰富的日志,可以通过日志查看消息的发送和接收情况。

    tail -f logs/rocketmq.log
  2. 消息追踪:RocketMQ提供了消息追踪功能,可以通过消息ID查看消息的流转情况。

    sh bin/mqadmin topicList -n localhost:9876
    sh bin/mqadmin consumeStats -n localhost:9876 -t OrderTopic -c PaymentConsumer
  3. 性能监控:RocketMQ提供了性能监控接口,可以通过这些接口查看系统的运行状态。

    sh bin/mqadmin brokerList -n localhost:9876
    sh bin/mqadmin topicConfig -n localhost:9876 -t OrderTopic

通过以上步骤,可以有效地部署RocketMQ,并进行相关的调试和监控。

性能优化与调优建议

性能优化

  1. 增加消息队列:增加消息队列的数量可以提高系统的消息存储和转发能力。

    # broker-a.properties
    brokerName=broker-a
    brokerId=0
    brokerRole=ASYNC_MASTER
    namesrvAddr=localhost:9876
    messageQueueNums=16
  2. 优化网络配置:优化网络配置,提高网络传输的效率。

    # broker-a.properties
    brokerName=broker-a
    brokerId=0
    brokerRole=ASYNC_MASTER
    namesrvAddr=localhost:9876
    bindIp=0.0.0.0
    listenPort=10911
  3. 使用异步发送:使用异步发送的方式,提高消息发送的效率。

    producer.send(new Message("OrderTopic", orderInfo.getBytes()), new SendCallback() {
       @Override
       public void onSuccess(SendResult sendResult) {
           System.out.println("Order sent successfully: " + orderId);
       }
    
       @Override
       public void onException(Throwable e) {
           System.out.println("Failed to send order: " + orderId);
       }
    });

调优建议

  1. 调整生产者配置:根据消息发送的频率和延迟要求,调整生产者的超时时间、重试次数等参数。

    # producer.properties
    producerName=OrderProducer
    namesrvAddr=localhost:9876
    sendMsgTimeout=30000
    retryTimesWhenSendFailed=0
  2. 调整消费者配置:根据消息消费的频率和延迟要求,调整消费者的拉取间隔、拉取的最大消息数等参数。

    # consumer.properties
    consumerName=PaymentConsumer
    namesrvAddr=localhost:9876
    pullInterval=1000
    maxMsgNums=10
  3. 增加备份Broker:通过主从复制、多副本备份的方式,增强系统的可靠性。

    # broker-b.properties
    brokerName=broker-b
    brokerId=1
    brokerRole=SLAVE
    namesrvAddr=localhost:9876

通过以上优化和调优建议,可以提高RocketMQ的性能,确保系统的稳定性和可靠性。

综合以上内容,通过从环境搭建、消息发送与接收、常见问题解决方案到实战演练和性能优化,可以全面掌握RocketMQ的使用和优化技巧。希望这些内容对您的学习和实践有所帮助。

點擊查看更多內容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優質文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學習,寫下你的評論
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學

大額優惠券免費領

立即參與 放棄機會
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號

舉報

0/150
提交
取消