MQ 概述

MQ 简介

MQ (Message Queue)是消息队列的意思,通常指提供消息队列服务的中间件。简单说就是接收消息、储存消息、转发消息的东西。

MQ 用途

MQ 的应用场景有许多,大抵上分为 3 类

流量削峰填谷

将系统超载的请求暂存到 MQ 中,然后在系统负荷内慢慢消费。一方面防止系统被超量请求压垮,以方便提高资源利用率。

业务异步解耦

一般是 A 服务做完某些事情然后发出通知,其他服务可以订阅这些消息处理自己的逻辑。一方面A 不关心订阅者的处理逻辑,也不用同步等待其他服务处理完成,另一方面新增减少订阅者都不需要改动 A 服务的代码。

海量数据收集

一般用在大数据行业,例如物联网行业内,传感器每时每刻都在产生数据,一个是数据量大,一个是写入频率高。传统的数据库不适合直接处理这样的数据。而MQ则非常适合做这种数据储存的临时介质,这方面主要 Kafka 的天下。

RocketMQ

为什么选择 RocketMQ?

MQ 功能基本都大同小异,市面上可选择的也不是很多,我选择 RocketMQ主要是考虑以下几点:

  • 吞吐量高,天然支持集群部署
  • 开发语言是Java,中文文档,便于学习
  • 阿里巴巴出品,通过双11级的高并发考验,消息支持 0 丢失配置。

主要概念

RocketMQ主要分 4 部分,Producer,Consumer,Name Server,Broker Server

Producer

也就是生产者,负责发送消息。主要是从 NameServer 里拉取目标 Broker 信息,然后将消息发送给 Broker,消息支持同步发送、异步发送、顺序发送、单向发送

Consumer

也就是消费者,负责拉取并消费消息,从消费者的角度提供了 2 种获取消息的方式:拉取式消费(Pull) 和推动式消费(Push)。同时提供2 种消费模式:集群消费和广播消费。

Name Server

名服务器,类似微服务中的注册中心,持有 Broker 的一些信息供生产者和消费者路由选择Broker。

Broker Server

代理服务器,负责储存消息和转发消息。内部也储存消息的元数据,包括消费者组、消费进度便宜、主题、和队列消息等

消息(Message)

实际消息的载体,包含实际消息以及一些元数据 例如,Topic、Tag等

主题(Topic)

一类消息的集合,是 RocketMQ 订阅的基本单位

队列(Queue)

Topic内部进一步的划分,实际上也就是 consumerqueue,主题一般包含一个或多个队列,并且还区分读写逻辑上的区分,消息索引就储存在队列里,实际消费的消息就是从这里取出的,集群模式同一时刻下,一个队列只能被同一个消费者组中的一个消费者消费。

工作原理

  1. 启动 Name Server,Name Server 监听指定端口等待 Broker Server、Producer、Consumer 连接
  2. 启动 Broker Server,Broker Server连接 Name Server,在Name Server那维护自己的 Topic、Queue 以及负载等信息
  3. 启动 Producer,连接 Name Server,创建或者获取 Topic,并从 Name Server 中选出 Topic 对应的 Broker Server地址以及 Queue 信息,然后像 Boker Server 发送消息。
  4. Broker Server 收到消息后会计算消息在 consumerqueue 中的偏移、tag hash等其他数据组装成一个消息,然后写入到 commitlog 文件中,同时将索引写入相应的 consumerqueue 的文件中。
  5. 启动 Consumer,连接 Name Server,通过 topic以及一些消息过滤方式,从Name Server 拿到真实的 Broker Server 地址以及 Queue 信息。Queue 也就是 consumerqueue,内部包含 Topic 消息在 commitlog 文件里的偏移,根据消费进度(集群消费模式下消费进度由 broker 提供,广播模式下由 consumer 自己提供)拉取下一条消息进行消费(实际上是批量拉取在缓存后再单个或批量消费),消费完成后返回 ACK,成功或其他状态。

特性

  • 订阅发布
    • 本质上是拉取
  • 顺序消费
    • 同一个队列保证有序
  • 消息过滤
    • tag 过滤
    • 消息属性过滤,类似SQL语言
  • 消息可靠性
    • 单点故障下异步刷盘会丢失少量消息,同步刷盘不丢失消息
    • 磁盘损坏下,通过磁盘Raid10 或主从来保证消息可靠性(异步刷盘仍然会丢失少量消息)
    • 同步复制或刷盘下可靠性极高,不会丢失消息,但性能低,实际使用需要权衡
  • 至少一次
    • RocketMQ 保证消息至少投递一次,但消息可能重复投递,某些特殊场景下还会出现大量重复消息
    • 消息处理需要支持幂等
  • 回溯消费
    • 消费完的消息并不是直接删除,因此RocketMQ支持配置消费某个时间点后的消息,时间精确到毫秒
  • 事务消息
    • 发送事务消息,执行本地事务,成功后提交事务消息,失败则回滚事务消息,事务消息提交后才对消费者可见,若事务消息迟迟没提交或者回滚则会自动回查
  • 定时消息
    • 支出发送配置指定延迟级别下的延迟消息(默认为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level),消息延迟指定时间后才会对消费者可见。
  • 消息重试
    • 消息消费失败下,希望重试消息,每次重试会延迟一定时间(默认重试级别时间配置参考定时消息的延迟级别,去掉前2个级别)
  • 消息重投
    • 同步发送:向其他 broker 重新发送消息
    • 异步发送:仅在同一个 broker 下不断尝试
    • 单向消息 Oneway:只管发,不重试
  • 流量控制
    • 生产者流控,拒绝 send,并且不会重试,通常是 broker 消息写入能力达到瓶颈了
      • commitlog 被锁超过 osPageCacheBusyTimeOutMills 时,默认 1000ms
      • 如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控
      • broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。
    • 消费者流控,会降低消息拉取频率,通常是消费过程耗时太长
      • 消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。
      • 消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。
      • 消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。
  • 死信队列
    • 消息不断重试,达到最大重试次数后仍然失败则会进去私信队列,通常意味着业务逻辑或者消息本身存在问题,这时候需要开发人员介入排查失败原因。

性能问题

  • 低配服务器下也能有上万的吞吐量
  • RocketMQ 储存介质是文件,依赖操作系统提供的mmap零拷贝技术,将对文件操作转换为对内存的操作,极大的提高文件读写效率。
  • 储存文件以及索引文件写入几乎都是顺序写入,降低了寻址时间。
  • 消费消息时经过索引后会产生大量随机读取 commitlog 文件的操作,使用固态能显著提高读取效率。

集群搭建方案

多主多从(普通集群),多 master broker 保证负载,多 slave broker 提供稳定性。
Dledger(高可用集群),Raft选主,主节点负责维护 commitlog 一致,但性能有所牺牲