摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-schedule-and-retry/ 「芋道源码」欢迎转载,保留摘要,谢谢!

  • 1. 概述
  • 2. 定时消息
    • 2.1 延迟级别
    • 2.2 Producer 发送定时消息
    • 2.3 Broker 存储定时消息
    • 2.4 Broker 发送定时消息
    • 2.5 Broker 持久化定时发送进度
  • 3. 消息重试

阅读源码最好的方式,是使用 IDEA 进行调试 RocketMQ 源码,不然会一脸懵逼。

胖友可以点击「芋道源码」扫码关注,回复 git001 关键字
获得艿艿添加了中文注释的 RocketMQ 源码地址。

阅读源码很孤单,加入源码交流群,一起坚持!

1. 概述

建议前置阅读内容:

  • 《RocketMQ 源码分析 —— Message 发送与接收》
  • 《RocketMQ 源码分析 —— Message 拉取与消费(下)》

为什么把定时消息消息重试放在一起?你猜。
你猜我猜不猜。

2. 定时消息

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。

下图是定时消息的处理逻辑图:

定时消息逻辑图.png

2.1 延迟级别

RocketMQ 目前只支持固定精度的定时消息。官方说法如下:

如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。

  • 延迟级别:
延迟级别时间
11s
25s
310s
430s
51m
62m
73m
84m
95m
106m
117m
128m
139m
1410m
1520m
1630m
171h
182h
  • 核心源码如下:

      1// ⬇️⬇️⬇️【MessageStoreConfig.java】
      2/**
      3:  * 消息延迟级别字符串配置
      4:  */

      5private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
      6
      7// ⬇️⬇️⬇️【ScheduleMessageService.java】
      8/**
      9:  * 解析延迟级别
     10:  *
     11:  * @return 是否解析成功
     12:  */

     13public boolean parseDelayLevel() {
     14:     HashMap<String, Long> timeUnitTable = new HashMap<>();
     15:     timeUnitTable.put("s"1000L);
     16:     timeUnitTable.put("m"1000L * 60);
     17:     timeUnitTable.put("h"1000L * 60 * 60);
     18:     timeUnitTable.put("d"1000L * 60 * 60 * 24);
     19
     20:     String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
     21:     try {
     22:         String[] levelArray = levelString.split(" ");
     23:         for (int i = 0; i < levelArray.length; i++) {
     24:             String value = levelArray[i];
     25:             String ch = value.substring(value.length() - 1);
     26:             Long tu = timeUnitTable.get(ch);
     27
     28:             int level = i + 1;
     29:             if (level > this.maxDelayLevel) {
     30:                 this.maxDelayLevel = level;
     31:             }
     32:             long num = Long.parseLong(value.substring(0, value.length() - 1));
     33:             long delayTimeMillis = tu * num;
     34:             this.delayLevelTable.put(level, delayTimeMillis);
     35:         }
     36:     } catch (Exception e) {
     37:         log.error("parseDelayLevel exception", e);
     38:         log.info("levelString String = {}", levelString);
     39:         return false;
     40:     }
     41
     42:     return true;
     43: }

2.2 Producer 发送定时消息

  • 发送时,设置消息的延迟级别
Message msg = new Message(...);
msg.setDelayTimeLevel(level);

2.3 Broker 存储定时消息

  • 存储消息时,延迟消息进入 Topic 为SCHEDULE_TOPIC_XXXX
  • 延迟级别 与 消息队列编号 做固定映射:QueueId = DelayLevel - 1

核心代码如下:

  1// ⬇️⬇️⬇️【CommitLog.java】
  2/**
  3:  * 添加消息,返回消息结果
  4:  *
  5:  * @param msg 消息
  6:  * @return 结果
  7:  */

  8public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
  9:     // ....(省略代码) 
 10
 11:     // 定时消息处理
 12:     final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
 13:     if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
 14:         || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
 15:         // Delay Delivery
 16:         if (msg.getDelayTimeLevel() > 0) {
 17:             if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
 18:                 msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
 19:             }
 20
 21:             // 存储消息时,延迟消息进入 `Topic` 为 `SCHEDULE_TOPIC_XXXX` 。
 22:             topic = ScheduleMessageService.SCHEDULE_TOPIC;
 23
 24:             // 延迟级别 与 消息队列编号 做固定映射
 25:             queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
 26
 27:             // Backup real topic, queueId
 28:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
 29:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
 30:             msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
 31
 32:             msg.setTopic(topic);
 33:             msg.setQueueId(queueId);
 34:         }
 35:     }
 36
 37:     // ....(省略代码) 
 38: }
 39
 40// ⬇️⬇️⬇️【ScheduleMessageService.java】
 41/**
 42:  * 根据 延迟级别 计算 消息队列编号
 43:  * QueueId = DelayLevel - 1
 44:  *
 45:  * @param delayLevel 延迟级别
 46:  * @return 消息队列编号
 47:  */

 48public static int delayLevel2QueueId(final int delayLevel) {
 49:     return delayLevel - 1;
 50: }

  •  生成 ConsumeQueue 时,每条消息的 tagsCode 使用【消息计划消费时间】。这样,ScheduleMessageService 在轮询 ConsumeQueue 时,可以使用 tagsCode 进行过滤。

核心代码如下:

  1// ⬇️⬇️⬇️【CommitLog.java】
  2/**
  3:  * check the message and returns the message size
  4:  *
  5:  * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
  6:  */

  7public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {
  8:     try {
  9:         // // ....(省略代码)
 10
 11:         // 17 properties
 12:         short propertiesLength = byteBuffer.getShort();
 13:         if (propertiesLength > 0) {
 14:             // ....(省略代码)
 15:             String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
 16:             if (tags != null && tags.length() > 0) {
 17:                 tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
 18:             }
 19
 20:             // Timing message processing
 21:             {
 22:                 String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
 23:                 if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
 24:                     int delayLevel = Integer.parseInt(t);
 25
 26:                     if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
 27:                         delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
 28:                     }
 29
 30:                     if (delayLevel > 0) {
 31:                         tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
 32:                             storeTimestamp);
 33:                     }
 34:                 }
 35:             }
 36:         }
 37
 38:         // ....(省略代码)
 39
 40:         return new DispatchRequest(//
 41:             topic, // 1
 42:             queueId, // 2
 43:             physicOffset, // 3
 44:             totalSize, // 4
 45:             tagsCode, // 5
 46:             storeTimestamp, // 6
 47:             queueOffset, // 7
 48:             keys, // 8
 49:             uniqKey, //9
 50:             sysFlag, // 9
 51:             preparedTransactionOffset// 10
 52:         );
 53:     } catch (Exception e) {
 54:     }
 55
 56:     return new DispatchRequest(-1false /* success */);
 57: }
 58
 59// ⬇️⬇️⬇️【ScheduleMessageService.java】
 60/**
 61:  * 计算 投递时间【计划消费时间】
 62:  *
 63:  * @param delayLevel 延迟级别
 64:  * @param storeTimestamp 存储时间
 65:  * @return 投递时间【计划消费时间】
 66:  */

 67public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
 68:     Long time = this.delayLevelTable.get(delayLevel);
 69:     if (time != null) {
 70:         return time + storeTimestamp;
 71:     }
 72
 73:     return storeTimestamp + 1000;
 74: }

2.4 Broker 发送定时消息

  •  对 SCHEDULE_TOPIC_XXXX 每条消费队列对应单独一个定时任务进行轮询,发送 到达投递时间【计划消费时间】 的消息。

下图是发送定时消息的处理逻辑图:

定时消息定时逻辑

实现代码如下:

  1/**
  2:  * ⬇️⬇️⬇️ 发送(投递)延迟消息定时任务
  3:  */

  4class DeliverDelayedMessageTimerTask extends TimerTask {
  5:     /**
  6:      * 延迟级别
  7:      */

  8:     private final int delayLevel;
  9:     /**
 10:      * 位置
 11:      */

 12:     private final long offset;
 13
 14:     public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
 15:         this.delayLevel = delayLevel;
 16:         this.offset = offset;
 17:     }
 18
 19:     @Override
 20:     public void run() {
 21:         try {
 22:             this.executeOnTimeup();
 23:         } catch (Exception e) {
 24:             // XXX: warn and notify me
 25:             log.error("ScheduleMessageService, executeOnTimeup exception", e);
 26:             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
 27:                 this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
 28:         }
 29:     }
 30
 31:     /**
 32:      * 纠正可投递时间。
 33:      * 因为发送级别对应的发送间隔可以调整,如果超过当前间隔,则修正成当前配置,避免后面的消息无法发送。
 34:      *
 35:      * @param now 当前时间
 36:      * @param deliverTimestamp 投递时间
 37:      * @return 纠正结果
 38:      */

 39:     private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
 40:         long result = deliverTimestamp;
 41
 42:         long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
 43:         if (deliverTimestamp > maxTimestamp) {
 44:             result = now;
 45:         }
 46
 47:         return result;
 48:     }
 49
 50:     public void executeOnTimeup() {
 51:         ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,  delayLevel2QueueId(delayLevel));
 52
 53:         long failScheduleOffset = offset;
 54
 55:         if (cq != null) {
 56:             SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
 57:             if (bufferCQ != null) {
 58:                 try {
 59:                     long nextOffset = offset;
 60:                     int i = 0;
 61:                     for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
 62:                         long offsetPy = bufferCQ.getByteBuffer().getLong();
 63:                         int sizePy = bufferCQ.getByteBuffer().getInt();
 64:                         long tagsCode = bufferCQ.getByteBuffer().getLong();
 65
 66:                         long now = System.currentTimeMillis();
 67:                         long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
 68
 69:                         nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
 70
 71:                         long countdown = deliverTimestamp - now;
 72
 73:                         if (countdown <= 0) { // 消息到达可发送时间
 74:                             MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
 75:                             if (msgExt != null) {
 76:                                 try {
 77:                                     // 发送消息
 78:                                     MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
 79:                                     PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);
 80:                                     if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 发送成功
 81:                                         continue;
 82:                                     } else { // 发送失败
 83:                                         // XXX: warn and notify me
 84:                                         log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId());
 85
 86:                                         // 安排下一次任务
 87:                                         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);
 88
 89:                                         // 更新进度
 90:                                         ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
 91:                                         return;
 92:                                     }
 93:                                 } catch (Exception e) {
 94:                                     // XXX: warn and notify me
 95:                                     log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
 96:                                             + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e);
 97:                                 }
 98:                             }
 99:                         } else {
100:                             // 安排下一次任务
101:                             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
102
103:                             // 更新进度
104:                             ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
105:                             return;
106:                         }
107:                     } // end of for
108
109:                     nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
110
111:                     // 安排下一次任务
112:                     ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
113
114:                     // 更新进度
115:                     ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
116:                     return;
117:                 } finally {
118:                     bufferCQ.release();
119:                 }
120:             } // end of if (bufferCQ != null)
121:             else { // 消费队列已经被删除部分,跳转到最小的消费进度
122:                 long cqMinOffset = cq.getMinOffsetInQueue();
123:                 if (offset < cqMinOffset) {
124:                     failScheduleOffset = cqMinOffset;
125:                     log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
126:                         + cqMinOffset + ", queueId=" + cq.getQueueId());
127:                 }
128:             }
129:         } // end of if (cq != null)
130
131:         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);
132:     }
133
134:     /**
135:      * 设置消息内容
136:      *
137:      * @param msgExt 消息
138:      * @return 消息
139:      */

140:     private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
141:         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
142:         msgInner.setBody(msgExt.getBody());
143:         msgInner.setFlag(msgExt.getFlag());
144:         MessageAccessor.setProperties(msgInner, msgExt.getProperties());
145
146:         TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
147:         long tagsCodeValue =
148:             MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
149:         msgInner.setTagsCode(tagsCodeValue);
150:         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
151
152:         msgInner.setSysFlag(msgExt.getSysFlag());
153:         msgInner.setBornTimestamp(msgExt.getBornTimestamp());
154:         msgInner.setBornHost(msgExt.getBornHost());
155:         msgInner.setStoreHost(msgExt.getStoreHost());
156:         msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
157
158:         msgInner.setWaitStoreMsgOK(false);
159:         MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
160
161:         msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
162
163:         String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
164:         int queueId = Integer.parseInt(queueIdStr);
165:         msgInner.setQueueId(queueId);
166
167:         return msgInner;
168:     }
169: }

2.5 Broker 持久化定时发送进度

  • 定时消息发送进度存储在文件(../config/delayOffset.json)里
  •  每 10s 定时持久化发送进度。

核心代码如下:

  1// ⬇️⬇️⬇️【ScheduleMessageService.java】
  2/**
  3: public void start() {
  4:     // 定时发送消息
  5:     for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
  6:         Integer level = entry.getKey();
  7:         Long timeDelay = entry.getValue();
  8:         Long offset = this.offsetTable.get(level);
  9:         if (null == offset) {
 10:             offset = 0L;
 11:         }
 12: 
 13:         if (timeDelay != null) {
 14:             this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
 15:         }
 16:     }
 17: 
 18:     // 定时持久化发送进度
 19:     this.timer.scheduleAtFixedRate(new TimerTask() {
 20: 
 21:         @Override
 22:         public void run() {
 23:             try {
 24:                 ScheduleMessageService.this.persist();
 25:             } catch (Exception e) {
 26:                 log.error("scheduleAtFixedRate flush exception", e);
 27:             }
 28:         }
 29:     }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
 30: }

3. 消息重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。

  • Consumer 将消费失败的消息发回 Broker,进入延迟消息队列。即,消费失败的消息,不会立即消费。

核心代码如下:

  1// ⬇️⬇️⬇️【SendMessageProcessor.java】
  2/**
  3:  * 消费者发回消息
  4:  *
  5:  * @param ctx ctx
  6:  * @param request 请求
  7:  * @return 响应
  8:  * @throws RemotingCommandException 当远程调用异常
  9:  */

 10private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
 11:     throws RemotingCommandException 
{
 12:     // ....(省略代码)
 13:     // 处理 delayLevel(独有)。
 14:     int delayLevel = requestHeader.getDelayLevel();
 15:     int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
 16:     if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
 17:         maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
 18:     }
 19:     if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
 20:     // ....(省略代码)
 21:     } else {
 22:         if (0 == delayLevel) {
 23:             delayLevel = 3 + msgExt.getReconsumeTimes();
 24:         }
 25:         msgExt.setDelayTimeLevel(delayLevel);
 26:     }
 27
 28:     // ....(省略代码)
 29:     return response;
 30: }


©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. Java 8 日期/时间 API
  2. 消息中间件 RocketMQ 源码解析 —— 调试环境搭建
  3. 老艿艿说:关于时间管理的分享
  4. 分布式消息队列 RocketMQ源码解析:事务消息
  5. 分布式消息队列 RocketMQ源码解析:Filtersrv
  6. 分布式消息队列 RocketMQ 源码分析 —— 定时消息与消息重试
  7. Windows 服务器安装远程桌面及破解120天时间限制授权
  8. 理解算法的时间复杂度[每日前端夜话0x82]
  9. 分布式消息队列 RocketMQ 源码分析 —— 高可用

随机推荐

  1. Android重温
  2. 腾讯Android面经
  3. Android中对同一个TextView设置不同字体
  4. 用 Golang 开发 Android 应用(六)—— Came
  5. 【10.0.1】ArcGIS Runtime for Android之
  6. android用户界面之Widget教程实例汇总
  7. android 局域网聊天工具(可发送文字/语音
  8. Android 内核简单分析
  9. 在mac 上配置AndroidStudio碰到的坑
  10. android HAL介绍