摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-send-and-receive/ 「芋道源码」欢迎转载,保留摘要,谢谢!

  • 1、概述
  • 2、Producer 发送消息
    • DefaultMQProducer#send(Message)
    • DefaultMQProducerImpl#sendDefaultImpl()
  • 3、Broker 接收消息
    • SendMessageProcessor#sendMessage
    • DefaultMessageStore#putMessage
  • 4、某种结尾

阅读源码最好的方式,是使用 IDEA 进行调试 RocketMQ 源码,不然会一脸懵逼。

胖友可以点击「芋道源码」扫码关注,回复 git001 关键字
获得艿艿添加了中文注释的 RocketMQ 源码地址。

阅读源码很孤单,加入源码交流群,一起坚持!

1、概述

  1. Producer 发送消息。主要是同步发送消息源码,涉及到 异步/Oneway发送消息,事务消息会跳过。
  2. Broker 接收消息。(存储消息在《RocketMQ 源码分析 —— Message 存储》解析)
Producer发送消息全局顺序图

2、Producer 发送消息

Producer发送消息顺序图

DefaultMQProducer#send(Message)

  1@Override
  2public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  3:     return this.defaultMQProducerImpl.send(msg);
  4: }
  • 说明:发送同步消息,DefaultMQProducer#send(Message) 对 DefaultMQProducerImpl#send(Message) 进行封装。

DefaultMQProducerImpl#sendDefaultImpl()

  1public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  2:     return send(msg, this.defaultMQProducer.getSendMsgTimeout());
  3: }
  4
  5public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  6:     return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
  7: }
  8
  9private SendResult sendDefaultImpl(//
 10:     Message msg, //
 11:     final CommunicationMode communicationMode, //
 12:     final SendCallback sendCallback, //
 13:     final long timeout//
 14: )
 throws MQClientException, RemotingException, MQBrokerException, InterruptedException 
{
 15:     // 校验 Producer 处于运行状态
 16:     this.makeSureStateOK();
 17:     // 校验消息格式
 18:     Validators.checkMessage(msg, this.defaultMQProducer);
 19:     //
 20:     final long invokeID = random.nextLong(); // 调用编号;用于下面打印日志,标记为同一次发送消息
 21:     long beginTimestampFirst = System.currentTimeMillis();
 22:     long beginTimestampPrev = beginTimestampFirst;
 23:     long endTimestamp = beginTimestampFirst;
 24:     // 获取 Topic路由信息
 25:     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
 26:     if (topicPublishInfo != null && topicPublishInfo.ok()) {
 27:         MessageQueue mq = null// 最后选择消息要发送到的队列
 28:         Exception exception = null;
 29:         SendResult sendResult = null// 最后一次发送结果
 30:         int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1// 同步多次调用
 31:         int times = 0// 第几次发送
 32:         String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名
 33:         // 循环调用发送消息,直到成功
 34:         for (; times < timesTotal; times++) {
 35:             String lastBrokerName = null == mq ? null : mq.getBrokerName();
 36:             MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 选择消息要发送到的队列
 37:             if (tmpmq != null) {
 38:                 mq = tmpmq;
 39:                 brokersSent[times] = mq.getBrokerName();
 40:                 try {
 41:                     beginTimestampPrev = System.currentTimeMillis();
 42:                     // 调用发送消息核心方法
 43:                     sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
 44:                     endTimestamp = System.currentTimeMillis();
 45:                     // 更新Broker可用性信息
 46:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
 47:                     switch (communicationMode) {
 48:                         case ASYNC:
 49:                             return null;
 50:                         case ONEWAY:
 51:                             return null;
 52:                         case SYNC:
 53:                             if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
 54:                                 if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试
 55:                                     continue;
 56:                                 }
 57:                             }
 58:                             return sendResult;
 59:                         default:
 60:                             break;
 61:                     }
 62:                 } catch (RemotingException e) { // 打印异常,更新Broker可用性信息,更新继续循环
 63:                     endTimestamp = System.currentTimeMillis();
 64:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
 65:                     log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
 66:                     log.warn(msg.toString());
 67:                     exception = e;
 68:                     continue;
 69:                 } catch (MQClientException e) { // 打印异常,更新Broker可用性信息,继续循环
 70:                     endTimestamp = System.currentTimeMillis();
 71:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
 72:                     log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
 73:                     log.warn(msg.toString());
 74:                     exception = e;
 75:                     continue;
 76:                 } catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环
 77:                     endTimestamp = System.currentTimeMillis();
 78:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
 79:                     log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
 80:                     log.warn(msg.toString());
 81:                     exception = e;
 82:                     switch (e.getResponseCode()) {
 83:                         // 如下异常continue,进行发送消息重试
 84:                         case ResponseCode.TOPIC_NOT_EXIST:
 85:                         case ResponseCode.SERVICE_NOT_AVAILABLE:
 86:                         case ResponseCode.SYSTEM_ERROR:
 87:                         case ResponseCode.NO_PERMISSION:
 88:                         case ResponseCode.NO_BUYER_ID:
 89:                         case ResponseCode.NOT_IN_CURRENT_UNIT:
 90:                             continue;
 91:                         // 如果有发送结果,进行返回,否则,抛出异常;
 92:                         default:
 93:                             if (sendResult != null) {
 94:                                 return sendResult;
 95:                             }
 96:                             throw e;
 97:                     }
 98:                 } catch (InterruptedException e) {
 99:                     endTimestamp = System.currentTimeMillis();
100:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
101:                     log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
102:                     log.warn(msg.toString());
103:                     throw e;
104:                 }
105:             } else {
106:                 break;
107:             }
108:         }
109:         // 返回发送结果
110:         if (sendResult != null) {
111:             return sendResult;
112:         }
113:         // 根据不同情况,抛出不同的异常
114:         String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst,
115:                 msg.getTopic(), Arrays.toString(brokersSent)) + FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
116:         MQClientException mqClientException = new MQClientException(info, exception);
117:         if (exception instanceof MQBrokerException) {
118:             mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
119:         } else if (exception instanceof RemotingConnectException) {
120:             mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
121:         } else if (exception instanceof RemotingTimeoutException) {
122:             mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
123:         } else if (exception instanceof MQClientException) {
124:             mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
125:         }
126:         throw mqClientException;
127:     }
128:     // Namesrv找不到异常
129:     List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
130:     if (null == nsList || nsList.isEmpty()) {
131:         throw new MQClientException(
132:             "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
133:     }
134:     // 消息路由找不到异常
135:     throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
136:         null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
137: }
  • 说明 :发送消息。步骤:获取消息路由信息,选择要发送到的消息队列,执行消息发送核心方法,并对发送结果进行封装返回。
  • 第 1  至 7 行:对sendsendDefaultImpl(...)进行封装。
  • 第 20 行 :invokeID仅仅用于打印日志,无实际的业务用途。
  • 第 25 行 :获取 Topic路由信息, 详细解析见:DefaultMQProducerImpl#tryToFindTopicPublishInfo()
  • 第 30 & 34 行 :计算调用发送消息到成功为止的最大次数,并进行循环。同步或异步发送消息会调用多次,默认配置为3次。
  • 第 36 行 :选择消息要发送到的队列,详细解析见:MQFaultStrategy
  • 第 43 行 :调用发送消息核心方法,详细解析见:DefaultMQProducerImpl#sendKernelImpl()
  • 第 46 行 :更新Broker可用性信息。在选择发送到的消息队列时,会参考Broker发送消息的延迟,详细解析见:MQFaultStrategy
  • 第 62 至 68 行:当抛出RemotingException时,如果进行消息发送失败重试,则可能导致消息发送重复。例如,发送消息超时(RemotingTimeoutException),实际Broker接收到该消息并处理成功。因此,Consumer在消费时,需要保证幂等性。

DefaultMQProducerImpl#tryToFindTopicPublishInfo()

  1private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
  2:     // 缓存中获取 Topic发布信息
  3:     TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
  4:     // 当无可用的 Topic发布信息时,从Namesrv获取一次
  5:     if (null == topicPublishInfo || !topicPublishInfo.ok()) {
  6:         this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
  7:         this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
  8:         topicPublishInfo = this.topicPublishInfoTable.get(topic);
  9:     }
 10:     // 若获取的 Topic发布信息时候可用,则返回
 11:     if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
 12:         return topicPublishInfo;
 13:     } else { // 使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。用于 Topic发布信息不存在 && Broker支持自动创建Topic
 14:         this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, truethis.defaultMQProducer);
 15:         topicPublishInfo = this.topicPublishInfoTable.get(topic);
 16:         return topicPublishInfo;
 17:     }
 18: }
  • 说明 :获得 Topic发布信息。优先从缓存topicPublishInfoTable,其次从Namesrv中获得。
  • 第 3 行 :从缓存topicPublishInfoTable中获得 Topic发布信息。
  • 第 5 至 9 行 :从 Namesrv 中获得 Topic发布信息。
  • 第 13 至 17 行 :当从 Namesrv 无法获取时,使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。目的是当 Broker 开启自动创建 Topic开关时,Broker 接收到消息后自动创建Topic,详细解析见《RocketMQ 源码分析 —— Topic》。

MQFaultStrategy

Latency类图

MQFaultStrategy

  1public class MQFaultStrategy {
  2:     private final static Logger log = ClientLogger.getLog();
  3
  4:     /**
  5:      * 延迟故障容错,维护每个Broker的发送消息的延迟
  6:      * key:brokerName
  7:      */

  8:     private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
  9:     /**
 10:      * 发送消息延迟容错开关
 11:      */

 12:     private boolean sendLatencyFaultEnable = false;
 13:     /**
 14:      * 延迟级别数组
 15:      */

 16:     private long[] latencyMax = {50L100L550L1000L2000L3000L15000L};
 17:     /**
 18:      * 不可用时长数组
 19:      */

 20:     private long[] notAvailableDuration = {0L0L30000L60000L120000L180000L600000L};
 21
 22:     /**
 23:      * 根据 Topic发布信息 选择一个消息队列
 24:      *
 25:      * @param tpInfo Topic发布信息
 26:      * @param lastBrokerName brokerName
 27:      * @return 消息队列
 28:      */

 29:     public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
 30:         if (this.sendLatencyFaultEnable) {
 31:             try {
 32:                 // 获取 brokerName=lastBrokerName && 可用的一个消息队列
 33:                 int index = tpInfo.getSendWhichQueue().getAndIncrement();
 34:                 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
 35:                     int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
 36:                     if (pos < 0)
 37:                         pos = 0;
 38:                     MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
 39:                     if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
 40:                         if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
 41:                             return mq;
 42:                     }
 43:                 }
 44:                 // 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性
 45:                 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
 46:                 int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
 47:                 if (writeQueueNums > 0) {
 48:                     final MessageQueue mq = tpInfo.selectOneMessageQueue();
 49:                     if (notBestBroker != null) {
 50:                         mq.setBrokerName(notBestBroker);
 51:                         mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
 52:                     }
 53:                     return mq;
 54:                 } else {
 55:                     latencyFaultTolerance.remove(notBestBroker);
 56:                 }
 57:             } catch (Exception e) {
 58:                 log.error("Error occurred when selecting message queue", e);
 59:             }
 60:             // 选择一个消息队列,不考虑队列的可用性
 61:             return tpInfo.selectOneMessageQueue();
 62:         }
 63:         // 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
 64:         return tpInfo.selectOneMessageQueue(lastBrokerName);
 65:     }
 66
 67:     /**
 68:      * 更新延迟容错信息
 69:      *
 70:      * @param brokerName brokerName
 71:      * @param currentLatency 延迟
 72:      * @param isolation 是否隔离。当开启隔离时,默认延迟为30000。目前主要用于发送消息异常时
 73:      */

 74:     public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
 75:         if (this.sendLatencyFaultEnable) {
 76:             long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
 77:             this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
 78:         }
 79:     }
 80
 81:     /**
 82:      * 计算延迟对应的不可用时间
 83:      *
 84:      * @param currentLatency 延迟
 85:      * @return 不可用时间
 86:      */

 87:     private long computeNotAvailableDuration(final long currentLatency) {
 88:         for (int i = latencyMax.length - 1; i >= 0; i--) {
 89:             if (currentLatency >= latencyMax[i])
 90:                 return this.notAvailableDuration[i];
 91:         }
 92:         return 0;
 93:     }
  • 说明 :Producer消息发送容错策略。默认情况下容错策略关闭,即sendLatencyFaultEnable=false

  • 第 30 至 62 行 :容错策略选择消息队列逻辑。优先获取可用队列,其次选择一个broker获取队列,最差返回任意broker的一个队列。

  • 第 64 行 :未开启容错策略选择消息队列逻辑。

  • 第 74 至 79 行 :更新延迟容错信息。当 Producer 发送消息时间过长,则逻辑认为N秒内不可用。按照latencyMaxnotAvailableDuration的配置,对应如下:

    Producer发送消息消耗时长Broker不可用时长
    >= 15000 ms600 * 1000 ms
    >= 3000 ms180 * 1000 ms
    >= 2000 ms120 * 1000 ms
    >= 1000 ms60 * 1000 ms
    >= 550 ms30 * 1000 ms
    >= 100 ms0 ms
    >= 50 ms0 ms

LatencyFaultTolerance

  1public interface LatencyFaultTolerance<T{
  2
  3:     /**
  4:      * 更新对应的延迟和不可用时长
  5:      *
  6:      * @param name 对象
  7:      * @param currentLatency 延迟
  8:      * @param notAvailableDuration 不可用时长
  9:      */

 10:     void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
 11
 12:     /**
 13:      * 对象是否可用
 14:      *
 15:      * @param name 对象
 16:      * @return 是否可用
 17:      */

 18:     boolean isAvailable(final T name);
 19
 20:     /**
 21:      * 移除对象
 22:      *
 23:      * @param name 对象
 24:      */

 25:     void remove(final T name);
 26
 27:     /**
 28:      * 获取一个对象
 29:      *
 30:      * @return 对象
 31:      */

 32:     pickOneAtLeast();
 33: }
  • 说明 :延迟故障容错接口

LatencyFaultToleranceImpl

  1public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String{
  2
  3:     /**
  4:      * 对象故障信息Table
  5:      */

  6:     private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<>(16);
  7:     /**
  8:      * 对象选择Index
  9:      * @see #pickOneAtLeast()
 10:      */

 11:     private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
 12
 13:     @Override
 14:     public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
 15:         FaultItem old = this.faultItemTable.get(name);
 16:         if (null == old) {
 17:             // 创建对象
 18:             final FaultItem faultItem = new FaultItem(name);
 19:             faultItem.setCurrentLatency(currentLatency);
 20:             faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
 21:             // 更新对象
 22:             old = this.faultItemTable.putIfAbsent(name, faultItem);
 23:             if (old != null) {
 24:                 old.setCurrentLatency(currentLatency);
 25:                 old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
 26:             }
 27:         } else { // 更新对象
 28:             old.setCurrentLatency(currentLatency);
 29:             old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
 30:         }
 31:     }
 32
 33:     @Override
 34:     public boolean isAvailable(final String name) {
 35:         final FaultItem faultItem = this.faultItemTable.get(name);
 36:         if (faultItem != null) {
 37:             return faultItem.isAvailable();
 38:         }
 39:         return true;
 40:     }
 41
 42:     @Override
 43:     public void remove(final String name) {
 44:         this.faultItemTable.remove(name);
 45:     }
 46
 47:     /**
 48:      * 选择一个相对优秀的对象
 49:      *
 50:      * @return 对象
 51:      */

 52:     @Override
 53:     public String pickOneAtLeast() {
 54:         // 创建数组
 55:         final Enumeration<FaultItem> elements = this.faultItemTable.elements();
 56:         List<FaultItem> tmpList = new LinkedList<>();
 57:         while (elements.hasMoreElements()) {
 58:             final FaultItem faultItem = elements.nextElement();
 59:             tmpList.add(faultItem);
 60:         }
 61:         //
 62:         if (!tmpList.isEmpty()) {
 63:             // 打乱 + 排序。TODO 疑问:应该只能二选一。猜测Collections.shuffle(tmpList)去掉。
 64:             Collections.shuffle(tmpList);
 65:             Collections.sort(tmpList);
 66:             // 选择顺序在前一半的对象
 67:             final int half = tmpList.size() / 2;
 68:             if (half <= 0) {
 69:                 return tmpList.get(0).getName();
 70:             } else {
 71:                 final int i = this.whichItemWorst.getAndIncrement() % half;
 72:                 return tmpList.get(i).getName();
 73:             }
 74:         }
 75:         return null;
 76:     }
 77: }
  • 说明 :延迟故障容错实现。维护每个对象的信息。

FaultItem

  1class FaultItem implements Comparable<FaultItem{
  2:     /**
  3:      * 对象名
  4:      */

  5:     private final String name;
  6:     /**
  7:      * 延迟
  8:      */

  9:     private volatile long currentLatency;
 10:     /**
 11:      * 开始可用时间
 12:      */

 13:     private volatile long startTimestamp;
 14
 15:     public FaultItem(final String name) {
 16:         this.name = name;
 17:     }
 18
 19:     /**
 20:      * 比较对象
 21:      * 可用性 > 延迟 > 开始可用时间
 22:      *
 23:      * @param other other
 24:      * @return 升序
 25:      */

 26:     @Override
 27:     public int compareTo(final FaultItem other) {
 28:         if (this.isAvailable() != other.isAvailable()) {
 29:             if (this.isAvailable())
 30:                 return -1;
 31
 32:             if (other.isAvailable())
 33:                 return 1;
 34:         }
 35
 36:         if (this.currentLatency < other.currentLatency)
 37:             return -1;
 38:         else if (this.currentLatency > other.currentLatency) {
 39:             return 1;
 40:         }
 41
 42:         if (this.startTimestamp < other.startTimestamp)
 43:             return -1;
 44:         else if (this.startTimestamp > other.startTimestamp) {
 45:             return 1;
 46:         }
 47
 48:         return 0;
 49:     }
 50
 51:     /**
 52:      * 是否可用:当开始可用时间大于当前时间
 53:      *
 54:      * @return 是否可用
 55:      */

 56:     public boolean isAvailable() {
 57:         return (System.currentTimeMillis() - startTimestamp) >= 0;
 58:     }
 59
 60:     @Override
 61:     public int hashCode() {
 62:         int result = getName() != null ? getName().hashCode() : 0;
 63:         result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
 64:         result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
 65:         return result;
 66:     }
 67
 68:     @Override
 69:     public boolean equals(final Object o) {
 70:         if (this == o)
 71:             return true;
 72:         if (!(o instanceof FaultItem))
 73:             return false;
 74
 75:         final FaultItem faultItem = (FaultItem) o;
 76
 77:         if (getCurrentLatency() != faultItem.getCurrentLatency())
 78:             return false;
 79:         if (getStartTimestamp() != faultItem.getStartTimestamp())
 80:             return false;
 81:         return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;
 82
 83:     }
 84: }
  • 说明 :对象故障信息。维护对象的名字、延迟、开始可用的时间。

DefaultMQProducerImpl#sendKernelImpl()

  1private SendResult sendKernelImpl(final Message msg, //
  2:     final MessageQueue mq, //
  3:     final CommunicationMode communicationMode, //
  4:     final SendCallback sendCallback, //
  5:     final TopicPublishInfo topicPublishInfo, //
  6:     final long timeout)
 throws MQClientException, RemotingException, MQBrokerException, InterruptedException 
{
  7:     // 获取 broker地址
  8:     String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  9:     if (null == brokerAddr) {
 10:         tryToFindTopicPublishInfo(mq.getTopic());
 11:         brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
 12:     }
 13:     //
 14:     SendMessageContext context = null;
 15:     if (brokerAddr != null) {
 16:         // 是否使用broker vip通道。broker会开启两个端口对外服务。
 17:         brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
 18:         byte[] prevBody = msg.getBody(); // 记录消息内容。下面逻辑可能改变消息内容,例如消息压缩。
 19:         try {
 20:             // 设置唯一编号
 21:             MessageClientIDSetter.setUniqID(msg);
 22:             // 消息压缩
 23:             int sysFlag = 0;
 24:             if (this.tryToCompressMessage(msg)) {
 25:                 sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
 26:             }
 27:             // 事务
 28:             final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
 29:             if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
 30:                 sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
 31:             }
 32:             // hook:发送消息校验
 33:             if (hasCheckForbiddenHook()) {
 34:                 CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
 35:                 checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
 36:                 checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
 37:                 checkForbiddenContext.setCommunicationMode(communicationMode);
 38:                 checkForbiddenContext.setBrokerAddr(brokerAddr);
 39:                 checkForbiddenContext.setMessage(msg);
 40:                 checkForbiddenContext.setMq(mq);
 41:                 checkForbiddenContext.setUnitMode(this.isUnitMode());
 42:                 this.executeCheckForbiddenHook(checkForbiddenContext);
 43:             }
 44:             // hook:发送消息前逻辑
 45:             if (this.hasSendMessageHook()) {
 46:                 context = new SendMessageContext();
 47:                 context.setProducer(this);
 48:                 context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
 49:                 context.setCommunicationMode(communicationMode);
 50:                 context.setBornHost(this.defaultMQProducer.getClientIP());
 51:                 context.setBrokerAddr(brokerAddr);
 52:                 context.setMessage(msg);
 53:                 context.setMq(mq);
 54:                 String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
 55:                 if (isTrans != null && isTrans.equals("true")) {
 56:                     context.setMsgType(MessageType.Trans_Msg_Half);
 57:                 }
 58:                 if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
 59:                     context.setMsgType(MessageType.Delay_Msg);
 60:                 }
 61:                 this.executeSendMessageHookBefore(context);
 62:             }
 63:             // 构建发送消息请求
 64:             SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
 65:             requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
 66:             requestHeader.setTopic(msg.getTopic());
 67:             requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
 68:             requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
 69:             requestHeader.setQueueId(mq.getQueueId());
 70:             requestHeader.setSysFlag(sysFlag);
 71:             requestHeader.setBornTimestamp(System.currentTimeMillis());
 72:             requestHeader.setFlag(msg.getFlag());
 73:             requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
 74:             requestHeader.setReconsumeTimes(0);
 75:             requestHeader.setUnitMode(this.isUnitMode());
 76:             if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 消息重发Topic
 77:                 String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
 78:                 if (reconsumeTimes != null) {
 79:                     requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
 80:                     MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
 81:                 }
 82:                 String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
 83:                 if (maxReconsumeTimes != null) {
 84:                     requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
 85:                     MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
 86:                 }
 87:             }
 88:             // 发送消息
 89:             SendResult sendResult = null;
 90:             switch (communicationMode) {
 91:                 case ASYNC:
 92:                     sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
 93:                         brokerAddr, // 1
 94:                         mq.getBrokerName(), // 2
 95:                         msg, // 3
 96:                         requestHeader, // 4
 97:                         timeout, // 5
 98:                         communicationMode, // 6
 99:                         sendCallback, // 7
100:                         topicPublishInfo, // 8
101:                         this.mQClientFactory, // 9
102:                         this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
103:                         context, //
104:                         this);
105:                     break;
106:                 case ONEWAY:
107:                 case SYNC:
108:                     sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
109:                         brokerAddr,
110:                         mq.getBrokerName(),
111:                         msg,
112:                         requestHeader,
113:                         timeout,
114:                         communicationMode,
115:                         context,
116:                         this);
117:                     break;
118:                 default:
119:                     assert false;
120:                     break;
121:             }
122:             // hook:发送消息后逻辑
123:             if (this.hasSendMessageHook()) {
124:                 context.setSendResult(sendResult);
125:                 this.executeSendMessageHookAfter(context);
126:             }
127:             // 返回发送结果
128:             return sendResult;
129:         } catch (RemotingException e) {
130:             if (this.hasSendMessageHook()) {
131:                 context.setException(e);
132:                 this.executeSendMessageHookAfter(context);
133:             }
134:             throw e;
135:         } catch (MQBrokerException e) {
136:             if (this.hasSendMessageHook()) {
137:                 context.setException(e);
138:                 this.executeSendMessageHookAfter(context);
139:             }
140:             throw e;
141:         } catch (InterruptedException e) {
142:             if (this.hasSendMessageHook()) {
143:                 context.setException(e);
144:                 this.executeSendMessageHookAfter(context);
145:             }
146:             throw e;
147:         } finally {
148:             msg.setBody(prevBody);
149:         }
150:     }
151:     // broker为空抛出异常
152:     throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist"null);
153: }
  • 说明 :发送消息核心方法。该方法真正发起网络请求,发送消息给 Broker
  • 第 21 行 :生产消息编号,详细解析见《RocketMQ 源码分析 —— Message 基础》。
  • 第 64 至 121 行 :构建发送消息请求SendMessageRequestHeader
  • 第 107 至 117 行 :执行 MQClientInstance#sendMessage(...)发起网络请求。

3、Broker 接收消息

接收发送消息API顺序图

SendMessageProcessor#sendMessage

  1@Override
  2public RemotingCommand proce***equest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
  3:     SendMessageContext mqtraceContext;
  4:     switch (request.getCode()) {
  5:         case RequestCode.CONSUMER_SEND_MSG_BACK:
  6:             return this.consumerSendMsgBack(ctx, request);
  7:         default:
  8:             // 解析请求
  9:             SendMessageRequestHeader requestHeader = parseRequestHeader(request);
 10:             if (requestHeader == null) {
 11:                 return null;
 12:             }
 13:             // 发送请求Context。在 hook 场景下使用
 14:             mqtraceContext = buildMsgContext(ctx, requestHeader);
 15:             // hook:处理发送消息前逻辑
 16:             this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
 17:             // 处理发送消息逻辑
 18:             final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
 19:             // hook:处理发送消息后逻辑
 20:             this.executeSendMessageHookAfter(response, mqtraceContext);
 21:             return response;
 22:     }
 23: }
 24
 25private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
 26:     final RemotingCommand request, //
 27:     final SendMessageContext sendMessageContext, //
 28:     final SendMessageRequestHeader requestHeader)
 throws RemotingCommandException 
{
 29
 30:     // 初始化响应
 31:     final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
 32:     final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
 33:     response.setOpaque(request.getOpaque());
 34:     response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
 35:     response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
 36
 37:     if (log.isDebugEnabled()) {
 38:         log.debug("receive SendMessage request command, {}", request);
 39:     }
 40
 41:     // 如果未开始接收消息,抛出系统异常
 42:     @SuppressWarnings("SpellCheckingInspection")
 43:     final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
 44:     if (this.brokerController.getMessageStore().now() < startTimstamp) {
 45:         response.setCode(ResponseCode.SYSTEM_ERROR);
 46:         response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
 47:         return response;
 48:     }
 49
 50:     // 消息配置(Topic配置)校验
 51:     response.setCode(-1);
 52:     super.msgCheck(ctx, requestHeader, response);
 53:     if (response.getCode() != -1) {
 54:         return response;
 55:     }
 56
 57:     final byte[] body = request.getBody();
 58
 59:     // 如果队列小于0,从可用队列随机选择
 60:     int queueIdInt = requestHeader.getQueueId();
 61:     TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
 62:     if (queueIdInt < 0) {
 63:         queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
 64:     }
 65
 66:     //
 67:     int sysFlag = requestHeader.getSysFlag();
 68:     if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
 69:         sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
 70:     }
 71
 72:     // 对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名,即加入 死信队列(Dead Letter Queue)
 73:     String newTopic = requestHeader.getTopic();
 74:     if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 75:         // 获取订阅分组配置
 76:         String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
 77:         SubscriptionGroupConfig subscriptionGroupConfig =
 78:             this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
 79:         if (null == subscriptionGroupConfig) {
 80:             response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
 81:             response.setRemark("subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
 82:             return response;
 83:         }
 84:         // 计算最大可消费次数
 85:         int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
 86:         if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
 87:             maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
 88:         }
 89:         int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
 90:         if (reconsumeTimes >= maxReconsumeTimes) { // 超过最大消费次数
 91:             newTopic = MixAll.getDLQTopic(groupName);
 92:             queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
 93:             topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
 94:                 DLQ_NUMS_PER_GROUP, //
 95:                 PermName.PERM_WRITE, 0
 96:             );
 97:             if (null == topicConfig) {
 98:                 response.setCode(ResponseCode.SYSTEM_ERROR);
 99:                 response.setRemark("topic[" + newTopic + "] not exist");
100:                 return response;
101:             }
102:         }
103:     }
104
105:     // 创建MessageExtBrokerInner
106:     MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
107:     msgInner.setTopic(newTopic);
108:     msgInner.setBody(body);
109:     msgInner.setFlag(requestHeader.getFlag());
110:     MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
111:     msgInner.setPropertiesString(requestHeader.getProperties());
112:     msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
113:     msgInner.setQueueId(queueIdInt);
114:     msgInner.setSysFlag(sysFlag);
115:     msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
116:     msgInner.setBornHost(ctx.channel().remoteAddress());
117:     msgInner.setStoreHost(this.getStoreHost());
118:     msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
119
120:     // 校验是否不允许发送事务消息
121:     if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
122:         String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
123:         if (traFlag != null) {
124:             response.setCode(ResponseCode.NO_PERMISSION);
125:             response.setRemark(
126:                 "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
127:             return response;
128:         }
129:     }
130
131:     // 添加消息
132:     PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
133:     if (putMessageResult != null) {
134:         boolean sendOK = false;
135
136:         switch (putMessageResult.getPutMessageStatus()) {
137:             // Success
138:             case PUT_OK:
139:                 sendOK = true;
140:                 response.setCode(ResponseCode.SUCCESS);
141:                 break;
142:             case FLUSH_DISK_TIMEOUT:
143:                 response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
144:                 sendOK = true;
145:                 break;
146:             case FLUSH_SLAVE_TIMEOUT:
147:                 response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
148:                 sendOK = true;
149:                 break;
150:             case SLAVE_NOT_AVAILABLE:
151:                 response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
152:                 sendOK = true;
153:                 break;
154
155:             // Failed
156:             case CREATE_MAPEDFILE_FAILED:
157:                 response.setCode(ResponseCode.SYSTEM_ERROR);
158:                 response.setRemark("create mapped file failed, server is busy or broken.");
159:                 break;
160:             case MESSAGE_ILLEGAL:
161:             case PROPERTIES_SIZE_EXCEEDED:
162:                 response.setCode(ResponseCode.MESSAGE_ILLEGAL);
163:                 response.setRemark(
164:                     "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
165:                 break;
166:             case SERVICE_NOT_AVAILABLE:
167:                 response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
168:                 response.setRemark(
169:                     "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
170:                 break;
171:             case OS_PAGECACHE_BUSY:
172:                 response.setCode(ResponseCode.SYSTEM_ERROR);
173:                 response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
174:                 break;
175:             case UNKNOWN_ERROR:
176:                 response.setCode(ResponseCode.SYSTEM_ERROR);
177:                 response.setRemark("UNKNOWN_ERROR");
178:                 break;
179:             default:
180:                 response.setCode(ResponseCode.SYSTEM_ERROR);
181:                 response.setRemark("UNKNOWN_ERROR DEFAULT");
182:                 break;
183:         }
184
185:         String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
186:         if (sendOK) {
187:             // 统计
188:             this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
189:             this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
190:             this.brokerController.getBrokerStatsManager().incBrokerPutNums();
191
192:             // 响应
193:             response.setRemark(null);
194:             responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
195:             responseHeader.setQueueId(queueIdInt);
196:             responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
197:             doResponse(ctx, request, response);
198
199:             // hook:设置发送成功到context
200:             if (hasSendMessageHook()) {
201:                 sendMessageContext.setMsgId(responseHeader.getMsgId());
202:                 sendMessageContext.setQueueId(responseHeader.getQueueId());
203:                 sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
204
205:                 int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
206:                 int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
207:                 int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
208
209:                 sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
210:                 sendMessageContext.setCommercialSendTimes(incValue);
211:                 sendMessageContext.setCommercialSendSize(wroteSize);
212:                 sendMessageContext.setCommercialOwner(owner);
213:             }
214:             return null;
215:         } else {
216:             // hook:设置发送失败到context
217:             if (hasSendMessageHook()) {
218:                 int wroteSize = request.getBody().length;
219:                 int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
220
221:                 sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
222:                 sendMessageContext.setCommercialSendTimes(incValue);
223:                 sendMessageContext.setCommercialSendSize(wroteSize);
224:                 sendMessageContext.setCommercialOwner(owner);
225:             }
226:         }
227:     } else {
228:         response.setCode(ResponseCode.SYSTEM_ERROR);
229:         response.setRemark("store putMessage return null");
230:     }
231
232:     return response;
233: }
  • #proce***equest() 说明 :处理消息请求。
  • #sendMessage() 说明 :发送消息,并返回发送消息结果。
  • 第 51 至 55 行 :消息配置(Topic配置)校验,详细解析见:AbstractSendMessageProcessor#msgCheck()。
  • 第 60 至 64 行 :消息队列编号小于0时,Broker 可以设置随机选择一个消息队列。
  • 第 72 至 103 行 :对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名, 即加  死信队 (Dead Letter Queue),详细解析见:《RocketMQ 源码分析 —— Topic》。
  • 第 105 至 118 行 :创建MessageExtBrokerInner
  • 第 132 :存储消息,详细解析见:DefaultMessageStore#putMessage()。
  • 第 133 至 183 行 :处理消息发送结果,设置响应结果和提示。
  • 第 186 至 214 行 :发送成功,响应。这里doResponse(ctx, request, response)进行响应,最后return null,原因是:响应给 Producer 可能发生异常,#doResponse(ctx, request, response)捕捉了该异常并输出日志。这样做的话,我们进行排查 Broker 接收消息成功后响应是否存在异常会方便很多。

AbstractSendMessageProcessor#msgCheck

  1protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
  2:                                    final SendMessageRequestHeader requestHeader, final RemotingCommand response)
 
{
  3:     // 检查 broker 是否有写入权限
  4:     if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
  5:         && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
  6:         response.setCode(ResponseCode.NO_PERMISSION);
  7:         response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
  8:             + "] sending message is forbidden");
  9:         return response;
 10:     }
 11:     // 检查topic是否可以被发送。目前是{@link MixAll.DEFAULT_TOPIC}不被允许发送
 12:     if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
 13:         String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
 14:         log.warn(errorMsg);
 15:         response.setCode(ResponseCode.SYSTEM_ERROR);
 16:         response.setRemark(errorMsg);
 17:         return response;
 18:     }
 19:     TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
 20:     if (null == topicConfig) { // 不能存在topicConfig,则进行创建
 21:         int topicSysFlag = 0;
 22:         if (requestHeader.isUnitMode()) {
 23:             if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 24:                 topicSysFlag = TopicSysFlag.buildSysFlag(falsetrue);
 25:             } else {
 26:                 topicSysFlag = TopicSysFlag.buildSysFlag(truefalse);
 27:             }
 28:         }
 29:         // 创建topic配置
 30:         log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
 31:         topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(//
 32:             requestHeader.getTopic(), //
 33:             requestHeader.getDefaultTopic(), //
 34:             RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
 35:             requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
 36:         if (null == topicConfig) {
 37:             if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 38:                 topicConfig =
 39:                     this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
 40:                         requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
 41:                         topicSysFlag);
 42:             }
 43:         }
 44:         // 如果没配置
 45:         if (null == topicConfig) {
 46:             response.setCode(ResponseCode.TOPIC_NOT_EXIST);
 47:             response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
 48:                 + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
 49:             return response;
 50:         }
 51:     }
 52:     // 队列编号是否正确
 53:     int queueIdInt = requestHeader.getQueueId();
 54:     int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
 55:     if (queueIdInt >= idValid) {
 56:         String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
 57:             queueIdInt,
 58:             topicConfig.toString(),
 59:             RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
 60:         log.warn(errorInfo);
 61:         response.setCode(ResponseCode.SYSTEM_ERROR);
 62:         response.setRemark(errorInfo);
 63:         return response;
 64:     }
 65:     return response;
 66: }
  • 说明:校验消息是否正确,主要是Topic配置方面,例如:Broker 是否有写入权限,topic配置是否存在,队列编号是否正确。
  • 第 11 至 18 行 :检查Topic是否可以被发送。目前是 {@link MixAll.DEFAULT_TOPIC} 不被允许发送。
  • 第 20 至 51 行 :当找不到Topic配置,则进行创建。当然,创建会存在不成功的情况,例如说:defaultTopic 的Topic配置不存在,又或者是 存在但是不允许继承,详细解析见《RocketMQ 源码分析 —— Topic》。

DefaultMessageStore#putMessage

  1public PutMessageResult putMessage(MessageExtBrokerInner msg) {
  2:     if (this.shutdown) {
  3:         log.warn("message store has shutdown, so putMessage is forbidden");
  4:         return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
  5:     }
  6
  7:     // 从节点不允许写入
  8:     if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
  9:         long value = this.printTimes.getAndIncrement();
 10:         if ((value % 50000) == 0) {
 11:             log.warn("message store is slave mode, so putMessage is forbidden ");
 12:         }
 13
 14:         return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
 15:     }
 16
 17:     // store是否允许写入
 18:     if (!this.runningFlags.isWriteable()) {
 19:         long value = this.printTimes.getAndIncrement();
 20:         if ((value % 50000) == 0) {
 21:             log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
 22:         }
 23
 24:         return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
 25:     } else {
 26:         this.printTimes.set(0);
 27:     }
 28
 29:     // 消息过长
 30:     if (msg.getTopic().length() > Byte.MAX_VALUE) {
 31:         log.warn("putMessage message topic length too long " + msg.getTopic().length());
 32:         return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
 33:     }
 34
 35:     // 消息附加属性过长
 36:     if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
 37:         log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
 38:         return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
 39:     }
 40
 41:     if (this.isOSPageCacheBusy()) {
 42:         return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
 43:     }
 44
 45:     long beginTime = this.getSystemClock().now();
 46:     // 添加消息到commitLog
 47:     PutMessageResult result = this.commitLog.putMessage(msg);
 48
 49:     long eclipseTime = this.getSystemClock().now() - beginTime;
 50:     if (eclipseTime > 500) {
 51:         log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
 52:     }
 53:     this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
 54
 55:     if (null == result || !result.isOk()) {
 56:         this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
 57:     }
 58
 59:     return result;
 60: }
  • 说明:存储消息封装,最终存储需要 CommitLog 实现。
  • 第 7 至 27 行 :校验 Broker 是否可以写入。
  • 第 29 至 39 行 :消息格式与大小校验。
  • 第 47 行 :调用 CommitLong 进行存储,详细逻辑见:《RocketMQ 源码分析 —— Message 存储》

4、某种结尾

感谢阅读、收藏、点赞本文的工程师同学。

阅读源码是件令自己很愉悦的事情,编写源码解析是让自己脑细胞死伤无数的过程,痛并快乐着。

如果有内容写的存在错误,或是不清晰的地方,见笑了,。我们一起探讨,共进步。

再次感谢阅读、收藏、点赞本文的工程师同学。


©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 消息中间件 RocketMQ 源码解析 —— 调试环境搭建
  2. 分布式消息队列 RocketMQ源码解析:事务消息
  3. 分布式消息队列 RocketMQ源码解析:Filtersrv
  4. 分布式消息队列 RocketMQ 源码分析 —— 定时消息与消息重试
  5. 分布式消息队列 RocketMQ 源码分析 —— 高可用
  6. 分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)
  7. 分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)
  8. 贞炸了!上线之后,消息收不到了!
  9. 数据结构之优先队列和堆

随机推荐

  1. Android Studio下载及离线升级方法
  2. Android用户界面 UI组件--自动提示输入框
  3. Android xml中 @和?区别,style和attr小结
  4. Android中的5大布局和网格布局
  5. Android(安卓)Studio简单设置
  6. LoganSquare——快速解析和序列化JSON对
  7. Android Gesture Detector
  8. android的ProgressBar设置为水平样式
  9. 【Android UI设计与开发】之具体解释Acti
  10. Android 动画中共用的属性!