RabbitMQ延迟队列的两种实现方式

前言

RabbitMQ是没有延迟队列, 但是我们可以通过TTL和死信队列间接来实现.

  1. Message指定TTL后放入队列中.
  2. 等超时后, Message放入死信队列.
  3. 死信队列将Message转发到目标队列.

很麻烦.
幸运的是, RabbitMQ官方提供了一个rabbitmq-delayed-message-exchange延迟消息插件.
本文基于Spring Boot AMQP来操作.

使用官方延迟插件 rabbitmq-delayed-message-exchange

要求版本 >= 3.5.8.
GitHub地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
下载地址: https://www.rabbitmq.com/community-plugins.html

我这里用的是3.6.x的版本.

1
2
3
4
5
6
7
8
9
10
11
12
# 1. 下载 plugin
cd /opt/
wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
unzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

# 2. 移动到 plugins 文件夹内, 不同操作系统 plugins 位置不同
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.10/plugins/
cp /opt/rabbitmq_delayed_message_exchange-20171215-3.6.x.ez ./

# 3. 启动延时插件
rabbitmq-plugins list | grep delayed
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

然后再声明一个延迟交换机Exchange.

1
2
3
4
5
6
7
8
9
10
11
@Configuration
@EnableRabbit
public class RabbitMQConfig {
public static final String DELAY_EXCHANGE_NAME = "ahao_delayed_exchange";
@Bean(DELAY_EXCHANGE_NAME)
public Exchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
}

然后我们需要将Queue队列绑定到交换机上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
@EnableRabbit
public class RabbitMQConfig {
public static final String QUEUE_NAME = "ahao_queue";
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME);
}

@Bean
public Binding binding(Queue queue, CustomExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME).noargs();
}
}

绑定后, 就可以直接发送消息了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Service
public class RabbitService {
public static final String DELAY_EXCHANGE_NAME = "ahao_delayed_exchange";

@Autowired
private RabbitTemplate rabbitTemplate;

public void doSendDelay(String queueName, Object data, long delayMilliSeconds) throws IllegalArgumentException {
if(delayMilliSeconds > 0xffffffffL) {
throw new IllegalArgumentException("超时过长, 只支持 < 4294967296 的延时值");
}

CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, queueName, data, message -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.getHeaders().put("x-delay", delayMilliSeconds);
return message;
}, correlationId);
}
}

坑点1 延时最长为 2^32-1 毫秒

根据官方文档来看, 本插件的延时时长最长为2^32-1毫秒, 也就是0xffffffff毫秒.
换算一下, 大约是49天.
如果超过2^32-1毫秒, 那么延时值就会溢出, 也就是会立即消费.

Issue#122也有提到.
这应该是Erlang本身的限制.

In Erlang a timer can be set up to (2^32)-1 milliseconds in the future

坑点2 队列需要和延时 Exchange 绑定

之前以为指定了x-delayed-typedirect, 就可以不用绑定Queue到这个延时Exchange交换机上.
结果发的消息接收不到, 还是需要绑定一下.

使用原生死信队列实现延时队列

原生方法就是利用死信队列.

  1. Message指定TTL后放入队列中.
  2. 等超时后, Message放入死信队列.
  3. 死信队列将Message转发到目标队列.

我们先设计下消息流转流程图
消息流转流程图

  1. 用户发送带着RoutingKeybiz_queue1的一条消息到延时交换机delay_exchange上(注意, 这个延时交换机就是一个普通交换机).
  2. 延时交换机delay_exchange将消息fanout到队列delay_queue, 这个队列配置了一堆死信参数.
  3. 等待消息在delay_queue超时, 然后将消息转发到该队列的死信交换机biz_exchange上.
  4. 因为delay_queue没有指定x-dead-letter-routing-key, 所以使用的还是原来的biz_queue1. 路由到biz_queue1队列上.
  5. 延时消费成功.

设计完毕开始编码实战. 我们需要初始化交换机Exchange和队列Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Configuration
@EnableRabbit
public class RabbitMQConfig {
@Bean
public Exchange delayExchange() {
return new FanoutExchange("delay_exchange", true, false, null);
}
@Bean
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-dead-letter-exchange", bizExchange().getName()); // 声明死信交换机
// args.put("x-dead-letter-routing-key", ""); // 声明死信路由键
args.put("x-message-ttl", 10000); // 所有消息的默认超时时间
return new Queue("delay_queue", true, false, false, args);
}
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(delayQueue().getName()).noargs();
}

@Bean
public Exchange bizExchange() {
return new DirectExchange("biz_exchange", true, false, null);
}
@Bean
public Queue bizQueue1() {
return new Queue("biz_queue1", true, false, false, null);
}
@Bean
public Binding bizBinding1() {
return BindingBuilder.bind(bizQueue1()).to(bizExchange()).with(bizQueue1().getName()).noargs();
}
@Bean
public Queue bizQueue2() {
return new Queue("biz_queue2", true, false, false, null);
}
@Bean
public Binding bizBinding2() {
return BindingBuilder.bind(bizQueue2()).to(bizExchange()).with(bizQueue2().getName()).noargs();
}
}

然后写一个单元测试, 我用的Junit5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Service
public class DirectConsumer {
public static final String QUEUE_NAME = "biz_queue1";
public static Object value;
@RabbitListener(queuesToDeclare = @Queue(QUEUE_NAME))
@RabbitHandler
public void directQueue(String msg) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息接收时间:"+sdf.format(new Date()));
System.out.println("接收到的消息:"+msg);
Thread.sleep(1000);
value = msg;
}
}

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@ContextConfiguration(classes = {RabbitMQConfig.class, RabbitAutoConfiguration.class, SpringContextHolder.class, DirectConsumer.class})
public class DirectProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private DirectConsumer consumer;
@Test
public void send() throws Exception {
Assert.assertNotNull(rabbitTemplate);
Assert.assertNotNull(consumer);

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息发送时间:" + sdf.format(new Date()));
String msg = "send()";

rabbitTemplate.convertAndSend("delay_exchange", DirectConsumer.QUEUE_NAME, msg, message -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setExpiration("3000");
return message;
});

Assert.assertNull(DirectConsumer.value);
Thread.sleep(5000);
Assert.assertEquals(msg, DirectConsumer.value);
}
}

我们可以给队列设置x-message-ttl, 也可以给每条消息设置expiration, RabbitMQ会取两者最小值作为消息过期时间.

用死信队列来实现延迟队列, 只要套多几个死信队列, 就可以绕过官方延时插件的只能延时2^32-1毫秒的bug.
但是和官方延时插件一样, 还是得每个队列都绑定到延时交换机上.

并且! 推荐给队列设置x-message-ttl, 而不是给消息设置expiration.

坑点 同一队列的延时时长不一样导致消息阻塞

我们先看下面这个单元测试, 比起上面那个单元测试, 就是连续发送了两条消息.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@ContextConfiguration(classes = {RabbitMQConfig.class, RabbitAutoConfiguration.class, SpringContextHolder.class, DirectConsumer.class})
public class DirectProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private DirectConsumer consumer;
@Test
public void sendFailure() throws Exception {
Assert.assertNotNull(rabbitTemplate);
Assert.assertNotNull(consumer);

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息发送时间:" + sdf.format(new Date()));

String msg1 = "sendFailure(1)";
rabbitTemplate.convertAndSend("delay_exchange", DirectConsumer.QUEUE_NAME, msg1, message -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setExpiration("1000000");
return message;
});
String msg2 = "sendFailure(2)";
rabbitTemplate.convertAndSend("delay_exchange", DirectConsumer.QUEUE_NAME, msg2, message -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setExpiration("3000");
return message;
});

Assert.assertNull(DirectConsumer.value);
Thread.sleep(5000);
Assert.assertNull(msg, DirectConsumer.value);
}
}

执行后可以发现, 5000毫秒后, 消费者仍然不能接受到sendFailure(2)这条消息.
因为消息队列是先进先出的, 当第一条消息没有被消费, 后面的消息也会阻塞不能消费.

所以推荐还是使用给队列设置x-message-ttl的形式来设置延时时长. 当然, 官方延时插件就没这个问题了.

总结

使用官方插件

  • 优点:
    1. 使用简单
    2. 不会出现因为前一条消息没有消费, 导致后面的消息阻塞的情况
  • 缺点:
    1. 延时时长不能超过2^32-1毫秒, 大约49天.

使用原生死信队列

  • 优点:
    1. 使用死信队列套死信队列, 可以突破2^32-1毫秒的官方插件限制.
  • 缺点:
    1. 实现复杂.
    2. 如果给每条消息设置expiration, 则前一条消息会阻塞后一条消息.

然后我写了个工具类RabbitMQHelper可以拿来用下.

参考资料