订阅关系
控制台上没有订阅关系信息:Topic、过滤规则均为空。
消费者状态
没有消费者实例信息,消息在不断堆积。
为了便于表达和理解,我们只关注与该问题有关的部分逻辑。
因为消息堆积量不断在增加,所以判断该Group ID已经在Broker上有了订阅关系,很可能是使用该Group ID的Consumer实例下线后没有取消订阅关系导致的,如图:
初步判断-Consumer未取消订阅关系
在正常情况下,控制台上可以看到Group ID的【订阅关系】及【消费者状态】,如图:
正常情况-订阅关系
正常情况-消费者状态
异常之后就变成了【问题描述】中的样子,此时我们不清楚:
该GID订阅了哪个topic
该GID被哪个应用消费者使用后出现的异常
该GID对应的消息生产者是哪个
在以上事情没有弄清楚之前,也不敢对该GID做取消订阅、删除之类的操作。
消息堆积是通过消费者的offset信息统计的,该信息存储在Broker上的store/config/consumerOffset.json中,consumerOffset.json格式如图:
Broker - consumerOffset.json
我们在consumerOffset.json文件中找到了GID对应的topic,此处有个细节(后面代码处有解释):
该GID在groupTopicMap中没有重试队列Topic
该GID在offsetTable中没有重试队列Topic上的offset
查询消息
通过上面的查询无法直接定位到ECS,我们可以通过Message ID计算出ECS IP,方法如下:
复制
String ip = MessageClientIDSetter.getIPStrFromID(Message ID)
1.
如果懒得写代码,也可以使用arthas来查询:
arthas查询消息
此时整个链路逐渐清晰起来了,还缺少最关键的Consumer信息。
查询了近期发版的所有代码,没有找到与该GID相关的信息。
我们试图通过Broker端的日志来确认两件事情:
该GID的Consumer在什么时候从哪些IP建立了与Broker的交互
该GID的Consumer在什么时候从哪些IP断开了与Broker的交互
Broker - Consumer心跳处理逻辑
通过以上代码打印的日志,我们可以过滤出该GID与Broker建立交互时候的相关信息。
在Consumer实例shutdown的时候,会向Broker发送unregisterClient请求,会调用ConsumerManager中相应的unregisterConsumer方法:
Broker - Consumer取消注册
通过以上代码打印的日志,我们可以过滤出该GID与Broker断开交互时候的相关信息。
理想是美好的,现实是残酷的Broker端最多保留了不到2天的日志,所以这条路也走不通了。
同时我们也在想:除了程序,还有其他途径变更这种订阅关系吗?答案是有的。
命令方式-重置消费位点
控制台 - 重置消费位点
到这里估计您已经知道引起这次消息堆积的原因了。
完善监控告警、提高应急响应能力
最小权限原则
RocketMQ控制台是否应该增加操作记录的功能?