摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 「芋道源码」欢迎转载,保留摘要,谢谢!

  • 1、概述
  • 2、Consumer
  • 3、PushConsumer 一览
  • 4、PushConsumer 订阅
  • 5、PushConsumer 消息队列分配
  • 5、PushConsumer 消费进度读取
  • 6、PushConsumer 拉取消息
  • 6、PushConsumer 消费消息
  • 7、PushConsumer 发回消费失败消息
  • 8、Consumer 消费进度
  • 9、结尾

阅读源码最好的方式,是使用 IDEA 进行调试 RocketMQ 源码,不然会一脸懵逼。

胖友可以点击「芋道源码」扫码关注,回复 git001 关键字
获得艿艿添加了中文注释的 RocketMQ 源码地址。

阅读源码很孤单,加入源码交流群,一起坚持!

1、概述

本文接:《RocketMQ 源码分析 —— Message 拉取与消费(上)》。

主要解析 Consumer 在 消费 逻辑涉及到的源码。

2、Consumer

MQ 提供了两类消费者:

  • PushConsumer:
    • 在大多数场景下使用。
    • 名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现。通过 Pull 不断不断不断轮询 Broker 获取消息。当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push 做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )
  • PullConsumer

本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费
本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费
本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费

3、PushConsumer 一览

先看一张 PushConsumer 包含的组件以及组件之间的交互图:

PushConsumer手绘图.png

  • RebalanceService:均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。当有新的 Consumer 的加入或移除,都会重新分配消息队列。
  • PullMessageService:拉取消息服务,不断不断不断Broker 拉取消息,并提交消费任务到ConsumeMessageService
  • ConsumeMessageService:消费消息服务,不断不断不断消费消息,并处理消费结果。
  • RemoteBrokerOffsetStoreConsumer 消费进度管理,负责从 Broker 获取消费进度,同步消费进度到 Broker
  • ProcessQueue :消息处理队列。
  • MQClientInstance :封装对 NamesrvBroker 的 API调用,提供给 ProducerConsumer 使用。

4、PushConsumer 订阅

DefaultMQPushConsumerImpl#subscribe(...)

  1public void subscribe(String topic, String subExpression) throws MQClientException {
  2:     try {
  3:         // 创建订阅数据
  4:         SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
  5:             topic, subExpression);
  6:         this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
  7:         // 通过心跳同步Consumer信息到Broker
  8:         if (this.mQClientFactory != null) {
  9:             this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
 10:         }
 11:     } catch (Exception e) {
 12:         throw new MQClientException("subscription exception", e);
 13:     }
 14: }
  • 说明 :订阅 Topic 。
  • 第 3 至 6 行 :创建订阅数据。详细解析见:FilterAPI.buildSubscriptionData(...)。
  • 第 7 至 10 行 :通过心跳同步 Consumer 信息到 Broker

FilterAPI.buildSubscriptionData(...)

  1public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
  2:     String subString)
 throws Exception 
{
  3:     SubscriptionData subscriptionData = new SubscriptionData();
  4:     subscriptionData.setTopic(topic);
  5:     subscriptionData.setSubString(subString);
  6:     // 处理订阅表达式
  7:     if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
  8:         subscriptionData.setSubString(SubscriptionData.SUB_ALL);
  9:     } else {
 10:         String[] tags = subString.split("\\|\\|");
 11:         if (tags.length > 0) {
 12:             for (String tag : tags) {
 13:                 if (tag.length() > 0) {
 14:                     String trimString = tag.trim();
 15:                     if (trimString.length() > 0) {
 16:                         subscriptionData.getTagsSet().add(trimString);
 17:                         subscriptionData.getCodeSet().add(trimString.hashCode());
 18:                     }
 19:                 }
 20:             }
 21:         } else {
 22:             throw new Exception("subString split error");
 23:         }
 24:     }
 25
 26:     return subscriptionData;
 27: }
  • 说明 :根据 Topic 和 订阅表达式 创建订阅数据
  • subscriptionData.subVersion = System.currentTimeMillis()。

DefaultMQPushConsumer#registerMessageListener(...)

  1public void registerMessageListener(MessageListenerConcurrently messageListener) {
  2:     this.messageListener = messageListener;
  3:     this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
  4: }
  • 说明 :注册消息监听器。

5、PushConsumer 消息队列分配

RebalanceService&PushConsumer分配队列

RebalanceService

  1public class RebalanceService extends ServiceThread {
  2
  3:     /**
  4:      * 等待间隔,单位:毫秒
  5:      */

  6:     private static long waitInterval =
  7:         Long.parseLong(System.getProperty(
  8:             "rocketmq.client.rebalance.waitInterval""20000"));
  9
 10:     private final Logger log = ClientLogger.getLog();
 11:     /**
 12:      * MQClient对象
 13:      */

 14:     private final MQClientInstance mqClientFactory;
 15
 16:     public RebalanceService(MQClientInstance mqClientFactory) {
 17:         this.mqClientFactory = mqClientFactory;
 18:     }
 19
 20:     @Override
 21:     public void run() {
 22:         log.info(this.getServiceName() + " service started");
 23
 24:         while (!this.isStopped()) {
 25:             this.waitForRunning(waitInterval);
 26:             this.mqClientFactory.doRebalance();
 27:         }
 28
 29:         log.info(this.getServiceName() + " service end");
 30:     }
 31
 32:     @Override
 33:     public String getServiceName() {
 34:         return RebalanceService.class.getSimpleName();
 35:     }
 36: }
  • 说明 :均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。
  • 第 26 行 :调用 MQClientInstance#doRebalance(...) 分配消息队列。目前有三种情况情况下触发:
    • 如 第 25 行 等待超时,每 20s 调用一次。
    • PushConsumer 启动时,调用 rebalanceService#wakeup(...) 触发。
    • Broker 通知 Consumer 加入 或 移除时,Consumer 响应通知,调用 rebalanceService#wakeup(...) 触发。

详细解析见:MQClientInstance#doRebalance(...)。

MQClientInstance#doRebalance(...)

  1public void doRebalance() {
  2:     for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
  3:         MQConsumerInner impl = entry.getValue();
  4:         if (impl != null) {
  5:             try {
  6:                 impl.doRebalance();
  7:             } catch (Throwable e) {
  8:                 log.error("doRebalance exception", e);
  9:             }
 10:         }
 11:     }
 12: }
  • 说明 :遍历当前 Client 包含的 consumerTableConsumer集合 ),执行消息队列分配。
  • 疑问:目前代码调试下来,consumerTable 只包含 Consumer自己。

  • AllocateMessageQueueStrategy类图DefaultMQPushConsumerImpl拉取消息DefaultMQPushConsumerImpl消费消息OffsetStore类图.png

  • 有大大对这个疑问有解答的,烦请解答下。
  • 第 6 行 :调用 MQConsumerInner#doRebalance(...) 进行队列分配。DefaultMQPushConsumerImplDefaultMQPullConsumerImpl 分别对该接口方法进行了实现。DefaultMQPushConsumerImpl#doRebalance(...) 详细解析见:DefaultMQPushConsumerImpl#doRebalance(...)。
  • DefaultMQPushConsumerImpl#doRebalance(...)

  •   1public void doRebalance() {
      2:     if (!this.pause) {
      3:         this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
      4:     }
      5: }
  • 说明:执行消息队列分配。
  • 第 3 行 :调用 RebalanceImpl#doRebalance(...) 进行队列分配。详细解析见:RebalancePushImpl#doRebalance(...)。
  • RebalanceImpl#doRebalance(...)

  •   1/**
      2:  * 执行分配消息队列
      3:  *
      4:  * @param isOrder 是否顺序消息
      5:  */

      6public void doRebalance(final boolean isOrder) {
      7:     // 分配每个 topic 的消息队列
      8:     Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
      9:     if (subTable != null) {
     10:         for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
     11:             final String topic = entry.getKey();
     12:             try {
     13:                 this.rebalanceByTopic(topic, isOrder);
     14:             } catch (Throwable e) {
     15:                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
     16:                     log.warn("rebalanceByTopic Exception", e);
     17:                 }
     18:             }
     19:         }
     20:     }
     21:     // 移除未订阅的topic对应的消息队列
     22:     this.truncateMessageQueueNotMyTopic();
     23: }
     24
     25/**
     26:  * 移除未订阅的消息队列
     27:  */

     28private void truncateMessageQueueNotMyTopic() {
     29:     Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
     30:     for (MessageQueue mq : this.processQueueTable.keySet()) {
     31:         if (!subTable.containsKey(mq.getTopic())) {
     32
     33:             ProcessQueue pq = this.processQueueTable.remove(mq);
     34:             if (pq != null) {
     35:                 pq.setDropped(true);
     36:                 log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
     37:             }
     38:         }
     39:     }
     40: }
  • #doRebalance(...) 说明 :执行分配消息队列。
    • 第 7 至 20 行 :循环订阅主题集合( subscriptionInner ),分配每一个 Topic 的消息队列。
    • 第 22 行 :移除未订阅的 Topic 的消息队列。
  • #truncateMessageQueueNotMyTopic(...) 说明 :移除未订阅的消息队列。当调用DefaultMQPushConsumer#unsubscribe(topic) 时,只移除订阅主题集合( subscriptionInner ),对应消息队列移除在该方法。
  • RebalanceImpl#rebalanceByTopic(...)

  • // ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
  • #rebalanceByTopic(...) 说明 :分配 Topic 的消息队列。
    • 第 21 至 40 行 :获取 Topic 对应的消息队列和消费者们,并对其进行排序。因为各 Consumer 是在本地分配消息队列,排序后才能保证各 Consumer 顺序一致。
    • 第 42 至 61 行 :根据 队列分配策略( AllocateMessageQueueStrategy ) 分配消息队列。详细解析见:AllocateMessageQueueStrategy。
    • 第 63 至 72 行 :更新 Topic 对应的消息队列。
    • 第 3 至 19 行 :广播模式( BROADCASTING ) 下,分配 Topic对应的所有消息队列。
    • 第 20 至 74 行 :集群模式( CLUSTERING ) 下,分配 Topic对应的部分消息队列。
  • #updateProcessQueueTableInRebalance(...) 说明 :当分配队列时,更新 Topic 对应的消息队列,并返回是否有变更。
    • 第 132 至 135 行 :顺序消费 相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。
    • 第 137 行 :移除消息队列的消费进度。
    • 第 139 行 :获取队列消费进度。详细解析见:RebalancePushImpl#computePullFromWhere(...)。
    • 第 140 至 156 行 :添加新消费处理队列,添加消费拉取消息请求
    • 第 103 行 :移除不需要的消息队列。详细解析见:RebalancePushImpl#removeUnnecessaryMessageQueue(...)。
    • 第 108 至 120 行 :队列拉取超时,即 当前时间 - 最后一次拉取消息时间 > 120s ( 120s 可配置),判定发生 BUG,过久未进行消息拉取,移除消息队列。移除后,下面**#新增队列逻辑#**可以重新加入新的该消息队列。
    • 第 93 至 126 行 :移除不存在于分配的消息队列( mqSet ) 的 消息处理队列( processQueueTable )。
    • 第 128 至 158 行 :增加 分配的消息队列( mqSet ) 新增的消息队列。
    • 第 161 行 :发起新增的消息队列消息拉取请求。详细解析见:RebalancePushImpl#dispatchPullRequest(...)。
  • RebalanceImpl#removeUnnecessaryMessageQueue(...)

  • RebalancePushImpl#removeUnnecessaryMessageQueue(...)

  •   1public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
      2:     // 同步队列的消费进度,并移除之。
      3:     this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
      4:     this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
      5:     // TODO 顺序消费
      6:     if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
      7:         && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
      8:         try {
      9:             if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
     10:                 try {
     11:                     return this.unlockDelay(mq, pq);
     12:                 } finally {
     13:                     pq.getLockConsume().unlock();
     14:                 }
     15:             } else {
     16:                 log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}"//
     17:                     mq, //
     18:                     pq.getTryUnlockTimes());
     19
     20:                 pq.incTryUnlockTimes();
     21:             }
     22:         } catch (Exception e) {
     23:             log.error("removeUnnecessaryMessageQueue Exception", e);
     24:         }
     25
     26:         return false;
     27:     }
     28:     return true;
     29: }
  • 说明 :移除不需要的消息队列相关的信息,并返回是否移除成功。
  • 第 2 至 4 行 :同步队列的消费进度,并移除之。
  • 第 5 至 27 行 :顺序消费 相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。
  • [PullConsumer]RebalancePullImpl#removeUnnecessaryMessageQueue(...)

  •   1public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
      2:     this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
      3:     this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
      4:     return true;
      5: }
  • 说明 :移除不需要的消息队列相关的信息,并返回移除成功。RebalancePushImpl#removeUnnecessaryMessageQueue(...)基本一致。
  • RebalancePushImpl#dispatchPullRequest(...)

  •   1public void dispatchPullRequest(List<PullRequest> pullRequestList) {
      2:     for (PullRequest pullRequest : pullRequestList) {
      3:         this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
      4:         log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
      5:     }
      6: }
  • 说明 :发起消息拉取请求。该调用是PushConsumer不断不断不断拉取消息的起点
  • DefaultMQPushConsumerImpl#executePullRequestImmediately(...)

  •   1public void executePullRequestImmediately(final PullRequest pullRequest) {
      2:     this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
      3: }
  • 说明 :提交拉取请求。提交后,PullMessageService 异步执行非阻塞。详细解析见:PullMessageService。
  • AllocateMessageQueueStrategy

  • AllocateMessageQueueAveragely

  •   1public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
      2:     private final Logger log = ClientLogger.getLog();
      3
      4:     @Override
      5:     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
      6:         List<String> cidAll)
     
    {
      7:         // 校验参数是否正确
      8:         if (currentCID == null || currentCID.length() < 1) {
      9:             throw new IllegalArgumentException("currentCID is empty");
     10:         }
     11:         if (mqAll == null || mqAll.isEmpty()) {
     12:             throw new IllegalArgumentException("mqAll is null or mqAll empty");
     13:         }
     14:         if (cidAll == null || cidAll.isEmpty()) {
     15:             throw new IllegalArgumentException("cidAll is null or cidAll empty");
     16:         }
     17
     18:         List<MessageQueue> result = new ArrayList<>();
     19:         if (!cidAll.contains(currentCID)) {
     20:             log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
     21:                 consumerGroup,
     22:                 currentCID,
     23:                 cidAll);
     24:             return result;
     25:         }
     26:         // 平均分配
     27:         int index = cidAll.indexOf(currentCID); // 第几个consumer。
     28:         int mod = mqAll.size() % cidAll.size(); // 余数,即多少消息队列无法平均分配。
     29:         int averageSize =
     30:             mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
     31:                 + 1 : mqAll.size() / cidAll.size());
     32:         int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 有余数的情况下,[0, mod) 平分余数,即每consumer多分配一个节点;第index开始,跳过前mod余数。
     33:         int range = Math.min(averageSize, mqAll.size() - startIndex); // 分配队列数量。之所以要Math.min()的原因是,mqAll.size() <= cidAll.size(),部分consumer分配不到消息队列。
     34:         for (int i = 0; i < range; i++) {
     35:             result.add(mqAll.get((startIndex + i) % mqAll.size()));
     36:         }
     37:         return result;
     38:     }
     39
     40:     @Override
     41:     public String getName() {
     42:         return "AVG";
     43:     }
     44: }
  • 说明 :平均分配队列策略。
  • 第 7 至 25 行 :参数校验。
  • 第 26 至 36 行 :平均分配消息队列。
    • [ 0, mod ) :mqAll.size() / cidAll.size() + 1。前面mod 个 Consumer 平分余数,多获得 1 个消息队列。
    • [ mod, cidAll.size() ) :mqAll.size() / cidAll.size()
    • 第 27 行 :index :当前 Consumer 在消费集群里是第几个。这里就是为什么需要对传入的 cidAll 参数必须进行排序的原因。如果不排序,Consumer 在本地计算出来的index 无法一致,影响计算结果。
    • 第 28 行 :mod :余数,即多少消息队列无法平均分配。
    • 第 29 至 31 行 :averageSize :代码可以简化成 (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size())
    • 第 32 行 :startIndex :Consumer 分配消息队列开始位置。
    • 第 33 行 :range :分配队列数量。之所以要Math#min(...) 的原因:当 mqAll.size() <= cidAll.size() 时,最后几个 Consumer 分配不到消息队列。
    • 第 34 至 36 行 :生成分配消息队列结果。
  • 举个例子:
  • 固定消息队列长度为4


  • Consumer * 2 可以整除Consumer * 3 不可整除Consumer * 5 无法都分配
    消息队列[0]Consumer[0]Consumer[0]Consumer[0]
    消息队列[1]Consumer[0]Consumer[0]Consumer[1]
    消息队列[2]Consumer[1]Consumer[1]Consumer[2]
    消息队列[3]Consumer[1]Consumer[2]Consumer[3]
  • AllocateMessageQueueByMachineRoom

  •   1public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
      2:     /**
      3:      * 消费者消费brokerName集合
      4:      */

      5:     private Set<String> consumeridcs;
      6
      7:     @Override
      8:     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
      9:         List<String> cidAll)
     
    {
     10:         // 参数校验
     11:         List<MessageQueue> result = new ArrayList<MessageQueue>();
     12:         int currentIndex = cidAll.indexOf(currentCID);
     13:         if (currentIndex < 0) {
     14:             return result;
     15:         }
     16:         // 计算符合当前配置的消费者数组('consumeridcs')对应的消息队列
     17:         List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
     18:         for (MessageQueue mq : mqAll) {
     19:             String[] temp = mq.getBrokerName().split("@");
     20:             if (temp.length == 2 && consumeridcs.contains(temp[0])) {
     21:                 premqAll.add(mq);
     22:             }
     23:         }
     24:         // 平均分配
     25:         int mod = premqAll.size() / cidAll.size();
     26:         int rem = premqAll.size() % cidAll.size();
     27:         int startIndex = mod * currentIndex;
     28:         int endIndex = startIndex + mod;
     29:         for (int i = startIndex; i < endIndex; i++) {
     30:             result.add(mqAll.get(i));
     31:         }
     32:         if (rem > currentIndex) {
     33:             result.add(premqAll.get(currentIndex + mod * cidAll.size()));
     34:         }
     35:         return result;
     36:     }
     37
     38:     @Override
     39:     public String getName() {
     40:         return "MACHINE_ROOM";
     41:     }
     42
     43:     public Set<String> getConsumeridcs() {
     44:         return consumeridcs;
     45:     }
     46
     47:     public void setConsumeridcs(Set<String> consumeridcs) {
     48:         this.consumeridcs = consumeridcs;
     49:     }
     50: }
  • 说明 :平均分配可消费的 Broker 对应的消息队列。
  • 第 7 至 15 行 :参数校验。
  • 第 16 至 23 行 :计算可消费的 Broker 对应的消息队列。
  • 第 25 至 34 行 :平均分配消息队列。该平均分配方式和 AllocateMessageQueueAveragely 略有不同,其是将多余的结尾部分分配给前 rem 个 Consumer
  • 疑问:使用该分配策略时,Consumer 和 Broker 分配需要怎么配置。©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任

    更多相关文章

    1. 面试官再问我如何保证 RocketMQ 不丢失消息,这回我笑了!
    2. RocketMQ 源码分析 —— 定时消息与消息重试
    3. 消息中间件 RocketMQ 源码解析 —— 调试环境搭建
    4. 分布式消息队列 RocketMQ源码解析:事务消息
    5. 分布式消息队列 RocketMQ源码解析:Filtersrv
    6. 分布式消息队列 RocketMQ 源码分析 —— 定时消息与消息重试
    7. 分布式消息队列 RocketMQ 源码分析 —— 高可用
    8. 分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)
    9. 分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)

    随机推荐

    1. Android按键消息传播流程(WindowManagerSe
    2. android 笔记 --- Android中Menu应用
    3. OrmLite - Lightweight Java ORM Support
    4. TextView的跑马灯效果
    5. Android 4.0 用户输入子系统
    6. Activity 模版样式简介
    7. Android中ExpandableListView的使用
    8. android 对话框(Dialog)使用
    9. Android Sensor Shake(WeChat)
    10. 分享七个非常有用的Android开发工具和工