RabbitMQ(零):基础概念
前言
说 RabbitMQ 之前先说两个概念:Message Broker 和 AMQP
Message Broker 是一种消息验证、传输、路由的架构模式,其设计目标主要应用于下面这些场景:
- 消息路由到一个或多个目的地
- 消息转化为其他的表现方式
- 执行消息的聚集、消息的分解,并将结果发送到他们的目的地,然后重新组合相应返回给消息用户
- 调用 Web 服务来检索数据
- 响应事件或错误
- 使用发布 - 订阅模式来提供内容或基于主题的消息路由
AMQP 是 Advanced Message Queuing Protocol 的简称,它是一个面向消息中间件的开放式标准应用层协议。AMQP 定义了这些特性:
- 消息方向
- 消息队列
- 消息路由(包括:点到点和发布-订阅模式)
- 可靠性
- 安全性
本文要介绍的 RabbitMQ 就是以 AMQP 协议实现的一种中间件产品,服务器端用 Erlang 语言编写,它可以支持多种操作系统,多种编程语言,几乎可以覆盖所有主流的企业级技术平台。本文主要重点介绍 RabbitMQ 中的一些基础概念,了解了这些概念,可以为后边的使用 RabbitMQ 打下良好的基础。
ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel 都是 RabbitMQ 对外提供的 API 中最基本的对象。
- Connection 是 RabbitMQ 的 socket 链接,它封装了 socket 协议相关部分逻辑。
- ConnectionFactory 为 Connection 的制造工厂。
- Channel 是我们与 RabbitMQ 打交道的最重要的一个接口,我们大部分的业务操作是在 Channel 这个接口中完成的,包括定义 Queue、定义 Exchange、绑定 Queue 与 Exchange、发布消息等。
Queue
Queue(队列)是 RabbitMQ 的内部对象,用于存储消息,用下图表示。
RabbitMQ 中的消息都只能存储在 Queue 中,生产者(下图中的 P)生产消息并最终投递到 Queue 中,消费者(下图中的 C)可以从 Queue 中获取消息并消费。
多个消费者可以订阅同一个 Queue,这时 Queue 中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
Message acknowledgment
在实际应用中,可能会发生消费者收到 Queue 中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。
为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给 RabbitMQ,RabbitMQ 收到消息回执(Message acknowledgment)后才将该消息从 Queue 中移除;如果 RabbitMQ 没有收到回执并检测到消费者的 RabbitMQ 连接断开,则 RabbitMQ 会将该消息发送给其他消费者(如果存在多个消费者)进行处理。
这里不存在 timeout 概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的 RabbitMQ 连接断开。
这里会产生另外一个问题,如果我们在处理完业务逻辑后,忘记发送回执给 RabbitMQ,这是个常见的低级错误,但是后果却是很严重的——Queue 中堆积的消息会越来越多,消费者重启后会重复消费这些消息并重复执行业务逻辑,周而复始。
Spring AMQP 通过默认配置避免了忘记 basicACK 这种情况,所以我们并不需要特别关注。
另外,pub message 是没有 ack 的。
Message durability
如果我们希望即使在 RabbitMQ 服务重启的情况下,也不会丢失消息,我们可以将 Queue 与 Message 都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的 RabbitMQ 消息不会丢失。
但是,请注意,这并不能完全保证就一定丢不了消息,因为 RabbitMQ 不会为每条消息执行fsync(2)
,它可能只是保存到缓存中,并没有真正写入磁盘。如果我们需要对这种小概率事件(比如 RabbitMQ 服务器已经接收到生产者的消息,但还没来得及持久化该消息时 RabbitMQ 服务器就断电了)也要管理起来,那么我们要用到事务。由于这里仅为 RabbitMQ 的简单介绍,所以这里将不讲解 RabbitMQ 相关的事务。
Spring AMQP 通过在 MessageProperties 中设置了合理默认值来定义消息的持久性。参考 common properties
属性 | 默认值 | 描述 |
---|---|---|
durable | true | 当 declareExchange 为 true 时,持久化标志被设置为该值 |
deliveryMode | PERSISTENT | PERSISTENT 或 NON_PERSISTENT 来确定 RabbitMQ 是否持久化消息 |
Prefetch count
默认情况下,如果有多个消费者同时订阅同一个 Queue 中的消息,Queue 中的消息会被平摊给多个消费者。平均而言,每个消费者将获得相同数量的消息。这种分配消息的方式称为循环调度(Round-robin)。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置 prefetchCount 来限制 Queue 每次发送给每个消费者的消息数,比如我们设置 prefetchCount=1,则 Queue 每次给每个消费者发送一条消息;消费者处理完这条消息后 Queue 会再给该消费者发送一条消息。
至于如果使用 Spring AMQP 的话,RabbitMQ 的文档说公平调度(Fair dispatch)是 Spring AMQP 的默认配置,但是我实验发现默认的好像是循环调度。另外,官方文档里说
The SimpleMessageListenerContainer defines the value for DEFAULT_PREFETCH_COUNT to be 1. If the DEFAULT_PREFECTH_COUNT were set to 0 the behavior would be round robin messaging as described above.
但是我只在AbstractMessageListenerContainer
中找到了DEFAULT_PREFETCH_COUNT = 250
。不知道是不是因为版本的问题,待确认。
2018-04-13 Update:
经过阅读源码与测试,在我所使用的版本(Spring Boot: 2.0.1.RELEASE)中,DEFAULT_PREFETCH_COUNT
定义在AbstractMessageListenerContainer
类中,且默认值为250
,默认配置为循环调度。具体详情请看工作队列 中的 prefetch 一节
Exchange
之前我们看到生产者将消息投递到 Queue 中,实际上这在 RabbitMQ 中这种事情永远都不会发生。实际的情况是,生产者将消息发送到 Exchange(交换器,下图中的 X),由 Exchange 将消息路由到一个或多个 Queue 中(或者丢弃)。
Exchange 是按照什么逻辑将消息路由到 Queue 的?这个将在 Binding 一节介绍。RabbitMQ 中的 Exchange 有四种类型,不同的类型有着不同的路由策略,这将在 Exchange Types 一节介绍。
Routing key
生产者在将消息发送给 Exchange 的时候,一般会指定一个 routing key,来指定这个消息的路由规则,而这个 routing key 需要与 Exchange Type 及 binding key 联合使用才能最终生效。在 Exchange Type 与 binding key 固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给 Exchange 时,通过指定 routing key 来决定消息流向哪里。RabbitMQ 为 routing key 设定的长度限制为 255 bytes。
Binding
RabbitMQ 中通过 Binding 将 Exchange 与 Queue 关联起来,这样 RabbitMQ 就知道如何正确地将消息路由到指定的 Queue 了。
Binding key
在绑定(Binding)Exchange 与 Queue 的同时,一般会指定一个 binding key;消费者将消息发送给 Exchange 时,一般会指定一个 routing key;当 binding key 与 routing key 相匹配时,消息将会被路由到对应的 Queue 中。这个将在下边 Exchange Types 一节中列举实际的例子加以说明。在绑定多个 Queue 到同一个 Exchange 的时候,这些 Binding 允许使用相同的 binding key。binding key 并不是在所有情况下都生效,它依赖于 Exchange Type,比如 fanout 类型的 Exchange 就会无视 binding key,而是将消息路由到所有绑定到该 Exchange 的 Queue。
Exchange Types
RabbitMQ 常用的 Exchange Type 有 fanout、direct、topic、headers 这四种(AMQP 规范里还提到两种 Exchange Type,分别为 system 与自定义,这里不予以描述),下面分别进行介绍。
fanout
fanout 类型的 Exchange 路由规则非常简单,它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中。
上图中,生产者(P)发送到 Exchange(X)的所有消息都会路由到图中的两个 Queue,并最终被两个消费者(C1 与 C2)消费。
direct
direct 类型的 Exchange 路由规则也很简单,它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中。
以上图的配置为例,我们以 routingKey=”error” 发送消息到 Exchange,则消息会路由到 Queue1(amqp.gen-S9b…,这是由 RabbitMQ 自动生成的 Queue 名称)和 Queue2(amqp.gen-Agl…);如果我们以 routingKey=”info” 或 routingKey=”warning” 来发送消息,则消息只会路由到 Queue2。如果我们以其他 routingKey 发送消息,则消息不会路由到这两个 Queue 中。
topic
前面讲到 direct 类型的 Exchange 路由规则是完全匹配 binding key 与 routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic 类型的 Exchange 在匹配规则上进行了扩展,它与 direct 类型的 Exchange 相似,也是将消息路由到 binding key 与 routing key 相匹配的 Queue 中,但这里的匹配规则有些不同,它约定:
- routing key 为一个句点号
.
分隔的字符串(我们将被句点号.
分隔开的每一段独立的字符串称为一个单词),如 “stock.usd.nyse”、”nyse.vmw”、”quick.orange.rabbit” - binding key 与 routing key 一样也是句点号
.
分隔的字符串 - binding key 中可以存在两种特殊字符
*
与#
,用于做模糊匹配,其中*
用于匹配一个单词,#
用于匹配多个单词(可以是零个)
以上图中的配置为例:
routingKey=”quick.orange.rabbit” 的消息会同时路由到 Q1 与 Q2
routingKey=”lazy.orange.fox” 的消息会路由到 Q1 与 Q2
routingKey=”lazy.brown.fox” 的消息会路由到 Q2
routingKey=”lazy.pink.rabbit” 的消息会路由到 Q2(只会投递给 Q2 一次,虽然这个 routingKey 与 Q2 的两个 bindingKey 都匹配)
routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit” 的消息将会被丢弃,因为它们没有匹配任何 bindingKey
headers
headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送到 Exchange 时,RabbitMQ 会取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配 Queue 与 Exchange 绑定时指定的键值对;如果完全匹配则消息会路由到该 Queue,否则不会路由到该 Queue。该类型的 Exchange 没有用到过(不过也应该很有用武之地),所以不做介绍。
RPC
MQ 本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到 RabbitMQ 后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于 RPC(Remote Procedure Call,远程过程调用)。在 RabbitMQ 中也支持 RPC。
RabbitMQ 中实现 RPC 的机制是:
- 客户端发送请求(消息)时,在消息的属性(
MessageProperties
,在AMQP
协议中定义了 14 中properties
,这些属性会随着消息一起发送)中设置两个值replyTo
(一个Queue
名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue
中)和correlationId
(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个 id 了解哪条请求被成功执行了或执行失败) - 服务器端收到消息并处理
- 服务器端处理完消息后,将生成一条应答消息到
replyTo
指定的Queue
,同时带上correlationId
属性 - 客户端之前已订阅
replyTo
指定的Queue
,从中收到服务器的应答消息后,根据其中的correlationId
属性分析哪条请求被执行了,根据执行结果进行后续业务处理
总结
本文介绍了 RabbitMQ 中个人认为最重要的概念,之后的文章中也会结合具体代码来说明怎么使用。充分利用 RabbitMQ 提供的这些功能就可以处理我们绝大部分的异步业务了。
相关文章: