摘要: 原创出处 http://www.iocoder.cn/RocketMQ/spring-boot-integration/ 「芋道源码」欢迎转载,保留摘要,谢谢!

  • 1. 概述
  • 2. 调试环境搭建
  • 3. 项目结构一览
  • 5. annotation 包
  • 6. autoconfigure 包
  • 7. config 包
  • 8. support 包
  • 9. core 包
  • 666. 彩蛋

1. 概述

在开始分享 https://github.com/apache/rocketmq-spring 项目(RocketMQ 集成到 Spring Boot 中),我们先恶趣味的看一段历史:

  • 2014-08 Spring Boot 1 正式发布。
  • 2018-03 Spring Boot 2 正式发布。
  • 2018-12 RocketMQ 团队发布 RocketMQ 集成到 Spring Boot 的解决方案,并且提供了中文文档。

在阅读本文之前,希望胖友能够先熟读 中文文档 。最好呢,当然不强制,可以操练下每个 Demo 。

2. 调试环境搭建

在读源码之前,我们当然是先把调试环境搭建起来。

2.1 依赖工具

  • JDK :1.8+
  • Maven
  • IntelliJ IDEA

2. 源码拉取

从官方仓库 https://github.com/apache/rocketmq-spring Fork 出属于自己的仓库。为什么要 Fork ?既然开始阅读、调试源码,我们可能会写一些注释,有了自己的仓库,可以进行自由的提交。

使用 IntelliJ IDEA 从 Fork 出来的仓库拉取代码。拉取完成后,Maven 会下载依赖包,可能会花费一些时间,耐心等待下。


在等待的过程中,我来简单说下,搭建调试环境的过程:

  1. 启动 RocketMQ Namesrv
  2. 启动 RocketMQ Broker
  3. 启动 RocketMQ Spring Boot Producer
  4. 启动 RocketMQ Spring Boot Consumer

最小化的 RocketMQ 的环境,暂时不考虑 Namesrv 集群、Broker 集群、Consumer 集群。

另外,本文使用的 rocketmq-spring 版本是 2.0.2-SNAPSHOT 。

2.3 启动 RocketMQ Namesrv

方式一,可以参考 《RocketMQ 源码解析 —— 调试环境搭建》 的 「3. 启动 RocketMQ Namesrv」 的方式,进行启动 RocketMQ Namesrv 。

方式一,可以方便调试 RocketMQ Namesrv 的代码。

方式二,也可以按照 《Apache RocketMQ —— Quick Start》 的 「Start Name Server」 的方式,进行启动 RocketMQ Namesrv 。

方式二,比较方便。

2.4 启动 RocketMQ Broker

方式一,可以参考 《RocketMQ 源码解析 —— 调试环境搭建》 的 「4. 启动 RocketMQ Broker」 的方式,进行启动 RocketMQ Broker 。

  • 需要注意的是,要删除 org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener 和 org.apache.rocketmq.broker.transaction.TransactionalMessageService两个 SPI 配置文件,否则事务功能,无法正常使用。

方式二,也可以按照 《Apache RocketMQ —— Quick Start》 的 「Start Broker」 的方式,进行启动 RocketMQ Broker 。

2.5 启动 RocketMQ Spring Boot Producer

第一步,打开根目录的 pom.xml 文件,将 rocketmq-spring-boot-samples 示例项目的注释去掉。如下:

<!-- pom -->

<modules>
    <module>rocketmq-spring-boot-parent</module>
    <module>rocketmq-spring-boot</module>
    <module>rocketmq-spring-boot-starter</module>
    <!-- Note: The samples need to mvn compiple in its own directory
            <module>rocketmq-spring-boot-samples</module>
    -->

    <module>rocketmq-spring-boot-samples</module>
</modules>

此时,Maven 又会下载依赖包,可能会花费一些时间,耐心等待下。

第二步,右键运行 rocketmq-produce-demo 的 ProducerApplication 的 #main(String[] args) 方法,Producer 就启动完成了。输出日志如下图:

此时,可能会报 Intellij IDEA 报错:Error : java 不支持发行版本5。可以参考 《Intellij idea 报错:Error : java 不支持发行版本5》 文章,进行解决。

2.6 启动 RocketMQ Spring Boot Consumer

右键运行 rocketmq-consumer-demo 的 ConsumerApplication 的 #main(String[] args) 方法,Consumer 就启动完成了。输出日志如下图:

后面,我们就可以愉快的各种调试玩耍了~

3. 项目结构一览

本文主要分享 rocketmq-spring 的 项目结构
希望通过本文能让胖友对 rocketmq-spring 的整体项目有个简单的了解。

项目结构一览

3.1 代码统计

这里先分享一个小技巧。笔者在开始源码学习时,会首先了解项目的代码量。

第一种方式,使用 IDEA Statistic 插件,统计整体代码量。

Statistic 统计代码量

我们可以粗略的看到,总的代码量在 1700 行。这其中还包括单元测试,示例等等代码。
所以,不慌,一点不慌~

第二种方式,使用 Shell 脚本命令逐个 Maven 模块统计 。

一般情况下,笔者使用 find . -name "*.java"|xargs cat|grep -v -e ^$ -e ^\s*\/\/.*$|wc -l 。这个命令只过滤了部分注释,所以相比 IDEA Statistic 会偏多

当然,考虑到准确性,胖友需要手动 cd 到每个 Maven 项目的 src/main/java  目录下,以达到排除单元测试的代码量。

Shell 脚本统计代码量

统计完后,是不是更加不慌了。哈哈哈哈。

3.2 rocketmq-spring-boot-parent 模块

rocketmq-spring-boot-parent 模块,无具体代码,作为其它项目的 Maven Parent 项目,例如定义了依赖版本号。

3.3 rocketmq-spring-boot-starter 模块

rocketmq-spring-boot-starter 模块,无具体代码,作为 Spring Boot RocketMQ Starter 模块。其 pom.xml 的代码如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-parent</artifactId>
        <version>2.0.2-SNAPSHOT</version>
        <relativePath>../rocketmq-spring-boot-parent/pom.xml</relativePath>
    </parent>

    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <packaging>jar</packaging>

    <name>RocketMQ Spring Boot Starter</name>
    <description>SRocketMQ Spring Boot Starter</description>
    <url>https://github.com/apache/rocketmq-spring</url>

    <dependencies>
        <!-- Spring Boot RocketMQ 具体实现 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot</artifactId>
        </dependency>
        <!-- Spring Boot Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- 提供 Validation 功能 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>
    </dependencies>
    
</project>

3.4 rocketmq-spring-boot 模块

rocketmq-spring-boot 模块,1979 行代码,提供了 Spring Boot RocketMQ 的具体实现。其每个 package 包的功能,分别如下:

  • annotation :注解和注解相关的枚举。

  • autoconfigure :自动配置。

  • config :配置类。

    有点难解释。等后面直接撸源码。

  • core :核心实现。

  • support :提供支持,例如说工具类。

    有点难解释。等后面直接撸源码。

3.5 rocketmq-spring-boot-samples 模块

rocketmq-spring-boot-samples 模块,435 行代码,提供示例。*rocketmq-consume-demo 模块,提供消费者示例。* rocketmq-produce-demo 模块,提供生产者示例。

艿艿:后面的小节,我们开始看具体的源码。

5. annotation 包

5.1 @RocketMQMessageListener

org.apache.rocketmq.spring.annotation.@RocketMQMessageListener 注解,声明指定 Bean 是 RocketMQ 消费者的 MessageListener 。代码如下:

// RocketMQMessageListener.java

@Target(ElementType.TYPE) // 表名,注解在类上
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {

    /**
     * 消费分组
     *
     * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
     * load balance. It's required and needs to be globally unique.
     *
     *
     * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
     */

    String consumerGroup();

    /**
     * 消费主体
     *
     * Topic name.
     */

    String topic();

    /**
     * 选择消息的方式
     *
     * Control how to selector message.
     *
     * @see SelectorType
     */

    SelectorType selectorType() default SelectorType.TAG;

    /**
     * 选择消息的表达式
     *
     * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
     */

    String selectorExpression() default "*";

    /**
     * 消费模式
     *
     * Control consume mode, you can choice receive message concurrently or orderly.
     */

    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;

    /**
     * 消费模型
     *
     * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
     */

    MessageModel messageModel() default MessageModel.CLUSTERING;

    /**
     * 消费线程数
     *
     * Max consumer thread number.
     */

    int consumeThreadMax() default 64;

}
  • 具体使用,见示例 OrderPaidEventConsumer 。

  • selectorType 属性,org.apache.rocketmq.spring.annotation.SelectorType 枚举,选择消息的方式。代码如下:

    // SelectorType.java

    public enum SelectorType {

        /**
         * @see ExpressionType#TAG
         *
         * 标签
         */

        TAG,

        /**
         * @see ExpressionType#SQL92
         *
         * SQL
         */

        SQL92

    }
  • consumeMode 属性,org.apache.rocketmq.spring.annotation.ConsumeMode ,消费模式。代码如下:

    // ConsumeMode.java

    public enum ConsumeMode {

        /**
         * Receive asynchronously delivered messages concurrently
         *
         *  并发消费
         */

        CONCURRENTLY,

        /**
         * Receive asynchronously delivered messages orderly. one queue, one thread
         *
         * 顺序消费
         */

        ORDERLY

    }
  • messageModel 属性,org.apache.rocketmq.spring.annotation.MessageModel ,消费模型。代码如下:

    // MessageModel.java

    public enum MessageModel {

        /**
         * 广播消费
         */

        BROADCASTING("BROADCASTING"),
        /**
         * 集群消费
         */

        CLUSTERING("CLUSTERING");

        private final String modeCN;

        MessageModel(String modeCN) {
            this.modeCN = modeCN;
        }

        public String getModeCN() {
            return this.modeCN;
        }

    }

5.2 @RocketMQTransactionListener

org.apache.rocketmq.spring.annotatio.@RocketMQTransactionListener 注解,声明指定 Bean 是 RocketMQ 生产者的 RocketMQLocalTransactionListener 。代码如下:

// RocketMQTransactionListener.java

@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})  // 表名,注解在类上
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component // 默认带了 @Component 注解,所以只要添加到了类上,就会注册成 Spring Bean 对象
public @interface RocketMQTransactionListener {

    /**
     * 生产者分组
     *
     * Declare the txProducerGroup that is used to relate callback event to the listener, rocketMQTemplate must send a
     * transactional message with the declared txProducerGroup.
     * <p>
     * <p>It is suggested to use the default txProducerGroup if your system only needs to define a TransactionListener class.
     */

    String txProducerGroup() default RocketMQConfigUtils.ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME;

    /**
     * Set ExecutorService params -- corePoolSize
     */

    int corePoolSize() default 1;

    /**
     * Set ExecutorService params -- maximumPoolSize
     */

    int maximumPoolSize() default 1;

    /**
     * Set ExecutorService params -- keepAliveTime
     */

    long keepAliveTime() default 1000 * 60//60ms

    /**
     * Set ExecutorService params -- blockingQueueSize
     */

    int blockingQueueSize() default 2000;

}

// RocketMQConfigUtils.java

public static final String ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME = "rocketmq_transaction_default_global_name";

6. autoconfigure 包

6.1 RocketMQProperties

org.apache.rocketmq.spring.autoconfigure.RocketMQProperties,RocketMQ 客户端的 Properties 对象。代码如下:

// RocketMQProperties.java

@ConfigurationProperties(prefix = "rocketmq"// 配置文件中 rocketmq 前缀
public class RocketMQProperties {

    /**
     * The name server for rocketMQ, formats: `host:port;host:port`.
     *
     * Namesrv 地址
     */

    private String nameServer;

    /**
     * Producer 配置
     */

    private Producer producer;

    // ... 省略 setting/getting 方法

    public static class Producer {

        /**
         * Name of producer.
         */

        private String group;

        /**
         * Millis of send message timeout.
         */

        private int sendMessageTimeout = 3000;

        /**
         * Compress message body threshold, namely, message body larger than 4k will be compressed on default.
         */

        private int compressMessageBodyThreshold = 1024 * 4;

        /**
         * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
         * This may potentially cause message duplication which is up to application developers to resolve.
         */

        private int retryTimesWhenSendFailed = 2;

        /**
         * <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
         * This may potentially cause message duplication which is up to application developers to resolve.
         */

        private int retryTimesWhenSendAsyncFailed = 2;

        /**
         * Indicate whether to retry another broker on sending failure internally.
         */

        private boolean retryNextServer = false;

        /**
         * Maximum allowed message size in bytes.
         */

        private int maxMessageSize = 1024 * 1024 * 4;

        // ... 省略 setting/getting 方法
    }

}

6.2 RocketMQAutoConfiguration

org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration ,RocketMQ 自动配置类。代码如下:

// RocketMQAutoConfiguration.java

@Configuration // 标识是配置类
@EnableConfigurationProperties(RocketMQProperties.class) // 指定 RocketMQProperties 自动配置
@ConditionalOnClass({ MQAdmin.class, ObjectMapper.class }) // 要求有 MQAdmin、ObjectMapper 类
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server"// 要求有 rocketmq 开头,且 name-server 的配置
@Import({ JacksonFallbackConfiguration.class, ListenerContainerConfiguration.class }) // 引入 JacksonFallbackConfiguration 和 ListenerContainerConfiguration 配置类
@AutoConfigureAfter(JacksonAutoConfiguration.class) // 在 JacksonAutoConfiguration 之后初始化
public class RocketMQAutoConfiguration {

    // ... 省略配置方法
    
}

6.2.1 defaultMQProducer

#defaultMQProducer() 方法,创建 DefaultMQProducer Bean 对象。代码如下:

// RocketMQAutoConfiguration.java

@Bean
@ConditionalOnMissingBean(DefaultMQProducer.class) // 不存在 DefaultMQProducer Bean 对象
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server""producer.group"}) // 要求有 rocketmq 开头,且 name-server、producer.group 的配置
public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
    // 校验配置
    RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
    String nameServer = rocketMQProperties.getNameServer();
    String groupName = producerConfig.getGroup();
    Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
    Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");

    // 创建 DefaultMQProducer 对象
    DefaultMQProducer producer = new DefaultMQProducer(groupName);
    // 将 RocketMQProperties.Producer 配置,设置到 producer 中
    producer.setNamesrvAddr(nameServer);
    producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
    producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
    producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
    producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
    producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
    producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());

    return producer;
}

6.2.2 rocketMQTemplate

#rocketMQTemplate(DefaultMQProducer mqProducer, ObjectMapper rocketMQMessageObjectMapper) 方法,创建 RocketMQTemplate Bean 对象。代码如下:

// RocketMQAutoConfiguration.java

@Bean(destroyMethod = "destroy"// 声明了销毁时,调用 destroy 方法
@ConditionalOnBean(DefaultMQProducer.class) // 有 DefaultMQProducer Bean 的情况下
@ConditionalOnMissingBean(RocketMQTemplate.class) // 不存在 RocketMQTemplate Bean 对象
public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, ObjectMapper rocketMQMessageObjectMapper) {
    // 创建 RocketMQTemplate 对象
    RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
    // 设置其属性
    rocketMQTemplate.setProducer(mqProducer);
    rocketMQTemplate.setObjectMapper(rocketMQMessageObjectMapper);
    return rocketMQTemplate;
}
  • 关于 RocketMQTemplate 类,详细解析,见 「9.4 RocketMQTemplate」 中。

6.2.3 transactionHandlerRegistry

#transactionHandlerRegistry(RocketMQTemplate template) 方法,创建 TransactionHandlerRegistry Bean 对象。代码如下:

// RocketMQAutoConfiguration.java

@Bean
@ConditionalOnBean(RocketMQTemplate.class) // 有 RocketMQTemplate Bean 的情况下
@ConditionalOnMissingBean(TransactionHandlerRegistry.class) // 不存在 TransactionHandlerRegistry Bean 对象
public TransactionHandlerRegistry transactionHandlerRegistry(RocketMQTemplate template) {
    // 创建 TransactionHandlerRegistry 对象
    return new TransactionHandlerRegistry(template);
}
  • 详细解析,见 「7.2 TransactionHandlerRegistry」 中。

6.2.4 transactionAnnotationProcessor

#transactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) 方法,创建 RocketMQTransactionAnnotationProcessor Bean 对象。代码如下:

// RocketMQAutoConfiguration.java

@Bean(name = RocketMQConfigUtils.ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME) // Bean 的名字
@ConditionalOnBean(TransactionHandlerRegistry.class) // 有 TransactionHandlerRegistry Bean 的情况下
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public static RocketMQTransactionAnnotationProcessor transactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) {
    // 创建 RocketMQTransactionAnnotationProcessor 对象
    return new RocketMQTransactionAnnotationProcessor(transactionHandlerRegistry);
}

// RocketMQConfigUtils.java

/**
 * The bean name of the internally managed RocketMQ transaction annotation processor.
 */

public static final String ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME = "org.springframework.rocketmq.spring.starter.internalRocketMQTransAnnotationProcessor";
  • 详细解析,见 「7.3 RocketMQTransactionAnnotationProcessor」 中。

6.3 JacksonFallbackConfiguration

org.apache.rocketmq.spring.autoconfigure.JacksonFallbackConfiguration ,创建 ObjectMapper Bean 对象的配置类。代码如下:

// JacksonFallbackConfiguration.java

import com.fasterxml.jackson.databind.ObjectMapper;

@Configuration
@ConditionalOnMissingBean(ObjectMapper.class) // 不存在 ObjectMapper Bean 时
class JacksonFallbackConfiguration {

    @Bean
    public ObjectMapper rocketMQMessageObjectMapper() {
        return new ObjectMapper();
    }

}
  • com.fasterxml.jackson.databind.ObjectMapper ,是 Jackson 提供的 JSON 序列化工具类。
    • 生产者发送消息时,将消息使用 Jackson 进行序列化。
    • 消费者拉取消息时,将消息使用 Jackson 进行反序列化。

6.4 ListenerContainerConfiguration

org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration ,实现 ApplicationContextAware、SmartInitializingSingleton 接口,给每个带有注解的  @RocketMQMessageListener Bean 对象,生成对应的 DefaultRocketMQListenerContainer Bean 对象。

而 DefaultRocketMQListenerContainer 类,正如其名,是 DefaultRocketMQListener(RocketMQ 消费者的监听器)容器,负责创建 DefaultRocketMQListener 对象,并启动其对应的 DefaultMQPushConsumer(消费者),从而消费消息。

6.4.1 构造方法

// ListenerContainerConfiguration.java

@Configuration
public class ListenerContainerConfiguration implements ApplicationContextAwareSmartInitializingSingleton {

    private static final Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class);

    private ConfigurableApplicationContext applicationContext;

    /**
     * 计数器,用于在 {@link #registerContainer(String, Object)} 方法中,创建 DefaultRocketMQListenerContainer Bean 时,生成 Bean 的名字。
     */

    private AtomicLong counter = new AtomicLong(0);

    private StandardEnvironment environment;

    private RocketMQProperties rocketMQProperties;

    private ObjectMapper objectMapper;

    public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper, StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
        this.objectMapper = rocketMQMessageObjectMapper;
        this.environment = environment;
        this.rocketMQProperties = rocketMQProperties;
    }

    @Override // 实现自 ApplicationContextAware 接口
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    }
    
}
  • 严格来说,ListenerContainerConfiguration 并不能说是一个 Configuration 类。这么写的原因,猜测是为了提供给 RocketMQAutoConfiguration 类,进行引入。
  • 当然,如果我们将 @Configuration 注解,修改成 @Component注解,也是能良好的运行。并且 @Configuration 注解,本身自带 @Component 注解。

6.4.2 afterSingletonsInstantiated

#afterSingletonsInstantiated() 方法,给每个带有注解的  @RocketMQMessageListener Bean 对象,生成对应的 DefaultRocketMQListenerContainer Bean 对象。代码如下:

// ListenerContainerConfiguration.java

@Override // 实现自 SmartInitializingSingleton 接口
public void afterSingletonsInstantiated() {
    // <1> 获得所有 @RocketMQMessageListener 注解的 Bean 们
    Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
    // 遍历 beans 数组,生成(注册)对应的 DefaultRocketMQListenerContainer Bean 对象。
    if (Objects.nonNull(beans)) {
        beans.forEach(this::registerContainer);
    }
}
  • <1> 处,获得所有 @RocketMQMessageListener 注解的 Bean 们。
  • <2> 处,遍历 beans 数组,调用 #registerContainer(String beanName, Object bean) 方法,生成(注册)对应的 DefaultRocketMQListenerContainer Bean 对象。详细解析,见 「6.4.3 registerContainer」 中。

6.4.3 registerContainer

#registerContainer(String beanName, Object bean) 方法,生成(注册)对应的 DefaultRocketMQListenerContainer Bean 对象。代码如下:

// ListenerContainerConfiguration.java

private void registerContainer(String beanName, Object bean) {
    // <1.1> 获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。
    Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
    // <1.2> 如果未实现 RocketMQListener 接口,直接抛出 IllegalStateException 异常。
    if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
        throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
    }
    // <1.3> 获得 @RocketMQMessageListener 注解
    RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
    // <1.4> 校验注解配置
    validate(annotation);

    // <2.1> 生成 DefaultRocketMQListenerContainer Bean 的名字
    String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet());
    GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
    // <2.2> 创建 DefaultRocketMQListenerContainer Bean 对象,并注册到 Spring 容器中。
    genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
            () -> createRocketMQListenerContainer(bean, annotation));

    // <3.1> 从 Spring 容器中,获得刚注册的 DefaultRocketMQListenerContainer Bean 对象
    DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class);
    // <3.2> 如果未启动,则进行启动
    if (!container.isRunning()) {
        try {
            container.start();
        } catch (Exception e) {
            log.error("Started container failed. {}", container, e);
            throw new RuntimeException(e);
        }
    }

    // 打印日志
    log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}