RocketMQ 学习笔记
MQ 概述
MQ 简介
MQ
(Message Queue)是消息队列的意思,通常指提供消息队列服务的中间件。简单说就是接收消息、储存消息、转发消息的东西。
MQ 用途
MQ 的应用场景有许多,大抵上分为 3 类
流量削峰填谷
将系统超载的请求暂存到 MQ 中,然后在系统负荷内慢慢消费。一方面防止系统被超量请求压垮,以方便提高资源利用率。
业务异步解耦
一般是 A 服务做完某些事情然后发出通知,其他服务可以订阅这些消息处理自己的逻辑。一方面A 不关心订阅者的处理逻辑,也不用同步等待其他服务处理完成,另一方面新增减少订阅者都不需要改动 A 服务的代码。
海量数据收集
一般用在大数据行业,例如物联网行业内,传感器每时每刻都在产生数据,一个是数据量大,一个是写入频率高。传统的数据库不适合直接处理这样的数据。而MQ则非常适合做这种数据储存的临时介质,这方面主要 Kafka 的天下。
RocketMQ
为什么选择 RocketMQ?
MQ 功能基本都大同小异,市面上可选择的也不是很多,我选择 RocketMQ主要是考虑以下几点:
- 吞吐量高,天然支持集群部署
- 开发语言是Java,中文文档,便于学习
- 阿里巴巴出品,通过双11级的高并发考验,消息支持 0 丢失配置。
主要概念
RocketMQ主要分 4 部分,Producer,Consumer,Name Server,Broker Server
Producer
也就是生产者,负责发送消息。主要是从 NameServer 里拉取目标 Broker 信息,然后将消息发送给 Broker,消息支持同步发送、异步发送、顺序发送、单向发送
Consumer
也就是消费者,负责拉取并消费消息,从消费者的角度提供了 2 种获取消息的方式:拉取式消费(Pull) 和推动式消费(Push)。同时提供2 种消费模式:集群消费和广播消费。
Name Server
名服务器,类似微服务中的注册中心,持有 Broker 的一些信息供生产者和消费者路由选择Broker。
Broker Server
代理服务器,负责储存消息和转发消息。内部也储存消息的元数据,包括消费者组、消费进度便宜、主题、和队列消息等
消息(Message)
实际消息的载体,包含实际消息以及一些元数据 例如,Topic、Tag等
主题(Topic)
一类消息的集合,是 RocketMQ 订阅的基本单位
队列(Queue)
Topic内部进一步的划分,实际上也就是 consumerqueue,主题一般包含一个或多个队列,并且还区分读写逻辑上的区分,消息索引就储存在队列里,实际消费的消息就是从这里取出的,集群模式同一时刻下,一个队列只能被同一个消费者组中的一个消费者消费。
工作原理
- 启动 Name Server,Name Server 监听指定端口等待 Broker Server、Producer、Consumer 连接
- 启动 Broker Server,Broker Server连接 Name Server,在Name Server那维护自己的 Topic、Queue 以及负载等信息
- 启动 Producer,连接 Name Server,创建或者获取 Topic,并从 Name Server 中选出 Topic 对应的 Broker Server地址以及 Queue 信息,然后像 Boker Server 发送消息。
- Broker Server 收到消息后会计算消息在 consumerqueue 中的偏移、tag hash等其他数据组装成一个消息,然后写入到 commitlog 文件中,同时将索引写入相应的 consumerqueue 的文件中。
- 启动 Consumer,连接 Name Server,通过 topic以及一些消息过滤方式,从Name Server 拿到真实的 Broker Server 地址以及 Queue 信息。Queue 也就是 consumerqueue,内部包含 Topic 消息在 commitlog 文件里的偏移,根据
消费进度
(集群消费模式下消费进度由 broker 提供,广播模式下由 consumer 自己提供)拉取下一条消息进行消费(实际上是批量拉取在缓存后再单个或批量消费),消费完成后返回 ACK,成功或其他状态。
特性
- 订阅发布
- 本质上是拉取
- 顺序消费
- 同一个队列保证有序
- 消息过滤
- tag 过滤
- 消息属性过滤,类似SQL语言
- 消息可靠性
- 单点故障下异步刷盘会丢失少量消息,同步刷盘不丢失消息
- 磁盘损坏下,通过磁盘Raid10 或主从来保证消息可靠性(异步刷盘仍然会丢失少量消息)
- 同步复制或刷盘下可靠性极高,不会丢失消息,但性能低,实际使用需要权衡
- 至少一次
- RocketMQ 保证消息至少投递一次,但消息可能重复投递,某些特殊场景下还会出现大量重复消息
- 消息处理需要支持幂等
- 回溯消费
- 消费完的消息并不是直接删除,因此RocketMQ支持配置消费某个时间点后的消息,时间精确到毫秒
- 事务消息
- 发送事务消息,执行本地事务,成功后提交事务消息,失败则回滚事务消息,事务消息提交后才对消费者可见,若事务消息迟迟没提交或者回滚则会自动
回查
- 发送事务消息,执行本地事务,成功后提交事务消息,失败则回滚事务消息,事务消息提交后才对消费者可见,若事务消息迟迟没提交或者回滚则会自动
- 定时消息
- 支出发送配置指定延迟级别下的延迟消息(默认为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level),消息延迟指定时间后才会对消费者可见。
- 消息重试
- 消息消费失败下,希望重试消息,每次重试会延迟一定时间(默认重试级别时间配置参考定时消息的延迟级别,去掉前2个级别)
- 消息重投
- 同步发送:向其他 broker 重新发送消息
- 异步发送:仅在同一个 broker 下不断尝试
- 单向消息 Oneway:只管发,不重试
- 流量控制
- 生产者流控,拒绝 send,并且
不会重试
,通常是 broker 消息写入能力达到瓶颈了- commitlog 被锁超过 osPageCacheBusyTimeOutMills 时,默认 1000ms
- 如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控
- broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。
- 消费者流控,会降低消息拉取频率,通常是消费过程耗时太长
- 消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。
- 消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。
- 消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。
- 生产者流控,拒绝 send,并且
- 死信队列
- 消息不断重试,达到最大重试次数后仍然失败则会进去私信队列,通常意味着业务逻辑或者消息本身存在问题,这时候需要开发人员介入排查失败原因。
性能问题
- 低配服务器下也能有上万的吞吐量
- RocketMQ 储存介质是文件,依赖操作系统提供的
mmap零拷贝
技术,将对文件操作转换为对内存的操作,极大的提高文件读写效率。 - 储存文件以及索引文件写入几乎都是顺序写入,降低了寻址时间。
- 消费消息时经过索引后会产生大量随机读取 commitlog 文件的操作,使用固态能显著提高读取效率。
集群搭建方案
多主多从(普通集群),多 master broker 保证负载,多 slave broker 提供稳定性。
Dledger(高可用集群),Raft选主,主节点负责维护 commitlog 一致,但性能有所牺牲