1. 概述

建议前置阅读内容:

  • 《RocketMQ 源码分析 —— Message 发送与接收》

  • 《RocketMQ 源码分析 —— Message 拉取与消费(下)》

为什么把定时消息消息重试放在一起?你猜。
 你猜我猜不猜。

2. 定时消息

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。

下图是定时消息的处理逻辑图:

2.1 延迟级别

RocketMQ 目前只支持固定精度的定时消息。官方说法如下:

如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。

  • 延迟级别:

延迟级别时间
11s
25s
310s
430s
51m
62m
73m
84m
95m
106m
117m
128m
139m
1410m
1520m
1630m
171h
182h

  • 核心源码如下:



  1: // ⬇️⬇️⬇️【MessageStoreConfig.java】

2: /**
3: * 消息延迟级别字符串配置
4: */
5: private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
6:
7: // 【ScheduleMessageService.java】
8: /**
9: * 解析延迟级别
10: *
11: * @return 是否解析成功
12: */
13: public boolean parseDelayLevel() {
14: HashMap<String, Long> timeUnitTable = new HashMap<>();
15: timeUnitTable.put("s", 1000L);
16: timeUnitTable.put("m", 1000L * 60);
17: timeUnitTable.put("h", 1000L * 60 * 60);
18: timeUnitTable.put("d", 1000L * 60 * 60 * 24);
19:
20: String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
21: try {
22: String levelArray = levelString.split(" ");
23: for (int i = 0; i < levelArray.length; i++) {
24: String value = levelArray[i];
25: String ch = value.substring(value.length() - 1);
26: Long tu = timeUnitTable.get(ch);
27:
28: int level = i + 1;
29: if (level > this.maxDelayLevel) {
30: this.maxDelayLevel = level;
31: }
32: long num = Long.parseLong(value.substring(0, value.length() - 1));
33: long delayTimeMillis = tu * num;
34: this.delayLevelTable.put(level, delayTimeMillis);
35: }
36: } catch (Exception e) {
37: log.error("parseDelayLevel exception", e);
38: log.info("levelString String = {}", levelString);
39: return false;
40: }
41:
42: return true;
43: }




2.2 Producer 发送定时消息



  • 发送时,设置消息的延迟级别




Message msg = new Message(...);msg.setDelayTimeLevel(level);

2.3 Broker 存储定时消息

  • 存储消息时,延迟消息进入 Topic 为 SCHEDULE_TOPIC_XXXX

  •  延迟级别 与 消息队列编号 做固定映射:QueueId = DelayLevel - 1

核心代码如下:

  1: // ⬇️⬇️⬇️【CommitLog.java】  2: /**  3:  * 添加消息,返回消息结果  4:  *  5:  * @param msg 消息  6:  * @return 结果  7:  */  8: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {  9:     // ....(省略代码)  10:  11:     // 定时消息处理 12:     final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); 13:     if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE// 14:         || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { 15:         // Delay Delivery 16:         if (msg.getDelayTimeLevel() > 0) { 17:             if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 18:                 msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); 19:             } 20:  21:             // 存储消息时,延迟消息进入 `Topic` 为 `SCHEDULE_TOPIC_XXXX` 。 22:             topic = ScheduleMessageService.SCHEDULE_TOPIC; 23:  24:             // 延迟级别 与 消息队列编号 做固定映射 25:             queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); 26:  27:             // Backup real topic, queueId 28:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); 29:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); 30:             msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); 31:  32:             msg.setTopic(topic); 33:             msg.setQueueId(queueId); 34:         } 35:     } 36:  37:     // ....(省略代码)  38: } 39:  40: // ⬇️⬇️⬇️【ScheduleMessageService.java】 41: /** 42:  * 根据 延迟级别 计算 消息队列编号 43:  * QueueId = DelayLevel - 1 44:  * 45:  * @param delayLevel 延迟级别 46:  * @return 消息队列编号 47:  */ 48: public static int delayLevel2QueueId(final int delayLevel) { 49:     return delayLevel - 1; 50: }

  • 生成 ConsumeQueue 时,每条消息的 tagsCode 使用【消息计划消费时间】。这样, ScheduleMessageService在轮询 ConsumeQueue 时,可以使用 tagsCode 进行过滤。

核心代码如下:

  1: // ⬇️⬇️⬇️【CommitLog.java】  2: /**  3:  * check the message and returns the message size  4:  *  5:  * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure  6:  */  7: public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {  8:     try {  9:         // // ....(省略代码) 10:  11:         // 17 properties 12:         short propertiesLength = byteBuffer.getShort(); 13:         if (propertiesLength > 0) { 14:             // ....(省略代码) 15:             String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS); 16:             if (tags != null && tags.length() > 0) { 17:                 tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags); 18:             } 19:  20:             // Timing message processing 21:             { 22:                 String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL); 23:                 if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) { 24:                     int delayLevel = Integer.parseInt(t); 25:  26:                     if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 27:                         delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel(); 28:                     } 29:  30:                     if (delayLevel > 0) { 31:                         tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, 32:                             storeTimestamp); 33:                     } 34:                 } 35:             } 36:         } 37:  38:         // ....(省略代码) 39:  40:         return new DispatchRequest(// 41:             topic, // 1 42:             queueId, // 2 43:             physicOffset, // 3 44:             totalSize, // 4 45:             tagsCode, // 5 46:             storeTimestamp, // 6 47:             queueOffset, // 7 48:             keys, // 8 49:             uniqKey, //9 50:             sysFlag, // 9 51:             preparedTransactionOffset// 10 52:         ); 53:     } catch (Exception e) { 54:     } 55:  56:     return new DispatchRequest(-1, false /* success */); 57: } 58:  59: // ⬇️⬇️⬇️【ScheduleMessageService.java】 60: /** 61:  * 计算 投递时间【计划消费时间】 62:  * 63:  * @param delayLevel 延迟级别 64:  * @param storeTimestamp 存储时间 65:  * @return 投递时间【计划消费时间】 66:  */ 67: public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) { 68:     Long time = this.delayLevelTable.get(delayLevel); 69:     if (time != null) { 70:         return time + storeTimestamp; 71:     } 72:  73:     return storeTimestamp + 1000; 74: }

2.4 Broker 发送定时消息

  •  对 SCHEDULE_TOPIC_XXXX 每条消费队列对应单独一个定时任务进行轮询,发送 到达投递时间【计划消费时间】 的消息。

下图是发送定时消息的处理逻辑图:

实现代码如下:

  1: /**  2:  * ⬇️⬇️⬇️ 发送(投递)延迟消息定时任务  3:  */  4: class DeliverDelayedMessageTimerTask extends TimerTask {  5:     /**  6:      * 延迟级别  7:      */  8:     private final int delayLevel;  9:     /** 10:      * 位置 11:      */ 12:     private final long offset; 13:  14:     public DeliverDelayedMessageTimerTask(int delayLevel, long offset) { 15:         this.delayLevel = delayLevel; 16:         this.offset = offset; 17:     } 18:  19:     @Override 20:     public void run() { 21:         try { 22:             this.executeOnTimeup(); 23:         } catch (Exception e) { 24:             // XXX: warn and notify me 25:             log.error("ScheduleMessageService, executeOnTimeup exception", e); 26:             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( 27:                 this.delayLevel, this.offset), DELAY_FOR_A_PERIOD); 28:         } 29:     } 30:  31:     /** 32:      * 纠正可投递时间。 33:      * 因为发送级别对应的发送间隔可以调整,如果超过当前间隔,则修正成当前配置,避免后面的消息无法发送。 34:      * 35:      * @param now 当前时间 36:      * @param deliverTimestamp 投递时间 37:      * @return 纠正结果 38:      */ 39:     private long correctDeliverTimestamp(final long now, final long deliverTimestamp) { 40:         long result = deliverTimestamp; 41:  42:         long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel); 43:         if (deliverTimestamp > maxTimestamp) { 44:             result = now; 45:         } 46:  47:         return result; 48:     } 49:  50:     public void executeOnTimeup() { 51:         ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,  delayLevel2QueueId(delayLevel)); 52:  53:         long failScheduleOffset = offset; 54:  55:         if (cq != null) { 56:             SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); 57:             if (bufferCQ != null) { 58:                 try { 59:                     long nextOffset = offset; 60:                     int i = 0; 61:                     for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { 62:                         long offsetPy = bufferCQ.getByteBuffer().getLong(); 63:                         int sizePy = bufferCQ.getByteBuffer().getInt(); 64:                         long tagsCode = bufferCQ.getByteBuffer().getLong(); 65:  66:                         long now = System.currentTimeMillis(); 67:                         long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); 68:  69:                         nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); 70:  71:                         long countdown = deliverTimestamp - now; 72:  73:                         if (countdown <= 0) { // 消息到达可发送时间 74:                             MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); 75:                             if (msgExt != null) { 76:                                 try { 77:                                     // 发送消息 78:                                     MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); 79:                                     PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner); 80:                                     if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 发送成功 81:                                         continue; 82:                                     } else { // 发送失败 83:                                         // XXX: warn and notify me 84:                                         log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId()); 85:  86:                                         // 安排下一次任务 87:                                         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD); 88:  89:                                         // 更新进度 90:                                         ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); 91:                                         return; 92:                                     } 93:                                 } catch (Exception e) { 94:                                     // XXX: warn and notify me 95:                                     log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" 96:                                             + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e); 97:                                 } 98:                             } 99:                         } else {100:                             // 安排下一次任务101:                             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);102: 103:                             // 更新进度104:                             ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);105:                             return;106:                         }107:                     } // end of for108: 109:                     nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);110: 111:                     // 安排下一次任务112:                     ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);113: 114:                     // 更新进度115:                     ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);116:                     return;117:                 } finally {118:                     bufferCQ.release();119:                 }120:             } // end of if (bufferCQ != null)121:             else { // 消费队列已经被删除部分,跳转到最小的消费进度122:                 long cqMinOffset = cq.getMinOffsetInQueue();123:                 if (offset < cqMinOffset) {124:                     failScheduleOffset = cqMinOffset;125:                     log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="126:                         + cqMinOffset + ", queueId=" + cq.getQueueId());127:                 }128:             }129:         } // end of if (cq != null)130: 131:         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);132:     }133: 134:     /**135:      * 设置消息内容136:      *137:      * @param msgExt 消息138:      * @return 消息139:      */140:     private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {141:         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();142:         msgInner.setBody(msgExt.getBody());143:         msgInner.setFlag(msgExt.getFlag());144:         MessageAccessor.setProperties(msgInner, msgExt.getProperties());145: 146:         TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());147:         long tagsCodeValue =148:             MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());149:         msgInner.setTagsCode(tagsCodeValue);150:         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));151: 152:         msgInner.setSysFlag(msgExt.getSysFlag());153:         msgInner.setBornTimestamp(msgExt.getBornTimestamp());154:         msgInner.setBornHost(msgExt.getBornHost());155:         msgInner.setStoreHost(msgExt.getStoreHost());156:         msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());157: 158:         msgInner.setWaitStoreMsgOK(false);159:         MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);160: 161:         msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));162: 163:         String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);164:         int queueId = Integer.parseInt(queueIdStr);165:         msgInner.setQueueId(queueId);166: 167:         return msgInner;168:     }169: }

2.5 Broker 持久化定时发送进度

  •  定时消息发送进度存储在文件( ../config/delayOffset.json)里

  •  每 10s 定时持久化发送进度。

核心代码如下:

  1: // ⬇️⬇️⬇️【ScheduleMessageService.java】  2: /**  3: public void start() {  4:     // 定时发送消息  5:     for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {  6:         Integer level = entry.getKey();  7:         Long timeDelay = entry.getValue();  8:         Long offset = this.offsetTable.get(level);  9:         if (null == offset) { 10:             offset = 0L; 11:         } 12:  13:         if (timeDelay != null) { 14:             this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); 15:         } 16:     } 17:  18:     // 定时持久化发送进度 19:     this.timer.scheduleAtFixedRate(new TimerTask() { 20:  21:         @Override 22:         public void run() { 23:             try { 24:                 ScheduleMessageService.this.persist(); 25:             } catch (Exception e) { 26:                 log.error("scheduleAtFixedRate flush exception", e); 27:             } 28:         } 29:     }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); 30: }

3. 消息重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。

  •  Consumer 将消费失败的消息发回 Broker,进入延迟消息队列。即,消费失败的消息,不会立即消费。

核心代码如下:

  1: // ⬇️⬇️⬇️【SendMessageProcessor.java】  2: /**  3:  * 消费者发回消息  4:  *  5:  * @param ctx ctx  6:  * @param request 请求  7:  * @return 响应  8:  * @throws RemotingCommandException 当远程调用异常  9:  */ 10: private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) 11:     throws RemotingCommandException { 12:     // ....(省略代码) 13:     // 处理 delayLevel(独有)。 14:     int delayLevel = requestHeader.getDelayLevel(); 15:     int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); 16:     if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { 17:         maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); 18:     } 19:     if (msgExt.getReconsumeTimes() >= maxReconsumeTimes// 20:     // ....(省略代码) 21:     } else { 22:         if (0 == delayLevel) { 23:             delayLevel = 3 + msgExt.getReconsumeTimes(); 24:         } 25:         msgExt.setDelayTimeLevel(delayLevel); 26:     } 27:  28:     // ....(省略代码) 29:     return response; 30: }


©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 理解算法的时间复杂度[每日前端夜话0x82]
  2. 分布式消息队列 RocketMQ 源码分析 —— 高可用
  3. 分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)
  4. 分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)
  5. 贞炸了!上线之后,消息收不到了!
  6. PMP备考复盘,可供学习时间有限的同学参考
  7. AngularJS 日期时间选择组件(附详细使用方法)
  8. GMT UTC CST ISO 夏令时 时间戳,都是些什么鬼?
  9. 美团面试官:生成订单后一段时间不支付订单会自动关闭的功能该如何

随机推荐

  1. 黑科技神器-uTools
  2. Drawable、Bitmap、Canvas和Paint的区别
  3. 栈的应用(运算问题)
  4. 性能优化技巧:外键序号化
  5. RocketMQ 源码分析 —— Message 顺序发
  6. 芋道 Spring Boot SpringMVC 入门
  7. 芋道 Spring Boot 参数校验 Validation
  8. 程序员写简历时必须注意的技术词汇拼写(持
  9. 推荐几个炫酷的Python开源项目
  10. HTML5 WebGL 水波荡漾动画,超逼真