1、概述

本章主要解析 消费 逻辑涉及到的源码。 因为篇幅较长,分成上下两篇:

  1. 上篇: Broker 相关源码。

  2. 下篇: Consumer 相关源码。

本文即是上篇。


ok,先看第一张关于消费逻辑的图:

再看消费逻辑精简的顺序图(实际情况会略有差别):

2、ConsumeQueue 结构

ConsumeQueueMappedFileQueueMappedFile 的关系如下:

ConsumeQueue : MappedFileQueue : MappedFile = 1 : 1 : N。

反应到系统文件如下:

Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ pwd/Users/yunai/store/consumequeueYunai-MacdeMacBook-Pro-2:consumequeue yunai$ cd TopicRead3/Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ ls -lstotal 00 drwxr-xr-x  3 yunai  staff  102  4 27 21:52 00 drwxr-xr-x  3 yunai  staff  102  4 27 21:55 10 drwxr-xr-x  3 yunai  staff  102  4 27 21:55 20 drwxr-xr-x  3 yunai  staff  102  4 27 21:55 3Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ cd 0/Yunai-MacdeMacBook-Pro-2:0 yunai$ ls -lstotal 1172011720 -rw-r--r--  1 yunai  staff  6000000  4 27 21:55 00000000000000000000

ConsumeQueueMappedFileQueueMappedFile 的定义如下:

  • MappedFile :00000000000000000000等文件。

  • MappedFileQueue : MappedFile 所在的文件夹,对 MappedFile 进行封装成文件队列,对上层提供可无限使用的文件容量。

    • 每个 MappedFile 统一文件大小。

    • 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在 ConsumeQueue 里默认为 6000000B。

  • ConsumeQueue :针对 MappedFileQueue 的封装使用。

    • Store:ConsumeQueue=ConcurrentHashMap<String/* topic */,ConcurrentHashMap<Integer/* queueId */,ConsumeQueue>>

ConsumeQueue 存储在 MappedFile 的内容必须大小是 20B( ConsumeQueue.CQ_STORE_UNIT_SIZE ),有两种内容类型:

  1. MESSAGE_POSITION_INFO :消息位置信息。

  2. BLANK : 文件前置空白占位。当历史 Message 被删除时,需要用 BLANK占位被删除的消息。

MESSAGE_POSITION_INFOConsumeQueue 存储结构:

第几位字段说明数据类型字节数
1offset消息 CommitLog 存储位置Long8
2size消息长度Int4
3tagsCode消息tagsCodeLong8

BLANKConsumeQueue 存储结构:

第几位字段说明数据类型字节数
1
0Long8
2
Integer.MAX_VALUEInt4
3
0Long8

3、ConsumeQueue 存储

主要有两个组件:

  • ReputMessageService :write ConsumeQueue。

  • FlushConsumeQueueService :flush ConsumeQueue。

ReputMessageService

  1: class ReputMessageService extends ServiceThread {  2:   3:     /**  4:      * 开始重放消息的CommitLog物理位置  5:      */  6:     private volatile long reputFromOffset = 0;  7:   8:     public long getReputFromOffset() {  9:         return reputFromOffset; 10:     } 11:  12:     public void setReputFromOffset(long reputFromOffset) { 13:         this.reputFromOffset = reputFromOffset; 14:     } 15:  16:     @Override 17:     public void shutdown() { 18:         for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) { 19:             try { 20:                 Thread.sleep(100); 21:             } catch (InterruptedException ignored) { 22:             } 23:         } 24:  25:         if (this.isCommitLogAvailable()) { 26:             log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}", 27:                 DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset); 28:         } 29:  30:         super.shutdown(); 31:     } 32:  33:     /** 34:      * 剩余需要重放消息字节数 35:      * 36:      * @return 字节数 37:      */ 38:     public long behind() { 39:         return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset; 40:     } 41:  42:     /** 43:      * 是否commitLog需要重放消息 44:      * 45:      * @return 是否 46:      */ 47:     private boolean isCommitLogAvailable() { 48:         return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset(); 49:     } 50:  51:     private void doReput() { 52:         for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { 53:  54:             // TODO 疑问:这个是啥 55:             if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() // 56:                 && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { 57:                 break; 58:             } 59:  60:             // 获取从reputFromOffset开始的commitLog对应的MappeFile对应的MappedByteBuffer 61:             SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); 62:             if (result != null) { 63:                 try { 64:                     this.reputFromOffset = result.getStartOffset(); 65:  66:                     // 遍历MappedByteBuffer 67:                     for (int readSize = 0; readSize < result.getSize() && doNext; ) { 68:                         // 生成重放消息重放调度请求 69:                         DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); 70:                         int size = dispatchRequest.getMsgSize(); // 消息长度 71:                         // 根据请求的结果处理 72:                         if (dispatchRequest.isSuccess()) { // 读取成功 73:                             if (size > 0) { // 读取Message 74:                                 DefaultMessageStore.this.doDispatch(dispatchRequest); 75:                                 // 通知有新消息 76:                                 if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() 77:                                     && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { 78:                                     DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), 79:                                         dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, 80:                                         dispatchRequest.getTagsCode()); 81:                                 } 82:                                 // FIXED BUG By shijia 83:                                 this.reputFromOffset += size; 84:                                 readSize += size; 85:                                 // 统计 86:                                 if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { 87:                                     DefaultMessageStore.this.storeStatsService 88:                                         .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); 89:                                     DefaultMessageStore.this.storeStatsService 90:                                         .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) 91:                                         .addAndGet(dispatchRequest.getMsgSize()); 92:                                 } 93:                             } else if (size == 0) { // 读取到MappedFile文件尾 94:                                 this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); 95:                                 readSize = result.getSize(); 96:                             } 97:                         } else if (!dispatchRequest.isSuccess()) { // 读取失败 98:                             if (size > 0) { // 读取到Message却不是Message 99:                                 log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);100:                                 this.reputFromOffset += size;101:                             } else { // 读取到Blank却不是Blank102:                                 doNext = false;103:                                 if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {104:                                     log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",105:                                         this.reputFromOffset);106: 107:                                     this.reputFromOffset += result.getSize() - readSize;108:                                 }109:                             }110:                         }111:                     }112:                 } finally {113:                     result.release();114:                 }115:             } else {116:                 doNext = false;117:             }118:         }119:     }120: 121:     @Override122:     public void run() {123:         DefaultMessageStore.log.info(this.getServiceName() + " service started");124: 125:         while (!this.isStopped()) {126:             try {127:                 Thread.sleep(1);128:                 this.doReput();129:             } catch (Exception e) {130:                 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);131:             }132:         }133: 134:         DefaultMessageStore.log.info(this.getServiceName() + " service end");135:     }136: 137:     @Override138:     public String getServiceName() {139:         return ReputMessageService.class.getSimpleName();140:     }141: 142: }
  • 说明:重放消息线程服务。

    • 该服务不断生成 消息位置信息 到 消费队列(ConsumeQueue)

    • 该服务不断生成 消息索引 到 索引文件(IndexFile)

    • 第 75 至 81 行 :当 Broker 是主节点 && Broker 开启的是长轮询,通知消费队列有新的消息。 NotifyMessageArrivingListener 会 调用 PullRequestHoldService#notifyMessageArriving(...)方法,详细解析见:PullRequestHoldService

    • 第 61 行 :获取 reputFromOffset 开始的 CommitLog 对应的 MappedFile 对应的 MappedByteBuffer

    • 第 67 行 :遍历 MappedByteBuffer

    • 第 69 行 :生成重放消息重放调度请求 ( DispatchRequest) 。请求里主要包含一条消息 ( Message) 或者 文件尾 ( BLANK) 的基本信息。

    • 第 72 至 96 行 :请求是有效请求,进行逻辑处理。

    • 第 73 至 92 行 :请求对应的是 Message,进行调度,生成 ConsumeQueue 和 IndexFile 对应的内容。详细解析见:

    • 第 93 至 96 行 :请求对应的是 Blank,即文件尾,跳转指向下一个 MappedFile

    • 第 97 至 110 行 :请求是无效请求。出现该情况,基本是一个BUG

  • 第 127 至 128 行 :每 1ms 循环执行重放逻辑。

  • 第 18 至 30 行 : shutdown时,多次 sleep(100) 直到 CommitLog 回放到最新位置。恩,如果未回放完,会输出警告日志。

DefaultMessageStore#doDispatch(...)

  1: /**  2:  * 执行调度请求  3:  * 1. 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue  4:  * 2. 建立 索引信息 到 IndexFile  5:  *  6:  * @param req 调度请求  7:  */  8: public void doDispatch(DispatchRequest req) {  9:     // 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue 10:     final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag()); 11:     switch (tranType) { 12:         case MessageSysFlag.TRANSACTION_NOT_TYPE: 13:         case MessageSysFlag.TRANSACTION_COMMIT_TYPE: 14:             DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(), 15:                 req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset()); 16:             break; 17:         case MessageSysFlag.TRANSACTION_PREPARED_TYPE: 18:         case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: 19:             break; 20:     } 21:     // 建立 索引信息 到 IndexFile 22:     if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) { 23:         DefaultMessageStore.this.indexService.buildIndex(req); 24:     } 25: } 26:  27: /** 28:  * 建立 消息位置信息 到 ConsumeQueue 29:  * 30:  * @param topic 主题 31:  * @param queueId 队列编号 32:  * @param offset commitLog存储位置 33:  * @param size 消息长度 34:  * @param tagsCode 消息tagsCode 35:  * @param storeTimestamp 存储时间 36:  * @param logicOffset 队列位置 37:  */ 38: public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp, 39:     long logicOffset) { 40:     ConsumeQueue cq = this.findConsumeQueue(topic, queueId); 41:     cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset); 42: }

ConsumeQueue#putMessagePositionInfoWrapper(...)

  1: /**  2:  * 添加位置信息封装  3:  *  4:  * @param offset commitLog存储位置  5:  * @param size 消息长度  6:  * @param tagsCode 消息tagsCode  7:  * @param storeTimestamp 消息存储时间  8:  * @param logicOffset 队列位置  9:  */ 10: public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp, 11:     long logicOffset) { 12:     final int maxRetries = 30; 13:     boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable(); 14:     // 多次循环写,直到成功 15:     for (int i = 0; i < maxRetries && canWrite; i++) { 16:         // 调用添加位置信息 17:         boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset); 18:         if (result) { 19:             // 添加成功,使用消息存储时间 作为 存储check point。 20:             this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp); 21:             return; 22:         } else { 23:             // XXX: warn and notify me 24:             log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset 25:                 + " failed, retry " + i + " times"); 26:  27:             try { 28:                 Thread.sleep(1000); 29:             } catch (InterruptedException e) { 30:                 log.warn("", e); 31:             } 32:         } 33:     } 34:  35:     // XXX: warn and notify me 设置异常不可写入 36:     log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId); 37:     this.defaultMessageStore.getRunningFlags().makeLogicsQueueError(); 38: } 39:  40: /** 41:  * 添加位置信息,并返回添加是否成功 42:  * 43:  * @param offset commitLog存储位置 44:  * @param size 消息长度 45:  * @param tagsCode 消息tagsCode 46:  * @param cqOffset 队列位置 47:  * @return 是否成功 48:  */ 49: private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, 50:     final long cqOffset) { 51:     // 如果已经重放过,直接返回成功 52:     if (offset <= this.maxPhysicOffset) { 53:         return true; 54:     } 55:     // 写入位置信息到byteBuffer 56:     this.byteBufferIndex.flip(); 57:     this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); 58:     this.byteBufferIndex.putLong(offset); 59:     this.byteBufferIndex.putInt(size); 60:     this.byteBufferIndex.putLong(tagsCode); 61:     // 计算consumeQueue存储位置,并获得对应的MappedFile 62:     final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE; 63:     MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); 64:     if (mappedFile != null) { 65:         // 当是ConsumeQueue第一个MappedFile && 队列位置非第一个 && MappedFile未写入内容,则填充前置空白占位 66:         if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { // TODO 疑问:为啥这个操作。目前能够想象到的是,一些老的消息很久没发送,突然发送,这个时候刚好满足。 67:             this.minLogicOffset = expectLogicOffset; 68:             this.mappedFileQueue.setFlushedWhere(expectLogicOffset); 69:             this.mappedFileQueue.setCommittedWhere(expectLogicOffset); 70:             this.fillPreBlank(mappedFile, expectLogicOffset); 71:             log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " " 72:                 + mappedFile.getWrotePosition()); 73:         } 74:         // 校验consumeQueue存储位置是否合法。TODO 如果不合法,继续写入会不会有问题? 75:         if (cqOffset != 0) { 76:             long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); 77:             if (expectLogicOffset != currentLogicOffset) { 78:                 LOG_ERROR.warn( 79:                     "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", 80:                     expectLogicOffset, 81:                     currentLogicOffset, 82:                     this.topic, 83:                     this.queueId, 84:                     expectLogicOffset - currentLogicOffset 85:                 ); 86:             } 87:         } 88:         // 设置commitLog重放消息到ConsumeQueue位置。 89:         this.maxPhysicOffset = offset; 90:         // 插入mappedFile 91:         return mappedFile.appendMessage(this.byteBufferIndex.array()); 92:     } 93:     return false; 94: } 95:  96: /** 97:  * 填充前置空白占位 98:  * 99:  * @param mappedFile MappedFile100:  * @param untilWhere consumeQueue存储位置101:  */102: private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {103:     // 写入前置空白占位到byteBuffer104:     ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);105:     byteBuffer.putLong(0L);106:     byteBuffer.putInt(Integer.MAX_VALUE);107:     byteBuffer.putLong(0L);108:     // 循环填空109:     int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());110:     for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {111:         mappedFile.appendMessage(byteBuffer.array());112:     }113: }
  • #putMessagePositionInfoWrapper(...) 说明 :添加位置信息到 ConsumeQueue 的封装,实际需要调用 #putMessagePositionInfo(...) 方法。

    • 第 13 行 :判断 ConsumeQueue 是否允许写入。当发生Bug时,不允许写入。

    • 第 17 行 :调用 #putMessagePositionInfo(...) 方法,添加位置信息。

    • 第 18 至 21 行 :添加成功,使用消息存储时间 作为 存储检查点。 StoreCheckpoint 的详细解析见:Store初始化与关闭。

    • 第 22 至 32 行 :添加失败,目前基本可以认为是BUG。

    • 第 35 至 37 行 :写入失败时,标记 ConsumeQueue 写入异常,不允许继续写入。

  • #putMessagePositionInfo(...) 说明 :添加位置信息到 ConsumeQueue,并返回添加是否成功。

    • 这块比较有疑问,如果计算出来的存储位置不合法,不返回添加失败,继续进行添加位置信息,会不会有问题???

    • 第 51 至 54 行 :如果 offset(存储位置) 小于等于 maxPhysicOffsetCommitLog 消息重放到 ConsumeQueue 最大的 CommitLog 存储位置),表示已经重放过,此时,不再重复写入,直接返回写入成功。

    • 第 55 至 60 行 :写 位置信息到byteBuffer。

    • 第 62 至 63 行 :计算 ConsumeQueue存储位置,并获得对应的MappedFile。

    • 第 65 至 73 行 :当 MappedFile 是 ConsumeQueue 当前第一个文件 && MappedFile 未写入内容 && 重放消息队列位置大于0,则需要进行 MappedFile 填充前置 BLANK

    • 这块比较有疑问,什么场景下会需要。猜测产生的原因:一个 Topic 长期无消息产生,突然N天后进行发送, Topic 对应的历史消息以及和消费队列数据已经被清理,新生成的 MappedFile需要前置占位。

    • 第 74 至 87 行 :校验 ConsumeQueue 存储位置是否合法,不合法则输出日志。

    • 第 89 行 :设置 CommitLog 重放消息到 ConsumeQueue 的最大位置。

    • 第 91 行 :插入消息位置到 MappedFile

FlushConsumeQueueService

  1: class FlushConsumeQueueService extends ServiceThread {  2:     private static final int RETRY_TIMES_OVER = 3;  3:     /**  4:      * 最后flush时间戳  5:      */  6:     private long lastFlushTimestamp = 0;  7:   8:     private void doFlush(int retryTimes) {  9:         int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages(); 10:  11:         // retryTimes == RETRY_TIMES_OVER时,进行强制flush。主要用于shutdown时。 12:         if (retryTimes == RETRY_TIMES_OVER) { 13:             flushConsumeQueueLeastPages = 0; 14:         } 15:         // 当时间满足flushConsumeQueueThoroughInterval时,即使写入的数量不足flushConsumeQueueLeastPages,也进行flush 16:         long logicsMsgTimestamp = 0; 17:         int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval(); 18:         long currentTimeMillis = System.currentTimeMillis(); 19:         if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) { 20:             this.lastFlushTimestamp = currentTimeMillis; 21:             flushConsumeQueueLeastPages = 0; 22:             logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp(); 23:         } 24:         // flush消费队列 25:         ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; 26:         for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) { 27:             for (ConsumeQueue cq : maps.values()) { 28:                 boolean result = false; 29:                 for (int i = 0; i < retryTimes && !result; i++) { 30:                     result = cq.flush(flushConsumeQueueLeastPages); 31:                 } 32:             } 33:         } 34:         // flush 存储 check point 35:         if (0 == flushConsumeQueueLeastPages) { 36:             if (logicsMsgTimestamp > 0) { 37:                 DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp); 38:             } 39:             DefaultMessageStore.this.getStoreCheckpoint().flush(); 40:         } 41:     } 42:  43:     public void run() { 44:         DefaultMessageStore.log.info(this.getServiceName() + " service started"); 45:  46:         while (!this.isStopped()) { 47:             try { 48:                 int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue(); 49:                 this.waitForRunning(interval); 50:                 this.doFlush(1); 51:             } catch (Exception e) { 52:                 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); 53:             } 54:         } 55:  56:         this.doFlush(RETRY_TIMES_OVER); 57:  58:         DefaultMessageStore.log.info(this.getServiceName() + " service end"); 59:     } 60:  61:     @Override 62:     public String getServiceName() { 63:         return FlushConsumeQueueService.class.getSimpleName(); 64:     } 65:  66:     @Override 67:     public long getJointime() { 68:         return 1000 * 60; 69:     } 70: }
  • 说明 :flush ConsumeQueue(消费队列) 线程服务。

  • 第 11 至 14 行 :当 retryTimes==RETRY_TIMES_OVER 时,进行强制flush。用于 shutdown 时。

  • 第 15 至 23 行 :每 flushConsumeQueueThoroughInterval 周期,执行一次 flush 。因为不是每次循环到都能满足 flushConsumeQueueLeastPages 大小,因此,需要一定周期进行一次强制 flush 。当然,不能每次循环都去执行强制 flush,这样性能较差。

  • 第 24 至 33 行 :flush ConsumeQueue(消费队列)。

    • flush 逻辑:MappedFile#落盘。

  • 第 34 至 40 行 :flush StoreCheckpoint。 StoreCheckpoint 的详细解析见:Store初始化与关闭。

  • 第 43 至 59 行 :每 1000ms 执行一次 flush。如果 wakeup() 时,则会立即进行一次 flush。目前,暂时不存在 wakeup() 的调用。

4、Broker 提供[拉取消息]接口

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/

5、Broker 提供[更新消费进度]接口

Yunai-MacdeMacBook-Pro-2:config yunai$ pwd/Users/yunai/store/configYunai-MacdeMacBook-Pro-2:config yunai$ ls -lstotal 408 -rw-r--r--  1 yunai  staff    21  4 28 16:58 consumerOffset.json8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 consumerOffset.json.bak8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 delayOffset.json8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 delayOffset.json.bak8 -rw-r--r--  1 yunai  staff  1401  4 27 21:51 topics.jsonYunai-MacdeMacBook-Pro-2:config yunai$ cat consumerOffset.json{    "offsetTable":{        "%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0        },        "TopicRead3@please_rename_unique_group_name_4":{1:5        }    }}
  • consumerOffset.json :消费进度存储文件。

  • consumerOffset.json.bak :消费进度存储文件备份。

  • 每次写入 consumerOffset.json,将原内容备份到 consumerOffset.json.bak。实现见:MixAll#string2File(...)。

BrokerController#initialize(...)

  1:this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  2:    @Override  3:    public void run() {  4:        try {  5:            BrokerController.this.consumerOffsetManager.persist();  6:        } catch (Throwable e) {  7:            log.error("schedule persist consumerOffset error.", e);  8:        }  9:    } 10:}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
  • 说明 :每 5s 执行一次持久化逻辑。

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/

6、Broker 提供[发回消息]接口

大部分逻辑和 Broker 提供[接收消息]接口 类似,可以先看下相关内容。

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/

7、结尾

感谢同学们对本文的阅读、收藏、点赞。


©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)
  2. 贞炸了!上线之后,消息收不到了!
  3. 利用深度学习识别滑动验证码缺口位置
  4. 超级好用的RabbitMQ 消息 100% 投递的解决方案!
  5. 毫秒时间戳标识消息导致数据丢失的问题排查
  6. 如何保证消息队列的高可用?
  7. 如何基于Hyperf实现RabbitMQ+WebSocket消息推送
  8. PHP 消息队列 Kafka 使用
  9. 基于 Hyperf + RabbitMQ + WebSocket 实现消息推送

随机推荐

  1. 如何将curl获取到的json对象转成数组
  2. php如何修改数组的值?
  3. PHP在图片中用 imagettftext() 添加水印(
  4. 如何解决php中curl传递数据太慢
  5. php如何设置权限?
  6. php代码如何在html文件里面执行(详解)
  7. 如何解决php中curl_init()函数不可用
  8. php中如何进行小写转换?
  9. PHP以正则表达式验证手机号码
  10. 如何实现php中date只显示年月日