前言
RabbitMQ
是一个Erlang
实现的开源消息中间件, 实现了Advanced Message Queuing Protocol(AMQP)
协议.
AMQP
协议是一个抽象协议, 定义了一系列的接口, 而RabbitMQ
等消息中间件实现了这些接口.
本文使用原生RabbitMQ
的API
来进行调用, 不使用Spring AMQP
.
AMQP核心概念
Server
: 又称为Broker
, 其实就是RabbitMQ
的服务端, 接收客户端的连接, 实现AMQP
协议.
Connection
: 连接, 应用程序与Broker
的连接.
Channel
: 网络信道, 生产消费消息都在Channel
进行, 一个客户端可以建立多个Channel
.
Message
: 消息, 应用程序和Broker
之间传送的数据, 由Properties
属性和Body
消息体组成.
Virtual Host
: 虚拟地址, 用于逻辑隔离. 一般用于区分dev
环境, test
环境.
Exchange
: 交换机, 接收消息, 根据消息的路由Key
和绑定规则, 投递到对应的Queue
队列中.
Binding
: Exchange
交换机和Queue
队列之间的绑定规则.
Queue
: 队列, 保存消息并转发给消费者.
Broker
Broker
服务端直接用docker
启动就好了, 使用默认参数.
1
| docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
|
然后访问http://虚拟机IP:15672
.
除了单节点的启动, RabbitMQ
还支持集群部署.
Connection、Channel
接下来是Connection
和Channel
, 和数据库连接类似, 我们可以对比下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class Main { @Test public void mq() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest");
try (Connection connection = factory.newConnection(); Channel channel1 = connection.createChannel(); Channel channel2 = connection.createChannel();) { } }
@Test public void db() throws Exception { try (Connection connection = DriverManager.getConnection(url, username, password); PreparedStatement statement1 = connection.prepareStatement(SQL); PreparedStatement statement2 = connection.prepareStatement(SQL);) { } } }
|
一般一个应用程序共享一个Connection
, 每个线程创建自己的Channel
.
所有的操作, 包括Exchange
、Queue
的创建绑定, 消息的生产消费, 都是在Channel
上执行的.
Virtual Host、Exchange、Binding、Queue
Virtual Host
用来区分dev
环境和test
环境, 避免两个环境的数据混淆在一起.
从图中可以看到, 生产者只管投递到Exchange
上, 消费者也只管从Queue
消费消息, 其中它们是怎么路由的, 生产者和消费者不需要关心.
而Exchange
和Queue
之间的路由规则, 就是根据Message
消息提供的RoutingKey
, 以及Exchange
和Queue
之间的BindingKey
, 两者匹配上, 路由成功就投递到Queue
队列上.
Demo 例子
概念讲完, 这里提供简单的一个单元测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| public class NativeTest { public static final String EXCHANGE_NAME = "ahao-exchange"; public static final String ROUTING_KEY = "ahao-routing-key"; public static final String QUEUE_NAME = "ahao-queue";
@Test public void producer() throws Exception { ConnectionFactory factory = this.initFactory();
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel();) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
String msg = "现在时间"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss")); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8)); } }
@Test public void consumer() throws Exception { ConnectionFactory factory = this.initFactory();
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel();) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
CountDownLatch latch = new CountDownLatch(1); channel.basicQos(64); channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接受到:" + new String(body)); latch.countDown(); } });
latch.await(10, TimeUnit.SECONDS); } }
public ConnectionFactory initFactory() { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("MQ地址"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); return factory; } }
|