RabbitMQ消息的可靠性投递

前言

消息中间件MQ主要的工作流程如下

  1. 生产端Producer发送消息给消息中间件MQ
  2. 消息中间件MQ接收到消息, 告诉生产者Producer消息已经收到
  3. 消息中间件MQ将消息路由Routing后, 投递给消费者Consumer
  4. 消费者告诉MQ消息是否消费成功

但是, 现实世界是不可能如此理想的. 如果在某个阶段发生了异常.

比如消息中间件MQ接收到消息后, 所在的服务器重启了, 没能告诉生产者Producer消息接收成功, 也没能将消息投递给消费者Consumer, 这时候就应该让生产者Producer重新投递.
比如发生了重放攻击, 生产者Producer连续发送了两条相同的消息, 那么消费者Consumer应该判断此消息是否消费过, 不应该重复消费.

业界有两种方案保证消息的可靠性投递

  1. 消息落库, 对消息状态进行打标
  2. 消息延迟投递, 做二次确认, 回调检查

两种方案都需要做消息的落库处理, 这里提供一个简单的消息表

1
2
3
4
5
6
7
8
9
create table rabbitmq (
`id` bigint(20) unsigned not null auto_increment comment '消息ID',
`msg` varchar(4096) not null comment '消息',
`status` tinyint unsigned not null comment '状态, 0:未投递, 1:已投递, 2:已确认, 3:已消费, 4: 失败',
`retry_count` tinyint unsigned not null default 0 comment '重试次数',
`create_time` datetime not null comment '创建时间',
`update_time` datetime not null comment '更新时间',
primary key (`id`)
) engine = InnoDB default charset = utf8mb4 comment='消息表';

方案一: 消息落库, 对状态打标

以电商下单为场景

主要的步骤如图

  1. 下单请求Request发送到订单系统Order Center
  2. 将订单数据落库DB, 将消息落库DB, 状态为1 已投递
  3. 发送消息给消息中间件MQ
  4. 消息中间件MQ告诉订单系统Order Center消息接收到了
  5. 订单系统Order Center接收到消息中间件MQ的确认信息confirm, 更新DB中的消息状态为2 已确认
  6. 消息中间件MQ将消息投递给结算系统Settlement-Center
  7. 结算系统Settlement-Center收到消息, 更新DB中的消息状态为3 已消费
  8. 分布式定时任务扫描那些超过一天还没消费的消息, 重新投递

这算是一个比较完整的解决方案, 不管哪个步骤出问题, 只要状态没有流转到3 已消费, 那么分布式定时任务就会抓取消息, 重新投递, 超过特定次数就不再自动投递.
这里就会带来几个问题.

  1. 订单落库和消息落库, 是否要加事务?
  2. 定时任务必须要是分布式的.

保证生产者和消息中间件的之间的可靠性投递

订单落库和消息落库, 我们可以看成是两次insert写库操作, 如果订单库和消息库在同一个DB实例下, 直接开启本地事务保存即可.
但是如果订单库和消息库在不同的DB实例下, 我们要使用分布式事务吗? 业务是否能接受分布式事务带来的性能损耗吗?

如果体量小的项目, 用分布式事务是可以的, 但如果碰到了海量请求的情况下. 我们也可以通过快速失败的方式, 来保证两个消息落库成功.
但是如果遇到更大的流量请求, 就要通过方案二来解决了.

1
2
3
4
5
6
7
8
9
10
11
public String order() {
// 1. 生成订单号
try {
// 1. 订单落库
// 2. 消息落库
// 3. 发送MQ
} catch (Exception e) {
// 4. 根据订单号做回滚操作 或者 根据定时任务扫描失败的记录
return "下单失败";
}
}

如果订单落库失败, 或着消息落库失败, 那么就直接返回客户端下单失败的提醒. 后续定时任务轮询, 根据业务做回滚操作.
如果订单落库成功, 消息落库成功, 但是发送消息失败, 此时消息库应该有一条消息记录, 状态是1:已投递, 后续定时任务轮询重新投递.
如果订单落库成功, 消息落库成功, 消息发送成功, 但是**MQ想要告诉生产者发送成功**的这个动作失败了, 消息记录的状态是1:已投递, 后续定时任务轮询重新投递.
如果上述的操作都成功了, 那么消息的状态变更为2:已确认, 生产者和消费者之间的可靠性投递就算保证了.

那么怎么判断消息发送成功呢?
RabbitMQ有两套机制保证, Confirm机制和Return机制.

Confirm 确认消息

Confirm消息确认机制

  1. 生产者发送消息给消息中间件
  2. 消息中间件应答, 发送Confirm消息给生产者, 告诉生产者发送成功与否.

RabbitMQ通过调用channel.confirmSelect()开启Confirm消息确认机制.

普通confirm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void simpleConfirm(Channel channel) throws Exception {
// 1. 开启确认模式
int size = 10;
channel.confirmSelect();
// 2. 发送消息
for (int i = 0; i < size; i++) {
String msg = "deliveryTag: " + deliveryTag + ", 现在时间" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"));
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
// 3. 同步 confirm
if (!channel.waitForConfirms()) {
System.out.println("消息[" + msg + "]发送失败");
}
}
// 4. 批量同步 confirm
// if (!channel.waitForConfirms()) {
// System.out.println("消息[" + msg + "]发送失败");
// }
}

Channel提供了一个waitForConfirms方法. 会阻塞等待服务器端confirm.

Wait until all messages published since the last call have been either ack’d or nack’d by the broker

waitForConfirms还支持批量confirm, 放在for循环外面即可.
当超时或返回false时, 需要自己编写业务代码重发消息.

异步confirm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Test
public void asyncConfirm(Channel channel) throws Exception {
// 1. 开启确认模式
int size = 10;
CountDownLatch latch = new CountDownLatch(size);
TreeSet<Long> confirmSet = new TreeSet<>(); // 记录未确认的 deliveryTag

channel.confirmSelect();
// 添加异步监听器
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
try {
if(multiple) {
System.out.println("消息唯一标识:" + deliveryTag + " 之前的消息都 ACK");
confirmSet.headSet(deliveryTag-1).clear();
} else {
System.out.println("消息唯一标识:" + deliveryTag + " 消息 ACK");
confirmSet.remove(deliveryTag);
}
} finally {
latch.countDown();
}
}

@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
try {
if(multiple) {
System.out.println("消息唯一标识:" + deliveryTag + " 之前的消息都 NACK");
confirmSet.headSet(deliveryTag-1).clear();
} else {
System.out.println("消息唯一标识:" + deliveryTag + " 消息 NACK");
confirmSet.remove(deliveryTag);
}

if(confirmSet.contains(deliveryTag)) {
// 从数据库捞记录重发消息
}
} finally {
latch.countDown();
}
}
});

// 2. 发送消息
for (int i = 0; i < size; i++) {
long deliveryTag = channel.getNextPublishSeqNo();
String msg = "deliveryTag: "+deliveryTag+", 现在时间" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"));
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
confirmSet.add(deliveryTag);
Thread.sleep(1000); // 延迟, 保证 multiple 为 false
}

latch.await();
Assertions.assertTrue(confirmSet.isEmpty());
}

当生产者接受到confirm通知时, 会回调监听器的两个方法, handleAckhandleNack.

  1. handleAck表示成功处理的消息
  2. handleNack表示Broker丢失了消息. 但是, 即使返回Nack, 也有可能已经将消息投递给了消费者.

handleAckhandleNack有两个参数, 消息标识deliveryTag和批量标识multiple.

  1. deliveryTag用于表示一个Channel下的消息的唯一Id, 可以通过channel.getNextPublishSeqNo()来获取接下来要发送的消息的deliveryTag.
  2. multiple用于表示当前的ACKNACK是否批量操作, 如果是批量操作, 那么小于deliveryTag的消息都要处理.

Spring Boot 实现 confirm

首先要在配置文件里面开启confirm模式

1
2
3
spring:
rabbitmq:
publisher-confirm-type: correlated # confirm模式

然后设置ConfirmCallback, 多线程跑一下.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void confirm() throws Exception {
String msg = "sendString()";

int size = 10;
CountDownLatch latch = new CountDownLatch(size);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("接收: " + correlationData + ", ack:[" + ack + "], cause:[" + cause + "]");
latch.countDown();
});
for (int i = 0; i < size; i++) {
new Thread(() -> {
// 不能在这里设置 ConfirmCallback
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
System.out.println("发送:" + msg + ", correlationId: " + correlationId);
rabbitTemplate.convertAndSend(DirectConsumer.QUEUE_NAME, (Object) msg, correlationId);
}).start();
}

boolean success = latch.await(10, TimeUnit.SECONDS);
Assertions.assertTrue(success);
}

有一点需要注意下, 一个RabbitTemplate只能有一个ConfirmCallback. 这里看下源码.

1
2
3
4
5
// org.springframework.amqp.rabbit.core.RabbitTemplate#setConfirmCallback
public void setConfirmCallback(ConfirmCallback confirmCallback) {
Assert.state(this.confirmCallback == null || this.confirmCallback == confirmCallback, "Only one ConfirmCallback is supported by each RabbitTemplate");
this.confirmCallback = confirmCallback;
}

如果业务要求需要对不同的生产者执行不同的ConfirmCallback逻辑, 那么再应该new一个RabbitTemplate.

Return 不可达消息

Return不可达消息

https://www.rabbitmq.com/confirms.html
For unroutable messages, the broker will issue a confirm once the exchange verifies a message won’t route to any queue (returns an empty list of queues).
If the message is also published as mandatory, the basic.return is sent to the client before basic.ack.
The same is true for negative acknowledgements (basic.nack).

当生产者生产一条消息, 投递到MQ消息中间件中, 但是MQ消息中间件判定投递失败unroutable, 比如没有对应的路由Key, 就会触发return机制.
在生产消息时, 如果指定Mandatorytrue, 则监听器会接收到路由不可达的消息, 交由ReturnListener进行后续处理. 如果为false, 那么MQ消息中间件会自动删除这条消息.

原生 API 方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void _return() throws Exception {
// 1. 开启 return
CountDownLatch latch = new CountDownLatch(1);
boolean mandatory = true; // true监听不可达消息, false则自动删除消息

channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
System.out.println("replyCode:[" + replyCode + "], replyText:[" + replyText + "], exchange:[" + exchange + "], routingKey:[" + routingKey + "], BasicProperties:[" + properties + "], body:[" + new String(body, StandardCharsets.UTF_8) + "]");
latch.countDown();
});

// 2. 发送消息
long deliveryTag = channel.getNextPublishSeqNo();
String msg = "deliveryTag: " + deliveryTag + ", 现在时间" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"));
channel.basicPublish(EXCHANGE_NAME, "NULL_KEY", mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
// channel.basicPublish("NULL_EXCHANGE", ROUTING_KEY, mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));

boolean success = latch.await(10, TimeUnit.SECONDS);
Assertions.assertTrue(success);
}

实际上, 如果传递一个未声明的Exchange过去, 不会触发return. 而传递不可路由的RoutingKey, 则可以触发return.

Spring Boot 实现 return

首先要在配置文件里面开启return模式

1
2
3
4
5
spring:
rabbitmq:
publisher-returns: true # return机制
template:
mandatory: true # 与 return 机制结合配置次属性

然后设置ReturnCallback, 多线程跑一下.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void _return() throws Exception {
String msg = "sendString()";

int size = 10;
CountDownLatch latch = new CountDownLatch(size);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("接收: " + message + ", replyCode:[" + replyCode + "], replyText:[" + replyText + "], exchange:[" + exchange + "], routingKey:[" + routingKey + "]");
latch.countDown();
});
for (int i = 0; i < size; i++) {
new Thread(() -> {
// 不能在这里设置 ReturnCallback
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
System.out.println("发送:" + msg + ", correlationId: " + correlationId);
rabbitTemplate.convertAndSend("UNKNOWN", (Object) msg, correlationId);
}).start();
}

boolean success = latch.await(10, TimeUnit.SECONDS);
Assertions.assertTrue(success);
}

有一点需要注意下, 和Callback一样, 一个RabbitTemplate只能有一个ReturnCallback. 这里看下源码.

1
2
3
4
5
// org.springframework.amqp.rabbit.core.RabbitTemplate#setReturnCallback
public void setReturnCallback(ReturnCallback returnCallback) {
Assert.state(this.returnCallback == null || this.returnCallback == returnCallback, "Only one ReturnCallback is supported by each RabbitTemplate");
this.returnCallback = returnCallback;
}

如果业务要求需要对不同的生产者执行不同的ReturnCallback逻辑, 那么再应该new一个RabbitTemplate.

我们需要判断消息发送成功吗?

如果我们只管投递消息, 不管是否投递成功, 会有什么情况?
投递成功当然是最好的, 投递失败的话, 订单状态会一直处于1:已投递, 而不会变成2:已确认.
这样的话, 定时任务扫描到这条处于1:已投递的消息, 就会重新发送.

看起来似乎用不用confirmreturn都可以保证消息投递成功了.
我能想象到使用confirmreturn的业务场景有两种情况

  1. 投递失败时, 尽快重新投递, 不等定时任务扫描.
  2. 监控投递流程, 单位时间内投递失败消息超过一定数量就报警.

保证消息中间件的持久化存储, 避免消息丢失

经过上面的操作, 消息已经存储到消息中间件了. 为了保证消息不丢失, 消息中间件应该将消息做持久化处理.
否则当消息中间件宕机的时候, 在内存中的消息就会丢失.

RabbitMQ可持久化的东西有三样, ExchangeQueue、消息.
持久化也很简单, 只要调用API传参即可.

原生 API 方式

1
2
3
4
5
6
7
8
9
10
11
12
13
public void create(Channel channel) {
// Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments);
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, false, null);

// Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// public BasicProperties(String contentType, String contentEncoding, Map<String,Object> headers, Integer deliveryMode,
// Integer priority, String correlationId, String replyTo, String expiration, String messageId,
// Date timestamp, String type, String userId, String appId, String clusterId)
// new BasicProperties("text/plain", null, null, 2, 0, null, null, null, null, null, null, null, null, null)
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
}

ExchangeQueue的持久化只要在声明的时候, 指定durabletrue即可.
而消息的持久化, 需要在发送消息的时候, 指定BasicProperties属性, 将deliveryMode设置为2持久化.

Spring Boot 实现

Spring Boot可以通过使用Bean声明ExchangeQueue来实现持久化.

1
2
3
4
5
6
7
8
@Configuration
public class MyConfig {
boolean durable = true; // 持久化
@Bean
public Exchange exchange() { return new TopicExchange(EXCHANGE_NAME, durable, false, null); }
@Bean
public Queue queue() { return new Queue(QUEUE_NAME, durable, false, false, null); }
}

也可以使用注解的形式声明ExchangeQueue实现持久化.

1
2
3
4
5
6
7
8
9
public class Consumer {
@RabbitListener(bindings = @QueueBinding(key = "#",
exchange = @Exchange(type = ExchangeTypes.TOPIC, name = EXCHANGE_NAME, durable = "true"),
value = @Queue(name = QUEUE_NAME, durable = "true")))
public void consumer(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
@Headers Map<String, Object> header) {
// 消费消息
}
}

消息的持久化也是在发消息时通过指定MessageProperties实现

1
2
3
4
5
6
7
public void send(String exchange, String routingKey, String msg) {
MessageProperties properties = new MessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 指定持久化消息

Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();
rabbitTemplate.send(exchange, routingKey, message);
}

保证消息中间件的消费可靠性

RabbitMQ要保证消息消费的可靠性, 就必须开启手动ACK.
默认情况下, 消费者是开启自动ACK的. 也就是说一条消息过来, 不管有没有执行成功, 都会告诉RabbitMQ消费成功. 如果出现异常, 也告诉RabbitMQ消费成功. 这明显是不合理的.

原生 API 手动 ACK

可以看到, 自动ACK的配置, 是在创建消费者的时候就声明的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void consumer(Channel channel) {
boolean autoAck = false; // 声明自动 ACK
channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
System.out.println("接受到:" + new String(body));
// channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
// channel.basicNack(envelope.getDeliveryTag(), false, false);
// channel.basicReject(envelope.getDeliveryTag(), false);
}
}
});
}

如果设置成手动ACK, 那么就要手动调用几个方法来通知MQ消息的消费情况.

  1. channel.basicAck(long deliveryTag, boolean multiple), 表示正确处理这条消息.
  2. channel.basicNack(long deliveryTag, boolean multiple, boolean requeue), 表示消息处理失败, 决定是否重回队列.
  3. channel.basicReject(long deliveryTag, boolean requeue), 表示消息处理失败, 决定是否重回队列.

basicNackbasicReject有点像, 看了下文档, 区别就是能否支持multiple.

Spring Boot 手动 ACK

首先在配置文件里设置

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动ack

然后在消费者那里手动调用原生API.

1
2
3
4
5
6
7
8
9
10
11
12
13
@RabbitListener(queuesToDeclare = @Queue(queueName))
public class Consumer {
@RabbitHandler
public void directQueue(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
try {
System.out.println("接收到的消息:" + msg);
channel.basicAck(tag, false);
} catch (Exception e) {
e.printStackTrace();
channel.basicNack(tag, false, false);
}
}
}

定时任务处理极端情况下的消息处理异常

保证了消息发送、中间件存储、消息消费的可靠性.
但还是防止不了极端情况, 这些无法预料的情况, 必须有个兜底策略来处理.

比如用分布式定时任务扫描那些超过一天还没有流转到3 已消费的消息, 重新投递.

至于为什么要用分布式的定时任务, 是避免同一条消息, 在同一时刻重复投递.
从源头避免并发消息的产生.
当然, 消息的幂等性处理要由消费者来保证.

方案二: 消息延迟投递, 做二次确认, 回调检查

方案一有一个问题, 消息发送之前, 需要落库两次, 业务库和消息库.
在海量高并发下, 每次磁盘IO的耗时, 都是不可忽视的. 在这种情况下, 要保证高性能, 将一致性降级为最终一致性.

和方案一相比, 方案二多了一个回调系统, 少了一步落库的操作.
要注意的是, 这里没有使用RabbitMQConfirmReturn机制.

  1. 下单请求Request发送到订单系统Order Center
  2. 将订单数据落库DB, 发送一条业务消息, 发送一条延时消息(用于检查第一条消息是否处理成功)
  3. 结算系统Settlement-Center收到消息, 处理成功后, 发送一条确认消息
  4. 回调系统消费这条确认消息, 这个时候再进行消息落库
  5. 过了几分钟, 回调系统消费了订单系统发送的延迟消息, 去消息库里检查是否存在这条信息.
  6. 如果没有, 就调用订单系统的接口, 重投消息.
  7. 分布式定时任务扫描那些超过一天还没消费的消息, 重新投递.

以上都是流程正常的情况.
那万一中间哪一步出现异常, 要怎么保证消息再次重发呢? 关键点就在于这个用于检查的延迟消息.

这个用于检查的延迟消息, 消息体带上订单号等业务信息, 会在一段时间, 比如十分钟后投递到MQ消息中间件, 然后回调系统消费这条消息, 去检查消息库.
如果看到这条订单消息没有被处理, 那么就调用订单系统的接口, 让订单系统重新投递这条消息.

这里有个大问题, 如果这条延迟消息丢失了怎么办?
只能通过定时任务来补偿了.

总结