什么是消息队列

Message Queue(MQ),消息队列中间件。很多人都说:MQ 通过将消息的发送和接收分离来实现应用程序的异步和解偶,这个给人的直觉是——MQ 是异步的,用来解耦的,但是这个只是 MQ 的效果而不是目的。

MQ 真正的目的是为了通讯,屏蔽底层复杂的通讯协议,定义了一套应用层的、更加简单的通讯协议。一个分布式系统中两个模块之间通讯要么是 HTTP,要么是自己开发的 TCP,但是这两种协议其实都是原始的协议。

为什么消息中间件不直接使用 HTTP 协议?

HTTP 协议很难实现两端通讯——模块 A 可以调用 B,B 也可以主动调用 A,如果要做到这个两端都要背上 WebServer,而且还不支持长连接(HTTP 2.0 的库根本找不到)。TCP 就更加原始了,粘包、心跳、私有的协议,想一想头皮就发麻。

对于一个消息中间件来说,其主要责任就是负责数据传递,存储,分发,高性能和简洁才是我们所追求的,而 HTTP 请求报文头和响应报文头是比较复杂的,包含了 Cookie,数据的加密解密,窗台吗,响应码等附加的功能,我们并不需要这么复杂的功能。

同时大部分情况下 HTTP 大部分都是短链接,在实际的交互过程中,一个请求到响应都很有可能会中断,中断以后就不会执行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取信息的过程,出现问题和故障要对数据或消息执行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行。

MQ 所要做的就是在这些协议之上构建一个简单的“协议”——生产者/消费者模型。MQ 带给我的“协议”不是具体的通讯协议,而是更高层次通讯模型。它定义了两个对象——发送数据的叫生产者;接收数据的叫消费者, 提供一个 SDK 让我们可以定义自己的生产者和消费者实现消息通讯而无视底层通讯协议。

消息队列的使用场景

异步处理

场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种串行和并行的方式。

1、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。


2、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。

与串行的差别是,并行的方式可以提高处理的时间。

假设三个业务节点每个使用 50 毫秒钟,不考虑网络等其他开销,则串行方式的时间是 150 毫秒,并行的时间可能是 100 毫秒。


如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

引入消息队列,将不是必须的业务逻辑,异步处理。


按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是 50 毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是 50 毫秒。因此架构改变后,系统的吞吐量提高到每秒 20 QPS。比串行提高了 3 倍,比并行提高了 2 倍。

应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。

传统模式的缺点:

  1. 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败。

  2. 订单系统与库存系统耦合。

如何解决以上问题呢?引入应用消息队列后的方案。

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。


假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

流量削峰

流量削峰也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。

应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列,可以控制活动的人数,可以缓解短时间内高流量压垮应用

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面,秒杀业务根据消息队列中的请求信息,再做后续处理。

AMQP 和 JMS

MQ 是消息通信的模型,并非具体实现。现在实现 MQ 的有两种主流方式:AMQP、JMS。

两者间的区别和联系:

  • JMS 是定义了统一的接口,来对消息操作进行统一;AMQP 是通过规定协议来统一数据交互的格式,如 RabbitMQ。

  • JMS 限定了必须使用 Java 语言;AMQP 只是协议,不规定实现方式,因此是跨语言的,如 RocketMQ。

RabbitMQ 简介

RabbitMQ 是由 erlang 语言开发,基于 AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

官方地址:http://www.rabbitmq.com

官方教程:http://www.rabbitmq.com/getstarted.html

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

  1. 可靠性(Reliability), RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

  2. 灵活的路由(Flexible Routing), 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

  3. 消息集群(Clustering), 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

  4. 高可用(Highly Available Queues), 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

  5. 多种协议(Multi-protocol), RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

  6. 多语言客户端(Many Clients) ,RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

  7. 管理界面(Management UI), RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

  8. 跟踪机制(Tracing) ,如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

  9. 插件机制(Plugin System), RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

RabbitMQ 的架构模型

RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上,RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说,RabbitMQ 模型更像是一种交换机模型。


Connection

连接,作为客户端(无论是生产者还是消费者),你如果要与 RabbitMQ 通讯的话,你们之间必须创建一条 TCP 连接,当然同时建立连接后,客户端还必须发送一条“问候语”让彼此知道我们都是符合 AMQP 的语言的,比如你跟别人打招呼一般会说“你好!”,你跟国外的美女一般会说“hello!”一样。

你们确认好“语言”之后,就相当于客户端和 RabbitMQ 通过“认证”了。你们之间可以创建一条 AMQP 的信道(Channel)。

Channel

信道,是生产者/消费者与 RabbitMQ 通信的渠道。信道是建立在 TCP 连接上的虚拟连接,什么意思呢?就是说 rabbitmq 在一条 TCP 上建立成百上千个信道来达到多个线程处理,这个 TCP 被多个线程共享,每个线程对应一个信道,信道在 RabbitMQ 都有唯一的 ID ,保证了信道私有性,对应上唯一的线程使用。

为什么不建立多个 TCP 连接呢?

因为对于操纵系统而言,建立和销毁 TCP 是非常昂贵的,系统为每个线程开辟一个 TCP 是非常消耗性能,每秒成百上千的建立销毁 TCP 会严重消耗系统。所以 rabbitmq 选择建立多个信道(建立在 tcp 的虚拟连接)连接到 rabbit 上。

从技术上讲,这被称之为多路复用,对于执行多个任务的多线程或者异步应用程序来说,它非常有用。

Message

消息,包含有效载荷和标签,有效载荷指要传输的数据,标签描述了有效载荷,并且 rabbitmq 用它来决定谁获得消息,消费者只能拿到有效载荷,并不知道生产者是谁。

Producer

生产者,消息的创建者,发送到 rabbitmq。

Consumer

消费者,消息的消费者,连接到 rabbitmq,订阅到队列上,消费消息,持续订阅和单条订阅。

Broker

代理服务,简单来说就是消息队列服务器实体,默认端口 5672。

Exchange

交换机,用来接收生产者发送的消息,然后将这些消息根据路由键发送到队列,主要有四种,后续会介绍。

Routing key

路由规则,虚拟机用它来确认如何路由一个特定消息,即 Exchange 根据这个关键字进行消息投递。

Binding

Exchange 和 Queue 之间的虚拟连接,它的作用就是把 exchange 和 queue 按照路由规则绑定起来,Binding 中可以包括多个 Routing key。

Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

交换机、队列、绑定、路由键之间的关系

队列通过路由键绑定到交换机,生产者将消息发布到交换机,交换机根据绑定的路由键将消息路由到特定队列,然后由订阅这个队列的消费者进行接收。


Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / ,通过缺省用户和口令 guest 进行访问。


交换机类型

共有四种 direct,fanout,topic,headers,其种 headers(几乎和 direct 一样)不实用,可以忽略。

direct

路由键完全匹配,消息被投递到对应的队列, direct 交换器是默认交换器。ExChange 会将消息发送完全匹配 ROUTING_KEY 的 Queue。

fanout

消息广播到绑定的队列,不管队列绑定了什么路由键,消息经过交换机,每个队列都有一份。

topic

通过使用 **通配符进行处理,使来自不同源头的消息到达同一个队列,通过 . 将路由键分为了几个标识符,*匹配其后面的 1 个标识符,#匹配一个或多个标识符。如:

user.#  # 可以匹配到 user.add  user.add.batch
user.*  # 只能匹配到 user.add ,不能匹配到 user.add.batch

本文文字及图片出自 InfoQ

余下全文(1/3)
分享这篇文章:

请关注我们:

发表评论

邮箱地址不会被公开。 必填项已用*标注