环境:SpringBoot2.7.9
生产者丢失消息
生产者发出的数据由于网络原因没有到底MQ Server丢失
MQ Server丢消息
由于消息队列没有持久化或者是消息没有持久化,在Server重启后消息丢失
消费者丢消息
接收到消息后,业务还没有处理完成,服务宕机(当你是自动ACK)。
通过事务(不推荐)
确认机制(推荐)
这里只讲如何通过确认机制保证生产者不丢失消息
引入依赖
复制
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
1.
2.
3.
4.
声明交换机及队列
复制
@Beanpublic TopicExchange topicExchange() { return new TopicExchange("akf.exchange", true, false) ; }@Beanpublic Queue queue() { return new Queue("akf.queue", true, false, false) ; }@Beanpublic Binding binding() { return BindingBuilder.bind(queue()).to(topicExchange()).with("akf.#") ; }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
RabbitMQ配置
复制
spring: rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtualHost: testpublisherConfirmType: correlated publisherReturns: truetemplate: mandatory: true
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
注意:spring.rabbitmq.publisher-confirm-type及spring.rabbitmq.publisher-returns 的配置值。
接下来是为RabbitTemplate配置对应的Callback,Publisher确认回调,Publisher返回回调。
确认回调
当消息发送到了交换机则ack=true,当消息无法发送到交换机则ack=false。
返回回调
当消息能够发送到交换机,但是不能路由到队列则会调用该return回调。
RabbitTemplate是单例的可以通过两种方式配置对应的回调。
自定义RabbitTemplate。
通过AWare接口获取RabbitTemplate配置。
这里只讲通过AWare接口配置回调。
配置Callback
复制
@Componentpublic class ConfigRabbitTemplate implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext context) throws BeansException {RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class) ;rabbitTemplate.setConfirmCallback(new ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("correlation: " + correlationData) ;if (ack) { System.out.println("消息发送到交换机") ; } else { System.out.println("消息发送失败 - " + ", cause" + cause) ; } } });rabbitTemplate.setReturnsCallback(new ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) {System.out.println(returned.getExchange() + ", " + returned.getRoutingKey() + ", " + returned.getReplyCode() + ", " + returned.getMessage().toString()) ; } }); } }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
使用错误的交换机和错误的路由key分别测试即可以看到上面的输出信息了。
在通过@Bean声明交换机和队列时设置持久性,在消息上设置持久化。
复制
@Beanpublic TopicExchange topicExchange() { // 这里的第二个参数就是设置是否持久化,如果设置为false,当服务重启交换机将丢失 // 第三个参数是否自动删除,当不再使用该交换机时会自动删除该交换机 return new TopicExchange("akf.exchange", true, false) ; }@Beanpublic Queue queue() { // 第二个参数true设置队列是持久化的,当服务重启队列不会丢失 return new Queue("akf.queue", true, false, false) ; }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
设置消息持久化。
复制
Message message = MessageBuilder.withBody("Hello".getBytes()) // 设置消息投递模式为持久化的(默认不设置就是持久化的) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build() ;
1.
2.
3.
4.
关闭自动应答机制。
默认是自动应答,当消息监听方法中没有异常时则正常应答,当发生异常时,在默认情况下会重新入队列(这样就会出现死循环)。
复制
spring: rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtualHost: testpublisherConfirmType: correlatedpublisherReturns: truelistener: simple:acknowledgeMode: manual #设置为手动应答
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
消息监听。
复制
@RabbitListener(queues = {"akf.queue"})public void onMessage(Message message, Channel channel) throws Exception { try {System.out.println("接收到消息: " + new String(message.getBody()));// ... 这里处理我们的业务代码// 当消费者把消息消费成功,再手动应答RabbitMQchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) {// 如果发生了异常,我们一般的处理是直接扔掉死信队列,一般这里出现错误都是消息有问题// 如果消息出现问题,你重试再入队列是无意义的 } }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
如果消息消费时出现错误,你又希望能够通过重试来尽可能的处理掉该消息,Spring也提供了相应的重试机制。
修改配置:
复制
spring: rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtualHost: testpublisherConfirmType: correlatedpublisherReturns: truelistener: simple:acknowledgeMode: autoconcurrency: 1retry: # 开启重试 enabled: true # 延迟1s后开始重试 initialInterval: 1000 # 每次消息重试的间隔乘数 multiplier: 3 # 2次间的重试最大间隔时间 maxInterval: 20000 maxAttempts: 4 #重试4次,1s, 3s, 9s stateless: true #如果消息处理中存在事务则需要将其设置为false
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
如果只是做上面的配置,重试指定次数后消息将会被丢弃,这是默认行为。Spring提供了 MessageRecoverer接口来决定消息如何处理。默认Spring提供如下几种实现:
ImmediateRequeueMessageRecoverer
RejectAndDontRequeueRecoverer
RepublishMessageRecoverer
我们只需要定义一个Bean为MessageRecoverer即可,这里我们就用Spring提供的RepublishMessageRecoverer重新发布消息。
复制
@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error") ; }
1.
2.
3.
4.
这里将消息重新发布一个专门的队列(重试指定次数后)。