消息丢失的可能
消息从生产者到消费者的每一步都可能丢失消息。
发送消息时:
生产者发送消息时连接MQ失败。
生产者发送消息到达MQ后未找到Exchange
。
生产者发送消息到达MQ的Exchange
后,未找到合适的Queue
。
生产者发送消息到达MQ后,处理消息的进程发生异常。
MQ导致消息丢失:
消息到达MQ,保存到队列后,尚未消费就突然宕机。
消费者处理消息时:
消息接收后尚未处理突然宕机。
消息接收后处理过程中抛出异常。
综上,我们要解决消息丢失问题,保证消息的可靠性,需要从3个方面入手:
确保生产者一定把消息发送到MQ。
确保MQ不会将消息弄丢。
确保消费者一定要处理消息。
生产者的可靠性
生产者重连
实现
对于"生产者发送消息时连接MQ失败",即生产者发送消息时,出现了网络故障。SpringAMQP提供了消息发送时的重试机制,当RabbitTemplate
与MQ连接超时后,进行多次重试。
修改生产者
模块的application.yml
文件,添加如下内容:
1 2 3 4 5 6 7 8 9 10 spring: rabbitmq: connection-timeout: 1s template: retry: enabled: true initial-interval: 1000ms multiplier: 1 max-attempts: 3
我们可以试一下,会打印如下内容,显示正在重试。
1 2 3 2024-07-01 22:04:49.286 INFO 1134 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [10.211.55.17:5672] 2024-07-01 22:04:51.295 INFO 1134 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [10.211.55.17:5672] 2024-07-01 22:04:53.300 INFO 1134 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [10.211.55.17:5672]
注意事项
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。
但是,SpringAMQP提供的重试机制是阻塞式 的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,或者考虑使用异步线程来执行发送消息的代码。
生产者确认
什么是生产者确认
一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
但是,在少数情况下,会出现消息发送到MQ之后丢失的现象,例如:
MQ内部处理消息的进程发生了异常
生产者发送消息到达MQ后未找到Exchange
生产者发送消息到达MQ的Exchange
后,未找到合适的Queue
,因此无法路由
针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm
和Publisher Return
两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执 。
当消息投递到MQ,但是路由失败时,通过Publisher Return 返回异常信息,同时返回ack的确认信息,代表投递成功。
临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功。
持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功。
其它情况都会返回NACK,告知投递失败。
例如,磁盘满了,返回NACK。
其中ack
和nack
属于Publisher Confirm 机制,ack
是投递成功,nack
是投递失败。return
则属于Publisher Return 机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。
开启生产者确认
在application.yaml
中添加配置:
1 2 3 4 spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true
这里publisher-confirm-type
有三种模式可选:
none
:关闭confirm机制。
simple
:同步阻塞等待MQ的回执。
correlated
:MQ异步回调返回回执。
一般我们推荐使用correlated
,回调机制。
定义ReturnCallback
每个RabbitTemplate
只能配置一个ReturnCallback
。
我们在配置类中进行设置,配置方法方法有很多种。
第一种方法
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 package com.kakawanyifan.rabbitmq.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration @Slf 4jpublic class Config { public static final String EXCHANGE_NAME = "boot_topic_exchange" ; public static final String QUEUE_NAME = "boot_queue_exchange" ; @Bean public RabbitTemplate initRabbitTemplate (ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){ @Override public void returnedMessage (ReturnedMessage returnedMessage) { log.error("触发return callback," ); log.info("exchange: {}" , returnedMessage.getExchange()); log.info("routingKey: {}" , returnedMessage.getRoutingKey()); log.info("message: {}" , returnedMessage.getMessage()); log.info("replyCode: {}" , returnedMessage.getReplyCode()); log.info("replyText: {}" , returnedMessage.getReplyText()); } }); rabbitTemplate.setMandatory(true ); return rabbitTemplate; } @Bean ("bootExchange" ) public Exchange BootExchange () { return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true ).build(); } @Bean ("bootQueue" ) public Queue BootQueue () { return QueueBuilder.durable(QUEUE_NAME).build(); } @Bean public Binding bindQueueExchange (@Qualifier("bootQueue" ) Queue queue,@Qualifier ("bootExchange" ) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("boot.#" ).noargs(); } }
运行结果:
1 2 3 4 5 6 2024-07-02 10:26:03.216 ERROR 7021 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : 触发return callback, 2024-07-02 10:26:03.216 INFO 7021 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : exchange: boot_topic_exchange 2024-07-02 10:26:03.218 INFO 7021 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : routingKey: nonono.haha 2024-07-02 10:26:03.218 INFO 7021 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : message: (Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) 2024-07-02 10:26:03.218 INFO 7021 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : replyCode: 312 2024-07-02 10:26:03.218 INFO 7021 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : replyText: NO_ROUTE
第二种方法
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 package com.kakawanyifan.rabbitmq.config;import lombok.AllArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Configuration @AllArgsConstructor @Slf 4jpublic class Config { public static final String EXCHANGE_NAME = "boot_topic_exchange" ; public static final String QUEUE_NAME = "boot_queue_exchange" ; private final RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage (ReturnedMessage returnedMessage) { log.error("触发return callback," ); log.info("exchange: {}" , returnedMessage.getExchange()); log.info("routingKey: {}" , returnedMessage.getRoutingKey()); log.info("message: {}" , returnedMessage.getMessage()); log.info("replyCode: {}" , returnedMessage.getReplyCode()); log.info("replyText: {}" , returnedMessage.getReplyText()); } }); } @Bean ("bootExchange" ) public Exchange BootExchange () { return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true ).build(); } @Bean ("bootQueue" ) public Queue BootQueue () { return QueueBuilder.durable(QUEUE_NAME).build(); } @Bean public Binding bindQueueExchange (@Qualifier("bootQueue" ) Queue queue,@Qualifier ("bootExchange" ) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("boot.#" ).noargs(); } }
运行结果:
1 2 3 4 5 6 2024-07-02 10:24:44.920 ERROR 6970 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : 触发return callback, 2024-07-02 10:24:44.920 INFO 6970 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : exchange: boot_topic_exchange 2024-07-02 10:24:44.921 INFO 6970 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : routingKey: nonono.haha 2024-07-02 10:24:44.922 INFO 6970 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : message: (Body:'{"name":"姓名","age":21}' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __KeyTypeId__=java.lang.Object, __TypeId__=java.util.HashMap}, messageId=991c9107-88a0-46da-a20b-c0086acaa1ed, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) 2024-07-02 10:24:44.922 INFO 6970 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : replyCode: 312 2024-07-02 10:24:44.922 INFO 6970 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : replyText: NO_ROUTE
第三种方法
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 package com.kakawanyifan.rabbitmq.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.BeansException;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration @Slf 4jpublic class Config implements ApplicationContextAware { public static final String EXCHANGE_NAME = "boot_topic_exchange" ; public static final String QUEUE_NAME = "boot_queue_exchange" ; @Override public void setApplicationContext (ApplicationContext applicationContext) throws BeansException { RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class ) ; rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){ @Override public void returnedMessage (ReturnedMessage returnedMessage) { log.error("触发return callback," ); log.info("exchange: {}" , returnedMessage.getExchange()); log.info("routingKey: {}" , returnedMessage.getRoutingKey()); log.info("message: {}" , returnedMessage.getMessage()); log.info("replyCode: {}" , returnedMessage.getReplyCode()); log.info("replyText: {}" , returnedMessage.getReplyText()); } }); } @Bean ("bootExchange" ) public Exchange BootExchange () { return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true ).build(); } @Bean ("bootQueue" ) public Queue BootQueue () { return QueueBuilder.durable(QUEUE_NAME).build(); } @Bean public Binding bindQueueExchange (@Qualifier("bootQueue" ) Queue queue,@Qualifier ("bootExchange" ) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("boot.#" ).noargs(); } }
运行结果:
1 2 3 4 5 6 2024-07-02 10:31:18.863 ERROR 7097 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : 触发return callback, 2024-07-02 10:31:18.864 INFO 7097 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : exchange: boot_topic_exchange 2024-07-02 10:31:18.866 INFO 7097 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : routingKey: nonono.haha 2024-07-02 10:31:18.866 INFO 7097 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : message: (Body:'{"name":"姓名","age":21}' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __KeyTypeId__=java.lang.Object, __TypeId__=java.util.HashMap}, messageId=5a7814ec-c684-4208-b032-c077ed6e33e2, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) 2024-07-02 10:31:18.866 INFO 7097 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : replyCode: 312 2024-07-02 10:31:18.866 INFO 7097 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : replyText: NO_ROUTE
解释说明:实现ApplicationContextAware
,Spring容器的一个通告;实现这个接口,就需要实现setApplicationContext
方法,传入ApplicationContext
对象。然后我们在容器中getBean,然后设置ReturnsCallback
。
定义ConfirmCallback
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback
需要在每次发消息时定义。
具体来说,是在调用RabbitTemplate
中的convertAndSend
方法时,多传递一个参数CorrelationData
的对象。
CorrelationData中包含两个核心的东西:
id
:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆。
SettableListenableFuture
:回执结果的Future对象。
将来MQ的回执就会通过这个Future对象来返回,我们提前给CorrelationData
中的Future
添加回调函数来处理消息回执。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package com.kakawanyifan;import com.kakawanyifan.rabbitmq.config.Config;import lombok.extern.slf4j.Slf4j;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import org.springframework.util.concurrent.ListenableFutureCallback;import java.util.HashMap;import java.util.Map;@Slf 4j@SpringBootTest @RunWith (SpringRunner.class ) public class Producers { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSend () { CorrelationData cd = new CorrelationData(); cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onFailure (Throwable ex) { log.error("send message fail" , ex); } @Override public void onSuccess (CorrelationData.Confirm result) { if (result.isAck()){ log.info("发送消息成功,收到 ack!" ); }else { log.error("发送消息失败,收到 nack, reason : {}" , result.getReason()); } } }); Map<String,Object> msg = new HashMap<>(); msg.put("name" , "姓名" ); msg.put("age" , 21 ); rabbitTemplate.convertAndSend(Config.EXCHANGE_NAME,"boot.haha" ,msg,cd); } }
运行结果:
1 2024-07-02 10:41:04.848 INFO 7330 --- [.211.55.17:5672] com.kakawanyifan.Producers : 发送消息成功,收到 ack!
解释说明:
如果传递的RoutingKey
是错误的,路由失败后,除了触发了return callback
,还会收到ack
。
修改为正确的RoutingKey
以后,不会触发return callback
了,只收到ack
。
如果连交换机都是错误的,则只会收到nack
。
另外,在上述代码中:
onFailure
,是指Spring内部在处理的时候失败了,这个失败和MQ没有关系,一般不会被触发。
onSuccess
,是指MQ的回调成功了。
注意事项
开启生产者确认比较消耗MQ性能,一般不建议开启。而且:路由失败,一般是因为RoutingKey错误导致,往往是编程导致;交换机名称错误,同样是编程错误导致。
只有MQ内部故障,这种需要处理,但概率往往较低。
因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。
MQ的可靠性
消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。
数据持久化
持久化方法
为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,配置数据持久化,包括:
交换机持久化
如果交换机不是持久的,重启MQ之后,交换机就没了。
队列持久化
如果队列不是持久的,重启MQ之后,队列也没了。
消息持久化
对于"交换机持久化"和"队列持久化",在Spring中创建交换机和队列的时候,Spring默认把交换机和队列创建成了持久的。
对于消息的持久化,设置方法如下:
1 2 3 4 5 Message message = MessageBuilder .withBody("hello" .getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) .build(); rabbitTemplate.convertAndSend(Config.EXCHANGE_NAME, "simple.queue" , message);
注意事项
在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化,一般间隔在100毫秒左右,这就会导致ACK有一定的延迟。
因此建议生产者确认全部采用异步方式。
LazyQueue
什么是LazyQueue
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,例如:消费者宕机或出现网络故障;消息发送量激增,超过了消费者处理速度;消费者处理业务发生阻塞。
一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为被称为PageOut
,PageOut
会耗费一段时间,并且会阻塞队列进程。在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。
为了解决这个问题,从RabbitMQ的3.6版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:
接收到消息后直接存入磁盘而非内存,内存中只保留最近的消息,默认2048条。
消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)。
支持数百万条的消息存储。
在3.12版本之后,LazyQueue已经成为所有队列的默认格式。
设置方法
我们可以通过QueueBuilder
的lazy()
函数配置Lazy模式:
1 2 3 4 5 6 7 @Bean public Queue lazyQueue () { return QueueBuilder .durable("lazy.queue" ) .lazy() .build(); }
也可以基于注解来声明队列并设置为Lazy模式:
1 2 3 4 5 6 7 8 @RabbitListener (queuesToDeclare = @Queue ( name = "lazy.queue" , durable = "true" , arguments = @Argument (name = "x-queue-mode" , value = "lazy" ) )) public void listenLazyQueue (String msg) { log.info("接收到 lazy.queue的消息:{}" , msg); }
对于已经存在的队列,可以通过控制台设置policy的方式,配置为Lazy模式。
解释说明:
Lazy
:策略名称,可以自定义
"^lazy-queue$"
:用正则表达式匹配队列的名字
消费者的可靠性
当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,例如:消息投递的过程中出现了网络故障;消费者接收到消息后突然宕机;消费者接收到消息后,因处理不当导致异常等。
一旦发生上述情况,消息也会丢失。
因此,RabbitMQ必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。
那么,RabbitMQ如何得知消费者的处理状态呢?
消费者确认机制
什么是消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制Consumer Acknowledgement 。即,当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
ack
:成功处理消息,RabbitMQ从队列中删除该消息。
nack
:消息处理失败,RabbitMQ需要再次投递消息。
reject
:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息。
一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。
因此大多数情况下我们需要将消息处理的代码通过try catch
机制捕获,消息处理成功时返回ack
,处理失败时返回nack
。
设置方法
由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
none
:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用。
manual
:手动模式。需要自己在业务代码中调用api,发送ack
或reject
,存在业务入侵,但更灵活。
auto
:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack
,当业务出现异常时,根据异常判断返回不同结果:
如果是业务异常 ,会自动返回nack
;
如果是消息处理或校验异常 ,自动返回reject
;
通过下面的配置可以修改SpringAMQP的ACK处理方式:
1 2 3 4 5 spring: rabbitmq: listener: simple: acknowledge-mode: none
案例
我们将acknowledge-mode
设置为none
,并模拟一个消息处理的异常,抛出MessageConversionException
类型的异常。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package com.kakawanyifan.rabbitmq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.support.converter.MessageConversionException;import org.springframework.stereotype.Component;@Component @Slf 4jpublic class Listener { @RabbitListener (queues = "boot_queue_exchange" ) public void onMessage (Message message) { log.info("spring 消费者接收到消息:【" + message + "】" ); if (true ) { throw new MessageConversionException("故意的" ); } log.info("消息处理完成" ); } }
测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。
如果把acknowledge-mode
设置为auto
,由于抛出的是消息转换异常 ,因此Spring会自动返回reject
,所以消息依然会被删除。
我们将异常改为RuntimeException类型:
1 2 3 4 5 6 7 8 9 @RabbitListener (queues = "boot_queue_exchange" )public void onMessage (Message message) { log.info("spring 消费者接收到消息:【" + message + "】" ); if (true ) { throw new RuntimeException("故意的" ); } log.info("消息处理完成" ); }
由于抛出的是业务异常,所以Spring返回ack
,最终消息恢复至Ready
状态,并且没有被RabbitMQ删除。消息处理失败后,会回到RabbitMQ,并重新投递到消费者。
失败重试机制
什么是失败重试
在上文,当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。
为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
设置方法
修改application.yml
文件,添加内容:
1 2 3 4 5 6 7 8 9 10 spring: rabbitmq: listener: simple: retry: enabled: true initial-interval: 1000ms multiplier: 1 max-attempts: 3 stateless: true
我们可以看到,消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次;本地重试3次以后,抛出了AmqpRejectAndDontRequeueException
异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject
。
即:
开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试。
重试达到最大次数后,Spring会返回reject,消息会被丢弃。
失败处理策略
在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。
这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery
接口来定义的,它有3个不同实现:
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject
,丢弃消息。默认就是这种方式。
ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack
,消息重新入队。
RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机。
比较优雅的一种处理方案是RepublishMessageRecoverer
,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
定义处理失败消息的交换机和队列:
1 2 3 4 5 6 7 8 9 10 11 12 @Bean public DirectExchange errorMessageExchange () { return new DirectExchange("error.direct" ); } @Bean public Queue errorQueue () { return new Queue("error.queue" , true ); } @Bean public Binding errorBinding (Queue errorQueue, DirectExchange errorMessageExchange) { return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error" ); }
定义一个RepublishMessageRecoverer,关联队列和交换机:
1 2 3 4 @Bean public MessageRecoverer republishMessageRecoverer (RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer(rabbitTemplate, "error.direct" , "error" ); }
完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package com.kakawanyifan.rabbitmq.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.retry.MessageRecoverer;import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration @ConditionalOnProperty (name = "spring.rabbitmq.listener.simple.retry.enabled" , havingValue = "true" )public class ErrorMessageConfig { @Bean public DirectExchange errorMessageExchange () { return new DirectExchange("error.direct" ); } @Bean public Queue errorQueue () { return new Queue("error.queue" , true ); } @Bean public Binding errorBinding (Queue errorQueue, DirectExchange errorMessageExchange) { return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error" ); } @Bean public MessageRecoverer republishMessageRecoverer (RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer(rabbitTemplate, "error.direct" , "error" ); } }
业务幂等性
什么是幂等性
何为幂等性?
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。
有些业务本来就是幂等的,例如:查询业务,例如根据id查询商品;删除业务,例如根据id删除商品。
有些业务不具有幂等性,例如:用户下单业务,需要扣减库存;用户退款业务,需要恢复余额。
然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如,页面卡顿时频繁刷新导致表单重复提交,那么怎么办?
进入表单的时候,生成唯一标识。
提交表单时,携带标识。
后端收到表单后,检查该标识是否已经提交过。
即令牌机制。
保证消息处理的幂等性的方法中,有一种方法就和这类似。
保证消息处理的幂等性,方法有:
唯一消息ID
业务状态判断
唯一消息ID
思路
思路:
每一条消息都生成一个唯一的id,与消息一起投递给消费者。
消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库。
如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
我们该如何给消息添加唯一ID呢?
实现方法一
SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
以Jackson的消息转换器为例:
1 2 3 4 5 6 7 8 @Bean public MessageConverter messageConverter () { Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); jackson2JsonMessageConverter.setCreateMessageIds(true ); return jackson2JsonMessageConverter; }
实现方法二
上述方法使用的是UUID,如果我们想自定义ID,可以用.setMessageId("XXX")
方法。示例代码:
1 2 3 4 5 Message message = MessageBuilder .withBody("hello" .getBytes(StandardCharsets.UTF_8)) .setMessageId("XXX" ) .build(); rabbitTemplate.convertAndSend(Config.EXCHANGE_NAME, "simple.queue" , message);
实现方法三
示例代码:
1 2 3 4 5 6 7 8 rabbitTemplate.convertAndSend("delay.direct1" , "delay" , message, new MessagePostProcessor() { @Override public Message postProcessMessage (Message message) throws AmqpException { message.getMessageProperties().setMessageId("XXX" ); return message; } });
业务判断
业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public void markOrderPaySuccess (Long orderId) { Order old = getById(orderId); if (old == null || old.getStatus() != 1 ) { return ; } Order order = new Order(); order.setId(orderId); order.setStatus(2 ); order.setPayTime(LocalDateTime.now()); updateById(order); }
上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作,因此在极小概率下可能存在线程安全问题,可以参考这个逻辑
1 UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
解释说明:在where条件中除了判断id以外,还加上了status必须为1的条件。如果条件不符(说明订单已支付),则SQL匹配不到数据,不会执行。
兜底方案
如何保证支付服务与交易服务之间的订单状态一致性?
首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了MQ的持久化,避免因服务宕机导致消息丢失。
最后,我们还在交易服务更新订单状态时做了业务幂等判断,避免因消息重复消费导致订单状态异常。
虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息100%的可靠。万一真的MQ通知失败该怎么办呢?
既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询 支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。
流程如下:
图中黄色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。
例如,我们利用定时任务 定期查询,例如每隔20秒就查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。
关于定时任务,可以参考《基于Java的后端开发入门:19.Quartz和APScheduler》 。
延迟消息
应用场景
在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损。
因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。
像这种在一段时间以后才执行的任务,我们称之为延迟任务 ,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息了。
在RabbitMQ中实现延迟消息有两种方案:
死信交换机+TTL
延迟消息插件
死信交换机和延迟消息
死信
当一个队列中的消息满足下列情况之一时,被称为死信(dead letter):
消费者使用basic.reject
或basic.nack
声明消费失败,并且消息的requeue
参数设置为false。
要投递的队列消息满了,无法投递。
消息是一个过期消息,超时无人消费。
死信交换机
如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange
属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机 (Dead Letter Exchange),而此时加入有另一个队列与死信交换机绑定,则最终死信就会被投递到另一个队列中。
即,死信交换机的作用:
收集因处理失败而被拒绝的消息。
收集因队列满了而被拒绝的消息。
收集因TTL(有效期)到期的消息。
延迟消息
前面两种作用场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer
作用类似。
而最后一种场景,如图,有一组绑定的交换机(ttl.fanout
)和队列(ttl.queue
)。但是ttl.queue
没有消费者监听,而是设定了死信交换机hmall.direct
,而队列direct.queue1
则与死信交换机绑定,RoutingKey
是blue
:
假如我们现在发送一条消息到ttl.fanout
,RoutingKey
为blue
,并设置消息的有效期 为5000毫秒;消息被投递到ttl.queue
之后,由于没有消费者,因此消息无人消费;5秒之后,消息的有效期到期,成为死信;死信被再次投递到死信交换机hmall.direct
,并沿用之前的RoutingKey
,也就是blue
;由于direct.queue1
与hmall.direct
绑定的key
是blue
,因此最终消息被成功路由到direct.queue1
,如果此时有消费者与direct.queue1
绑定,也就能成功消费消息了。但这时也就是5秒钟以后了。
也就是说,publisher发送了一条消息,但最终consumer在5秒后才收到消息。我们成功实现了延迟消息 。
注意事项
RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此我们设置的TTL时间不一定准确。
DelayExchange插件
基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。
官方文档说明:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq
安装
通过"rabbitmq_delayed_message_exchange"的Github页面,下载对应版本的插件。
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
并将其上传至服务器,复制到rabbitmq的plugins目录下,在本文是/usr/lib/rabbitmq/lib/rabbitmq_server-3.9.16/plugins
。
安装插件:
1 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
示例代码:
1 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 Enabling plugins on node rabbit@centos-linux: rabbitmq_delayed_message_exchange The following plugins have been configured: rabbitmq_delayed_message_exchange rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@centos-linux... The following plugins have been enabled: rabbitmq_delayed_message_exchange started 1 plugins.
声明延迟交换机
基于@Bean的方式
基于@Bean
的方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package com.kakawanyifan.rabbitmq.config;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;public class DelayExchangeConfig { @Bean public DirectExchange delayExchange () { return ExchangeBuilder .directExchange("delay.direct" ) .delayed() .durable(true ) .build(); } @Bean public Queue delayedQueue () { return new Queue("delay.queue" ); } @Bean public Binding delayQueueBinding () { return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay" ); } }
解释说明:delayedQueue()
和delayExchange()
方法,以及被加载了@Bean
,所以实际上不会重复创建对象。
基于注解方式
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package com.kakawanyifan.rabbitmq;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component @Slf 4jpublic class Listener { @RabbitListener (bindings = @QueueBinding ( value = @Queue (name = "delay.queue" , durable = "true" ), exchange = @Exchange (name = "delay.direct" , delayed = "true" ), key = "delay" )) public void onMessage (Message message) { log.info("spring 消费者接收到消息:【" + message + "】" ); } }
发送延迟消息
发送消息时,必须通过x-delay属性设定延迟时间。示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package com.kakawanyifan;import lombok.extern.slf4j.Slf4j;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@Slf 4j@SpringBootTest @RunWith (SpringRunner.class ) public class Producers { @Autowired private RabbitTemplate rabbitTemplate; @Test void testPublisherDelayMessage () { String message = "hello, delayed message" ; rabbitTemplate.convertAndSend("delay.direct1" , "delay" , message, new MessagePostProcessor() { @Override public Message postProcessMessage (Message message) throws AmqpException { message.getMessageProperties().setMessageId(); return message; } }); } }
注意事项
延迟消息插件内部会维护一个本地数据库表,同时使用Erlang的Timers功能实现计时。
如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息。
集群
搭建集群
准备节点
准备三个节点。在每个节点上都安装Erlang和RabbitMQ,并修改/etc/hostname
的内容为对应的节点名。
节点
IP
node01
10.211.55.18
node02
10.211.55.19
node03
10.211.55.20
集群节点彼此发现
node01设置
设置IP地址到主机名称的映射
修改文件/etc/hosts
,添如下内容:
1 2 3 10.211.55.18 node01 10.211.55.19 node02 10.211.55.20 node03
开通网络端口
除了5672
和15672
,还需要开启4369
和25672
端口
示例代码:
1 2 3 firewall-cmd --zone=public --add-port=4369/tcp --permanent firewall-cmd --zone=public --add-port=25672/tcp --permanent firewall-cmd --reload
同步erlang.cookie
三台集群中的erlang.cookie
,需要一致,通过命令scp
进行同步。
示例代码:
1 2 scp /var/lib/rabbitmq/.erlang.cookie node02:/var/lib/rabbitmq/ scp /var/lib/rabbitmq/.erlang.cookie node03:/var/lib/rabbitmq/
有些资料建议我们复制文件内容,再通过vim
修改其他节点的,这绝不是一个好办法,容易错误的复制到换行。
我们可以通过md5sum
校验。示例代码:
1 md5sum /var/lib/rabbitmq/.erlang.cookie
重置节点应用
1 2 3 rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app
node02和node03设置
在node02和node03中,也需要设置IP地址到主机名称的映射,开启4369
和25672
端口,操作与node01一致,不赘述。
因为erlang.cookie
文件的内容被修改了,我们需要先重启rabbitmq-server,示例代码:
1 systemctl restart rabbitmq-server
然后,重置节点应用并加入集群,该部分相比node01,多了一个步骤rabbitmqctl join_cluster rabbit@node01
。
1 2 3 4 rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit@node01 rabbitmqctl start_app
检查集群状态
我们可以通过命令rabbitmqctl cluster_status
检查集群的状态。
负载均衡
address
直接在配置文件中配置多个地址,就可以实现负载均衡。
1 2 3 4 5 6 spring: rabbitmq: addresses: 10.211 .55 .18 :5672, 10.211 .55 .19 :5672, 10.211 .55 .20 :5672 username: admin password: 123456 virtual-host: /
HAProxy
但是,有些资料会提到HAProxy
,我认为这绝不是一个好的建议,多一层组件,多一层风险。
Management UI
其实访问任何一个RabbitMQ实例的管理界面都是对集群操作,所以配置负载均衡通过统一入口访问,只是使用起来更方便一些。
安装HAProxy
1 2 3 4 yum install -y haproxy haproxy -v systemctl start haproxy systemctl enable haproxy
配置文件HAProxy
修改文件/etc/haproxy/haproxy.cfg
,在文件末尾增加如下内容:
1 2 3 4 5 6 7 8 9 10 11 12 frontend rabbitmq_ui_frontend bind 10.211.55.18:20000 mode http default_backend rabbitmq_ui_backend backend rabbitmq_ui_backend mode http balance roundrobin option httpchk GET / server rabbitmq_ui1 10.211.55.18:15672 check server rabbitmq_ui2 10.211.55.19:15672 check server rabbitmq_ui3 10.211.55.20:15672 check
设置SELinux策略,允许HAProxy拥有权限连接任意端口:
1 setsebool -P haproxy_connect_any=1
解释说明:* SELinux是Linux系统中的安全模块,它可以限制进程的权限以提高系统的安全性。在某些情况下,SELinux可能会阻止HAProxy绑定指定的端口,这就需要通过设置域(domain)的安全策略来解决此问题。* 通过执行`setsebool -P haproxy_connect_any=1`命令,我们已经为HAProxy设置了一个布尔值,允许HAProxy连接到任意端口。这样,HAProxy就可以成功绑定指定的socket,并正常工作。
重启HAProxy:
1 systemctl restart haproxy
核心功能
修改文件/etc/haproxy/haproxy.cfg
,在文件末尾增加如下内容:
1 2 3 4 5 6 7 8 9 10 11 frontend rabbitmq_frontend bind 10.211.55.18:10000 mode tcp default_backend rabbitmq_backend backend rabbitmq_backend mode tcp balance roundrobin server rabbitmq1 10.211.55.18:5672 check server rabbitmq2 10.211.55.19:5672 check server rabbitmq3 10.211.55.20:5672 check
重启HAProxy服务:
1 systemctl restart haproxy
之后,我们可以在代码中,通过10.211.55.18:10000
,发送消息和监听消息。
CentOS7
因为CentOS7从2024年6月30日开始,不再维护,安装haproxy
可能会报错,可以尝试先执行如下的命令解决。
1 2 sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-* sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-*
仲裁队列
使用仲裁队列
创建仲裁队列,Type队列类型选择Quorum
,Node节点选择的是主节点。
我们会看到这个队列有leader节点和member节点,这是和其他队列
也可以通过Java代码创建仲裁队列,quorum()
方法。
1 2 3 4 @Bean ("bootQueue" )public Queue BootQueue () { return QueueBuilder.durable(QUEUE_NAME).quorum().build(); }
其他的操作,包括"创建交换机"、“绑定交换机”、以及发送消息和监听消息,没有区别,不赘述。
注意事项
仲裁队列(Quorum Queues)提供队列复制的能力,保障数据的高可用和安全性。使用仲裁队列可以在RabbitMQ节点间进行队列数据的复制,在一个节点宕机时,队列依旧可以正常运行。
仲裁队列适用于队列长时间存在,对队列容错和数据安全要求高,对延迟和队列特性要求相对低的场景。在可能出现消息大量堆积的场景,不推荐使用仲裁队列,因为仲裁队列的写入放大会造成成倍的磁盘占用。
仲裁队列的消息会优先保存在内存中,使用仲裁队列时,建议定义队列最大长度和最大内存占用,在消息堆积超过阈值时从内存转移到磁盘,以免造成内存高水位。
关于RabbitMQ,有三点没讨论:
镜像队列,这个队列相比仲裁队列,存在性能问题和同步问题,已经被淘汰了。 Stream流式队列,这个队列不够成熟。 Federation,这个主要位于不同机房的RabbitMQ集群进行同步,很难看到实际的应用场景。