摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 「芋道源码」欢迎转载,保留摘要,谢谢!

  • 1、概述
  • 2、ConsumeQueue 结构
  • 3、ConsumeQueue 存储
  • 4、Broker 提供[拉取消息]接口
  • 5、Broker 提供[更新消费进度]接口
  • 6、Broker 提供[发回消息]接口
  • 7、结尾

阅读源码最好的方式,是使用 IDEA 进行调试 RocketMQ 源码,不然会一脸懵逼。

胖友可以点击「芋道源码」扫码关注,回复 git001 关键字
获得艿艿添加了中文注释的 RocketMQ 源码地址。

阅读源码很孤单,加入源码交流群,一起坚持!

1、概述

本章主要解析 消费 逻辑涉及到的源码。因为篇幅较长,分成上下两篇:

  1. 上篇:Broker 相关源码。
  2. 下篇:Consumer 相关源码。

本文即是上篇。


ok,先看第一张关于消费逻辑的图:

消费逻辑图

再看消费逻辑精简的顺序图(实际情况会略有差别):

Consumer&Broker消费精简图.png

2、ConsumeQueue 结构

ConsumeQueueMappedFileQueueMappedFile 的关系如下:

ConsumeQueue : MappedFileQueue : MappedFile = 1 : 1 : N。

反应到系统文件如下:

Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ pwd
/Users/yunai/store/consumequeue
Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ cd TopicRead3/
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ ls -ls
total 0
0 drwxr-xr-x  3 yunai  staff  102  4 27 21:52 0
0 drwxr-xr-x  3 yunai  staff  102  4 27 21:55 1
0 drwxr-xr-x  3 yunai  staff  102  4 27 21:55 2
0 drwxr-xr-x  3 yunai  staff  102  4 27 21:55 3
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ cd 0/
Yunai-MacdeMacBook-Pro-2:0 yunai$ ls -ls
total 11720
11720 -rw-r--r--  1 yunai  staff  6000000  4 27 21:55 00000000000000000000

ConsumeQueueMappedFileQueueMappedFile 的定义如下:

  • MappedFile :00000000000000000000等文件。
  • MappedFileQueue :MappedFile 所在的文件夹,对MappedFile 进行封装成文件队列,对上层提供可无限使用的文件容量。
    • 每个 MappedFile 统一文件大小。
    • 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在 ConsumeQueue 里默认为 6000000B。
  • ConsumeQueue :针对 MappedFileQueue 的封装使用。
    • Store : ConsumeQueue = ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>

ConsumeQueue 存储在 MappedFile 的内容必须大小是 20B( ConsumeQueue.CQ_STORE_UNIT_SIZE ),有两种内容类型:

  1. MESSAGE_POSITION_INFO :消息位置信息。
  2. BLANK : 文件前置空白占位。当历史 Message 被删除时,需要用 BLANK占位被删除的消息。

MESSAGE_POSITION_INFO 在 ConsumeQueue 存储结构:

第几位字段说明数据类型字节数
1offset消息 CommitLog 存储位置Long8
2size消息长度Int4
3tagsCode消息tagsCodeLong8

BLANK 在 ConsumeQueue 存储结构:

第几位字段说明数据类型字节数
1
0Long8
2
Integer.MAX_VALUEInt4
3
0Long8

3、ConsumeQueue 存储

CommitLog重放ConsumeQueue图

主要有两个组件:

  • ReputMessageService :write ConsumeQueue。
  • FlushConsumeQueueService :flush ConsumeQueue。

ReputMessageService

ReputMessageService顺序图

  1class ReputMessageService extends ServiceThread {
  2
  3:     /**
  4:      * 开始重放消息的CommitLog物理位置
  5:      */

  6:     private volatile long reputFromOffset = 0;
  7
  8:     public long getReputFromOffset() {
  9:         return reputFromOffset;
 10:     }
 11
 12:     public void setReputFromOffset(long reputFromOffset) {
 13:         this.reputFromOffset = reputFromOffset;
 14:     }
 15
 16:     @Override
 17:     public void shutdown() {
 18:         for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
 19:             try {
 20:                 Thread.sleep(100);
 21:             } catch (InterruptedException ignored) {
 22:             }
 23:         }
 24
 25:         if (this.isCommitLogAvailable()) {
 26:             log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
 27:                 DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
 28:         }
 29
 30:         super.shutdown();
 31:     }
 32
 33:     /**
 34:      * 剩余需要重放消息字节数
 35:      *
 36:      * @return 字节数
 37:      */

 38:     public long behind() {
 39:         return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
 40:     }
 41
 42:     /**
 43:      * 是否commitLog需要重放消息
 44:      *
 45:      * @return 是否
 46:      */

 47:     private boolean isCommitLogAvailable() {
 48:         return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
 49:     }
 50
 51:     private void doReput() {
 52:         for (boolean doNext = truethis.isCommitLogAvailable() && doNext; ) {
 53
 54:             // TODO 疑问:这个是啥
 55:             if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
 56:                 && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
 57:                 break;
 58:             }
 59
 60:             // 获取从reputFromOffset开始的commitLog对应的MappeFile对应的MappedByteBuffer
 61:             SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
 62:             if (result != null) {
 63:                 try {
 64:                     this.reputFromOffset = result.getStartOffset();
 65
 66:                     // 遍历MappedByteBuffer
 67:                     for (int readSize = 0; readSize < result.getSize() && doNext; ) {
 68:                         // 生成重放消息重放调度请求
 69:                         DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), falsefalse);
 70:                         int size = dispatchRequest.getMsgSize(); // 消息长度
 71:                         // 根据请求的结果处理
 72:                         if (dispatchRequest.isSuccess()) { // 读取成功
 73:                             if (size > 0) { // 读取Message
 74:                                 DefaultMessageStore.this.doDispatch(dispatchRequest);
 75:                                 // 通知有新消息
 76:                                 if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
 77:                                     && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
 78:                                     DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
 79:                                         dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
 80:                                         dispatchRequest.getTagsCode());
 81:                                 }
 82:                                 // FIXED BUG By shijia
 83:                                 this.reputFromOffset += size;
 84:                                 readSize += size;
 85:                                 // 统计
 86:                                 if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
 87:                                     DefaultMessageStore.this.storeStatsService
 88:                                         .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
 89:                                     DefaultMessageStore.this.storeStatsService
 90:                                         .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
 91:                                         .addAndGet(dispatchRequest.getMsgSize());
 92:                                 }
 93:                             } else if (size == 0) { // 读取到MappedFile文件尾
 94:                                 this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
 95:                                 readSize = result.getSize();
 96:                             }
 97:                         } else if (!dispatchRequest.isSuccess()) { // 读取失败
 98:                             if (size > 0) { // 读取到Message却不是Message
 99:                                 log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
100:                                 this.reputFromOffset += size;
101:                             } else { // 读取到Blank却不是Blank
102:                                 doNext = false;
103:                                 if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
104:                                     log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
105:                                         this.reputFromOffset);
106
107:                                     this.reputFromOffset += result.getSize() - readSize;
108:                                 }
109:                             }
110:                         }
111:                     }
112:                 } finally {
113:                     result.release();
114:                 }
115:             } else {
116:                 doNext = false;
117:             }
118:         }
119:     }
120
121:     @Override
122:     public void run() {
123:         DefaultMessageStore.log.info(this.getServiceName() + " service started");
124
125:         while (!this.isStopped()) {
126:             try {
127:                 Thread.sleep(1);
128:                 this.doReput();
129:             } catch (Exception e) {
130:                 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
131:             }
132:         }
133
134:         DefaultMessageStore.log.info(this.getServiceName() + " service end");
135:     }
136
137:     @Override
138:     public String getServiceName() {
139:         return ReputMessageService.class.getSimpleName();
140:     }
141
142: }
  • 说明:重放消息线程服务。
    • 该服务不断生成 消息位置信息 到 消费队列(ConsumeQueue)
    • 该服务不断生成 消息索引 到 索引文件(IndexFile)
  • ReputMessageService用例图
    • 第 75 至 81 行 :当 Broker 是主节点 && Broker 开启的是长轮询,通知消费队列有新的消息。NotifyMessageArrivingListener 会 调用 PullRequestHoldService#notifyMessageArriving(...) 方法,详细解析见:PullRequestHoldService
    • 第 61 行 :获取 reputFromOffset 开始的 CommitLog 对应的 MappedFile 对应的 MappedByteBuffer
    • 第 67 行 :遍历 MappedByteBuffer
    • 第 69 行 :生成重放消息重放调度请求 (DispatchRequest) 。请求里主要包含一条消息 (Message) 或者 文件尾 (BLANK) 的基本信息。
    • 第 72 至 96 行 :请求是有效请求,进行逻辑处理。
    • 第 73 至 92 行 :请求对应的是 Message,进行调度,生成 ConsumeQueue 和 IndexFile 对应的内容。详细解析见:
    • 第 93 至 96 行 :请求对应的是 Blank,即文件尾,跳转指向下一个 MappedFile
    • 第 97 至 110 行 :请求是无效请求。出现该情况,基本是一个BUG
  • 第 127 至 128 行 :每 1ms 循环执行重放逻辑。
  • 第 18 至 30 行 :shutdown时,多次 sleep(100) 直到 CommitLog 回放到最新位置。恩,如果未回放完,会输出警告日志。

DefaultMessageStore#doDispatch(...)

  1/**
  2:  * 执行调度请求
  3:  * 1. 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
  4:  * 2. 建立 索引信息 到 IndexFile
  5:  *
  6:  * @param req 调度请求
  7:  */

  8public void doDispatch(DispatchRequest req) {
  9:     // 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
 10:     final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
 11:     switch (tranType) {
 12:         case MessageSysFlag.TRANSACTION_NOT_TYPE:
 13:         case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
 14:             DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
 15:                 req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
 16:             break;
 17:         case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
 18:         case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
 19:             break;
 20:     }
 21:     // 建立 索引信息 到 IndexFile
 22:     if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
 23:         DefaultMessageStore.this.indexService.buildIndex(req);
 24:     }
 25: }
 26
 27/**
 28:  * 建立 消息位置信息 到 ConsumeQueue
 29:  *
 30:  * @param topic 主题
 31:  * @param queueId 队列编号
 32:  * @param offset commitLog存储位置
 33:  * @param size 消息长度
 34:  * @param tagsCode 消息tagsCode
 35:  * @param storeTimestamp 存储时间
 36:  * @param logicOffset 队列位置
 37:  */

 38public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,
 39:     long logicOffset)
 
{
 40:     ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
 41:     cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
 42: }

ConsumeQueue#putMessagePositionInfoWrapper(...)

  1/**
  2:  * 添加位置信息封装
  3:  *
  4:  * @param offset commitLog存储位置
  5:  * @param size 消息长度
  6:  * @param tagsCode 消息tagsCode
  7:  * @param storeTimestamp 消息存储时间
  8:  * @param logicOffset 队列位置
  9:  */

 10public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
 11:     long logicOffset)
 
{
 12:     final int maxRetries = 30;
 13:     boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
 14:     // 多次循环写,直到成功
 15:     for (int i = 0; i < maxRetries && canWrite; i++) {
 16:         // 调用添加位置信息
 17:         boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
 18:         if (result) {
 19:             // 添加成功,使用消息存储时间 作为 存储check point。
 20:             this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp);
 21:             return;
 22:         } else {
 23:             // XXX: warn and notify me
 24:             log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset
 25:                 + " failed, retry " + i + " times");
 26
 27:             try {
 28:                 Thread.sleep(1000);
 29:             } catch (InterruptedException e) {
 30:                 log.warn("", e);
 31:             }
 32:         }
 33:     }
 34
 35:     // XXX: warn and notify me 设置异常不可写入
 36:     log.error("[BUG]consume queue can not write, {} {}"this.topic, this.queueId);
 37:     this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
 38: }
 39
 40/**
 41:  * 添加位置信息,并返回添加是否成功
 42:  *
 43:  * @param offset commitLog存储位置
 44:  * @param size 消息长度
 45:  * @param tagsCode 消息tagsCode
 46:  * @param cqOffset 队列位置
 47:  * @return 是否成功
 48:  */

 49private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
 50:     final long cqOffset)
 
{
 51:     // 如果已经重放过,直接返回成功
 52:     if (offset <= this.maxPhysicOffset) {
 53:         return true;
 54:     }
 55:     // 写入位置信息到byteBuffer
 56:     this.byteBufferIndex.flip();
 57:     this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
 58:     this.byteBufferIndex.putLong(offset);
 59:     this.byteBufferIndex.putInt(size);
 60:     this.byteBufferIndex.putLong(tagsCode);
 61:     // 计算consumeQueue存储位置,并获得对应的MappedFile
 62:     final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
 63:     MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
 64:     if (mappedFile != null) {
 65:         // 当是ConsumeQueue第一个MappedFile && 队列位置非第一个 && MappedFile未写入内容,则填充前置空白占位
 66:         if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { // TODO 疑问:为啥这个操作。目前能够想象到的是,一些老的消息很久没发送,突然发送,这个时候刚好满足。
 67:             this.minLogicOffset = expectLogicOffset;
 68:             this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
 69:             this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
 70:             this.fillPreBlank(mappedFile, expectLogicOffset);
 71:             log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
 72:                 + mappedFile.getWrotePosition());
 73:         }
 74:         // 校验consumeQueue存储位置是否合法。TODO 如果不合法,继续写入会不会有问题?
 75:         if (cqOffset != 0) {
 76:             long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
 77:             if (expectLogicOffset != currentLogicOffset) {
 78:                 LOG_ERROR.warn(
 79:                     "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
 80:                     expectLogicOffset,
 81:                     currentLogicOffset,
 82:                     this.topic,
 83:                     this.queueId,
 84:                     expectLogicOffset - currentLogicOffset
 85:                 );
 86:             }
 87:         }
 88:         // 设置commitLog重放消息到ConsumeQueue位置。
 89:         this.maxPhysicOffset = offset;
 90:         // 插入mappedFile
 91:         return mappedFile.appendMessage(this.byteBufferIndex.array());
 92:     }
 93:     return false;
 94: }
 95
 96/**
 97:  * 填充前置空白占位
 98:  *
 99:  * @param mappedFile MappedFile
100:  * @param untilWhere consumeQueue存储位置
101:  */

102private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
103:     // 写入前置空白占位到byteBuffer
104:     ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
105:     byteBuffer.putLong(0L);
106:     byteBuffer.putInt(Integer.MAX_VALUE);
107:     byteBuffer.putLong(0L);
108:     // 循环填空
109:     int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
110:     for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
111:         mappedFile.appendMessage(byteBuffer.array());
112:     }
113: }
  • #putMessagePositionInfoWrapper(...) 说明 :添加位置信息到ConsumeQueue 的封装,实际需要调用 #putMessagePositionInfo(...) 方法。
    • 第 13 行 :判断 ConsumeQueue 是否允许写入。当发生Bug时,不允许写入。
    • 第 17 行 :调用 #putMessagePositionInfo(...) 方法,添加位置信息。
    • 第 18 至 21 行 :添加成功,使用消息存储时间 作为 存储检查点。StoreCheckpoint 的详细解析见:Store初始化与关闭。
    • 第 22 至 32 行 :添加失败,目前基本可以认为是BUG。
    • 第 35 至 37 行 :写入失败时,标记 ConsumeQueue 写入异常,不允许继续写入。
  • #putMessagePositionInfo(...) 说明 :添加位置信息到 ConsumeQueue,并返回添加是否成功。
    • 这块比较有疑问,如果计算出来的存储位置不合法,不返回添加失败,继续进行添加位置信息,会不会有问题???
    • 这块比较有疑问,什么场景下会需要。猜测产生的原因:一个Topic 长期无消息产生,突然N天后进行发送,Topic 对应的历史消息以及和消费队列数据已经被清理,新生成的MappedFile需要前置占位。
    • 第 51 至 54 行 :如果 offset(存储位置) 小于等于  maxPhysicOffset(CommitLog 消息重放到 ConsumeQueue 最大的 CommitLog 存储位置),表示已经重放过,此时,不再重复写入,直接返回写入成功。
    • 第 55 至 60 行 :写 位置信息到byteBuffer。
    • 第 62 至 63 行 :计算 ConsumeQueue存储位置,并获得对应的MappedFile。
    • 第 65 至 73 行 :当 MappedFile 是 ConsumeQueue 当前第一个文件 && MappedFile 未写入内容 && 重放消息队列位置大于0,则需要进行 MappedFile 填充前置  BLANK
    • 第 74 至 87 行 :校验 ConsumeQueue 存储位置是否合法,不合法则输出日志。
    • 第 89 行 :设置 CommitLog 重放消息到 ConsumeQueue 的最大位置。
    • 第 91 行 :插入消息位置到 MappedFile

FlushConsumeQueueService

  1class FlushConsumeQueueService extends ServiceThread {
  2:     private static final int RETRY_TIMES_OVER = 3;
  3:     /**
  4:      * 最后flush时间戳
  5:      */

  6:     private long lastFlushTimestamp = 0;
  7
  8:     private void doFlush(int retryTimes) {
  9:         int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
 10
 11:         // retryTimes == RETRY_TIMES_OVER时,进行强制flush。主要用于shutdown时。
 12:         if (retryTimes == RETRY_TIMES_OVER) {
 13:             flushConsumeQueueLeastPages = 0;
 14:         }
 15:         // 当时间满足flushConsumeQueueThoroughInterval时,即使写入的数量不足flushConsumeQueueLeastPages,也进行flush
 16:         long logicsMsgTimestamp = 0;
 17:         int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
 18:         long currentTimeMillis = System.currentTimeMillis();
 19:         if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
 20:             this.lastFlushTimestamp = currentTimeMillis;
 21:             flushConsumeQueueLeastPages = 0;
 22:             logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
 23:         }
 24:         // flush消费队列
 25:         ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
 26:         for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
 27:             for (ConsumeQueue cq : maps.values()) {
 28:                 boolean result = false;
 29:                 for (int i = 0; i < retryTimes && !result; i++) {
 30:                     result = cq.flush(flushConsumeQueueLeastPages);
 31:                 }
 32:             }
 33:         }
 34:         // flush 存储 check point
 35:         if (0 == flushConsumeQueueLeastPages) {
 36:             if (logicsMsgTimestamp > 0) {
 37:                 DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
 38:             }
 39:             DefaultMessageStore.this.getStoreCheckpoint().flush();
 40:         }
 41:     }
 42
 43:     public void run() {
 44:         DefaultMessageStore.log.info(this.getServiceName() + " service started");
 45
 46:         while (!this.isStopped()) {
 47:             try {
 48:                 int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
 49:                 this.waitForRunning(interval);
 50:                 this.doFlush(1);
 51:             } catch (Exception e) {
 52:                 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
 53:             }
 54:         }
 55
 56:         this.doFlush(RETRY_TIMES_OVER);
 57
 58:         DefaultMessageStore.log.info(this.getServiceName() + " service end");
 59:     }
 60
 61:     @Override
 62:     public String getServiceName() {
 63:         return FlushConsumeQueueService.class.getSimpleName();
 64:     }
 65
 66:     @Override
 67:     public long getJointime() {
 68:         return 1000 * 60;
 69:     }
 70: }
  • 说明 :flush ConsumeQueue(消费队列) 线程服务。
  • 第 11 至 14 行 :当 retryTimes == RETRY_TIMES_OVER 时,进行强制flush。用于 shutdown 时。
  • 第 15 至 23 行 :每 flushConsumeQueueThoroughInterval 周期,执行一次 flush 。因为不是每次循环到都能满足 flushConsumeQueueLeastPages 大小,因此,需要一定周期进行一次强制 flush 。当然,不能每次循环都去执行强制 flush,这样性能较差。
  • 第 24 至 33 行 :flush ConsumeQueue(消费队列)。
    • flush 逻辑:MappedFile#落盘。
  • 第 34 至 40 行 :flush StoreCheckpointStoreCheckpoint 的详细解析见:Store初始化与关闭。
  • 第 43 至 59 行 :每 1000ms 执行一次 flush。如果 wakeup() 时,则会立即进行一次 flush。目前,暂时不存在 wakeup() 的调用。

4、Broker 提供[拉取消息]接口

PullMessageRequestHeader

  1public class PullMessageRequestHeader implements CommandCustomHeader {
  2:     /**
  3:      * 消费者分组
  4:      */

  5:     @CFNotNull
  6:     private String consumerGroup;
  7:     /**
  8:      * Topic
  9:      */

 10:     @CFNotNull
 11:     private String topic;
 12:     /**
 13:      * 队列编号
 14:      */

 15:     @CFNotNull
 16:     private Integer queueId;
 17:     /**
 18:      * 队列开始位置
 19:      */

 20:     @CFNotNull
 21:     private Long queueOffset;
 22:     /**
 23:      * 消息数量
 24:      */

 25:     @CFNotNull
 26:     private Integer maxMsgNums;
 27:     /**
 28:      * 系统标识
 29:      */

 30:     @CFNotNull
 31:     private Integer sysFlag;
 32:     /**
 33:      * 提交消费进度位置
 34:      */

 35:     @CFNotNull
 36:     private Long commitOffset;
 37:     /**
 38:      * 挂起超时时间
 39:      */

 40:     @CFNotNull
 41:     private Long suspendTimeoutMillis;
 42:     /**
 43:      * 订阅表达式
 44:      */

 45:     @CFNullable
 46:     private String subscription;
 47:     /**
 48:      * 订阅版本号
 49:      */

 50:     @CFNotNull
 51:     private Long subVersion;
 52: }
  • 说明:拉取消息请求Header
  • topic +  queueId + queueOffset + maxMsgNums
  • sysFlag :系统标识。
    • 第 0 位 FLAG_COMMIT_OFFSET :标记请求提交消费进度位置,和 commitOffset 配合。
    • 第 1 位 FLAG_SUSPEND :标记请求是否挂起请求,和 suspendTimeoutMillis 配合。当拉取不到消息时, Broker 会挂起请求,直到有消息。最大挂起时间:suspendTimeoutMillis毫秒。
    • 第 2 位 FLAG_SUBSCRIPTION :是否过滤订阅表达式,和 subscription 配置。
  • subVersion :订阅版本号。请求时,如果版本号不对,则无法拉取到消息,需要重新获取订阅信息,使用最新的订阅版本号。

PullMessageProcessor#proce***equest(...)

// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
  • 说明:处理拉取消息请求,返回响应。
  • 第 14 至 19 行 :校验 Broker 是否可读。
  • 第 21 至 33 行 :校验 SubscriptionGroupConfig(订阅分组配置) 是否存在 && 可以消费。
  • 第 35 至 38 行 :处理 PullMessageRequestHeader.sysFlag 对应的标志位。
  • 第 40 至 62 行 :校验 TopicConfig(主题配置) 是否存在 && 可读 && 队列编号正确。
  • 第 64 至 110 行 :校验 SubscriptionData(订阅信息) 是否正确。
  • 第 113 行 :调用 MessageStore#getMessage(...) 获取 GetMessageResult(消息)。详细解析见:MessageStore#getMessage(...)。
  • 第 122 至 152 行 :计算建议拉取消息 brokerId 。
  • 第 154 至 201 行 :
  • 第 204 至 244 行 :Hook 逻辑,#executeConsumeMessageHookBefore(...) 。
  • 第 247 至 283 行 :拉取消息成功,即拉取到消息。
    • 第 255 至 263 行 :方式一 :调用 readGetMessageResult(...) 获取消息内容到堆内内存,设置到 响应body
    • 第 265 至 281 行 :方式二 :基于 zero-copy 实现,直接响应,无需堆内内存,性能更优。TODO :此处等对zero-copy有研究,再补充一些
  • 第 284 至 300 行 :拉取不到消息,当满足条件 (Broker 允许挂起 && 请求要求挂起),执行挂起请求。详细解析见:PullRequestHoldService。
  • 第 304 至 328 行 :TODO :此处等对tools模块研究后再补充
  • 第 339 至 346 :持久化消费进度,当满足 (Broker 非主 && 请求要求持久化进度)。详细解析见:更新消费进度。

MessageStore#getMessage(...)

// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
  • 说明 :根据 消息分组(group) + 主题(Topic) + 队列编号(queueId) + 队列位置(offset) + 订阅信息(subscriptionData) 获取 指定条数(maxMsgNums) 消息(Message)。
  • 第 14 至 18 行 :判断 Store 是否处于关闭状态,若关闭,则无法获取消息。
  • 第 19 至 23 行 :判断当前运行状态是否可读,若不可读,则无法获取消息。
  • 第 37 行 :根据 主题(Topic) + 队列编号(queueId) 获取 消息队列(ConsumeQueue)。
    • #findConsumeQueue(...) :第 159 至 196 行。
  • 第 43 至 58 行 :各种队列位置(offset) 无法读取消息,并针对对应的情况,计算下一次 Client 队列拉取位置。
    • 第 43 至 45 行 :消息队列无消息。
    • 第 46 至 48 行 :查询的消息队列位置(offset) 太小。
    • 第 49 至 51 行 :查询的消息队列位置(offset) 恰好等于 消息队列最大的队列位置。该情况是正常现象,相当于查询最新的消息。
    • 第 52 至 58 行 :查询的消息队列位置(offset) 超过过多。
    • #nextOffsetCorrection(...) :第 198 至 212 行。
  • 第 61 行 :根据 消费队列位置(offset) 获取 对应的MappedFile
  • 第 72 至 128 行 :循环获取 消息位置信息
    • #checkInDiskByCommitOffset(...) :第 214 至 224 行。
    • #isTheBatchFull(...) :第 226 至 264 行。
    • 第 74 至 76 行 :读取每一个 消息位置信息
    • 第 79 至 83 行 :当 offsetPy 小于nextPhyFileStartOffset 时,意味着对 应的 Message 已经移除,所以直接continue,直到可读取的 Message
    • 第 84 至 90 行 :判断是否已经获得足够的消息。
  • 第 92 行 :判断消息是否符合条件。详细解析见:DefaultMessageFilter#isMessageMatched(...)。
  • 第 94 行 :从 CommitLog 获取对应 消息的MappedByteBuffer
  • 第 95 至 99 行 :获取 消息MappedByteBuffer 成功。
  • 第 100 至 106 行 :获取 消息MappedByteBuffer 失败。从 CommitLog 无法读取到消息,说明 该消息对应的文件(MappedFile) 已经删除,此时计算下一个MappedFile的起始位置。该逻辑需要配合(第 79 至 83 行)一起理解。
  • 第 117 至 120 行 :统计剩余可拉取消息字节数。
  • 第 123 行 :计算下次拉取消息的消息队列编号。
  • 第 124 至 128 行 :根据剩余可拉取消息字节数与内存判断是否建议读取从节点。
  • 第 130 行 :释放 bufferConsumeQueue 对 MappedFile 的指向。此处 MappedFile 是 ConsumeQueue 里的文件,不是 CommitLog 下的文件。
  • 第 133 至 136 行 :获得消费队列位置(offset) 获取 对应的MappedFile 为,计算ConsumeQueue 从 offset 开始的下一个 MappedFile 对应的位置。
  • 第 143 至 150 行 :记录统计信息:消耗时间、拉取到消息/未拉取到消息次数。
  • 第 151 至 156 行 :设置返回结果并返回。

DefaultMessageFilter#isMessageMatched(...)

  1public class DefaultMessageFilter implements MessageFilter {
  2
  3:     @Override
  4:     public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) {
  5:         // 消息tagsCode 空
  6:         if (tagsCode == null) {
  7:             return true;
  8:         }
  9:         // 订阅数据 空
 10:         if (null == subscriptionData) {
 11:             return true;
 12:         }
 13:         // classFilter
 14:         if (subscriptionData.isClassFilterMode())
 15:             return true;
 16:         // 订阅表达式 全匹配
 17:         if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
 18:             return true;
 19:         }
 20:         // 订阅数据code数组 是否包含 消息tagsCode
 21:         return subscriptionData.getCodeSet().contains(tagsCode.intValue());
 22:     }
 23
 24: }
  • 说明 :消息过滤器默认实现。

PullRequestHoldService

// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
  • PullRequestHoldService 说明 :拉取消息请求挂起维护线程服务。
    • 当拉取消息请求获得不了消息时,则会将请求进行挂起,添加到该服务。
    • 当有符合条件信息时 或 挂起超时时,重新执行获取消息逻辑。
  • #suspendPullRequest(...) 说明 :添加拉取消息挂起请求到集合( pullRequestTable )。
  • #run(...) 说明 :定时检查挂起请求是否有需要通知重新拉取消息并进行通知。
    • 第 65 至 70 行 :根据长轮训or短轮训设置不同的等待时间。
    • 第 71 至 77 行 :检查挂起请求是否有需要通知的。
  • #checkHoldRequest(...) 说明 :遍历挂起请求,检查是否有需要通知的。
  • #notifyMessageArriving(...) 说明 :检查指定队列是否有需要通知的请求。
    • 第 139 至 143 行 :如果 maxOffset 过小,重新获取一次最新的。
    • 第 144 至 155 行 :有新的匹配消息,唤醒请求,即再次拉取消息。
    • 第 156 至 165 行 :超过挂起时间,唤醒请求,即再次拉取消息。
    • 第 148 || 159 行 :唤醒请求,再次拉取消息。原先担心拉取消息时间过长,导致影响整个挂起请求的遍历,后面查看#executeRequestWhenWakeup(...),实际是丢到线程池进行一步的消息拉取,不会有性能上的问题。详细解析见:PullMessageProcessor#executeRequestWhenWakeup(...)。
    • 第 166 至 172 行 :不符合唤醒的请求重新添加到集合(pullRequestTable)。

PullMessageProcessor#executeRequestWhenWakeup(...)

  1public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException {
  2:     Runnable run = new Runnable() {
  3:         @Override
  4:         public void run() {
  5:             try {
  6:                 // 调用拉取请求。本次调用,设置不挂起请求。
  7:                 final RemotingCommand response = PullMessageProcessor.this.proce***equest(channel, request, false);
  8
  9:                 if (response != null) {
 10:                     response.setOpaque(request.getOpaque());
 11:                     response.markResponseType();
 12:                     try {
 13:                         channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
 14:                             @Override
 15:                             public void operationComplete(ChannelFuture future) throws Exception {
 16:                                 if (!future.isSuccess()) {
 17:                                     LOG.error("Proce***equestWrapper response to {} failed", future.channel().remoteAddress(), future.cause());
 18:                                     LOG.error(request.toString());
 19:                                     LOG.error(response.toString());
 20:                                 }
 21:                             }
 22:                         });
 23:                     } catch (Throwable e) {
 24:                         LOG.error("Proce***equestWrapper process request over, but response failed", e);
 25:                         LOG.error(request.toString());
 26:                         LOG.error(response.toString());
 27:                     }
 28:                 }
 29:             } catch (RemotingCommandException e1) {
 30:                 LOG.error("ExecuteRequestWhenWakeup run", e1);
 31:             }
 32:         }
 33:     };
 34:     // 提交拉取请求到线程池
 35:     this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
 36: }
  • 说明 :执行请求唤醒,即再次拉取消息。该方法调用线程池,因此,不会阻塞。
  • 第 7 行 :调用拉取消息请求。本次调用,设置即使请求不到消息,也不挂起请求。如果不设置,请求可能被无限挂起,被 Broker 无限循环。
  • 第 35 行 :提交拉取消息请求到线程池

5、Broker 提供[更新消费进度]接口

Yunai-MacdeMacBook-Pro-2:config yunai$ pwd
/Users/yunai/store/config
Yunai-MacdeMacBook-Pro-2:config yunai$ ls -ls
total 40
8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 consumerOffset.json
8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 consumerOffset.json.bak
8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 delayOffset.json
8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 delayOffset.json.bak
8 -rw-r--r--  1 yunai  staff  1401  4 27 21:51 topics.json
Yunai-MacdeMacBook-Pro-2:config yunai$ cat consumerOffset.json
{
 "offsetTable":{
  "%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
  },
  "TopicRead3@please_rename_unique_group_name_4":{1:5
  }
 }
}
  • consumerOffset.json :消费进度存储文件。
  • consumerOffset.json.bak :消费进度存储文件备份。
  • 每次写入 consumerOffset.json,将原内容备份到 consumerOffset.json.bak。实现见:MixAll#string2File(...)。

BrokerController#initialize(...)

  1:this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  2:    @Override
  3:    public void run() {
  4:        try {
  5:            BrokerController.this.consumerOffsetManager.persist();
  6:        } catch (Throwable e) {
  7:            log.error("schedule persist consumerOffset error.", e);
  8:        }
  9:    }
 10:}, 1000 * 10this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
  • 说明 :每 5s 执行一次持久化逻辑。

ConfigManager

// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅

MixAll#string2File(...)

// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读

ConsumerOffsetManager

// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
  • 说明 :消费进度管理器。

6、Broker 提供[发回消息]接口

大部分逻辑和 Broker 提供[接收消息]接口 类似,可以先看下相关内容。

SendMessageProcessor#consumerSendMsgBack(...)

// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 阅读
  • 说明 :当 Consumer 消费某条消息失败时,会调用该接口发回消息。Broker 会存储发回的消息。这样,下次 Consumer 拉取该消息,能够从 CommitLog 和 ConsumeQueue 顺序读取。
  • [x] 因为大多数逻辑和 Broker 接收普通消息 很相似,时候TODO 标记成独有的逻辑。
  • 第 4 至 7 行 :初始化响应。
  • [x] 第 9 至 20 行 :Hook逻辑。
  • [x] 第22 至 30 行 :判断消费分组是否存在。
  • 第 32 至 37 行 :检查 Broker 是否有写入权限。
  • [x] 第 39 至 44 行 :检查重试队列数是否大于0。
  • 第 47 行 :计算 retry topic。
  • [x] 第 50 行 :随机分配队列编号,依赖 retryQueueNums
  • [x] 第 52 至 56 行 :计算 sysFlag
  • 第 58 至 72 行 :获取 TopicConfig。如果不存在,则创建。
  • [x] 第 74 至 80 行 :查询消息。若不存在,返回异常错误。
  • [x] 第 82 至 86 行 :设置 retryTopic 到消息拓展属性。
  • [x] 第 89 行 :设置消息不等待存储完成。
    • 当 Broker 刷盘方式为同步,会导致同步落盘不能批量提交,这样会不会存在问题?有知道的同学麻烦告知下。

    • [x] 第 91 至 116 行 :处理 delayLevel 。
    • 第 118 至 131 行 :创建 MessageExtBrokerInner 。
    • [x] 第 133 至 135 行 :设置原始消息编号到拓展属性。
    • 第 137 至 161 行 :添加消息。

7、结尾

感谢同学们对本文的阅读、收藏、点赞。

如果解析存在问题或者表达误解的,表示抱歉。如果方便的话,可以一起沟通下。让我们来一场 1 :1 交流(搞基)。

再次表示十分感谢。


©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. RocketMQ 源码分析 —— 定时消息与消息重试
  2. 消息中间件 RocketMQ 源码解析 —— 调试环境搭建
  3. 分布式消息队列 RocketMQ源码解析:事务消息
  4. 分布式消息队列 RocketMQ源码解析:Filtersrv
  5. 分布式消息队列 RocketMQ 源码分析 —— 定时消息与消息重试
  6. 分布式消息队列 RocketMQ 源码分析 —— 高可用
  7. 分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)
  8. 分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)
  9. 贞炸了!上线之后,消息收不到了!

随机推荐

  1. linux_locale的设定中LANG、LC_ALL、LANG
  2. Linux文件权限查看及修改命令chmod
  3. fedora下vim的安装
  4. Centos7 安装部署 Python-3.6.1
  5. 在Linux中使用PAM进行身份验证为什么它可
  6. 删除目录软链接注意事项
  7. linux网卡实现高可用:team链路聚合
  8. Linux网络设备驱动架構學習(三)
  9. 一&#183;创建Linux服务器(基于阿里云)
  10. Linux中动态链接库总结