RabbitMQ 如何实现延迟队列?

实现 RabbitMQ 延迟队列目前主流的实现方式,是采用官方提供的延迟插件来实现。而延迟插件需要先
首页 新闻资讯 行业资讯 RabbitMQ 如何实现延迟队列?

b9e0240765bf018e46156539b67eca8bf51cc0.png

延迟队列是指当消息被发送以后,并不是立即执行,而是等待特定的时间后,消费者才会执行该消息。延迟队列的使用场景有以下几种:

  1. 未按时支付的订单,30 分钟过期之后取消订单。

  2. 给活跃度比较低的用户间隔 N 天之后推送消息,提高活跃度。

  3. 新注册会员的用户,等待几分钟之后发送欢迎邮件等。

一、如何实现延迟队列?

延迟队列有以下两种实现方式:

  • 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能。

  • 使用官方提供的延迟插件实现延迟功能。

早期,大部分公司都会采用第一种方式,而随着 RabbitMQ 3.5.7(2015 年底发布)的延迟插件的发布,因为其使用更简单、更方便,所以它现在才是大家普通会采用的,实现延迟队列的方式,所以本文也只讲第二种方式。

二、实现延迟队列

1、安装并启动延迟队列

(1)下载延迟插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

注意:需要根据你自己的 RabbitMQ 服务器端版本选择相同版本的延迟插件,可以在 RabbitMQ 控制台查看:

b4892750810f0fc26b3302860f2a2ff4180447.jpg

57e764130c4bc31390f443fe66322a4d748bd3.jpg

(2)将插件放到插件目录

接下来,将上一步下载的插件放到 RabbitMQ 服务器安装目录,如果是 docker,使用一下命令复制:

docker cp 宿主机文件 容器名称或ID:容器目录。

如下图所示:

546c65c3600fa0013e350846c8c97e8daa8f41.jpg

之后,进入 docker 容器,查看插件中是否包含延迟队列:

docker exec -it 容器名称或ID /bin/bash rabbitmq-plugins list

如下图所示:

430001e770cd56d098743298b8e8e354ce850a.jpg

(3)启动插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

如下图所示:

7456392404e4a99af70424acf2ef01f8481825.jpg

(4)重启RabbitMQ服务

安装完 RabbitMQ 插件之后,需要重启 RabbitMQ 服务才能生效。如果使用的是 Docker,只需要重启 Docker 容器即可:

docker restart 容器名称或ID

如下图所示:

c181147284ce2348c89464cc19662e72922d9a.jpg

(5)验收结果

在 RabbitMQ 控制台查看,新建交换机时是否有延迟消息选项,如果有就说明延迟消息插件已经正常运行了,如下图所示:

55e415e413fb9ddd1ac72688bf77b0bc68e7a9.jpg

(6)手动创建延迟交换器(可选)

此步骤可选(非必须),因为某些版本下通过程序创建延迟交换器可能会出错,如果出错了,手动创建延迟队列即可,如下图所示:

a3720b073b962450b739501b61bc97d2c79adc.jpg

2、编写延迟消息实现代码

(1)配置交换器和队列

importorg.springframework.context.annotation.Configuration;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;/**
 * 延迟交换器和队列
 */@ConfigurationpublicclassDelayedExchangeConfig{publicstaticfinalStringEXCHANGE_NAME="myDelayedExchange";publicstaticfinalStringQUEUE_NAME="delayed.queue";publicstaticfinalStringROUTING_KEY="delayed.routing.key";@BeanpublicCustomExchangedelayedExchange(){returnnewCustomExchange(EXCHANGE_NAME,"x-delayed-message",// 消息类型true,// 是否持久化false);// 是否自动删除}@BeanpublicQueuedelayedQueue(){returnQueueBuilder.durable(QUEUE_NAME).withArgument("x-delayed-type","direct").build();}@BeanpublicBindingdelayedBinding(QueuedelayedQueue,CustomExchangedelayedExchange){returnBindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();}}

(2)定义消息发送方法

importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.scheduling.annotation.Scheduled;importorg.springframework.stereotype.Component;@ComponentpublicclassDelayedMessageProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;@Scheduled(fixedDelay=5000)publicvoidsendDelayedMessage(Stringmessage){rabbitTemplate.convertAndSend(DelayedExchangeConfig.EXCHANGE_NAME,DelayedExchangeConfig.ROUTING_KEY,message,messagePostProcessor->{messagePostProcessor.getMessageProperties().setDelay(10000);// 设置延迟时间,单位毫秒returnmessagePostProcessor;});}}

(3)发送延迟消息

importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/delayed")publicclassDelayedMessageController{@AutowiredprivateDelayedMessageProducerdelayedMessageProducer;@GetMapping("/send")publicStringsendDirectMessage(@RequestParamStringmessage){delayedMessageProducer.sendDelayedMessage(message);return"Delayed message sent to Exchange: "+message;}}

(4)接收延迟消息

importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassDelayedMessageConsumer{@RabbitListener(queues=DelayedExchangeConfig.QUEUE_NAME)publicvoidreceiveDelayedMessage(Stringmessage){System.out.println("Received delayed message: "+message);}}

PS:获取本文延迟队列的实现 Demo,请加我:GG_Stone【备注:延迟队列】

小结

实现 RabbitMQ 延迟队列目前主流的实现方式,是采用官方提供的延迟插件来实现。而延迟插件需要先下载插件、然后配置并重启 RabbitMQ 服务,之后就可以通过编写代码的方式实现延迟队列了。

18    2023-09-05 15:48:14    RabbitMQ 延迟队列