MQ初探

MQ思想浅析

消息队列核心当然是消息队列,是一个消息转发器,模型:

队列模型:纯粹一个队列,先进先出,但是这样的队列,虽然多个生产者能往里仍数据,但一个消息只能被一个消费者接受到,用了就没了。

2021-09-06-09-52-09-621185.png

主题模型(发布订阅):可以将一份消息分发给多个消费者,订阅某主题的消费者,可以收到全量消息。与队列模式相比,区别在于一份消息是否可以被重复消费。

2021-09-06-09-52-09-899223.png

上面两种区别主要是单播和广播的区别,当只有一个消费者订阅某主题时,就是队列模型了

场景:

解耦(系统解耦)、异步(提升系统吞吐量)、流量削峰、延迟通知、最终一致性、顺序消息、流式处理….

对于超出系统承载能力的场景,可以用队列进行限流保护。即:流量削峰

MQ简易设计

如果只是设计一个粗糙的MQ,那么两次RPC+消息存储即可(Dubbo+JDK自带的阻塞队列,例如ArrayBlockingQueue)

但是如果考虑到生产的方方面面问题,就没那么简单了,例如:

1:高并发场景下,如何保证收发消息的性能?

2:如何保证消息服务的高可用和高可靠?

3 : 如何保证服务是可以水平任意扩展的?

4: 如何保证消息存储也是水平可扩展的?

5: 各种元数据(比如集群中的各个节点、主题、消费关系等)如何管理,需不需要考虑数据的一致性?

将「一发一存一消费」这个核心流程进一步细化后,比较完整的数据流如下:

2021-09-06-09-52-11-796430.png

上图三大角色主要负责如下:

Broker:MQ的核心部分,负责消息的存储、备份、删除。为生产者和消费者提供RPC接口。(为什么需要Broker? Broker收到消息后不是直接转给消费方,为了保证消息不丢失,先落盘,存储起来,不影响业务)

Producer: 生产者,消息的生产,调用 Broker 提供的 RPC 接口发送消息。

Consumer:消费者,消息的消费,调用Broker提供的RPC接口接受消息,完成消息的消费。

基础功能

消息队列需要支持消息的发送、消息暂存,消息异步消费。

除了基本功能以外,消息队列在某些特殊的场景还需要支持事务,消息重试等功能。

  • 消息的顺序
  • 消息的可靠性保证
  • 消息的持久化
  • 支持不同消息模型
  • 多实例集群
  • 分布式环境下的负载均衡

分片

一个Broker是部署在某一台服务器上面,这个服务的磁盘存储空间是有限制的,不可能无限扩容。所以当消息量很大的时候,如果只是一直往机器的本地磁盘写数据,最终会写不进去的。

在设计的时候还要考虑数据分片的场景,一个Topic的数据可以分成很多份进行存储,分别存储在不同的Broker上,这样当磁盘不够的时候,可以通过增加Broker的节点来扩容。

那么问题来了,客户端写入的时候怎么知道这个Topic有哪些分片的存储信息,怎么知道有哪些Broker是在线的呢?这就要引入另一个设计:注册中心,在RocketMq中叫NameServer。

注册中心

注册中心、或者路由中心,为了只是调控Broker、consume、producer,三者。Broker启动的时候需要将自身的信息告诉NameServer,同时也要保持一个心跳检查,这样NameServer才能知道Broker当前是否处于正常状态。NameServer也要支持水平扩展,这样才能保证高可用性。既然要支持水平扩展,那么必然得无状态才行,但是NameServer本身就会存储一些数据,比如Broker信息。

实现方式:

1 : Broker启动的时候轮流向所有的NameServer进行注册,这样每个NameServer中都有全量的信息,即使某个节点挂了也不影响。RocketMQ就是使用的这种方式。

2: Broker启动的时候只向某一台NameServer进行注册,立马返回,然后NameServer之间再进行相互同步,Eureka就是使用的这种方式。

3: Broker启动的时候只向某一台NameServer进行注册,NameServer会同步向其他的NameServer进行数据的同步操作,等待所有写入成功或者半数写入成功,然后再返回给客户端。Zookeeper就是使用的这种方式。

技术难点与解决方案:

1 通讯

解决三者两两之间的通讯问题,用现有的RPC框架,也可以用Netty作地层通讯,zookeeper作为注册中心,自定义一套新的通信协议(类似 Kafka),也可以基于 AMQP 这种标准化的 MQ 协议来做实现(类似 RabbitMQ)对比直接用 RPC 框架,这种方案的定制化能力和优化空间更大。

2 高可用设计

Broker的高可用,只需保证Broker可水平扩展集群部署,进一步通过服务自动注册与发现、负载均衡、超时重试机制、发送和消费消息时的 ack 机制来保证。

主从:

使用过程中万一这个Broker挂掉了怎么办?这里是不是得考虑下高可用性,所以Broker还需要有主从的设计。

主节点的数据会同步给从节点,主节点出问题后,从节点可以顶上来提供服务,同时从节点也可以提供读的操作,为主节点减轻压力。

存储方案的高可用:参考 Kafka 的分区 + 多副本模式,但是需要考虑分布式场景下数据复制和一致性方案(类似 Zab、Raft等协议),并实现自动故障转移;还可以用主流的 DB、分布式文件系统

3存储设计

主流方案:追加写日志文件(数据部分) + 索引文件的方式(很多主流的开源 MQ 都是这种方式),索引设计上可以考虑稠密索引或者稀疏索引,查找消息可以利用跳转表、二分查找等,还可以通过操作系统的页缓存、零拷贝等技术来提升磁盘文件的读写性能。

4消费关系管理

Broker 是集群部署的,所以消费关系通常维护在公共存储上,可以基于 Zookeeper、Apollo 等配置中心来管理以及进行变更通知。

5高性能设计

业务线程池设计、生产者批量发送、Broker异步刷盘、消费端批量拉取

消息队列特性

1 即使通讯和消息队列

从消息能否被即时接受分析处理的角度,可以把消息投递分为两种:

一种是即时消息通讯,消息从消息生产端发送后立刻达到消费端,

一种是延迟消息通讯,有中间消息存储,即消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端。延迟消息通讯的容器实现就是消息队列。

产生的问题

队列产生的一些问题 :

消息丢失问题(如何知道消息丢失,如何确保消息不丢失,如何确认哪个环节消息丢失)

消息丢失: 确保消息的生产、存储、消费阶段。

生产消息阶段: 只要能正常收到Broker的消息ack确认响应,就表示发送成功,处理好返回值和异常,整个阶段是不会丢失的

存储阶段:消息中间件保证,比如broker做副本,保证一条消息至少同步两个节点再返回ack

消费阶段:消费端收到消息后,等到业务执行完成后才发送确认消息

即使如此,也不能保证消息不丢失

如何进行消息检测?

消息发送时,生产全局唯一id,或者消息版本号,然后在消费端做对应的消息检验

落地: 在生产端消息发送之前,用拦截器拦截消息,生成消息id(版本号或者分布式全局唯一),注入消息中,然后在消费端收到消息后,再通过拦截器检验消息的版本号的连续性和消费状态,这样实现的好处是消息检测的代码不会侵入到业务代码中,可以通过单独的任务来定位丢失的消息,做进一步的排查。

如何保证消息不重复消费

幂等。也就是任意多次操作产生的影响与一次操作的影响相同。可以写入redis来保证,因为redis的key和value是天然幂等的,或者用数据库的id,基于数据库的唯一键来保证重复数据不会被插入多条。

如何保证消息顺序消费?

RocketMQ在主题上是无序的,它只有在队列层面上是有序的。这里又会有两个概念:普通顺序和严格顺序。

普通顺序是指:消费者通过同一个消息队列接收到的消息是有顺序,不同消息队列接受到的消息可能是无顺序的。

严格顺序是指:消费者接受到的所有消息都是有顺序的,即使是在异常情况下。

严格顺序导致的问题:Broker 集群中只要有一台机器不可用,则整个集群都不可用。推荐普通顺序

Producer 生产消息的时候会进行轮询(取决你的负载均衡策略)来向同一主题的不同消息队列发送消息,如果一个订单,有创建、支付、发货,三个不同消息投放到一个topic的三个不同队列上呢,如何保证顺序?

可以用Hash取模的方法,保证同一个订单在同一个队列中

如何解决分布式事务 ?

RocketMQ 中使用的是 事务消息加上事务反查机制 来解决分布式事务问题的。

如何解决消息堆积问题

为什么一个主题需要维护多个队列

回答: 提高并发能力。的确,每个主题中只存在一个队列也是可行的。你想一下,如果每个主题中只存在一个队列,这个队列中也维护着每个消费者组的消费位置,这样也可以做到 发布订阅模式

但是呢,这样的话,生产者只能相一个队列投递消息又因为需要维护消费位置所以一个队列只能对应一个消费者组中的消费者,这样是不是其他的 Consumer 就没有用武之地了?

一些概念介绍 :

  • Producer group : 一类生产组,多个生产者的组合,一个生产组生产一类消息

  • Consumer group : 一类消费组,消费同类消息

  • topic : 主题,代表一类消息,例如:物流消息,订单消息。主题中可以有多个队列。

一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费,所以一般来讲要控制 消费者组中的消费者个数和主题中队列个数相同

消费位移:每个消费者组在每个队列中的消费位置都是不同的,为每个消费者组维护一个消费位移,每次消费者组消费完会返回一个成功的响应,然后队列再把维护的消费位移加一,这样就不会出现刚刚消费过的消息再一次被消费了。

有序消息的缺陷

1:发送顺序消息无法利用集群的故障转移特性,不能更换messageQueue进行重试。

2:发送路由策略导致的热点问题,导致某个messageQueue数据量很大。

RocketMQ

优势:

  • 支持集群,负载均衡,水平扩展
  • 亿极消息堆积
  • 采用零拷贝,顺序写,随机读
  • 底层通讯框架:netty
  • Nameserv替代zk,实现服务寻址,服务协调
  • 消息失败重试,消息可查
  • 集群无单点

RocketMQ 技术架构中有四大角色 :NameServer,Broker,Producer ,Consumer

那么这几个角色是干啥的?

Broker:

  • 主要负责消息存储,投递,查询以,及服务高可用保证。就是消息队列服务器,生产者把消息投递到Broker,消费者从Broker拉取消息消费。定时向NameServer发送心跳包,那么topic和Broker的关系又如何呢,一个topic可以分布在多个Broker上,一个Broker可以配置多个topic,是多对多的关系。如果一个topic消息量太大,可以多配置队列,或者分布在多个Broker上,减轻单个Broker的压力。如果某个 broker 上的队列越多,则该 broker 压力越大。

普通集群:

角色无法切换,当master宕机后slave无法自主切换为master。这一组的Broker就不可用

Dledger集群:

RocketMQ4.5引入,这个模式下,会随机选择一个节点作为master,而当master节点挂掉后,会从slave中自动选择一个节点作为master

Dledger做的事: 1:存储消息到CommitLog 。2:从集群中选举master节点(关键功能,用的是raft算法选举)。3:完成master节点的数据同步至slave。

raft算法(分布式一致性算法)选举:

三个角色: leader、candidate、follower。任期概念:每个任期时间(term)都会选举出一个leader,开始选举的时候每个都是follower,如果没有选举出来,进行下一次选举,每个角色随机睡眠一个时间。集群内会发送一个timeout信号,此时角色转成candidate,没有接受到其他投票请求的时候,自己选自己,把term值递增后发送给各个候选人(这里就是有先发起者优先当选的味道),其他后候选人接受到请求后,比对term值,比自己大的就投票给它,如果比自己小的,就拒绝请求,把票投给自己再发送给其他候选人。选出leader后,选举期间的candidate会转变成follower。leader会和其他follower保持心跳,确认自己的leader地位。也就是说,leader角色的节点是不断变化的。如果一个follower一段时间内没收到leader的心跳,会重新发起一个选举。

选举过程如下:

  • 集群启动后,例如有三个节点,发起投票后这三个都是follower。一轮投票后,这个三个候选者的term都是1,选举不出。
  • 一轮选举没选出来,那么就会随机睡眠,例如A 100ms B 200ms C 300ms,然后A先醒来,给自己投票并递增term,发送给其他候选人,B醒来后,接受到A的请求,发现它的term值比自己的1大,那么会选A,承认A是leader,自己会退化成follower这个时候,A已经获得多数票,C醒来后也是一样,投票给A,自己的term会更新为2。此时选举期间的候选者已转换成leader,follower
  • 在一个任期内,leader会给follower发送心跳,如果leader挂了或者其他follower没有收到心跳,会发起新一轮的选举。

Dledger采用raft协议进行多副本的消息同步

NameServer

  • 角色类似于Zookeeper,但比zk更轻量级,每个NameServer互相独立没有信息交互。

  • 如果一个Broker的topic数量太多,上万级别(数据几十M),可能在传心跳包的时候传输失败,导致NamerServer没有接收到,出现误判Broker已死。

  • 一个注册中心,主要提供: Broker管理路由信息管理 ,Broker会把自己的信息,注册到NameServer上,消费者和生产者,从 NameServer 中获取路由表然后照着路由表的信息和对应的 Broker 进行通信。生产者和消费者定期会向 NameServer 去查询相关的 Broker 的信息。

producer

  • 消息发布者,支持分布式,集群方式部署。
  • 发送有三种方式: 1 同步,消息发送方发送完后需要等待接收方的回应后才进行下一步操作。2 异步,不需要等待接收方回应就发送下一个数据包。3 单向,不管发送成功还是失败,发了后就不管。主要用于日志收集,用在可靠性不高的场景

comsumer

  • 消息消费的角色,支持分布式、集群部署
  • 负责消费消息,一般后台系统异步消费,不同系统设置不同消费组。如果是不同消费组订阅了同一个topic,对topic的一条消息,每个消费组都会获取到这个消息。
  • 支持pull和push,两种消费模式。支持集群消费和广播消费。

消费模式:

pull :

  • 消费者主动获取消息

特点:

  • 需要客户端维护offset,内存,磁盘,数据库
  • 主动消费可控性好,但是间隔时间不好控制,间隔时间短会出先空请求,间隔时间长则消息处理不及时。

push:

那么NameServer是干啥用的呢?

如果Broker直接与生产者,消费者关联,当 Broker 修改的时候必定会牵连着每个生产者和消费者,这样就会产生耦合问题,而 NameServer 注册中心就是用来解决这个问题的。

24d9e936-2057-4e57-add9-868c4314105a.jpeg

1:我们看到这张图,broker做了集群,并且主从部署,由于消息分布在各个 Broker 上,一旦某个 Broker 宕机,则该Broker 上的消息读写都会受到影响。所以 Rocketmq 提供了 master/slave 的结构,salve 定时从 master 同步数据(同步刷盘或者异步刷盘)如果 master 宕机, slave提供消费服务,但是不能写入消息

2:为了保证高可用,NameServer也做了集群部署,注意他是去中心化的,没有中心节点。 单个Broker和所有NameServer保持长连接

每隔30s,Broker会向,所有NameServer发送心跳,心跳包含了自身的 Topic 配置信息

3:生产者发送消息给Broker消息之前,需要先从NameServer获取Broker的路由消息,然后轮询的方法向每个队列生产数据已达到负载均衡,

4:消费者通过NameServer获取Broker路由消息后,向Broker发送pull请求,获取消息数据,Consumer 可以以两种模式启动—— 广播(Broadcast)和集群(Cluster)。广播模式下,一条消息会发送给 同一个消费组中的所有消费者 ,集群模式下消息只会发送给一个消费者。

存储机制

RocketMQ存储架构中的三大角色 : CommitLog,ConsumeQueue、IndexFile

CommitLog: 消息主体以及元数据的存储主体

ConsumerQueue: 消息消费队列,主要为了提高消息消费的性能,作为消费消息的索引。可以看出,consumerQueue文件是基于topic的commitlog索引文件。

IndexFile : 索引文件,可以通过key或时间区间来查询消息的方法

RocketMQ采用的是混合式的存储结构,把一堆消息不分类的放在一起,那么为啥呢 ?

是为了提高写入数据的效率,但这就牺牲了读的效率,一大批消息太乱了,读取要遍历整个大文件,于是便引入了ConsumeQueue,作为每个队列的索引文件来提升读取消息效率,可以直接根据队列的消息序号,计算出索引的全局位置

架构图:

87e3e6f4-e167-4bd3-af1f-5da7c54ad6ec.jpeg