摘要: 原创出处 http://www.iocoder.cn/RocketMQ/install/ 「芋道源码」欢迎转载,保留摘要,谢谢!

  • 1. 概述
    • 1.1 基本概念
    • 1.2 整体架构
    • 1.3 整体流程
    • 1.4 更多文档
  • 2. 单机部署
  • 3. 集群部署
  • 4. Web Console 控制台
  • 5. 简单示例
  • 6. Spring Boot 使用示例
  • 7. Spring Cloud 使用示例
  • 666. 彩蛋

推荐阅读如下 RocketMQ 文章:

  • 《芋道 Spring Boot 分布式消息队列 RocketMQ 入门》
  • 《芋道 Spring Cloud Alibaba 消息队列 RocketMQ 入门》
  • 《芋道 Spring Cloud Alibaba 事件总线 Bus RocketMQ 入门》
  • 《性能测试 —— RocketMQ 基准测试》

1. 概述

在开始搭建 RocketMQ 服务之前,我们先来对它做下简单的了解。

RocketMQ 是阿里巴巴在 2012 年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于 2017 年 9 月 25 日成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。

如下是 RocketMQ 产生的原因:

淘宝内部的交易系统使用了淘宝自主研发的 Notify 消息中间件,使用 MySQL 作为消息存储媒介,可完全水平扩容。

为了进一步降低成本,我们认为存储部分可以进一步优化。2011 年初,Linkin 开源了 Kafka 这个优秀的消息中间件,淘宝中间件团队在对 Kafka 做过充分 Review 之后, Kafka 无限消息堆积,高效的持久化速度吸引了我们。

但是,同时发现这个消息系统主要定位于日志传输,对于使用在淘宝交易、订单、充值等场景下还有诸多特性不满足,为此我们重新用 Java 语言编写了 RocketMQ ,定位于非日志的可靠消息传输(日志场景也 OK)。

目前 RocketMQ 在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理, binglog 分发等场景。

1.1 基本概念

在开始之前,胖友先认真阅读如下两个文档:

  • 概念(Concept):介绍 RocketMQ 的基本概念模型。
  • 特性(Features):介绍 RocketMQ 实现的功能特性。

1.2 整体架构

如下图所示:

  • 生产者(Producer):负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。
  • 消费者(Consumer):负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。
  • 消息服务器(Broker):是消息存储中心,主要作用是接收来自 Producer 的消息并存储, Consumer 从这里取得消息。
  • 名称服务器(NameServer):用来保存 Broker 相关 Topic 等元信息并给 Producer ,提供 Consumer 查找 Broker 信息。

1.3 整体流程

整体流程

  • 1、启动 Namesrv,Namesrv起 来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。

  • 2、Broker 启动,跟所有的 Namesrv 保持长连接,定时发送心跳包。

    心跳包中,包含当前 Broker 信息(IP+端口等)以及存储所有 Topic 信息。注册成功后,Namesrv 集群中就有 Topic 跟 Broker 的映射关系。

    • 3、收发消息前,先创建 Topic 。创建 Topic 时,需要指定该 Topic 要存储在哪些 Broker上。也可以在发送消息时自动创建Topic。
  • 4、Producer 发送消息。

    启动时,先跟 Namesrv 集群中的其中一台建立长连接,并从Namesrv 中获取当前发送的 Topic 存在哪些 Broker 上,然后跟对应的 Broker 建立长连接,直接向 Broker 发消息。

  • 5、Consumer 消费消息。

    Consumer 跟 Producer 类似。跟其中一台 Namesrv 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

1.4 更多文档

目前 RocketMQ 4 的中文文档很少,所以英文不太好的胖友,后续推荐看看如下资料:

  • 《RocketMQ 用户指南》 基于 RocketMQ 3 的版本。
  • 《RocketMQ 原理简介》 基于 RocketMQ 3 的版本。
  • 《RocketMQ 最佳实践》 基于 RocketMQ 3 的版本。
  • 《RocketMQ 开发者指南》 基于 RocketMQ 4 的版本。
  • 《阿里云 —— 消息队列 MQ》 阿里云的消息队列,就是 RocketMQ 的云服务。

2. 单机部署

可以参考 《Apache RocketMQ —— Quick Start》 文章。

本小节,我们会部署一套 RocketMQ 最小化的单机环境,包括一个 RocketMQ Namesrv 和 Broker 服务。部署完成之后,我们会测试消息的发送与消费。下面,让我们逐步开始。

2.1 前置条件

需要安装如下软件:

  • JDK 8+
  • Maven 3.2.X+

因为我们准备直接编译 RocketMQ 源码,构建出 RocketMQ 软件包。

2.2 下载源码

打开 RocketMQ release_notes 页面,我们可以看到 RocketMQ 所有的发布版本。这里,我们选择最新的 RocketMQ 4.6.0 版本。点击进入该版本的发布页面后,我们可以看到两种发布版本:

  • Source: rocketmq-all-4.6.0-source-release.zip
  • Binary: rocketmq-all-4.6.0-bin-release.zip

一般情况下,我们可以直接使用 Binary 版本,它是 RocketMQ 已经编译好,可以直接使用的 RocketMQ 软件包。

这里,我们想带着胖友们编译一次 RocketMQ 源码,所以使用 Source 版本。下面,我们开始下载 RocketMQ 4.6.0 Source 源码。命令行操作如下:

# 下载
$ wget wget http://mirror.bit.edu.cn/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-source-release.zip

# 解压
$ unzip rocketmq-all-4.6.0-source-release.zip

2.2 编译源码

使用 Maven 编译 RocketMQ 源码。命令行操作如下:

# 进入 RocketMQ 源码目录
cd rocketmq-all-4.6.0-source-release

# Maven 编译 RocketMQ ,并跳过测试。耐心等待...
$ mvn -Prelease-all -DskipTests clean install -U

编译完成,在我们进入 distribution 目录下,就可以看到 RocketMQ 的发布包了。命令行操作如下:

# 进入 distribution 目录下
cd distribution/target/rocketmq-4.6.0/rocketmq-4.6.0

# 打印目录
$ ls
40 -rwxr-xr-x   1 yunai  staff  17336 Nov 19 20:59 LICENSE
 8 -rwxr-xr-x   1 yunai  staff   1338 Nov 19 20:59 NOTICE
16 -rwxr-xr-x   1 yunai  staff   4225 Nov 19 20:59 README.md
 0 drwxr-xr-x   6 yunai  staff    192 Dec  3 12:48 benchmark # 性能基准测试
 0 drwxr-xr-x  30 yunai  staff    960 Nov 19 20:59 bin # 执行脚本
 0 drwxr-xr-x  12 yunai  staff    384 Nov 19 20:59 conf # 配置文件
 0 drwxr-xr-x  36 yunai  staff   1152 Dec  3 12:48 lib # RocketMQ jar 包

2.3 启动 Namesrv

启动一个 RocketMQ Namesrv 服务。命令行操作如下:

nohup sh bin/mqnamesrv &

启动完成后,查看日志。

# 查看 Namesrv 日志。
$ tail -f ~/logs/rocketmqlogs/namesrv.log

2019-12-03 12:58:04 INFO main - The Name Server boot success. serializeType=JSON
  • 默认情况下,Namesrv 日志文件所在地址为 ~/logs/rocketmqlogs/namesrv.log 。如果想要自定义,可以通过 conf/logback_namesrv.xml 配置文件来进行修改。

2.4 启动 Broker

在 conf 目录下,RocketMQ 提供了多种 Broker 的配置文件:

  • broker.conf :单主,异步刷盘。
  • 2m/ :双主,异步刷盘。
  • 2m-2s-async/ :两主两从,异步复制,异步刷盘。
  • 2m-2s-sync/ :两主两从,同步复制,异步刷盘。
  • dledger/ :Dledger 集群,至少三节点。

这里,我们只启动一个 RocketMQ Broker 服务,所以使用 broker.conf 配置文件。命令行操作如下:

nohup sh bin/mqbroker -c conf/broker.conf  -n 127.0.0.1:9876 &
  • 通过 -c 参数,配置读取的主 Broker 配置。

  • 通过 -n 参数,设置 RocketMQ Namesrv 地址。

  • 如果胖友的服务器的存相对小,可以修改下 bin/runbroker.sh脚本,将 Broker JVM 内存调小。如下:

    JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g"

启动完成后,查看日志。

tail -f ~/logs/rocketmqlogs/broker.log

2019-12-03 14:27:07 INFO main - The broker[broker-a, 192.168.3.44:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
  • 默认情况下,Broker 日志文件所在地址为 ~/logs/rocketmqlogs/broker.log 。如果想要自定义,可以通过 conf/logback_broker.xml 配置文件来进行修改。

至此,我们已经完成了 RocketMQ 单机部署。下面,我们开始进行下消息的发送和消费的测试。

2.5 测试发送消息

通过使用 bin/tools.sh 工具类,实现测试发送消息。命令行操作如下:

# 设置 Namesrv 服务器的地址
export NAMESRV_ADDR=127.0.0.1:9876

# 执行生产者 Producer 发送测试消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

如果发送成功,我们会看到大量成功的发送日志。

SendResult [sendStatus=SEND_OK, msgId=FE800000000000004F2B5386138462F500000D7163610D67E7F100F4, offsetMsgId=C0A8032C00002A9F000000000000D7EE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=61]
SendResult [sendStatus=SEND_OK, msgId=FE800000000000004F2B5386138462F500000D7163610D67E7F200F5, offsetMsgId=C0A8032C00002A9F000000000000D8D1, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=61]
  • 通过发送结果为 sendStatus=SEND_OK 状态,说明消息都发送成功了。

2.6 测试消费消息

通过使用 bin/tools.sh 工具类,实现测试消费消息。命令行操作如下:

# 设置 Namesrv 服务器的地址
export NAMESRV_ADDR=127.0.0.1:9876

# 执行消费者 Consumer 消费测试消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

如果消费成功,我们会看到大量成功的消费日志。

ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=2, storeSize=227, queueOffset=131, sysFlag=0, bornTimestamp=1575354513732, bornHost=/192.168.3.44:55510, storeTimestamp=1575354513733, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F000000000001D1FC, commitLogOffset=119292, bodyCRC=1549304357, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=145, CONSUME_START_TIME=1575354867104, UNIQ_KEY=FE800000000000004F2B5386138462F500000D7163610D67E944020E, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132535054], transactionId='null'}]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=227, queueOffset=130, sysFlag=0, bornTimestamp=1575354513729, bornHost=/192.168.3.44:55510, storeTimestamp=1575354513729, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F000000000001CE70, commitLogOffset=118384, bodyCRC=1530218044, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=145, CONSUME_START_TIME=1575354867103, UNIQ_KEY=FE800000000000004F2B5386138462F500000D7163610D67E941020A, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132535050], transactionId='null'}]]
  • 通过 ConsumeMessageThread_4 和 ConsumeMessageThread_3 线程名,我们可以看出,目前是进行并发消费消息。

3. 集群部署

在生产环境下,必须搭建 RocketMQ 高可用集群,不然简直是找死。艿艿有个项目抠门了下,只搭建了一主一从,在一次主挂掉之后,因为 RocketMQ 不支持主从切换,就发生了线上事故。一般 RocketMQ 的集群部署方案推荐如下:

  • 如果对高性能有比较强的诉求,使用两主两从,异步复制,异步刷盘。
  • 如果对可靠性有比较强的诉求,建议使用 Dledger 集群,至少三节点。

因为在 《性能测试 —— RocketMQ 基准测试》 的 「5. 搭建集群」 小节中,我们已经详细描述了如何搭建一个一主一从的 RocketMQ 单集群。胖友可以参考该文,搭建一个二主两从的 RocketMQ 双集群。

下面,艿艿额外放送下 RocketMQ 实现高可用的原理。感兴趣的胖友,可以瞅一瞅。

RocketMQ 集群

1. Producer

  • 1、Producer 自身在应用中,所以无需考虑高可用。
  • 2、Producer 配置多个 Namesrv 列表,从而保证 Producer 和 Namesrv 的连接高可用。并且,会从 Namesrv 定时拉取最新的 Topic 信息。
  • 3、Producer 会和所有 Consumer 直连,在发送消息时,会选择一个 Broker 进行发送。如果发送失败,则会使用另外一个 Broker 。
  • 4、Producer 会定时向 Broker 心跳,证明其存活。而 Broker 会定时检测,判断是否有 Producer 异常下线。

2. Consumer

  • 1、Consumer 需要部署多个节点,以保证 Consumer 自身的高可用。当相同消费者分组中有新的 Consumer 上线,或者老的 Consumer 下线,会重新分配 Topic 的 Queue 到目前消费分组的 Consumer 们。
  • 2、Consumer 配置多个 Namesrv 列表,从而保证 Consumer 和 Namesrv 的连接高可用。并且,会从 Consumer 定时拉取最新的 Topic 信息。
  • 3、Consumer 会和所有 Broker 直连,消费相应分配到的 Queue 的消息。如果消费失败,则会发回消息到 Broker 中。
  • 4、Consumer 会定时向 Broker 心跳,证明其存活。而 Broker 会定时检测,判断是否有 Consumer 异常下线。

3. Namesrv

  • 1、Namesrv 需要部署多个节点,以保证 Namesrv 的高可用。
  • 2、Namesrv 本身是无状态,不产生数据的存储,是通过 Broker 心跳将 Topic 信息同步到 Namesrv 中。
  • 3、多个 Namesrv 之间不会有数据的同步,是通过 Broker 向多个 Namesrv 多写。

 4. Broker

  • 1、多个 Broker 可以形成一个 Broker 分组。每个 Broker 分组存在一个 Master 和多个 Slave 节点。
    • Master 节点,可提供读和写功能。Slave 节点,可提供读功能。
    • Master 节点会不断发送新的 CommitLog 给 Slave节点。Slave 节点不断上报本地的 CommitLog 已经同步到的位置给 Master 节点。
    • Slave 节点会从 Master 节点拉取消费进度、Topic 配置等等。
  • 2、多个 Broker 分组,形成 Broker 集群。
    • Broker 集群和集群之间,不存在通信与数据同步。
  • 3、Broker 可以配置同步刷盘或异步刷盘,根据消息的持久化的可靠性来配置。

4. Web Console 控制台

在 RocketMQ 拓展项目(rocketmq-externals) 中,包含了 RocketMQ Console 项目,是 RocketMQ 的图形化管理控制台,提供 Broker 集群信息查看,Topic 管理,Producer、Consumer 信息展示,消息查询等等常用功能。

虽然说,我们也可以使用 RocketMQ 提供的 CLI Admin Tool 工具,实现上述的查询与管理的功能,但是命令行的方式对操作人员的要求稍高一些。当然,在 RocketMQ Console 无法满足我们更精细化的管理的需求的时候,我们还是会使用 CLI Admin Tool 工具。

下面,让我们来搭建一个 RocketMQ Console 控制台。

4.1 克隆代码

将 rocketmq-externals 仓库的代码,克隆到本地。操作流程如下:

# 克隆代码
$ git clone https://github.com/apache/rocketmq-externals.git

# 进入 Console 目录
cd rocketmq-console

4.2 配置文件

如果胖友需要自定义 RocketMQ Console 的配置,可以进入该项目下的 src/main/resources/ 目录下,进行相应的配置文件修改。例如说,设置 RocketMQ Namesrv 地址,开启 RocketMQ Console 的登陆访问。

这里,我们修改 src/main/resources/application.properties 配置文件,通过设置 rocketmq.config.namesrvAddr=127.0.0.1:9876配置项,设置 RocketMQ Namesrv 的地址。

4.3 编译源码

使用 Maven 编译 RocketMQ Console 源码。命令行操作如下:

# 编译
$ mvn clean package -Dmaven.test.skip=true

4.4 启动控制台

直接以 jar 的方式,启动控制台。注意,控制台使用 8080 端口。命令行操作如下:

nohup java -jar target/rocketmq-console-ng-1.0.1.jar &

启动完成后,查看日志。

$ tail -f nohup.out

[2019-12-03 16:05:19.349]  INFO Tomcat started on port(s): 8080 (http)
[2019-12-03 16:05:19.354]  INFO Started App in 5.341 seconds (JVM running for 6.104)
  • 当看到如上日志,说明 Console 控制台启动完成。

4.5 简单使用

使用浏览器,访问 http://127.0.0.1:8080/ 地址,我们就可以看到 RocketMQ Console 的界面。如下图:

更多的使用指南,胖友可以后续看看 《RocketMQ Console —— 使用文档》 。

5. 简单示例

本小节,我们来看看如何使用生产者 Producer 发送消息,和消费者 Consumer 消费消息。

在 Rocketmq 仓库的 example 目录下,提供了 RocketMQ 示例。本小节,我们主要来看看 qucikstart 这个最简示例。

5.1 Producer

Producer 类,提供生产者 Producer 发送消息的最简示例。代码如下:

// Producer.java

public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * Instantiate with a producer group name.
         */

        // <1.1> 创建 DefaultMQProducer 对象
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // <1.2> 设置 RocketMQ Namesrv 地址
        producer.setNamesrvAddr("127.0.0.1:9876");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */


        /*
         * Launch the instance.
         */

        // <1.3> 启动 producer 生产者
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */

                // <2.1> 创建 Message 消息
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                /*
                 * Call send message to deliver message to one of brokers.
                 */

                // <2.2> 同步发送消息
                SendResult sendResult = producer.send(msg);

                // <2.3> 打印发送结果
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        /*
         * Shut down once the producer instance is not longer in use.
         */

        // <3> 关闭 producer 生产者
        producer.shutdown();
    }

}
  • <1> 处,初始化一个 Producer 生产者。
    • <1.1> 处,创建 DefaultMQProducer 对象,这里设置的生产者分组是 "please_rename_unique_group_name" 。
    • <1.2> 处,设置 设置 producer 的 RocketMQ Namesrv 地址。这里,是艿艿额外添加的代码。
    • <1.3> 处,启动 producer 生产者。
  • <2> 处,使用 Producer 发送 1000 条消息。
    • <2.1> 处,创建 Message 消息。这里设置了其 Topic 为 "TopicTest",Tag 为 TagA、消息体 Body 为 "Hello RocketMQ" 的二进制数组。
    • <2.2> 处,调用生产者的  #send(Message msg) 方法,同步发送消息,等待发送结果。RocketMQ Producer 一共有三种发送消息的方式,除了我们这里看到的同步发送消息之外,还有异步发送消息(可见 AsyncProducer 示例),和 Oneway 发送消息。
    • <2.3> 处,打印发送结果。
  • <3> 处,关闭 producer 生产者。

执行 #main(args) 方法,开始发送消息。在控制台上,可以看到如下内容:

# 发送日志,省略另外 999 条日志
SendResult [sendStatus=SEND_OK, msgId=240E00E0F0931BB3FCEC071C1CDE61A8000018B4AAC20E79E06A03E7, offsetMsgId=C0A82BF000002A9F000000000008EE72, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=645]

# 关闭 Producer 日志
19:27:48.339 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
19:27:48.340 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.43.240:10911] result: true

5.2 Consumer

Consumer 类,提供消费者 Consumer 消费消息的最简示例。代码如下:

// Consumer.java

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        /*
         * Instantiate with specified consumer group name.
         */

        // <1> 创建 DefaultMQPushConsumer 对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        // <2> 设置 RocketMQ Namesrv 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */


        /*
         * Specify where to start in case the specified consumer group is a brand new one.
         */

        // <3> 设置消费进度,从 Topic 最初位置开始
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        /*
         * Subscribe one more more topics to consume.
         */

        // <4> 订阅 TopicTest 主题
        consumer.subscribe("TopicTest""*");

        /*
         *  Register callback to execute on arrival of messages fetched from brokers.
         */

        // <5> 添加消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context)
 
{
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 返回成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }

        });

        /*
         *  Launch the consumer instance.
         */

        // <6> 启动 producer 消费者
        consumer.start();

        // 打印 Consumer 启动完成
        System.out.printf("Consumer Started.%n");
    }

}