RocketMQ 源码分析 —— Message 拉取与消费(下)
16lz
2021-01-22
摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 「芋道源码」欢迎转载,保留摘要,谢谢!
- 1、概述
- 2、Consumer
- 3、PushConsumer 一览
- 4、PushConsumer 订阅
- 5、PushConsumer 消息队列分配
- 5、PushConsumer 消费进度读取
- 6、PushConsumer 拉取消息
- 6、PushConsumer 消费消息
- 7、PushConsumer 发回消费失败消息
- 8、Consumer 消费进度
- 9、结尾
阅读源码最好的方式,是使用 IDEA 进行调试 RocketMQ 源码,不然会一脸懵逼。
胖友可以点击「芋道源码」扫码关注,回复 git001 关键字
获得艿艿添加了中文注释的 RocketMQ 源码地址。阅读源码很孤单,加入源码交流群,一起坚持!
1、概述
本文接:《RocketMQ 源码分析 —— Message 拉取与消费(上)》。
主要解析 Consumer
在 消费 逻辑涉及到的源码。
2、Consumer
MQ 提供了两类消费者:
- PushConsumer:
- 在大多数场景下使用。
- 名字虽然是
Push
开头,实际在实现时,使用Pull
方式实现。通过Pull
不断不断不断轮询Broker
获取消息。当不存在新消息时,Broker
会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和Broker
主动Push
做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询(Long-Polling
)。
- PullConsumer
本文主要讲解PushConsumer
,部分讲解PullConsumer
,跳过顺序消费
。
本文主要讲解PushConsumer
,部分讲解PullConsumer
,跳过顺序消费
。
本文主要讲解PushConsumer
,部分讲解PullConsumer
,跳过顺序消费
。
3、PushConsumer 一览
先看一张 PushConsumer
包含的组件以及组件之间的交互图:
PushConsumer手绘图.png
RebalanceService
:均衡消息队列服务,负责分配当前Consumer
可消费的消息队列(MessageQueue
)。当有新的Consumer
的加入或移除,都会重新分配消息队列。PullMessageService
:拉取消息服务,不断不断不断从Broker
拉取消息,并提交消费任务到ConsumeMessageService
。ConsumeMessageService
:消费消息服务,不断不断不断消费消息,并处理消费结果。RemoteBrokerOffsetStore
:Consumer
消费进度管理,负责从Broker
获取消费进度,同步消费进度到Broker
。ProcessQueue
:消息处理队列。MQClientInstance
:封装对Namesrv
,Broker
的 API调用,提供给Producer
、Consumer
使用。
4、PushConsumer 订阅
DefaultMQPushConsumerImpl#subscribe(...)
1: public void subscribe(String topic, String subExpression) throws MQClientException {
2: try {
3: // 创建订阅数据
4: SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
5: topic, subExpression);
6: this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
7: // 通过心跳同步Consumer信息到Broker
8: if (this.mQClientFactory != null) {
9: this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
10: }
11: } catch (Exception e) {
12: throw new MQClientException("subscription exception", e);
13: }
14: }
- 说明 :订阅
Topic
。 - 第 3 至 6 行 :创建订阅数据。详细解析见:FilterAPI.buildSubscriptionData(...)。
- 第 7 至 10 行 :通过心跳同步
Consumer
信息到Broker
。
FilterAPI.buildSubscriptionData(...)
1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
2: String subString) throws Exception {
3: SubscriptionData subscriptionData = new SubscriptionData();
4: subscriptionData.setTopic(topic);
5: subscriptionData.setSubString(subString);
6: // 处理订阅表达式
7: if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
8: subscriptionData.setSubString(SubscriptionData.SUB_ALL);
9: } else {
10: String[] tags = subString.split("\\|\\|");
11: if (tags.length > 0) {
12: for (String tag : tags) {
13: if (tag.length() > 0) {
14: String trimString = tag.trim();
15: if (trimString.length() > 0) {
16: subscriptionData.getTagsSet().add(trimString);
17: subscriptionData.getCodeSet().add(trimString.hashCode());
18: }
19: }
20: }
21: } else {
22: throw new Exception("subString split error");
23: }
24: }
25:
26: return subscriptionData;
27: }
- 说明 :根据
Topic
和 订阅表达式 创建订阅数据 - subscriptionData.subVersion = System.currentTimeMillis()。
DefaultMQPushConsumer#registerMessageListener(...)
1: public void registerMessageListener(MessageListenerConcurrently messageListener) {
2: this.messageListener = messageListener;
3: this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
4: }
- 说明 :注册消息监听器。
5、PushConsumer 消息队列分配
RebalanceService&PushConsumer分配队列
RebalanceService
1: public class RebalanceService extends ServiceThread {
2:
3: /**
4: * 等待间隔,单位:毫秒
5: */
6: private static long waitInterval =
7: Long.parseLong(System.getProperty(
8: "rocketmq.client.rebalance.waitInterval", "20000"));
9:
10: private final Logger log = ClientLogger.getLog();
11: /**
12: * MQClient对象
13: */
14: private final MQClientInstance mqClientFactory;
15:
16: public RebalanceService(MQClientInstance mqClientFactory) {
17: this.mqClientFactory = mqClientFactory;
18: }
19:
20: @Override
21: public void run() {
22: log.info(this.getServiceName() + " service started");
23:
24: while (!this.isStopped()) {
25: this.waitForRunning(waitInterval);
26: this.mqClientFactory.doRebalance();
27: }
28:
29: log.info(this.getServiceName() + " service end");
30: }
31:
32: @Override
33: public String getServiceName() {
34: return RebalanceService.class.getSimpleName();
35: }
36: }
- 说明 :均衡消息队列服务,负责分配当前
Consumer
可消费的消息队列(MessageQueue
)。 - 第 26 行 :调用
MQClientInstance#doRebalance(...)
分配消息队列。目前有三种情况情况下触发:- 如
第 25 行
等待超时,每 20s 调用一次。 PushConsumer
启动时,调用rebalanceService#wakeup(...)
触发。Broker
通知Consumer
加入 或 移除时,Consumer
响应通知,调用rebalanceService#wakeup(...)
触发。
- 如
详细解析见:MQClientInstance#doRebalance(...)。
MQClientInstance#doRebalance(...)
1: public void doRebalance() {
2: for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
3: MQConsumerInner impl = entry.getValue();
4: if (impl != null) {
5: try {
6: impl.doRebalance();
7: } catch (Throwable e) {
8: log.error("doRebalance exception", e);
9: }
10: }
11: }
12: }
- 说明 :遍历当前
Client
包含的consumerTable
(Consumer
集合 ),执行消息队列分配。 疑问:目前代码调试下来,
consumerTable
只包含Consumer
自己。AllocateMessageQueueStrategy类图DefaultMQPushConsumerImpl拉取消息DefaultMQPushConsumerImpl消费消息OffsetStore类图.png
- 有大大对这个疑问有解答的,烦请解答下。
- 第 6 行 :调用
MQConsumerInner#doRebalance(...)
进行队列分配。DefaultMQPushConsumerImpl
、DefaultMQPullConsumerImpl
分别对该接口方法进行了实现。DefaultMQPushConsumerImpl#doRebalance(...)
详细解析见:DefaultMQPushConsumerImpl#doRebalance(...)。 DefaultMQPushConsumerImpl#doRebalance(...)
1: public void doRebalance() {
2: if (!this.pause) {
3: this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
4: }
5: }- 说明:执行消息队列分配。
- 第 3 行 :调用
RebalanceImpl#doRebalance(...)
进行队列分配。详细解析见:RebalancePushImpl#doRebalance(...)。 RebalanceImpl#doRebalance(...)
1: /**
2: * 执行分配消息队列
3: *
4: * @param isOrder 是否顺序消息
5: */
6: public void doRebalance(final boolean isOrder) {
7: // 分配每个 topic 的消息队列
8: Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
9: if (subTable != null) {
10: for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
11: final String topic = entry.getKey();
12: try {
13: this.rebalanceByTopic(topic, isOrder);
14: } catch (Throwable e) {
15: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
16: log.warn("rebalanceByTopic Exception", e);
17: }
18: }
19: }
20: }
21: // 移除未订阅的topic对应的消息队列
22: this.truncateMessageQueueNotMyTopic();
23: }
24:
25: /**
26: * 移除未订阅的消息队列
27: */
28: private void truncateMessageQueueNotMyTopic() {
29: Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
30: for (MessageQueue mq : this.processQueueTable.keySet()) {
31: if (!subTable.containsKey(mq.getTopic())) {
32:
33: ProcessQueue pq = this.processQueueTable.remove(mq);
34: if (pq != null) {
35: pq.setDropped(true);
36: log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
37: }
38: }
39: }
40: }#doRebalance(...)
说明 :执行分配消息队列。- 第 7 至 20 行 :循环订阅主题集合(
subscriptionInner
),分配每一个Topic
的消息队列。 - 第 22 行 :移除未订阅的
Topic
的消息队列。
- 第 7 至 20 行 :循环订阅主题集合(
#truncateMessageQueueNotMyTopic(...)
说明 :移除未订阅的消息队列。当调用DefaultMQPushConsumer#unsubscribe(topic)
时,只移除订阅主题集合(subscriptionInner
),对应消息队列移除在该方法。RebalanceImpl#rebalanceByTopic(...)
// ... 文章过长,超过微信限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 阅读
#rebalanceByTopic(...)
说明 :分配Topic
的消息队列。- 第 21 至 40 行 :获取
Topic
对应的消息队列和消费者们,并对其进行排序。因为各Consumer
是在本地分配消息队列,排序后才能保证各Consumer
顺序一致。 - 第 42 至 61 行 :根据 队列分配策略(
AllocateMessageQueueStrategy
) 分配消息队列。详细解析见:AllocateMessageQueueStrategy。 - 第 63 至 72 行 :更新
Topic
对应的消息队列。 - 第 3 至 19 行 :广播模式(
BROADCASTING
) 下,分配Topic
对应的所有消息队列。 - 第 20 至 74 行 :集群模式(
CLUSTERING
) 下,分配Topic
对应的部分消息队列。
- 第 21 至 40 行 :获取
#updateProcessQueueTableInRebalance(...)
说明 :当分配队列时,更新Topic
对应的消息队列,并返回是否有变更。- 第 132 至 135 行 :
顺序消费
相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。 - 第 137 行 :移除消息队列的消费进度。
- 第 139 行 :获取队列消费进度。详细解析见:RebalancePushImpl#computePullFromWhere(...)。
- 第 140 至 156 行 :添加新消费处理队列,添加消费拉取消息请求。
- 第 103 行 :移除不需要的消息队列。详细解析见:RebalancePushImpl#removeUnnecessaryMessageQueue(...)。
- 第 108 至 120 行 :队列拉取超时,即
当前时间 - 最后一次拉取消息时间 > 120s
( 120s 可配置),判定发生 BUG,过久未进行消息拉取,移除消息队列。移除后,下面**#新增队列逻辑#**可以重新加入新的该消息队列。 - 第 93 至 126 行 :移除不存在于分配的消息队列(
mqSet
) 的 消息处理队列(processQueueTable
)。 - 第 128 至 158 行 :增加 分配的消息队列(
mqSet
) 新增的消息队列。 - 第 161 行 :发起新增的消息队列消息拉取请求。详细解析见:RebalancePushImpl#dispatchPullRequest(...)。
- 第 132 至 135 行 :
RebalanceImpl#removeUnnecessaryMessageQueue(...)
RebalancePushImpl#removeUnnecessaryMessageQueue(...)
1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
2: // 同步队列的消费进度,并移除之。
3: this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
4: this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
5: // TODO 顺序消费
6: if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
7: && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
8: try {
9: if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
10: try {
11: return this.unlockDelay(mq, pq);
12: } finally {
13: pq.getLockConsume().unlock();
14: }
15: } else {
16: log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
17: mq, //
18: pq.getTryUnlockTimes());
19:
20: pq.incTryUnlockTimes();
21: }
22: } catch (Exception e) {
23: log.error("removeUnnecessaryMessageQueue Exception", e);
24: }
25:
26: return false;
27: }
28: return true;
29: }- 说明 :移除不需要的消息队列相关的信息,并返回是否移除成功。
- 第 2 至 4 行 :同步队列的消费进度,并移除之。
- 第 5 至 27 行 :
顺序消费
相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。 [PullConsumer]
RebalancePullImpl#removeUnnecessaryMessageQueue(...)1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
2: this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
3: this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
4: return true;
5: }- 说明 :移除不需要的消息队列相关的信息,并返回移除成功。和
RebalancePushImpl#removeUnnecessaryMessageQueue(...)
基本一致。 RebalancePushImpl#dispatchPullRequest(...)
1: public void dispatchPullRequest(List<PullRequest> pullRequestList) {
2: for (PullRequest pullRequest : pullRequestList) {
3: this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
4: log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
5: }
6: }- 说明 :发起消息拉取请求。该调用是
PushConsumer
不断不断不断拉取消息的起点。 DefaultMQPushConsumerImpl#executePullRequestImmediately(...)
1: public void executePullRequestImmediately(final PullRequest pullRequest) {
2: this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
3: }- 说明 :提交拉取请求。提交后,
PullMessageService
异步执行,非阻塞。详细解析见:PullMessageService。 AllocateMessageQueueStrategy
AllocateMessageQueueAveragely
1: public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
2: private final Logger log = ClientLogger.getLog();
3:
4: @Override
5: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
6: List<String> cidAll) {
7: // 校验参数是否正确
8: if (currentCID == null || currentCID.length() < 1) {
9: throw new IllegalArgumentException("currentCID is empty");
10: }
11: if (mqAll == null || mqAll.isEmpty()) {
12: throw new IllegalArgumentException("mqAll is null or mqAll empty");
13: }
14: if (cidAll == null || cidAll.isEmpty()) {
15: throw new IllegalArgumentException("cidAll is null or cidAll empty");
16: }
17:
18: List<MessageQueue> result = new ArrayList<>();
19: if (!cidAll.contains(currentCID)) {
20: log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
21: consumerGroup,
22: currentCID,
23: cidAll);
24: return result;
25: }
26: // 平均分配
27: int index = cidAll.indexOf(currentCID); // 第几个consumer。
28: int mod = mqAll.size() % cidAll.size(); // 余数,即多少消息队列无法平均分配。
29: int averageSize =
30: mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
31: + 1 : mqAll.size() / cidAll.size());
32: int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 有余数的情况下,[0, mod) 平分余数,即每consumer多分配一个节点;第index开始,跳过前mod余数。
33: int range = Math.min(averageSize, mqAll.size() - startIndex); // 分配队列数量。之所以要Math.min()的原因是,mqAll.size() <= cidAll.size(),部分consumer分配不到消息队列。
34: for (int i = 0; i < range; i++) {
35: result.add(mqAll.get((startIndex + i) % mqAll.size()));
36: }
37: return result;
38: }
39:
40: @Override
41: public String getName() {
42: return "AVG";
43: }
44: }- 说明 :平均分配队列策略。
- 第 7 至 25 行 :参数校验。
- 第 26 至 36 行 :平均分配消息队列。
[ 0, mod )
:mqAll.size() / cidAll.size() + 1
。前面mod
个Consumer
平分余数,多获得 1 个消息队列。[ mod, cidAll.size() )
:mqAll.size() / cidAll.size()
。- 第 27 行 :
index
:当前Consumer
在消费集群里是第几个。这里就是为什么需要对传入的cidAll
参数必须进行排序的原因。如果不排序,Consumer
在本地计算出来的index
无法一致,影响计算结果。 - 第 28 行 :
mod
:余数,即多少消息队列无法平均分配。 - 第 29 至 31 行 :
averageSize
:代码可以简化成(mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size())
。 - 第 32 行 :
startIndex
:Consumer
分配消息队列开始位置。 - 第 33 行 :
range
:分配队列数量。之所以要Math#min(...)
的原因:当mqAll.size() <= cidAll.size()
时,最后几个Consumer
分配不到消息队列。 - 第 34 至 36 行 :生成分配消息队列结果。
- 举个例子:
固定消息队列长度为4。
Consumer * 2 可以整除 Consumer * 3 不可整除 Consumer * 5 无法都分配 消息队列[0] Consumer[0] Consumer[0] Consumer[0] 消息队列[1] Consumer[0] Consumer[0] Consumer[1] 消息队列[2] Consumer[1] Consumer[1] Consumer[2] 消息队列[3] Consumer[1] Consumer[2] Consumer[3] AllocateMessageQueueByMachineRoom
1: public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
2: /**
3: * 消费者消费brokerName集合
4: */
5: private Set<String> consumeridcs;
6:
7: @Override
8: public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
9: List<String> cidAll) {
10: // 参数校验
11: List<MessageQueue> result = new ArrayList<MessageQueue>();
12: int currentIndex = cidAll.indexOf(currentCID);
13: if (currentIndex < 0) {
14: return result;
15: }
16: // 计算符合当前配置的消费者数组('consumeridcs')对应的消息队列
17: List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
18: for (MessageQueue mq : mqAll) {
19: String[] temp = mq.getBrokerName().split("@");
20: if (temp.length == 2 && consumeridcs.contains(temp[0])) {
21: premqAll.add(mq);
22: }
23: }
24: // 平均分配
25: int mod = premqAll.size() / cidAll.size();
26: int rem = premqAll.size() % cidAll.size();
27: int startIndex = mod * currentIndex;
28: int endIndex = startIndex + mod;
29: for (int i = startIndex; i < endIndex; i++) {
30: result.add(mqAll.get(i));
31: }
32: if (rem > currentIndex) {
33: result.add(premqAll.get(currentIndex + mod * cidAll.size()));
34: }
35: return result;
36: }
37:
38: @Override
39: public String getName() {
40: return "MACHINE_ROOM";
41: }
42:
43: public Set<String> getConsumeridcs() {
44: return consumeridcs;
45: }
46:
47: public void setConsumeridcs(Set<String> consumeridcs) {
48: this.consumeridcs = consumeridcs;
49: }
50: }- 说明 :平均分配可消费的
Broker
对应的消息队列。 - 第 7 至 15 行 :参数校验。
- 第 16 至 23 行 :计算可消费的
Broker
对应的消息队列。 - 第 25 至 34 行 :平均分配消息队列。该平均分配方式和
AllocateMessageQueueAveragely
略有不同,其是将多余的结尾部分分配给前rem
个Consumer
。 - 疑问:使用该分配策略时,
Consumer
和Broker
分配需要怎么配置。©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任更多相关文章
- 面试官再问我如何保证 RocketMQ 不丢失消息,这回我笑了!
- RocketMQ 源码分析 —— 定时消息与消息重试
- 消息中间件 RocketMQ 源码解析 —— 调试环境搭建
- 分布式消息队列 RocketMQ源码解析:事务消息
- 分布式消息队列 RocketMQ源码解析:Filtersrv
- 分布式消息队列 RocketMQ 源码分析 —— 定时消息与消息重试
- 分布式消息队列 RocketMQ 源码分析 —— 高可用
- 分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)
- 分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)
随机推荐