大家好,我是君哥。
今天来分享 RocketMQ 中一个非常重要又不太好理解的知识点-ProcessQueue。
一句话概括,ProcessQueue 就是 MessageQueue 的消费快照。看下面这张图:
RocketMQ 客户端启动时,会开启一个 rebalance 线程,代码如下:
复制
//MQClientInstance.javapublic void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST://...// Start rebalance service this.rebalanceService.start(); //... } }}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
这个线程会不停的做重平衡操作,对 ProcessQueue 进行维护。在重平衡线程类 RebalanceImpl 定义了一个变量 processQueueTable,数据结构如下:
可以看到,在 processQueueTable 这个数据结构上维护了 MessageQueue 和 ProcessQueue 的映射。
下面看一下维护 processQueueTable 的代码:
复制
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { boolean changed = false; Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<MessageQueue, ProcessQueue> next = it.next(); MessageQueue mq = next.getKey(); ProcessQueue pq = next.getValue(); if (mq.getTopic().equals(topic)) { if (!mqSet.contains(mq)) {//从processQueueTable上移除 } else if (pq.isPullExpired()) {switch (this.consumeType()) { case CONSUME_ACTIVELY://拉模式 break; case CONSUME_PASSIVELY://推模式 //从processQueueTable上移除 break; default: break;} } } }//创建ProcessQueue并放到processQueueTable List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { //... ProcessQueue pq = new ProcessQueue(); long nextOffset = -1L; try {nextOffset = this.computePullFromWhereWithException(mq); } catch (Exception e) {log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);continue; } if (nextOffset >= 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {//封装好processQueueTable后再创建一个PullRequest进行消息拉取 log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true;} } else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } this.dispatchPullRequest(pullRequestList); return changed;}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
上一节中构建 ProcessQueue 后,会再创建一个 PullRequest,这个 PullRequest 封装了 MessageQueue 和 ProcessQueue,创建成功后被放到了 PullMessageService 中的 pullRequestQueue 变量:
复制
//PullMessageService.javaprivate final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); }}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
这里以 RocketMQ 的推模式为例,Consumer 拉取到消息后,会进行如下处理:
对拉取到的消息根据 TAG 再次
进行过滤;
更新 PullRequest 下次拉取的偏移量 nextOffset;
把拉取的消息封装到 ProcessQueue 的 msgTreeMap(
放到 msgTreeMap 之前首先要获取到写锁 treeMapLock
);
封装 ConsumeRequest 进行消息消费;
封装消息拉取请求再次进行拉取。
代码如下:
复制
//DefaultMQPushConsumerImpl.javapublic void onSuccess(PullResult pullResult) { if (pullResult != null) { //1. 对拉取到的消息根据 TAG 再次进行过滤 pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND://2. 更新 PullRequest 下次拉取的偏移量 nextOffset pullRequest.setNextOffset(pullResult.getNextBeginOffset()); if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else { //3. 把拉取的消息封装到 ProcessQueue 的 msgTreeMap boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); //4. 封装 ConsumeRequest 进行消息消费 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);//5. 封装消息拉取请求 if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); }}break; //... } }}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
在上一节提到过,拉取到消息后,会把消息封装成一个 ConsumeRequest,这个线程类会调用消费者定义的 MessageListener 进行消费处理。看一下源代码:
复制
//ConsumeMessageConcurrentlyService.ConsumeRequestpublic void run() { if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try { status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); }//... if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); }}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
消息消费成功后,会调用 processConsumeResult 方法进行结果处理。对于广播模式,发送失败后不会做重试,相当于把消息丢弃,而对于集群模式,消费失败的消息会发送到 Broker 端等待消费者重新拉取进行重试。
消费结果处理完后,消费成功的消息会从 ProcessQueue 的 msgTreeMap 中移除(需要获取到写锁 treeMapLock),同时从 msgTreeMap 中获取最小的 Offset 来更新对应 MessageQueue 的偏移量。这个逻辑可以参考下面代码:
复制
public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest) { int ackIndex = context.getAckIndex(); switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; break; //... } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: //... break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);//消费失败的,发送回Brokerboolean result = this.sendMessageBack(msg, context);//... } break; default: break; }//从msgTreeMap中移除并返回msgTreeMap第一条消息的offset long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); }}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
如果消费者缓存的消息数量大于 RocketMQ 配置的阈值(默认 1000),就会触发延迟拉取,而消费者缓存的消息数量就来自 ProcessQueue,看下面代码:
复制
long cachedMessageCount = processQueue.getMsgCount().get();if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); return;}
1.
2.
3.
4.
5.
如果消费者缓存的消息大小大于 RocketMQ 配置的阈值(默认 100M),就会触发延迟拉取,而消费者缓存的消息大小就来自 ProcessQueue,看下面代码:
复制
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); return; }
1.
2.
3.
4.
5.
对于普通消息,如果消费偏移量间隔大于配置的阈值(默认 2000),就会触发延迟拉取,而消息间隔就来自 ProcessQueue,看下面代码:
复制
if (!this.consumeOrderly) { if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); return; }}
1.
2.
3.
4.
5.
6.
对于顺序消息,如果获取锁失败,也会触发延迟拉取,而判断获取锁是否成功,也是在 ProcessQueue,看下面代码:
复制
if (processQueue.isLocked()) { //...} else { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}
1.
2.
3.
4.
5.
ProcessQueue 是 MessageQueue 的消费快照,可以协助消费者进行消息拉取、消息消费、更新偏移量、限流。最后,看一下 ProcessQueue 的数据结构: