爱吃咖喱棒的打字员DA☆ZE~

0%

消息队列中间件

从命名上看,消息队列可以简单理解为存放消息的队列(Queue),而队列是一种常见的数据结构,在 Java 中的 java.util.Queue 接口下就有大量的队列实现。在一些业务场景中,我们可以使用 JDK 以及第三方包提供的队列实现,但是有些场景我们就不得不引入独立的队列组件,这有点类似 java.util.HashMap 与 Redis 的关系,它们都是以 key/value 的形式将数据存储到内存,但是在很多场景中我们需要使用 Redis 来替换 HashMap。

消息队列更为准确的描述是存放消息的容器,消息的生产者将消息放入容器,消费者可以从容器中取出消息进行消费。

为什么要使用消息队列

使用消息队列可以实现异步、解耦、削峰和流控等,同时分布式系统中也可以使用消息队列来实现最终一致性。

异步

我们可以根据传递方式的不同将消息划分为同步消息和异步消息。同步消息中比较有代表性的就是 RPC 调用,而异步消息中比较有代表性的就是使用消息队列。异步消息一般不怎么关心消息执行的结果,如果希望了解异步消息的执行情况,通常有轮询和通知两种方式。轮询具体是指在消息执行完毕后向其他设备(比如一个阻塞队列等)刷新执行结果,由主调方轮询查看结果;而通知则可以通过注册回调的方式实现。

假如系统中有一个耗时操作,我们正常调用该操作时,需要等待该操作结束后才能得到返回结果并继续向下执行,如果此时我们在系统中引入消息队列,将耗时操作需要的参数发送给消息队列,由队列代为执行,则可以不用等待耗时操作结束就可以直接返回。

削峰/流控

使用消息队列实现的异步处理可以延伸出一个比较常用的功能,那就是削峰和流控。比如一个系统,在不使用消息队列的时候,用户的请求数据经过处理后直接写入到数据库中,在请求量突然增大的情况下,数据库的压力也会骤然增大,使得整个系统的响应速度变慢。在使用了消息队列之后,用户的请求数据会直接发送给消息队列后立即返回,消息队列的消费者进程会从消息队列中获取数据并写入到数据库中。由于消息队列的消息处理能力一般高于数据库,同时消息队列的伸缩性也优于数据库,因此整个系统的响应速度会得到大幅提升。

削峰

解耦

有一个比较常见的业务场景,比如一个模块或系统,我们称为 A。A 需要将产生的数据传递给其他模块或系统,因此我们在 A 中直接调用其他模块或系统的方法来传递数据,而当其中的某个模块或系统不再需要该数据时,我们还需要再去修改 A 的逻辑,去掉向该模块或系统传递数据的方法。在这种场景下,如果业务频繁变更,我们需要不断修改 A 的逻辑,系统的可扩展性较差。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SystemA {
private SystemB systemB = new SystemB();
private SystemC systemC = new SystemC();

private void doSomething() {
// execute A's logic
}

public void deliverData() {
var data = this.doSomething();
systemB.needToDo(data);
systemC.needToDo(data);
}
}

我们知道如果模块之间不存在直接调用,那么添加或修改模块就不会对其他模块产生较大的影响,这样系统的可扩展性无疑会更好。在这种思想的指导下一般我们会引入一个中间者,作为它们之间沟通和协调的枢纽。这个枢纽在这里就是消息队列,并且结合业务的特点,我们可以使用消息队列的发布/订阅模式,消息的发送者(生产者)发送消息,一个或多个消息接收者(消费者)订阅消息,这样消息发送者和消息接收者之间就避免了直接耦合,从而达到解耦的目的。

解耦

消息队列的本质

RPC 大多都是基于请求/响应模型的,这也包括响应式范式。一般需要点对点通信、强事务保证以及延迟敏感的服务大多使用 RPC 而不是消息队列。从某种意义上讲,消息队列可以看做是一种异步的 RPC,它把一次 RPC 拆分成了两次 RPC 加一次转储,如果需要消费者最终做消费确认则是三次 RPC。在消息队列中,消息发送者将消息投递到服务端(或者叫做 Broker),服务端需要考虑如何承载消息的堆积,然后在合适的时机再将消息转发给消息接收者。

常见的消息队列协议

在消息队列中,消息的发送者需要知道如何构造消息,消息的接收者需要知道如何解析消息,它们需要按照一种统一的格式描述消息,这种统一的格式被称为消息协议。常见的消息队列通信协议包括 STOMP、AMQP、MQTT、OpenWire(ActiveMQ)、HTTP、XMPP 等,也有很多消息中间件使用自定义的通信协议,比如 RocketMQ、Kafka 等。

STOMP

STOMP 的全称为 The Simple (or Streaming) Text Oriented Messaging Protocol,意为简单(或流式)面向文本的消息协议。它是一种基于本文的协议,同时它也是一种基于帧的协议,帧的结构效仿了 HTTP 报文的格式,简单明了,比如:

1
2
3
4
5
COMMAND
header1:value1
header2:value2

Body^@

STOMP 提供了能够协作的报文格式,因此 STOMP 客户端可以与任何 STOMP 消息代理(Broker)进行通信。该协议的优点就是简单且易于实现,几乎所有的编程语言都有 STOMP 的客户端实现,缺点也很明显,它在消息大小和处理速度方面并无优势。比较典型的实现了该协议的消息队列就是 Apache ActiveMQ 和 Apache ActiveMQ Artemis。

很多时候,我们在使用 WebSocket 时通常会配合 STOMP 协议来实现实时通信的功能,也就是使用 STOMP over Websocket。这是因为 WebSocket 只定义了两种类型的消息:文本和二进制,但是它们的内容和格式并没有定义。因此我们需要在客户端与服务端之间定义一种更高级别的消息协议,将它使用在 WebSocket 之上去定义每次发送消息的类别、格式和内容等信息。而 STOMP 可用于任何可靠的双向流网络协议之上,比如 TCP 和 WebSocket,因此在 WebSocket 协议之上使用 STOMP 协议也就顺理成章了。

AMQP

AMQP 协议的全称为 Advanced Message Queuing Protocol,意为高级消息队列协议。它是一种二进制的应用层协议,规范了消息传递方和消息接收方的行为,以使消息在不同的提供商之间实现互操作。与 JMS 在特定的 API 接口层面和实现行为上进行统一不同,AMQP 更关注于各种消息如何以字节流的形式传递,因此任何实现了该协议的应用程序之间都可以进行消息的接收和传递。

AMQP

AMQP 协议比较全面且功能强大,但是实现比较复杂。典型的实现了该协议的消息队列就是 RabbitMQ。

JMS

严格来说将 JMS 归为消息通信协议是不严谨的,但是我们又不得不提到 JMS。JMS 全称 Java Message Service,即 Java 消息服务。它是 Java 平台提供的面向消息中间件的 API,没有定义通信方式和报文格式等细节。这有点类似于 JDBC,由 JCP 组织发布 JDBC 规范,然后具体的数据库厂商来实现这套规范,比如 MySQL JDBC Driver。因此为了便于理解,我们可以将 JMS 看作 JDBC,将不同的消息队列看作是不同的数据库软件,如果某个消息中间件厂商实现了 JMS 规范,我们就可以直接使用 JMS API 来操作它。从这一点来讲,JMS 协议就不像 AMQP 和 MQTT 这种操作级的协议,直接定义了所有的交互细节,这样就可以只通过一个客户端来操作不同的实现了对应协议的消息中间件。

JMS API

JMS 中有两个重要的概念需要解释清楚,一个是 Provider,另一个是 Destination。其中 Provider 是消息的中转者,负责消息的存储和转发,在 ActiveMQ 中又叫做 Broker。比如一个 ActiveMQ 服务就是一个 Broker 或 Provider。而 Destination 则指明了消息发送的目的地以及客户端接收消息的来源,在 JMS 有两种目的地:队列(Queue)和主题(Topic)。

JMS

JMS 支持两种消息传送模型,一种是点对点(P2P)模型,一种是发布/订阅(Pub/Sub)模型。

点对点(PTP)消息模型也可称之为队列模式,特定的一条消息只能被一个消费者消费。生产者将消息发送到指定的 Queue 当中,Broker(中间件)针对消息是否需要持久化进行持久化存储后通知消费者进行处理,消费者处理完毕后发送一个回执(Acknowledge)给 Broker,Broker 认为该消息已被正常消费,于是从持久化存储中删除该条消息。回执的发送逻辑内嵌在 MQ 的 API 中,无需主动调用。消费者通常可以通过两种方式获取新消息:PUSH 和 PULL。PUSH 方式,由 MQ 收到消息后主动调用消费者的新消息通知接口,需要消耗 MQ 宝贵的线程资源,同时消费者只能被动等待消息通知;PULL 方式,由消费者轮询调用 MQ API 去获取新消息,对应于 ActiveMQ 中的 consumer.receive(),不消耗 MQ 线程,消费者更加主动,相对的消费者的处理逻辑会变得稍稍复杂。两种方式的根本区别在于线程消耗问题,由于 MQ 的线程资源相对客户端更加宝贵,PUSH 方式会占用 MQ 过多的线程从而难以适应高并发的消息场景。同时当某一消费者离线一段时间再次上线后,大量积压消息处理会消耗大量 MQ 线程从而拖累其它消费者的消息处理,所以 PULL 方式相对来说更好。Kafka 已经抛弃了 PUSH 模式,全面拥抱 PULL 模式。

PTP

发布/订阅模式(Pub/Sub)也可称之为主题模式,特定的一条消息可以被多个消费者所接收,只要消费者订阅了该主题。消息生产者(发布者)将消息发送到某个称为主题(Topic)的虚拟通道中,Topic 可以被多个消费者订阅,因此该模式类似于广播的方式。发布/订阅模式采用 PUSH 的方式传送消息,Subscriber 只需保持在线即可。Subscriber 分为临时性的和持久性的,当 Sub 离线时,MQ 会为持久性的 Sub 持久化消息,当 Sub 恢复时会重新收到消息。但是既然采用 Pub/Sub 模式就表明允许部分消费者接收不到消息,所以通常会采用临时性的 Subscriber 而不是持久性的。

Pub/Sub

使用消息队列带来的问题

引入消息队列后,系统的可用性在某种程度上会下降,同时系统的复杂度也会提升。因为在引入消息队列之前,我们无需考虑消息队列挂掉的情况,但是在引入以后,我们就需要考虑这种情况,为此可能需要引入主从甚至是分布式消息队列,系统的复杂度无疑增加了很多,同时在引入了消息队列以后,我们还需要保证消息没有被重复消费和消息传递的顺序性,同时还要处理消息可能面临的丢失等问题,如果消费者没有正确消费消息,还有可能造成数据不一致的情况。

如何确保消息不丢失

消息从生产到消费经过了生产、转储和消费三个阶段,每一个阶段都有可能出现消息丢失的情况。

生产阶段可以通过确认机制来保证消息的可靠传递。当生产者使用客户端发送消息后,消息会被转发到 Broker,Broker 在接收到消息后会给客户端返回一个确认消息,只要生产者接收到了这个确认消息,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没有收到发送消息的确认响应时,会自动进行重试,如果再次失败则会返回异常,因此在编写代码时需要考虑异常的情况。

转储阶段则可以通过开启消息队列持久化,防止当意外发生,消息队列宕机重启后消息丢失。

消费阶段出现消息丢失一般是消息队列开启了 auto ack 选项,消费者在收到消息后还未完成处理,此时消费者服务出现意外宕机,由于开启了自动响应,Broker 会认为消息已经被成功消费,从而将消息从队列中移除,当消费者服务恢复后也就收不到该消息了。处理方式就是将自动响应改为手动响应,在消费者将消息接收并处理完毕后再向 Broker 发送确认消息,Broker 在收到确认消息后才会将消息移除。

确认机制

如何确保顺序消费

顺序消费是指消息从生产到消费都是按照顺序来进行的,也就是消费者处理消息的顺序与生产者投放消息的顺序是一致的。比如一笔订单先后产生了三条消息,分别是订单创建、订单付款和订单完成,消费者只有严格按照顺序消费才不会出错。由于很多分布式消息队列都存在分区的情况,所以顺序消费大体上又可以分为全局顺序消费和分区顺序消费。

全局顺序消费是指不管分区是否存在,消息都会严格按照顺序生产、存储和消费。在有多个分区的情况下,要实现绝对的顺序消费几乎是不可能的,因此通常能够实现严格顺序消费的简单可行的方法就是只有一个分区(或者是不存在分区),然后还要保证生产者和消费者都是单线程的,也就是说生产者、Broker 和消费者这三者是一对一对一的关系。这样做虽然简单有效,但是却带来了很严重的问题,首先就是系统并发性能严重下降,其次我们需要花费更多的精力来处理消费端出现的异常,因为全局顺序消费下,消费端的任何异常都有可能导致整个处理流程阻塞。

实现全局顺序消费还可以使用锁来实现,但是相对较复杂。

分区顺序消费是指消息在同一个分区中是按照顺序生产、存储和消费的。要实现分区顺序消费,首先消息需要具有一个业务 ID,然后我们可以通过算法将相关联的业务消息路由到同一个分区中,比如订单消息就可以根据订单 ID 进行哈希,然后路由到同一个分区中。接下来消费端需要实现顺序消费,最简单的方式就是单线程消费,但是同样会面临全局顺序消费需要面临的问题。

需要说明的是,应用需要严格按照消息顺序进行消费的场景其实很少,大部分的应用都可以容忍短暂的乱序。通常来说消息队列只负责消息入队和出队的顺序性,而不会保证消息消费的顺序性。消息消费的顺序性一般需要消费端自行处理。

如何避免重复消费

一般来说,一个主流的消息队列的设计范式是在不丢消息的前提下,尽量减少重复消息,但是不保证消息的投递顺序。但是我们使用消息队列难免会遇到如网络延迟、服务意外宕机等情况,这都有可能导致消息的重复投递,因此避免重复消费这个工作应该交给消费端来处理,总结来说就是消费端应该保证消息处理的幂等性。

首先为了鉴别重复消息,每个消息都应该有一个全局唯一的 ID,然后我们可以借助一些组件来完成重复的鉴定,组件可以是 Key/Value 数据库比如 Redis,也可以是布隆过滤器,当然也可以通过数据库的唯一键来鉴别。完成了重复消息的鉴别,接下来的处理就比较简单了,可以根据业务具体来定。比如消费一条消息是向数据库中插入一条数据,那么当出现重复消息时,可以只更新数据而不进行插入。

消息异常处理

当生产者将消息正常地投递到消费者手中,如果消费者在消费的过程中发生了异常,一般都需要进行重试操作,即 Broker 会根据重试机制重新投递消息给消费者消费。有时候消费出现异常是因为消费者本身业务缺陷或故障造成的,这时就会导致 Broker 不断进行重试,这是不必要的,因此重试一般都会有一个阈值,当重试次数超过该阈值时,消息可能会被投递到死信队列中。

当消费者无法处理接收到的消息时,可以将这个消息重新投递到另外一个队列(也就是死信队列)中,等待后续的处理(比如人工干预)。在 RabbitMQ 中,当消息被拒绝(basic.rejectbasic.nack)并且 requeue=false,消息 TTL 过期,队列达到最大长度(队列满)时,消息都会被投递到死信队列中。

参考

消息队列设计精要

新手也能看懂,消息队列其实很简单

JMS 介绍:我对 JMS 的理解和认识