0%

RabbitMQ概念入门

前言

RabbitMQ是一个Erlang实现的开源消息中间件, 实现了Advanced Message Queuing Protocol(AMQP)协议.
AMQP协议是一个抽象协议, 定义了一系列的接口, 而RabbitMQ等消息中间件实现了这些接口.

本文使用原生RabbitMQAPI来进行调用, 不使用Spring AMQP.

AMQP核心概念

Server: 又称为Broker, 其实就是RabbitMQ的服务端, 接收客户端的连接, 实现AMQP协议.
Connection: 连接, 应用程序与Broker的连接.
Channel: 网络信道, 生产消费消息都在Channel进行, 一个客户端可以建立多个Channel.

Message: 消息, 应用程序和Broker之间传送的数据, 由Properties属性和Body消息体组成.

Virtual Host: 虚拟地址, 用于逻辑隔离. 一般用于区分dev环境, test环境.
Exchange: 交换机, 接收消息, 根据消息的路由Key和绑定规则, 投递到对应的Queue队列中.
Binding: Exchange交换机和Queue队列之间的绑定规则.
Queue: 队列, 保存消息并转发给消费者.

Broker

Broker服务端直接用docker启动就好了, 使用默认参数.

1
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

然后访问http://虚拟机IP:15672.
除了单节点的启动, RabbitMQ还支持集群部署.

Connection、Channel

接下来是ConnectionChannel, 和数据库连接类似, 我们可以对比下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Main {
@Test
public void mq() throws Exception {
// 1. 建立连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");

// 2. 获取连接, 获取 Channel
try (Connection connection = factory.newConnection();
Channel channel1 = connection.createChannel();
Channel channel2 = connection.createChannel();) {
// 3. 使用 Channel 创建 Exchange、Queue, 生产消息
}
}

@Test
public void db() throws Exception {
// 1. 获取连接, 获取 Statement
try (Connection connection = DriverManager.getConnection(url, username, password);
PreparedStatement statement1 = connection.prepareStatement(SQL);
PreparedStatement statement2 = connection.prepareStatement(SQL);) {
// 2. 执行语句
}
}
}

一般一个应用程序共享一个Connection, 每个线程创建自己的Channel.
所有的操作, 包括ExchangeQueue的创建绑定, 消息的生产消费, 都是在Channel上执行的.

Virtual Host、Exchange、Binding、Queue

Virtual Host用来区分dev环境和test环境, 避免两个环境的数据混淆在一起.

从图中可以看到, 生产者只管投递到Exchange上, 消费者也只管从Queue消费消息, 其中它们是怎么路由的, 生产者和消费者不需要关心.
ExchangeQueue之间的路由规则, 就是根据Message消息提供的RoutingKey, 以及ExchangeQueue之间的BindingKey, 两者匹配上, 路由成功就投递到Queue队列上.

Demo 例子

概念讲完, 这里提供简单的一个单元测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class NativeTest {
public static final String EXCHANGE_NAME = "ahao-exchange";
public static final String ROUTING_KEY = "ahao-routing-key";
public static final String QUEUE_NAME = "ahao-queue";

@Test
public void producer() throws Exception {
// 1. 建立连接工厂
ConnectionFactory factory = this.initFactory();

// 2. 获取连接, 获取 Channel
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel();) {

// 3. 创建 Exchange 和 Queue 并绑定
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);

// 4. 发送消息
String msg = "现在时间"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"));
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
}
}

@Test
public void consumer() throws Exception {
// 1. 建立连接工厂
ConnectionFactory factory = this.initFactory();

// 2. 获取连接, 获取 Channel
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel();) {

// 3. 创建 Exchange 和 Queue 并绑定
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);

// 4. 消费消息
CountDownLatch latch = new CountDownLatch(1);
channel.basicQos(64);
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接受到:" + new String(body));
latch.countDown();
}
});

// 5. 等待消息消费后, 再关闭资源
latch.await(10, TimeUnit.SECONDS);
}
}

public ConnectionFactory initFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("MQ地址");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
return factory;
}
}
很高兴得到您的赞赏