⬆︎
×
TOC
Hyplus目录

RocketMQ消息中间件基础教程

RocketMQ是阿里巴巴开源的分布式消息中间件,2017年成为Apache顶级项目,基于Java开发,具备高性能、高可靠、高实时、易扩展的核心优势。其核心设计目标是解决分布式系统中的异步通信、流量削峰和服务解耦问题,广泛应用于电商、金融、物联网等场景。

官网:rocketmq.apache.org
文档:GitHubGitee

1 RocketMQ概述

RocketMQ凭借其高性能、高可靠和丰富的消息模型,成为分布式系统中异步通信的首选中间件。通过合理设计Topic/Queue结构、选择合适的消息类型和刷盘策略,开发者可高效构建可扩展的事件驱动架构。建议结合官方文档和社区资源深入学习高级特性(如流计算、云原生部署),以充分发挥RocketMQ的潜力。

核心特点:

  1. 分布式架构与弹性扩展:采用Shared-nothing架构,支持水平扩展和动态负载均衡,单机可支撑万亿级消息吞吐量,兼容Kubernetes等云原生环境。
  2. 金融级可靠性保障
    • 多副本存储:主从架构结合同步/异步刷盘策略,同步刷盘模式下实现消息零丢失。
    • 自动故障转移:主节点宕机后从节点无缝接管服务,保障业务连续性。
    • 事务消息:通过两阶段提交和状态回查机制,确保分布式事务最终一致性。
  3. 丰富的消息类型与灵活路由:支持普通、顺序、延迟(18级预定义)、定时及事务消息,通过Tag过滤和SQL92表达式实现精准订阅。
  4. 多语言支持与生态集成:提供7种语言SDK,支持HTTP协议接入,深度整合Spring Cloud、Flink等生态系统,构建端到端事件驱动架构。
  5. 高实时性与低延迟:通过零拷贝技术(MappedFile)和异步刷盘优化磁盘IO,端到端延迟达毫秒级,支撑亿级消息堆积仍保持高性能。
  6. 完善的监控与运维体系:内置RocketMQ Console和Prometheus+Grafana监控方案,实时追踪消息轨迹、消费进度及集群健康状态。
  7. 云原生与全球化能力:支持K8s原生部署和跨地域消息同步,通过全球路由功能实现异地多活场景下的数据一致性。

2 核心概念

RocketMQ架构图如下:

RocketMQ架构图

相关概念:

  1. Producer(生产者):负责发送消息到指定Topic,支持同步、异步和单向发送模式。同一Producer实例可归属同一Producer Group,用于事务消息的提交与回滚。
  2. Consumer(消费者):订阅Topic并消费消息,支持集群模式(每条消息仅被一个消费者实例处理)和广播模式(所有消费者实例均处理)。消费者通过Consumer Group进行负载均衡和故障恢复。
  3. Broker:消息存储与转发的核心节点,管理多个Topic的消息队列,支持主从架构(Broker Master/Slave)和消息持久化。Broker通过NameServer获取路由信息,并向Producer/Consumer提供服务。
  4. NameServer:轻量级元数据管理服务器,维护Broker集群的路由信息和Topic队列分布。客户端通过NameServer发现可用Broker地址,实现无状态服务发现。
  5. Topic:消息的逻辑分类,生产者将消息发送到Topic,消费者通过订阅Topic获取消息。一个Topic可包含多个Queue,用于实现水平扩展和负载均衡。
  6. Message Queue(消息队列):Topic的物理分片,消息按队列进行存储和分发。同一队列中的消息保证顺序性,不同队列间消息并行处理。

3 安装与启动

  1. 环境准备
    • 安装JDK 1.8+,配置JAVA_HOME环境变量。
    • 下载RocketMQ二进制包(如rocketmq-all-5.1.0-bin-release.zip),解压至指定目录。
  2. 启动服务

    • 启动NameServer:

      # Linux/macOS
      sh bin/mqnamesrv
      
      # Windows
      cmd /c start mqnamesrv.cmd
    • 启动Broker:

      # Linux/macOS(默认配置)
      sh bin/mqbroker -n localhost:9876
      
      # Windows
      cmd /c start mqbroker.cmd -n localhost:9876
  3. 验证安装:访问RocketMQ控制台(如http://localhost:8080)查看Broker和Topic状态,或通过命令行工具mqadmin验证服务运行。

4 快速入门(Java版)

  1. Maven依赖:在pom.xml中添加RocketMQ客户端依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>5.1.0</version>
</dependency>
  1. 生产者(发送普通消息)
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class SimpleProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("example-producer-group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++) {
            Message msg = new Message("example-topic",
                                      "TagA",
                                      ("Hello RocketMQ " + i).getBytes());
            SendResult result = producer.send(msg);
            System.out.printf("Send result: %s%n", result);
        }

        producer.shutdown();
    }
}
  1. 消费者(接收消息)
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

public class SimpleConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example-consumer-group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("example-topic", "*");   // 订阅所有Tag

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("Received message: %s%n", new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer started.");
    }
}

5 核心功能详解

消息类型:

  1. 普通消息:默认类型,支持同步/异步发送,适用于多数场景。
  2. 顺序消息:保证同一队列中消息按发送顺序消费,通过MessageQueueSelector实现分片。
  3. 事务消息:支持分布式事务的最终一致性,通过两阶段提交和回查机制实现。
  4. 延迟消息:消息在指定时间后投递,支持18个预定义延迟等级(如1s、5s、10min等)。

消息过滤方式:

  1. Tag过滤:通过设置消息的tag字段,消费者可订阅特定标签的消息。
  2. SQL过滤:支持SQL92表达式过滤消息属性,需在Broker配置中开启enablePropertyFilter

可用性策略:

  1. 主从架构:Broker采用主从复制(异步/同步刷盘),主节点宕机后自动切换到从节点。
  2. 刷盘策略:
    • 同步刷盘:消息写入磁盘后返回ACK,数据零丢失,性能较低。
    • 异步刷盘:消息写入内存后返回ACK,后台线程异步刷盘,性能高但可能丢失少量数据。

6 应用示例

  1. 消息幂等性:消费者需通过业务唯一标识(如订单ID)实现幂等处理,避免重复消费导致数据不一致。
  2. 流量控制::Producer可通过sendTimeoutretryTimesWhenSendFailed配置控制发送行为,避免因网络波动导致性能下降。
  3. 监控与报警:使用RocketMQ自带的Admin API或第三方工具(如Prometheus+Grafana)监控Broker、Producer/Consumer的运行状态,设置消息堆积、延迟等报警规则。
  4. 性能优化
    • 批量发送消息(producer.send(Collection<Message>))提升吞吐量。
    • 合理设置Queue数量(建议为Broker数量的2-4倍),避免队列竞争。
    • 使用SSD硬盘和内存映射(mmap)优化磁盘IO性能。

7 常见问题

  1. 消息丢失处理
    • 确保Broker配置flushDiskType=SYNC_FLUSH(同步刷盘)。
    • 生产者启用sendMsgTimeoutretry机制,消费者设置maxReconsumeTimes
  2. 消息重复消费
    • 业务层实现幂等性,如数据库唯一约束、状态机控制。
    • 消费者设置consumeConcurrentlyMaxSpan控制消息消费间隔。
  3. Broker启动失败
    • 检查内存是否充足(默认堆内存4GB,可通过conf/runbroker.sh调整)。
    • 确认NameServer地址正确,网络端口(9876、10911)未被占用。

发表评论