1. 概述

本文主要解析 NamesrvBroker 如何实现高可用, ProducerConsumer 怎么与它们通信保证高可用。

2. Namesrv 高可用

启动多个 Namesrv 实现高可用。
相较于 ZookeeperConsulEtcd 等, Namesrv 是一个超轻量级的注册中心,提供命名服务

2.1 Broker 注册到 Namesrv

  • 多个 Namesrv 之间,没有任何关系(不存在类似 Zookeeper 的 LeaderFollower 等角色),不进行通信与数据同步。通过 Broker 循环注册多个 Namesrv

  1: // ⬇️⬇️⬇️【BrokerOuterAPI.java】  2: public RegisterBrokerResult registerBrokerAll(  3:     final String clusterName,  4:     final String brokerAddr,  5:     final String brokerName,  6:     final long brokerId,  7:     final String haServerAddr,  8:     final TopicConfigSerializeWrapper topicConfigWrapper,  9:     final List<String> filterServerList, 10:     final boolean oneway, 11:     final int timeoutMills) { 12:     RegisterBrokerResult registerBrokerResult = null; 13:  14:     List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); 15:     if (nameServerAddressList != null) { 16:         for (String namesrvAddr : nameServerAddressList) { // 循环多个 Namesrv 17:             try { 18:                 RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId, 19:                     haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills); 20:                 if (result != null) { 21:                     registerBrokerResult = result; 22:                 } 23:  24:                 log.info("register broker to name server {} OK", namesrvAddr); 25:             } catch (Exception e) { 26:                 log.warn("registerBroker Exception, {}", namesrvAddr, e); 27:             } 28:         } 29:     } 30:  31:     return registerBrokerResult; 32: }

2.2 Producer、Consumer 访问 Namesrv

  • Producer、 Consumer 从 Namesrv列表选择一个可连接的进行通信。

  1: // ⬇️⬇️⬇️【NettyRemotingClient.java】  2: private Channel getAndCreateNameserverChannel() throws InterruptedException {  3:     // 返回已选择、可连接Namesrv  4:     String addr = this.namesrvAddrChoosed.get();  5:     if (addr != null) {  6:         ChannelWrapper cw = this.channelTables.get(addr);  7:         if (cw != null && cw.isOK()) {  8:             return cw.getChannel();  9:         } 10:     } 11:     // 12:     final List<String> addrList = this.namesrvAddrList.get(); 13:     if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { 14:         try { 15:             // 返回已选择、可连接的Namesrv 16:             addr = this.namesrvAddrChoosed.get(); 17:             if (addr != null) { 18:                 ChannelWrapper cw = this.channelTables.get(addr); 19:                 if (cw != null && cw.isOK()) { 20:                     return cw.getChannel(); 21:                 } 22:             } 23:             // 从【Namesrv列表】中选择一个连接的返回 24:             if (addrList != null && !addrList.isEmpty()) { 25:                 for (int i = 0; i < addrList.size(); i++) { 26:                     int index = this.namesrvIndex.incrementAndGet(); 27:                     index = Math.abs(index); 28:                     index = index % addrList.size(); 29:                     String newAddr = addrList.get(index); 30:  31:                     this.namesrvAddrChoosed.set(newAddr); 32:                     Channel channelNew = this.createChannel(newAddr); 33:                     if (channelNew != null) 34:                         return channelNew; 35:                 } 36:             } 37:         } catch (Exception e) { 38:             log.error("getAndCreateNameserverChannel: create name server channel exception", e); 39:         } finally { 40:             this.lockNamesrvChannel.unlock(); 41:         } 42:     } else { 43:         log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); 44:     } 45:  46:     return null; 47: }

3. Broker 高可用

启动多个 Broker分组 形成 集群 实现高可用。
Broker分组 = Master节点x1 + Slave节点xN。
类似 MySQLMaster节点 提供读写服务, Slave节点 只提供服务。

3.2 Broker 主从

  • 每个分组, Master节点 不断发送新的 CommitLog 给 Slave节点。 Slave节点 不断上报本地的 CommitLog 已经同步到的位置给 Master节点。

  • Broker分组 与 Broker分组 之间没有任何关系,不进行通信与数据同步。

  • 消费进度 目前不支持 MasterSlave 同步。

集群内, Master节点 有两种类型: Master_SYNCMaster_ASYNC:前者在 Producer 发送消息时,等待 Slave节点 存储完毕后再返回发送结果,而后者不需要等待。

3.1.1 配置

目前官方提供三套配置:

  • 2m-2s-async

brokerClusterNamebrokerNamebrokerRolebrokerId
DefaultClusterbroker-aASYNC_MASTER0
DefaultClusterbroker-aSLAVE1
DefaultClusterbroker-bASYNC_MASTER0
DefaultClusterbroker-bSLAVE1



  • 2m-2s-sync








brokerClusterNamebrokerNamebrokerRolebrokerId
------------
DefaultClusterbroker-aSYNC_MASTER0
DefaultClusterbroker-aSLAVE1
DefaultClusterbroker-bSYNC_MASTER0
DefaultClusterbroker-bSLAVE1



  • 2m-noslave








brokerClusterNamebrokerNamebrokerRolebrokerId
------------
DefaultClusterbroker-aASYNC_MASTER0
DefaultClusterbroker-bASYNC_MASTER0

3.1.2 组件

再看具体实现代码之前,我们来看看 Master/ Slave节点 包含的组件:

  • Master节点

    • ReadSocketService :来自 Slave节点 的数据。

    • WriteSocketService :到往 Slave节点 的数据。

    • AcceptSocketService :接收 Slave节点 连接。

    • HAConnection

  • Slave节点

    • HAClient :对 Master节点 连接、读写数据。

3.1.3 通信协议

Master节点 与 Slave节点 通信协议很简单,只有如下两条。

对象用途第几位字段数据类型字节数说明
Slave=>Master上报CommitLog已经同步到的物理位置






0maxPhyOffsetLong8CommitLog最大物理位置
Master=>Slave传输新的 CommitLog 数据






0fromPhyOffsetLong8CommitLog开始物理位置


1sizeInt4传输CommitLog数据长度


2bodyBytessize传输CommitLog数据

3.1.4 Slave


  • Slave 主循环,实现了不断不断不断从 Master 传输 CommitLog 数据,上传 Master 自己本地的 CommitLog 已经同步物理位置。

  1: // ⬇️⬇️⬇️【HAClient.java】  2: public void run() {  3:     log.info(this.getServiceName() + " service started");  4:   5:     while (!this.isStopped()) {  6:         try {  7:             if (this.connectMaster()) {  8:                 // 若到满足上报间隔,上报到Master进度  9:                 if (this.isTimeToReportOffset()) { 10:                     boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); 11:                     if (!result) { 12:                         this.closeMaster(); 13:                     } 14:                 } 15:  16:                 this.selector.select(1000); 17:  18:                 // 处理读取事件 19:                 boolean ok = this.proce***eadEvent(); 20:                 if (!ok) { 21:                     this.closeMaster(); 22:                 } 23:  24:                 // 若进度有变化,上报到Master进度 25:                 if (!reportSlaveMaxOffsetPlus()) { 26:                     continue; 27:                 } 28:  29:                 // Master过久未返回数据,关闭连接 30:                 long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; 31:                 if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig() 32:                     .getHaHousekeepingInterval()) { 33:                     log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress 34:                         + "] expired, " + interval); 35:                     this.closeMaster(); 36:                     log.warn("HAClient, master not response some time, so close connection"); 37:                 } 38:             } else { 39:                 this.waitForRunning(1000 * 5); 40:             } 41:         } catch (Exception e) { 42:             log.warn(this.getServiceName() + " service has exception. ", e); 43:             this.waitForRunning(1000 * 5); 44:         } 45:     } 46:  47:     log.info(this.getServiceName() + " service end"); 48: }
  • 第 8 至 14 行 :固定间隔(默认5s)向 Master 上报 Slave 本地 CommitLog 已经同步到的物理位置。该操作还有心跳的作用。

  • 第 16 至 22 行 :处理 Master 传输 Slave 的 CommitLog 数据。


  • 我们来看看 #dispatchReadRequest(...) 与 #reportSlaveMaxOffset(...) 是怎么实现的。

  1: // 【HAClient.java】  2: /**  3:  * 读取Master传输的CommitLog数据,并返回是异常  4:  * 如果读取到数据,写入CommitLog  5:  * 异常原因:  6:  *   1. Master传输来的数据offset 不等于 Slave的CommitLog数据最大offset  7:  *   2. 上报到Master进度失败  8:  *  9:  * @return 是否异常 10:  */ 11: private boolean dispatchReadRequest() { 12:     final int msgHeaderSize = 8 + 4; // phyoffset + size 13:     int readSocketPos = this.byteBufferRead.position(); 14:  15:     while (true) { 16:         // 读取到请求 17:         int diff = this.byteBufferRead.position() - this.dispatchPostion; 18:         if (diff >= msgHeaderSize) { 19:             // 读取masterPhyOffset、bodySize。使用dispatchPostion的原因是:处理数据“粘包”导致数据读取不完整。 20:             long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion); 21:             int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8); 22:             // 校验 Master传输来的数据offset 是否和 Slave的CommitLog数据最大offset 是否相同。 23:             long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); 24:             if (slavePhyOffset != 0) { 25:                 if (slavePhyOffset != masterPhyOffset) { 26:                     log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " 27:                         + slavePhyOffset + " MASTER: " + masterPhyOffset); 28:                     return false; 29:                 } 30:             } 31:             // 读取到消息 32:             if (diff >= (msgHeaderSize + bodySize)) { 33:                 // 写入CommitLog 34:                 byte[] bodyData = new byte[bodySize]; 35:                 this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize); 36:                 this.byteBufferRead.get(bodyData); 37:                 HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); 38:                 // 设置处理到的位置 39:                 this.byteBufferRead.position(readSocketPos); 40:                 this.dispatchPostion += msgHeaderSize + bodySize; 41:                 // 上报到Master进度 42:                 if (!reportSlaveMaxOffsetPlus()) { 43:                     return false; 44:                 } 45:                 // 继续循环 46:                 continue; 47:             } 48:         } 49:  50:         // 空间写满,重新分配空间 51:         if (!this.byteBufferRead.hasRemaining()) { 52:             this.reallocateByteBuffer(); 53:         } 54:  55:         break; 56:     } 57:  58:     return true; 59: } 60:  61: /** 62:  * 上报进度 63:  * 64:  * @param maxOffset 进度 65:  * @return 是否上报成功 66:  */ 67: private boolean reportSlaveMaxOffset(final long maxOffset) { 68:     this.reportOffset.position(0); 69:     this.reportOffset.limit(8); 70:     this.reportOffset.putLong(maxOffset); 71:     this.reportOffset.position(0); 72:     this.reportOffset.limit(8); 73:  74:     for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) { 75:         try { 76:             this.socketChannel.write(this.reportOffset); 77:         } catch (IOException e) { 78:             log.error(this.getServiceName() 79:                 + "reportSlaveMaxOffset this.socketChannel.write exception", e); 80:             return false; 81:         } 82:     } 83:  84:     return !this.reportOffset.hasRemaining(); 85: }

3.1.5 Master

  • ReadSocketService 逻辑同 HAClient#proce***eadEvent(...) 基本相同,我们直接看代码。

  1: // ⬇️⬇️⬇️【ReadSocketService.java】  2: private boolean proce***eadEvent() {  3:     int readSizeZeroTimes = 0;  4:   5:     // 清空byteBufferRead  6:     if (!this.byteBufferRead.hasRemaining()) {  7:         this.byteBufferRead.flip();  8:         this.processPostion = 0;  9:     } 10:  11:     while (this.byteBufferRead.hasRemaining()) { 12:         try { 13:             int readSize = this.socketChannel.read(this.byteBufferRead); 14:             if (readSize > 0) { 15:                 readSizeZeroTimes = 0; 16:  17:                 // 设置最后读取时间 18:                 this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); 19:  20:                 if ((this.byteBufferRead.position() - this.processPostion) >= 8) { 21:                     // 读取Slave 请求来的CommitLog的最大位置 22:                     int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8); 23:                     long readOffset = this.byteBufferRead.getLong(pos - 8); 24:                     this.processPostion = pos; 25:  26:                     // 设置Slave CommitLog的最大位置 27:                     HAConnection.this.slaveAckOffset = readOffset; 28:  29:                     // 设置Slave 第一次请求的位置 30:                     if (HAConnection.this.slaveRequestOffset < 0) { 31:                         HAConnection.this.slaveRequestOffset = readOffset; 32:                         log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset); 33:                     } 34:  35:                     // 通知目前Slave进度。主要用于Master节点为同步类型的。 36:                     HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); 37:                 } 38:             } else if (readSize == 0) { 39:                 if (++readSizeZeroTimes >= 3) { 40:                     break; 41:                 } 42:             } else { 43:                 log.error("read socket[" + HAConnection.this.clientAddr + "] < 0"); 44:                 return false; 45:             } 46:         } catch (IOException e) { 47:             log.error("proce***eadEvent exception", e); 48:             return false; 49:         } 50:     } 51:  52:     return true; 53: }

  • WriteSocketService 计算 Slave开始同步的位置后,不断向 Slave 传输新的 CommitLog数据。

  1: // ⬇️⬇️⬇️【WriteSocketService.java】  2: @Override  3: public void run() {  4:     HAConnection.log.info(this.getServiceName() + " service started");  5:   6:     while (!this.isStopped()) {  7:         try {  8:             this.selector.select(1000);  9:  10:             // 未获得Slave读取进度请求,sleep等待。 11:             if (-1 == HAConnection.this.slaveRequestOffset) { 12:                 Thread.sleep(10); 13:                 continue; 14:             } 15:  16:             // 计算初始化nextTransferFromWhere 17:             if (-1 == this.nextTransferFromWhere) { 18:                 if (0 == HAConnection.this.slaveRequestOffset) { 19:                     long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset(); 20:                     masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMapedFileSizeCommitLog()); 21:                     if (masterOffset < 0) { 22:                         masterOffset = 0; 23:                     } 24:  25:                     this.nextTransferFromWhere = masterOffset; 26:                 } else { 27:                     this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset; 28:                 } 29:  30:                 log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr 31:                     + "], and slave request " + HAConnection.this.slaveRequestOffset); 32:             } 33:  34:             if (this.lastWriteOver) { 35:                 long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; 36:                 if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) { // 心跳 37:  38:                     // Build Header 39:                     this.byteBufferHeader.position(0); 40:                     this.byteBufferHeader.limit(headerSize); 41:                     this.byteBufferHeader.putLong(this.nextTransferFromWhere); 42:                     this.byteBufferHeader.putInt(0); 43:                     this.byteBufferHeader.flip(); 44:  45:                     this.lastWriteOver = this.transferData(); 46:                     if (!this.lastWriteOver) 47:                         continue; 48:                 } 49:             } else { // 未传输完成,继续传输 50:                 this.lastWriteOver = this.transferData(); 51:                 if (!this.lastWriteOver) 52:                     continue; 53:             } 54:  55:             // 选择新的CommitLog数据进行传输 56:             SelectMappedBufferResult selectResult = 57:                 HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere); 58:             if (selectResult != null) { 59:                 int size = selectResult.getSize(); 60:                 if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) { 61:                     size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize(); 62:                 } 63:  64:                 long thisOffset = this.nextTransferFromWhere; 65:                 this.nextTransferFromWhere += size; 66:  67:                 selectResult.getByteBuffer().limit(size); 68:                 this.selectMappedBufferResult = selectResult; 69:  70:                 // Build Header 71:                 this.byteBufferHeader.position(0); 72:                 this.byteBufferHeader.limit(headerSize); 73:                 this.byteBufferHeader.putLong(thisOffset); 74:                 this.byteBufferHeader.putInt(size); 75:                 this.byteBufferHeader.flip(); 76:  77:                 this.lastWriteOver = this.transferData(); 78:             } else { // 没新的消息,挂起等待 79:                 HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100); 80:             } 81:         } catch (Exception e) { 82:  83:             HAConnection.log.error(this.getServiceName() + " service has exception.", e); 84:             break; 85:         } 86:     } 87:  88:     // 断开连接 & 暂停写线程 & 暂停读线程 & 释放CommitLog 89:     if (this.selectMappedBufferResult != null) { 90:         this.selectMappedBufferResult.release(); 91:     } 92:  93:     this.makeStop(); 94:  95:     readSocketService.makeStop(); 96:  97:     haService.removeConnection(HAConnection.this); 98:  99:     SelectionKey sk = this.socketChannel.keyFor(this.selector);100:     if (sk != null) {101:         sk.cancel();102:     }103: 104:     try {105:         this.selector.close();106:         this.socketChannel.close();107:     } catch (IOException e) {108:         HAConnection.log.error("", e);109:     }110: 111:     HAConnection.log.info(this.getServiceName() + " service end");112: }113: 114: /**115:  * 传输数据116:  */117: private boolean transferData() throws Exception {118:     int writeSizeZeroTimes = 0;119:     // Write Header120:     while (this.byteBufferHeader.hasRemaining()) {121:         int writeSize = this.socketChannel.write(this.byteBufferHeader);122:         if (writeSize > 0) {123:             writeSizeZeroTimes = 0;124:             this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();125:         } else if (writeSize == 0) {126:             if (++writeSizeZeroTimes >= 3) {127:                 break;128:             }129:         } else {130:             throw new Exception("ha master write header error < 0");131:         }132:     }133: 134:     if (null == this.selectMappedBufferResult) {135:         return !this.byteBufferHeader.hasRemaining();136:     }137: 138:     writeSizeZeroTimes = 0;139: 140:     // Write Body141:     if (!this.byteBufferHeader.hasRemaining()) {142:         while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {143:             int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());144:             if (writeSize > 0) {145:                 writeSizeZeroTimes = 0;146:                 this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();147:             } else if (writeSize == 0) {148:                 if (++writeSizeZeroTimes >= 3) {149:                     break;150:                 }151:             } else {152:                 throw new Exception("ha master write body error < 0");153:             }154:         }155:     }156: 157:     boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();158: 159:     if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {160:         this.selectMappedBufferResult.release();161:         this.selectMappedBufferResult = null;162:     }163: 164:     return result;165: }

3.1.6 Master_SYNC

  • Producer 发送消息时, Master_SYNC节点 会等待 Slave节点 存储完毕后再返回发送结果。

核心代码如下:

  1: // ⬇️⬇️⬇️【CommitLog.java】  2: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {  3:     // ....省略处理发送代码   4:     // Synchronous write double 如果是同步Master,同步到从节点  5:     if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {  6:         HAService service = this.defaultMessageStore.getHaService();  7:         if (msg.isWaitStoreMsgOK()) {  8:             // Determine whether to wait  9:             if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { 10:                 if (null == request) { 11:                     request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); 12:                 } 13:                 service.putRequest(request); 14:  15:                 // 唤醒WriteSocketService 16:                 service.getWaitNotifyObject().wakeupAll(); 17:  18:                 boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); 19:                 if (!flushOK) { 20:                     log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: " 21:                         + msg.getTags() + " client address: " + msg.getBornHostString()); 22:                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); 23:                 } 24:             } 25:             // Slave problem 26:             else { 27:                 // Tell the producer, slave not available 28:                 putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); 29:             } 30:         } 31:     } 32:  33:     return putMessageResult; 34: }
  • 第 16 行 :唤醒 WriteSocketService

    • 唤醒后, WriteSocketService 挂起等待新消息结束, Master 传输 Slave 新的 CommitLog 数据。

    • Slave 收到数据后,立即上报最新的 CommitLog 同步进度到 Master。 ReadSocketService 唤醒第 18 行: request#waitForFlush(...)

我们来看下 GroupTransferService 的核心逻辑代码:

  1: // ⬇️⬇️⬇️【GroupTransferService.java】  2: private void doWaitTransfer() {  3:     synchronized (this.requestsRead) {  4:         if (!this.requestsRead.isEmpty()) {  5:             for (CommitLog.GroupCommitRequest req : this.requestsRead) {  6:                 // 等待Slave上传进度  7:                 boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();  8:                 for (int i = 0; !transferOK && i < 5; i++) {  9:                     this.notifyTransferObject.waitForRunning(1000); // 唤醒 10:                     transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); 11:                 } 12:  13:                 if (!transferOK) { 14:                     log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); 15:                 } 16:  17:                 // 唤醒请求,并设置是否Slave同步成功 18:                 req.wakeupCustomer(transferOK); 19:             } 20:  21:             this.requestsRead.clear(); 22:         } 23:     } 24: }

3.2 Producer 发送消息

  • Producer 发送消息时,会对 Broker集群 的所有队列进行选择。

核心代码如下:

  1: // ⬇️⬇️⬇️【DefaultMQProducerImpl.java】  2: private SendResult sendDefaultImpl(//  3:     Message msg, //  4:     final CommunicationMode communicationMode, //  5:     final SendCallback sendCallback, //  6:     final long timeout//  7: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {  8:     // .... 省略:处理【校验逻辑】  9:     // 获取 Topic路由信息 10:     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); 11:     if (topicPublishInfo != null && topicPublishInfo.ok()) { 12:         MessageQueue mq = null; // 最后选择消息要发送到的队列 13:         Exception exception = null; 14:         SendResult sendResult = null; // 最后一次发送结果 15:         int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步多次调用 16:         int times = 0; // 第几次发送 17:         String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名 18:         // 循环调用发送消息,直到成功 19:         for (; times < timesTotal; times++) { 20:             String lastBrokerName = null == mq ? null : mq.getBrokerName(); 21:             MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 选择消息要发送到的队列 22:             if (tmpmq != null) { 23:                 mq = tmpmq; 24:                 brokersSent[times] = mq.getBrokerName(); 25:                 try { 26:                     beginTimestampPrev = System.currentTimeMillis(); 27:                     // 调用发送消息核心方法 28:                     sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); 29:                     endTimestamp = System.currentTimeMillis(); 30:                     // 更新Broker可用性信息 31:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); 32:                     // .... 省略:处理【发送返回结果】 33:                     } 34:                 } catch (e) { // .... 省略:处理【异常】 35:                      36:                 } 37:             } else { 38:                 break; 39:             } 40:         } 41:         // .... 省略:处理【发送返回结果】 42:     } 43:     // .... 省略:处理【找不到消息路由】 44: }

如下是调试 #sendDefaultImpl(...)TopicPublishInfo 的结果, Producer 获得到了 broker-a, broker-b两个 Broker分组 的消息队列:

3.3 Consumer 消费消息

  • Consumer 消费消息时,会对 Broker集群 的所有队列进行选择。

4. 总结

©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 数据库中间件 Sharding-JDBC 源码分析 —— 事务(一)之BED
  2. 数据库中间件 Sharding-JDBC 源码分析 —— JDBC实现与读写分离
  3. 数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 改写
  4. 数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 解析(六)
  5. 数据库[分库分表]中间件 Sharding-JDBC 源码分析 —— SQL 解析(
  6. 数据库中间件 Sharding-JDBC 源码分析 —— SQL 解析(三)之查询SQL
  7. 数据库中间件 MyCAT 源码分析 —— SQL ON MongoDB
  8. 数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 路由(一)
  9. 数据库读写分离这个坑,让刚入职的我一脸懵逼!

随机推荐

  1. XML实体扩展攻击代码实例分享
  2. XML开发环境的建立详解
  3. xml模版,自定义按钮背景
  4. 详解lxml处理xml时的字符编码问题
  5. XML概述以及Pull解析的示例代码
  6. 详细介绍Linq读取XML的示例代码
  7. XML的简介与开发详解(图文)
  8. XML文件解析汇总SAX/DOM/PULL的详细介绍
  9. XmlDocument操作xml文档的示例代码
  10. XML解析中的namespace初步了解