avatar


RabbitMQ-2.进阶

消息丢失的可能

消息从生产者到消费者的每一步都可能丢失消息。

  1. 发送消息时:
    1. 生产者发送消息时连接MQ失败。
    2. 生产者发送消息到达MQ后未找到Exchange
    3. 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
    4. 生产者发送消息到达MQ后,处理消息的进程发生异常。
  2. MQ导致消息丢失:
    1. 消息到达MQ,保存到队列后,尚未消费就突然宕机。
  3. 消费者处理消息时:
    1. 消息接收后尚未处理突然宕机。
    2. 消息接收后处理过程中抛出异常。

综上,我们要解决消息丢失问题,保证消息的可靠性,需要从3个方面入手:

  1. 确保生产者一定把消息发送到MQ。
  2. 确保MQ不会将消息弄丢。
  3. 确保消费者一定要处理消息。

生产者的可靠性

生产者重连

实现

对于"生产者发送消息时连接MQ失败",即生产者发送消息时,出现了网络故障。SpringAMQP提供了消息发送时的重试机制,当RabbitTemplate与MQ连接超时后,进行多次重试。

修改生产者模块的application.yml文件,添加如下内容:

1
2
3
4
5
6
7
8
9
10
# 配置RabbitMQ的基本信息
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = 'initial-interval' * multiplier
max-attempts: 3 # 最大重试次数

我们可以试一下,会打印如下内容,显示正在重试。

1
2
3
2024-07-01 22:04:49.286  INFO 1134 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [10.211.55.17:5672]
2024-07-01 22:04:51.295 INFO 1134 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [10.211.55.17:5672]
2024-07-01 22:04:53.300 INFO 1134 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [10.211.55.17:5672]

注意事项

  • 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。
  • 但是,SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
  • 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,或者考虑使用异步线程来执行发送消息的代码。

生产者确认

什么是生产者确认

一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。

但是,在少数情况下,会出现消息发送到MQ之后丢失的现象,例如:

  1. MQ内部处理消息的进程发生了异常
  2. 生产者发送消息到达MQ后未找到Exchange
  3. 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由

针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher ConfirmPublisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执

生产者确认机制

  1. 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功。
  2. 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功。
  3. 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功。
  4. 其它情况都会返回NACK,告知投递失败。
    例如,磁盘满了,返回NACK。

其中acknack属于Publisher Confirm机制,ack是投递成功,nack是投递失败。return则属于Publisher Return机制。

默认两种机制都是关闭状态,需要通过配置文件来开启。

开启生产者确认

application.yaml中添加配置:

1
2
3
4
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  1. none:关闭confirm机制。
  2. simple:同步阻塞等待MQ的回执。
  3. correlated:MQ异步回调返回回执。

一般我们推荐使用correlated,回调机制。

定义ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback

我们在配置类中进行设置,配置方法方法有很多种。

第一种方法

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.kakawanyifan.rabbitmq.config;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class Config {

public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue_exchange";

@Bean
public RabbitTemplate initRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("触发return callback,");
log.info("exchange: {}", returnedMessage.getExchange());
log.info("routingKey: {}", returnedMessage.getRoutingKey());
log.info("message: {}", returnedMessage.getMessage());
log.info("replyCode: {}", returnedMessage.getReplyCode());
log.info("replyText: {}", returnedMessage.getReplyText());
}
});
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}

// 交换机
@Bean("bootExchange")
public Exchange BootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}

// 队列
@Bean("bootQueue")
public Queue BootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}

// 队列和交换机的绑定
/*
1、队列
2、交换机
3、RoutingKey
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}

运行结果:

1
2
3
4
5
6
2024-07-02 10:26:03.216 ERROR 7021 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config  : 触发return callback,
2024-07-02 10:26:03.216 INFO 7021 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : exchange: boot_topic_exchange
2024-07-02 10:26:03.218 INFO 7021 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : routingKey: nonono.haha
2024-07-02 10:26:03.218 INFO 7021 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : message: (Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2024-07-02 10:26:03.218 INFO 7021 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : replyCode: 312
2024-07-02 10:26:03.218 INFO 7021 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : replyText: NO_ROUTE

第二种方法

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.kakawanyifan.rabbitmq.config;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Configuration
@AllArgsConstructor
@Slf4j
public class Config {

public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue_exchange";

private final RabbitTemplate rabbitTemplate;

@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("触发return callback,");
log.info("exchange: {}", returnedMessage.getExchange());
log.info("routingKey: {}", returnedMessage.getRoutingKey());
log.info("message: {}", returnedMessage.getMessage());
log.info("replyCode: {}", returnedMessage.getReplyCode());
log.info("replyText: {}", returnedMessage.getReplyText());
}
});
}

// 交换机
@Bean("bootExchange")
public Exchange BootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}

// 队列
@Bean("bootQueue")
public Queue BootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}

// 队列和交换机的绑定
/*
1、队列
2、交换机
3、RoutingKey
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}

运行结果:

1
2
3
4
5
6
2024-07-02 10:24:44.920 ERROR 6970 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config  : 触发return callback,
2024-07-02 10:24:44.920 INFO 6970 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : exchange: boot_topic_exchange
2024-07-02 10:24:44.921 INFO 6970 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : routingKey: nonono.haha
2024-07-02 10:24:44.922 INFO 6970 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : message: (Body:'{"name":"姓名","age":21}' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __KeyTypeId__=java.lang.Object, __TypeId__=java.util.HashMap}, messageId=991c9107-88a0-46da-a20b-c0086acaa1ed, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2024-07-02 10:24:44.922 INFO 6970 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : replyCode: 312
2024-07-02 10:24:44.922 INFO 6970 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : replyText: NO_ROUTE

第三种方法

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.kakawanyifan.rabbitmq.config;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class Config implements ApplicationContextAware {

public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue_exchange";

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("触发return callback,");
log.info("exchange: {}", returnedMessage.getExchange());
log.info("routingKey: {}", returnedMessage.getRoutingKey());
log.info("message: {}", returnedMessage.getMessage());
log.info("replyCode: {}", returnedMessage.getReplyCode());
log.info("replyText: {}", returnedMessage.getReplyText());
}
});
}

// 交换机
@Bean("bootExchange")
public Exchange BootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}

// 队列
@Bean("bootQueue")
public Queue BootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}

// 队列和交换机的绑定
/*
1、队列
2、交换机
3、RoutingKey
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}

}

运行结果:

1
2
3
4
5
6
2024-07-02 10:31:18.863 ERROR 7097 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config  : 触发return callback,
2024-07-02 10:31:18.864 INFO 7097 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : exchange: boot_topic_exchange
2024-07-02 10:31:18.866 INFO 7097 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : routingKey: nonono.haha
2024-07-02 10:31:18.866 INFO 7097 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : message: (Body:'{"name":"姓名","age":21}' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __KeyTypeId__=java.lang.Object, __TypeId__=java.util.HashMap}, messageId=5a7814ec-c684-4208-b032-c077ed6e33e2, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2024-07-02 10:31:18.866 INFO 7097 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : replyCode: 312
2024-07-02 10:31:18.866 INFO 7097 --- [nectionFactory1] com.kakawanyifan.rabbitmq.config.Config : replyText: NO_ROUTE

解释说明:实现ApplicationContextAware,Spring容器的一个通告;实现这个接口,就需要实现setApplicationContext方法,传入ApplicationContext对象。然后我们在容器中getBean,然后设置ReturnsCallback

定义ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。

具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数CorrelationData的对象。

CorrelationData中包含两个核心的东西:

  1. id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆。
  2. SettableListenableFuture:回执结果的Future对象。

将来MQ的回执就会通过这个Future对象来返回,我们提前给CorrelationData中的Future添加回调函数来处理消息回执。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.kakawanyifan;

import com.kakawanyifan.rabbitmq.config.Config;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class Producers {

// 注入RabbitMQTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSend(){

// 1 创建CorrelationData
CorrelationData cd = new CorrelationData();
// 2 给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 2.1 Future发生异常时的处理逻辑,基本不会触发
log.error("send message fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2 Future接收到回执的处理逻辑,参数中的result就是回执内容
// result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
if(result.isAck()){
log.info("发送消息成功,收到 ack!");
}else{
// result.getReason(),String类型,返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
}
}
});

// 准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name", "姓名");
msg.put("age", 21);
// 发送消息
rabbitTemplate.convertAndSend(Config.EXCHANGE_NAME,"boot.haha",msg,cd);
}
}

运行结果:

1
2024-07-02 10:41:04.848  INFO 7330 --- [.211.55.17:5672] com.kakawanyifan.Producers               : 发送消息成功,收到 ack!

解释说明:

  • 如果传递的RoutingKey是错误的,路由失败后,除了触发了return callback,还会收到ack
  • 修改为正确的RoutingKey以后,不会触发return callback了,只收到ack
  • 如果连交换机都是错误的,则只会收到nack
  • 另外,在上述代码中:
    • onFailure,是指Spring内部在处理的时候失败了,这个失败和MQ没有关系,一般不会被触发。
    • onSuccess,是指MQ的回调成功了。

注意事项

开启生产者确认比较消耗MQ性能,一般不建议开启。而且:路由失败,一般是因为RoutingKey错误导致,往往是编程导致;交换机名称错误,同样是编程错误导致。
只有MQ内部故障,这种需要处理,但概率往往较低。
因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

MQ的可靠性

消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。

数据持久化

持久化方法

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,配置数据持久化,包括:

  1. 交换机持久化
    如果交换机不是持久的,重启MQ之后,交换机就没了。
  2. 队列持久化
    如果队列不是持久的,重启MQ之后,队列也没了。
  3. 消息持久化

对于"交换机持久化"和"队列持久化",在Spring中创建交换机和队列的时候,Spring默认把交换机和队列创建成了持久的。

对于消息的持久化,设置方法如下:

1
2
3
4
5
Message message = MessageBuilder
.withBody("hello".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
rabbitTemplate.convertAndSend(Config.EXCHANGE_NAME, "simple.queue", message);

注意事项

在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化,一般间隔在100毫秒左右,这就会导致ACK有一定的延迟。
因此建议生产者确认全部采用异步方式。

LazyQueue

什么是LazyQueue

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,例如:消费者宕机或出现网络故障;消息发送量激增,超过了消费者处理速度;消费者处理业务发生阻塞。

一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为被称为PageOutPageOut会耗费一段时间,并且会阻塞队列进程。在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。

为了解决这个问题,从RabbitMQ的3.6版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:

  1. 接收到消息后直接存入磁盘而非内存,内存中只保留最近的消息,默认2048条。
  2. 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)。
  3. 支持数百万条的消息存储。

在3.12版本之后,LazyQueue已经成为所有队列的默认格式。

设置方法

我们可以通过QueueBuilderlazy()函数配置Lazy模式:

1
2
3
4
5
6
7
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启Lazy模式
.build();
}

也可以基于注解来声明队列并设置为Lazy模式:

1
2
3
4
5
6
7
8
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}

对于已经存在的队列,可以通过控制台设置policy的方式,配置为Lazy模式。

Lazy

解释说明:

  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字

消费者的可靠性

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,例如:消息投递的过程中出现了网络故障;消费者接收到消息后突然宕机;消费者接收到消息后,因处理不当导致异常等。

一旦发生上述情况,消息也会丢失。

因此,RabbitMQ必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。

那么,RabbitMQ如何得知消费者的处理状态呢?

消费者确认机制

什么是消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制Consumer Acknowledgement。即,当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息。
  • nack:消息处理失败,RabbitMQ需要再次投递消息。
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息。

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。
因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack

设置方法

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  1. none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用。
  2. manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活。
  3. auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack,当业务出现异常时,根据异常判断返回不同结果:
    1. 如果是业务异常,会自动返回nack
    2. 如果是消息处理或校验异常,自动返回reject;

通过下面的配置可以修改SpringAMQP的ACK处理方式:

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理

案例

我们将acknowledge-mode设置为none,并模拟一个消息处理的异常,抛出MessageConversionException类型的异常。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.kakawanyifan.rabbitmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class Listener {

@RabbitListener(queues = "boot_queue_exchange")
public void onMessage(Message message){
log.info("spring 消费者接收到消息:【" + message + "】");
if (true) {
throw new MessageConversionException("故意的");
}
log.info("消息处理完成");
}
}

测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。

如果把acknowledge-mode设置为auto,由于抛出的是消息转换异常,因此Spring会自动返回reject,所以消息依然会被删除。

我们将异常改为RuntimeException类型:

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "boot_queue_exchange")
public void onMessage(Message message){
log.info("spring 消费者接收到消息:【" + message + "】");
if (true) {
// throw new MessageConversionException("故意的");
throw new RuntimeException("故意的");
}
log.info("消息处理完成");
}

由于抛出的是业务异常,所以Spring返回ack,最终消息恢复至Ready状态,并且没有被RabbitMQ删除。消息处理失败后,会回到RabbitMQ,并重新投递到消费者。

失败重试机制

什么是失败重试

在上文,当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。

为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

设置方法

修改application.yml文件,添加内容:

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

我们可以看到,消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次;本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

即:

  1. 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试。
  2. 重试达到最大次数后,Spring会返回reject,消息会被丢弃。

失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。
这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  1. RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。
  2. ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
  3. RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

定义处理失败消息的交换机和队列:

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

定义一个RepublishMessageRecoverer,关联队列和交换机:

1
2
3
4
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.kakawanyifan.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}

业务幂等性

什么是幂等性

何为幂等性?
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。

有些业务本来就是幂等的,例如:查询业务,例如根据id查询商品;删除业务,例如根据id删除商品。
有些业务不具有幂等性,例如:用户下单业务,需要扣减库存;用户退款业务,需要恢复余额。

然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如,页面卡顿时频繁刷新导致表单重复提交,那么怎么办?

  1. 进入表单的时候,生成唯一标识。
  2. 提交表单时,携带标识。
  3. 后端收到表单后,检查该标识是否已经提交过。

即令牌机制。

保证消息处理的幂等性的方法中,有一种方法就和这类似。

保证消息处理的幂等性,方法有:

  1. 唯一消息ID
  2. 业务状态判断

唯一消息ID

思路

思路:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库。
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

我们该如何给消息添加唯一ID呢?

实现方法一

SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
以Jackson的消息转换器为例:

1
2
3
4
5
6
7
8
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}

实现方法二

上述方法使用的是UUID,如果我们想自定义ID,可以用.setMessageId("XXX")方法。示例代码:

1
2
3
4
5
Message message = MessageBuilder
.withBody("hello".getBytes(StandardCharsets.UTF_8))
.setMessageId("XXX")
.build();
rabbitTemplate.convertAndSend(Config.EXCHANGE_NAME, "simple.queue", message);

实现方法三

示例代码:

1
2
3
4
5
6
7
8
rabbitTemplate.convertAndSend("delay.direct1", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setMessageId("XXX");
return message;
}
});

业务判断

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void markOrderPaySuccess(Long orderId) {
// 1.查询订单
Order old = getById(orderId);
// 2.判断订单状态
if (old == null || old.getStatus() != 1) {
// 订单不存在或者订单状态不是1,放弃处理
return;
}
// 3.尝试更新订单
Order order = new Order();
order.setId(orderId);
order.setStatus(2);
order.setPayTime(LocalDateTime.now());
updateById(order);
}

上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作,因此在极小概率下可能存在线程安全问题,可以参考这个逻辑

1
UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1

解释说明:在where条件中除了判断id以外,还加上了status必须为1的条件。如果条件不符(说明订单已支付),则SQL匹配不到数据,不会执行。

兜底方案

如何保证支付服务与交易服务之间的订单状态一致性?

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了MQ的持久化,避免因服务宕机导致消息丢失。
  • 最后,我们还在交易服务更新订单状态时做了业务幂等判断,避免因消息重复消费导致订单状态异常。

虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息100%的可靠。万一真的MQ通知失败该怎么办呢?

既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。

流程如下:

兜底方案

图中黄色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。

例如,我们利用定时任务定期查询,例如每隔20秒就查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。

关于定时任务,可以参考《基于Java的后端开发入门:19.Quartz和APScheduler》

延迟消息

应用场景

在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损。
因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。

像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息了。

在RabbitMQ中实现延迟消息有两种方案:

  1. 死信交换机+TTL
  2. 延迟消息插件

死信交换机和延迟消息

死信

当一个队列中的消息满足下列情况之一时,被称为死信(dead letter):

  1. 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false。
  2. 要投递的队列消息满了,无法投递。
  3. 消息是一个过期消息,超时无人消费。

死信交换机

如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange),而此时加入有另一个队列与死信交换机绑定,则最终死信就会被投递到另一个队列中。

即,死信交换机的作用:

  1. 收集因处理失败而被拒绝的消息。
  2. 收集因队列满了而被拒绝的消息。
  3. 收集因TTL(有效期)到期的消息。

延迟消息

前面两种作用场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer作用类似。

而最后一种场景,如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKeyblue

image.png

假如我们现在发送一条消息到ttl.fanoutRoutingKeyblue,并设置消息的有效期为5000毫秒;消息被投递到ttl.queue之后,由于没有消费者,因此消息无人消费;5秒之后,消息的有效期到期,成为死信;死信被再次投递到死信交换机hmall.direct,并沿用之前的RoutingKey,也就是blue;由于direct.queue1hmall.direct绑定的keyblue,因此最终消息被成功路由到direct.queue1,如果此时有消费者与direct.queue1绑定,也就能成功消费消息了。但这时也就是5秒钟以后了。

也就是说,publisher发送了一条消息,但最终consumer在5秒后才收到消息。我们成功实现了延迟消息

注意事项

RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此我们设置的TTL时间不一定准确。

DelayExchange插件

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。
官方文档说明:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq

安装

通过"rabbitmq_delayed_message_exchange"的Github页面,下载对应版本的插件。
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

并将其上传至服务器,复制到rabbitmq的plugins目录下,在本文是/usr/lib/rabbitmq/lib/rabbitmq_server-3.9.16/plugins

安装插件:

1
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

示例代码:

1
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
Enabling plugins on node rabbit@centos-linux:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@centos-linux...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange

started 1 plugins.

声明延迟交换机

基于@Bean的方式

基于@Bean的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.kakawanyifan.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;

public class DelayExchangeConfig {
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}

@Bean
public Queue delayedQueue(){
return new Queue("delay.queue");
}

@Bean
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}

解释说明:delayedQueue()delayExchange()方法,以及被加载了@Bean,所以实际上不会重复创建对象。

基于注解方式

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.kakawanyifan.rabbitmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class Listener {

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void onMessage(Message message){
log.info("spring 消费者接收到消息:【" + message + "】");
}
}

发送延迟消息

发送消息时,必须通过x-delay属性设定延迟时间。示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.kakawanyifan;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class Producers {

// 注入RabbitMQTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct1", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setMessageId();
return message;
}
});
}
}

注意事项

延迟消息插件内部会维护一个本地数据库表,同时使用Erlang的Timers功能实现计时。
如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息。

集群

搭建集群

准备节点

准备三个节点。在每个节点上都安装Erlang和RabbitMQ,并修改/etc/hostname的内容为对应的节点名。

节点 IP
node01 10.211.55.18
node02 10.211.55.19
node03 10.211.55.20

集群节点彼此发现

node01设置

设置IP地址到主机名称的映射

修改文件/etc/hosts,添如下内容:

1
2
3
10.211.55.18 node01
10.211.55.19 node02
10.211.55.20 node03

开通网络端口

除了567215672,还需要开启436925672端口

示例代码:

1
2
3
firewall-cmd --zone=public --add-port=4369/tcp --permanent
firewall-cmd --zone=public --add-port=25672/tcp --permanent
firewall-cmd --reload

同步erlang.cookie

三台集群中的erlang.cookie,需要一致,通过命令scp进行同步。

示例代码:

1
2
scp /var/lib/rabbitmq/.erlang.cookie node02:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie node03:/var/lib/rabbitmq/

有些资料建议我们复制文件内容,再通过vim修改其他节点的,这绝不是一个好办法,容易错误的复制到换行。

我们可以通过md5sum校验。示例代码:

1
md5sum /var/lib/rabbitmq/.erlang.cookie

重置节点应用

1
2
3
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

node02和node03设置

在node02和node03中,也需要设置IP地址到主机名称的映射,开启436925672端口,操作与node01一致,不赘述。

因为erlang.cookie文件的内容被修改了,我们需要先重启rabbitmq-server,示例代码:

1
systemctl restart rabbitmq-server

然后,重置节点应用并加入集群,该部分相比node01,多了一个步骤rabbitmqctl join_cluster rabbit@node01

1
2
3
4
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node01
rabbitmqctl start_app

检查集群状态

我们可以通过命令rabbitmqctl cluster_status检查集群的状态。

负载均衡

address

直接在配置文件中配置多个地址,就可以实现负载均衡。

1
2
3
4
5
6
spring:
rabbitmq:
addresses: 10.211.55.18:5672, 10.211.55.19:5672, 10.211.55.20:5672
username: admin
password: 123456
virtual-host: /

HAProxy

但是,有些资料会提到HAProxy,我认为这绝不是一个好的建议,多一层组件,多一层风险。

Management UI

其实访问任何一个RabbitMQ实例的管理界面都是对集群操作,所以配置负载均衡通过统一入口访问,只是使用起来更方便一些。

安装HAProxy

1
2
3
4
yum install -y haproxy
haproxy -v
systemctl start haproxy
systemctl enable haproxy

配置文件HAProxy

修改文件/etc/haproxy/haproxy.cfg,在文件末尾增加如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
frontend rabbitmq_ui_frontend
bind 10.211.55.18:20000
mode http
default_backend rabbitmq_ui_backend

backend rabbitmq_ui_backend
mode http
balance roundrobin
option httpchk GET /
server rabbitmq_ui1 10.211.55.18:15672 check
server rabbitmq_ui2 10.211.55.19:15672 check
server rabbitmq_ui3 10.211.55.20:15672 check

设置SELinux策略,允许HAProxy拥有权限连接任意端口:

1
setsebool -P haproxy_connect_any=1
解释说明:* SELinux是Linux系统中的安全模块,它可以限制进程的权限以提高系统的安全性。在某些情况下,SELinux可能会阻止HAProxy绑定指定的端口,这就需要通过设置域(domain)的安全策略来解决此问题。* 通过执行`setsebool -P haproxy_connect_any=1`命令,我们已经为HAProxy设置了一个布尔值,允许HAProxy连接到任意端口。这样,HAProxy就可以成功绑定指定的socket,并正常工作。

重启HAProxy:

1
systemctl restart haproxy

核心功能

修改文件/etc/haproxy/haproxy.cfg,在文件末尾增加如下内容:

1
2
3
4
5
6
7
8
9
10
11
frontend rabbitmq_frontend
bind 10.211.55.18:10000
mode tcp
default_backend rabbitmq_backend

backend rabbitmq_backend
mode tcp
balance roundrobin
server rabbitmq1 10.211.55.18:5672 check
server rabbitmq2 10.211.55.19:5672 check
server rabbitmq3 10.211.55.20:5672 check

重启HAProxy服务:

1
systemctl restart haproxy

之后,我们可以在代码中,通过10.211.55.18:10000,发送消息和监听消息。

CentOS7

因为CentOS7从2024年6月30日开始,不再维护,安装haproxy可能会报错,可以尝试先执行如下的命令解决。

1
2
sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-*
sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-*

仲裁队列

使用仲裁队列

创建仲裁队列,Type队列类型选择Quorum,Node节点选择的是主节点。

创建仲裁队列

我们会看到这个队列有leader节点和member节点,这是和其他队列

leader节点和member节点

也可以通过Java代码创建仲裁队列,quorum()方法。

1
2
3
4
@Bean("bootQueue")
public Queue BootQueue(){
return QueueBuilder.durable(QUEUE_NAME).quorum().build();
}

其他的操作,包括"创建交换机"、“绑定交换机”、以及发送消息和监听消息,没有区别,不赘述。

注意事项

仲裁队列(Quorum Queues)提供队列复制的能力,保障数据的高可用和安全性。使用仲裁队列可以在RabbitMQ节点间进行队列数据的复制,在一个节点宕机时,队列依旧可以正常运行。

仲裁队列适用于队列长时间存在,对队列容错和数据安全要求高,对延迟和队列特性要求相对低的场景。在可能出现消息大量堆积的场景,不推荐使用仲裁队列,因为仲裁队列的写入放大会造成成倍的磁盘占用。

仲裁队列的消息会优先保存在内存中,使用仲裁队列时,建议定义队列最大长度和最大内存占用,在消息堆积超过阈值时从内存转移到磁盘,以免造成内存高水位。



关于RabbitMQ,有三点没讨论:

  1. 镜像队列,这个队列相比仲裁队列,存在性能问题和同步问题,已经被淘汰了。
  2. Stream流式队列,这个队列不够成熟。
  3. Federation,这个主要位于不同机房的RabbitMQ集群进行同步,很难看到实际的应用场景。
文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/11907
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

留言板