老弟问我,RocketMQ 中的 ProcessQueue 怎么理解?

ProcessQueue 是 MessageQueue 的消费快照,可以协助消费者进行消息拉取、消息
首页 新闻资讯 行业资讯 老弟问我,RocketMQ 中的 ProcessQueue 怎么理解?

大家好,我是君哥。

今天来分享 RocketMQ 中一个非常重要又不太好理解的知识点-ProcessQueue。

一句话概括,ProcessQueue 就是 MessageQueue 的消费快照。看下面这张图:

图片

1 ProcessQueue 构建

RocketMQ 客户端启动时,会开启一个 rebalance 线程,代码如下:


复制

//MQClientInstance.javapublic void start() throws MQClientException {
 synchronized (this) {
  switch (this.serviceState) {
   case CREATE_JUST://...// Start rebalance service
    this.rebalanceService.start();
   //...  }
 }}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.


这个线程会不停的做重平衡操作,对 ProcessQueue 进行维护。在重平衡线程类 RebalanceImpl 定义了一个变量 processQueueTable,数据结构如下:

图片

可以看到,在 processQueueTable 这个数据结构上维护了 MessageQueue 和 ProcessQueue 的映射。

下面看一下维护 processQueueTable 的代码:


复制

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
 final boolean isOrder) {
 boolean changed = false;

 Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
 while (it.hasNext()) {
  Entry<MessageQueue, ProcessQueue> next = it.next();
  MessageQueue mq = next.getKey();
  ProcessQueue pq = next.getValue();

  if (mq.getTopic().equals(topic)) {
   if (!mqSet.contains(mq)) {//从processQueueTable上移除   } else if (pq.isPullExpired()) {switch (this.consumeType()) { case CONSUME_ACTIVELY://拉模式
      break; case CONSUME_PASSIVELY://推模式      //从processQueueTable上移除
      break; default:  break;}
   }
  }
 }//创建ProcessQueue并放到processQueueTable
 List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
 for (MessageQueue mq : mqSet) {
  if (!this.processQueueTable.containsKey(mq)) {
   //...
   ProcessQueue pq = new ProcessQueue();

   long nextOffset = -1L;
   try {nextOffset = this.computePullFromWhereWithException(mq);
   } catch (Exception e) {log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);continue;
   }

   if (nextOffset >= 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {//封装好processQueueTable后再创建一个PullRequest进行消息拉取
     log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true;}
   } else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
   }
  }
 }

 this.dispatchPullRequest(pullRequestList);

 return changed;}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

  • 26.

  • 27.

  • 28.

  • 29.

  • 30.

  • 31.

  • 32.

  • 33.

  • 34.

  • 35.

  • 36.

  • 37.

  • 38.

  • 39.

  • 40.

  • 41.

  • 42.

  • 43.

  • 44.

  • 45.

  • 46.

  • 47.

  • 48.

  • 49.

  • 50.

  • 51.

  • 52.

  • 53.

  • 54.

  • 55.

  • 56.

  • 57.

  • 58.

  • 59.

  • 60.

  • 61.

  • 62.

  • 63.

  • 64.

  • 65.

  • 66.


2 拉取消息

上一节中构建 ProcessQueue 后,会再创建一个 PullRequest,这个 PullRequest 封装了 MessageQueue 和 ProcessQueue,创建成功后被放到了 PullMessageService 中的 pullRequestQueue 变量:


复制

//PullMessageService.javaprivate final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();public void executePullRequestImmediately(final PullRequest pullRequest) {
 try {
  this.pullRequestQueue.put(pullRequest);
 } catch (InterruptedException e) {
  log.error("executePullRequestImmediately pullRequestQueue.put", e);
 }}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.


这里以 RocketMQ 的推模式为例,Consumer 拉取到消息后,会进行如下处理:

  1. 对拉取到的消息根据 TAG 再次
    进行过滤;

  2. 更新 PullRequest 下次拉取的偏移量 nextOffset;

  3. 把拉取的消息封装到 ProcessQueue 的 msgTreeMap(

    放到 msgTreeMap 之前首先要获取到写锁 treeMapLock

    );

  4. 封装 ConsumeRequest 进行消息消费;

  5. 封装消息拉取请求再次进行拉取。

代码如下:


复制

//DefaultMQPushConsumerImpl.javapublic void onSuccess(PullResult pullResult) {
 if (pullResult != null) { //1. 对拉取到的消息根据 TAG 再次进行过滤
  pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
   subscriptionData);

  switch (pullResult.getPullStatus()) {
   case FOUND://2. 更新 PullRequest 下次拉取的偏移量 nextOffset
    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else { //3. 把拉取的消息封装到 ProcessQueue 的 msgTreeMap     boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); //4. 封装 ConsumeRequest 进行消息消费
     DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(  pullResult.getMsgFoundList(),  processQueue,  pullRequest.getMessageQueue(),  dispatchToConsume);//5. 封装消息拉取请求
     if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {  DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,   DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else {  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); }}break;
   //...  }
 }}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

  • 26.

  • 27.

  • 28.

  • 29.

  • 30.

  • 31.

  • 32.

  • 33.

  • 34.

  • 35.

  • 36.


3 消费消息

在上一节提到过,拉取到消息后,会把消息封装成一个 ConsumeRequest,这个线程类会调用消费者定义的 MessageListener 进行消费处理。看一下源代码:


复制

//ConsumeMessageConcurrentlyService.ConsumeRequestpublic void run() {
 if (this.processQueue.isDropped()) {
  log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
  return;
 }

 MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
 ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
 ConsumeConcurrentlyStatus status = null;

 try {
  status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
 }//...

 if (!processQueue.isDropped()) {
  ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
 }}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.


消息消费成功后,会调用 processConsumeResult 方法进行结果处理。对于广播模式,发送失败后不会做重试,相当于把消息丢弃,而对于集群模式,消费失败的消息会发送到 Broker 端等待消费者重新拉取进行重试。

消费结果处理完后,消费成功的消息会从 ProcessQueue 的 msgTreeMap 中移除(需要获取到写锁 treeMapLock),同时从 msgTreeMap 中获取最小的 Offset 来更新对应 MessageQueue 的偏移量。这个逻辑可以参考下面代码:


复制

public void processConsumeResult(
 final ConsumeConcurrentlyStatus status,
 final ConsumeConcurrentlyContext context,
 final ConsumeRequest consumeRequest) {
 int ackIndex = context.getAckIndex();

 switch (status) {
  case CONSUME_SUCCESS:
   if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;
   }
   int ok = ackIndex + 1;
   break;
  //... }
 switch (this.defaultMQPushConsumer.getMessageModel()) {
  case BROADCASTING:
   //...
   break;
  case CLUSTERING:
   List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
   for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);//消费失败的,发送回Brokerboolean result = this.sendMessageBack(msg, context);//...   }

   break;
  default:
   break;
 }//从msgTreeMap中移除并返回msgTreeMap第一条消息的offset long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
 if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
  this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
 }}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

  • 26.

  • 27.

  • 28.

  • 29.

  • 30.

  • 31.

  • 32.

  • 33.

  • 34.

  • 35.

  • 36.

  • 37.

  • 38.

  • 39.


4 消费者限流

4.1 缓存消息数量

如果消费者缓存的消息数量大于 RocketMQ 配置的阈值(默认 1000),就会触发延迟拉取,而消费者缓存的消息数量就来自 ProcessQueue,看下面代码:


复制

long cachedMessageCount = processQueue.getMsgCount().get();if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
 return;}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.


4.2 缓存的消息大小

如果消费者缓存的消息大小大于 RocketMQ 配置的阈值(默认 100M),就会触发延迟拉取,而消费者缓存的消息大小就来自 ProcessQueue,看下面代码:


复制

long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
 return;
}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.


4.3 消息间隔

对于普通消息,如果消费偏移量间隔大于配置的阈值(默认 2000),就会触发延迟拉取,而消息间隔就来自 ProcessQueue,看下面代码:


复制

if (!this.consumeOrderly) {
 if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
  return;
 }}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.


4.4 获取锁失败

对于顺序消息,如果获取锁失败,也会触发延迟拉取,而判断获取锁是否成功,也是在 ProcessQueue,看下面代码:


复制

if (processQueue.isLocked()) {
 //...} else {
 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.


5 总结

ProcessQueue 是 MessageQueue 的消费快照,可以协助消费者进行消息拉取、消息消费、更新偏移量、限流。最后,看一下 ProcessQueue 的数据结构:

图片

24    2023-03-14 08:45:25    RocketMQ 消息 消费