RocketMQ-没有消费者的消息堆积场景分析

为了便于表达和理解,我们只关注与该问题有关的部分逻辑。因为消息堆积量不断在增加,所以判断该Group
首页 新闻资讯 行业资讯 RocketMQ-没有消费者的消息堆积场景分析

问题描述

029fd6258f33dee6eba13540f922ddfbbfd1d1.png

订阅关系

控制台上没有订阅关系信息:Topic、过滤规则均为空。

957bfe6166759e88e3154729f917c3dbec54b5.png

消费者状态

没有消费者实例信息,消息在不断堆积。

分析过程

初步判断

为了便于表达和理解,我们只关注与该问题有关的部分逻辑。
因为消息堆积量不断在增加,所以判断该Group ID已经在Broker上有了订阅关系,很可能是使用该Group ID的Consumer实例下线后没有取消订阅关系导致的,如图:

139e09e18ae9976975d500a01b95fac0f3dfce.png

初步判断-Consumer未取消订阅关系

正常运行

在正常情况下,控制台上可以看到Group ID的【订阅关系】及【消费者状态】,如图:

b729b8b85adcf9245661042bf0a48eaee496dd.png

正常情况-订阅关系

32e833430d6ac5c7bcd139819aea8a6782bc60.png

正常情况-消费者状态

异常之后

异常之后就变成了【问题描述】中的样子,此时我们不清楚:

  • 该GID订阅了哪个topic

  • 该GID被哪个应用消费者使用后出现的异常

  • 该GID对应的消息生产者是哪个

在以上事情没有弄清楚之前,也不敢对该GID做取消订阅、删除之类的操作。

确定topic

消息堆积是通过消费者的offset信息统计的,该信息存储在Broker上的store/config/consumerOffset.json中,consumerOffset.json格式如图:

097a1a06076bbb417438320b0b9ce2cf5d4981.png

Broker - consumerOffset.json

我们在consumerOffset.json文件中找到了GID对应的topic,此处有个细节(后面代码处有解释):

  • 该GID在groupTopicMap中没有重试队列Topic

  • 该GID在offsetTable中没有重试队列Topic上的offset

确定Producer

通过Topic查询Message

f1781d1220825176414158d3118155ada90f11.png

查询消息

通过MessageID确定ECS IP

通过上面的查询无法直接定位到ECS,我们可以通过Message ID计算出ECS IP,方法如下:

复制

String ip = MessageClientIDSetter.getIPStrFromID(Message ID)
  • 1.

如果懒得写代码,也可以使用arthas来查询:

52eab2789fb57a859464911ee799cefadb186b.png

arthas查询消息

此时整个链路逐渐清晰起来了,还缺少最关键的Consumer信息。

确定Consumer

代码Review

查询了近期发版的所有代码,没有找到与该GID相关的信息。

Broker端找线索

我们试图通过Broker端的日志来确认两件事情:

  • 该GID的Consumer在什么时候从哪些IP建立了与Broker的交互

  • 该GID的Consumer在什么时候从哪些IP断开了与Broker的交互

Broker heartBeat

b9588202293921d859d096a481fa4a063b2f1d.png

Broker - Consumer心跳处理逻辑

通过以上代码打印的日志,我们可以过滤出该GID与Broker建立交互时候的相关信息。

Broker unregisterClient

在Consumer实例shutdown的时候,会向Broker发送unregisterClient请求,会调用ConsumerManager中相应的unregisterConsumer方法:

662eabc477942b8446a2524e75d1f35543c960.png

Broker - Consumer取消注册

通过以上代码打印的日志,我们可以过滤出该GID与Broker断开交互时候的相关信息。

理想是美好的,现实是残酷的Broker端最多保留了不到2天的日志,所以这条路也走不通了。

柳暗花明

同时我们也在想:除了程序,还有其他途径变更这种订阅关系吗?答案是有的。

命令行

568718789da2b3eb7bc27507abc108b0084079.png

命令方式-重置消费位点

控制台

f9730a2849fdb4ec6b582436f29942bfffc7a9.png

控制台 - 重置消费位点

到这里估计您已经知道引起这次消息堆积的原因了。

经验总结

  • 完善监控告警、提高应急响应能力

  • 最小权限原则

  • RocketMQ控制台是否应该增加操作记录的功能?