简介
今天在INFOQ上看到了去哪儿开源的QMQ的介绍去哪儿网消息队列设计与实现 。看了下挺有趣的,所以去Github上看了看他们的架构。其实QMQ在16年还是17年就已经开源了,那时候参加Qcon的时候听去哪儿网的工程师分享过。他们初期使用的是AMQ(ActiveMQ),后来说AMQ好像会出现丢消息等等问题,就自研MQ了。
闲话不多说,还是来看看QMQ的介绍吧。
1. 消费者扩容带来的痛点
来源:QMQ设计背景
QMQ是2012年开始开发的,在2012年的时候的消息中间件其实并不成熟,比如Kafka刚开源了1年,RocketMQ那时候还是MetaQ,只是Kafka的Java实现而已,Pulsar那时候也正在开发中,ActiveMQ和RabbitMQ一个是过于复杂,另一个开发语言是ERLANG,找不到掌握的人。这个其实也是大部分公司在那个阶段的痛点,要不就是下决心用RabbitMQ然后到处找问题的解决办法,要不就是花更多精力运维ActiveMQ并研究掌握各种配置方式。文章里说出现消息丢失和进程hang住的现象,根据笔者这两年的使用经历,一般都是配置方面的问题导致的。
Kafka和RocketMQ有什么问题呢?其实就是partition和consumer这个机制带来的问题,partition和consumer对应,导致了消费者无法动态横向扩容。这个问题最痛的地方是,如果在项目上线的初期没有设计好足够的partition时,就很容易导致consumer的处理速度跟不上但又无法通过扩容来解决问题,等未消费的消息达到了存储的上限或者超过了保留周期,就直接被Kafka删除了。这也是Kafka机制带来的消息丢失的场景。
用QMQ设计背景里的几张图可以很清晰地说明问题:
- partition和Consumer对应机制
- 无法横向扩容消费者
- 消费者和partition数量不同导致负载不均衡
- partition徒增的消息高峰无法通过横向扩容解决
2. 基本架构模型
QMQ的基本架构可以分为几个部分
- meta server提供集群管理和集群发现的作用
- server 提供实时消息服务
- delay server 提供延时/定时消息服务,延时消息先在delay server排队,时间到之后再发送给server
- producer 消息生产者
- consumer 消息消费者
图中各个组件的交互步骤
- delay server 向meta server注册
- 实时server 向meta server注册
- producer在发送消息前需要询问meta server获取server list
- meta server返回server list给producer(根据producer请求的消息类型返回不同的server list)
- producer发送延时/定时消息
- 延时时间已到,delay server将消息投递给实时server
- producer发送实时消息
- consumer需要拉取消息,在拉取之前向meta server获取server list(只会获取实时server的list)
- meta server返回server list给consumer
- consumer向实时server发起pull请求
- 实时server将消息返回给consumer
基本可以理解成这样的模式:
meta server进行消息的负责集群管理和集群发现,而meta server所使用的数据一致性方式是mysql数据库,这个倒是一个很新颖的模式。由于数据源相同,可以解决分布式meta节点之间元数据不一致的问题,但是为啥不用redis?
Producer和Consumer的交互与rocketMQ很像,是和meta server之间保持长连接(?),而后往对应的delay server或者server发消息,delay server专门用于延迟消息,只有生产者可以与其交互,到达延时的时间以后,delay server将消息发送给实时server,而后消费可以拿到。算是通过中间层来解决了延时消息的问题。
3. 消息存储模型
对于这种直连式的消息引擎,消费者消费位置的存储方式有很多种,比如存放在zookeeper上,存放在topic里(Kafka)等等……总之就是需要在服务端存放一份,这样做的用途是可以在异常导致Consumer重连的时候仍然可以从上次消费的位置开始消费。QMQ的存放方式是在实时server端。
- message log :所有subject的消息进入该log,消息的主存储
- consume log :consume log存储的是message log的索引信息
- pull log :每个consumer拉取消息的时候会产生pull log,pull log记录的是拉取的消息在consume log中的sequence
估计是参考了rocketMQ的模型,消息保存在主log里(类比rocketMQ的commit log),然后使用consumer log的模式来记录每个消息的索引,实现顺序写随机读的模式(类比rocketMQ的consumer queue)。然后增加了一个pull log,来记录每个消费者的消费顺序。这样的坏处是增加了磁盘的消耗,好处是可以回放消费者的消费顺序。
其实如果只是为了消费者重连时候断点继续消费,只需要保存一个offset就行了吧。但是通过这样的模式,可以实现消费者的动态横向扩展,而不是将partition和consumer绑定了。
当然我觉得这个机制肯定会带来性能的损耗,性能上肯定比不上Kafka和rocketMQ。
4. 延时消息的实现
RocketMQ是使用固定时间间隔的模式,QMQ实现的是任意时间点的延迟,最长可以延迟消息投递到2年。这样也就有了delay-server的存在意义,因为延迟两年的消息要保存在server上其实会有很多不确定因素。比如占用了实时消息的空间等等。
QMQ的延时/定时消息使用的是两层HashWheelTimer来实现的。第一层位于磁盘上,每个小时为一个刻度,每个刻度会生成一个日志文件,因为QMQ支持两年内的延迟消息,则最多会生成 2 * 366 * 24 = 17568 个文件。第二层在内存中,当消息的投递时间即将到来的时候,会将这个小时的消息索引从磁盘文件加载到内存中的HashWheelTimer上。
- message log 和实时消息里的message log类似,收到消息后append到该log
- schedule log 按照投递时间组织,每个小时一个。该log是回放message log后根据延时时间放置对应的log上,这是上面描述的两层HashWheelTimer的第一层,位于磁盘上
- dispatch log 延时/定时消息投递后写入,主要用于在应用重启后能够确定哪些消息已经投递
我理解delay-server里每个小时读取一个schedule log文件,然后将其放在内存的HashWheelTimer里,将这一个小时里的消息进行延迟投递到实时server上,如果produer发送了一个小时内的延迟消息,则直接进入内存的HashWheelTimer,否则就存入message log,通过异步回放message log的模式生成schedule log。当然因为我还没读源码,后续希望可以进一步看看这个具体的机制。
5. 高可用
QMQ的高可用也是类似RocketMQ的主备模式:生产和消费在master上,当消息发送给master后,slave会从master同步消息,只有消息同步到slave后master才会返回成功的响应给producer,这就保证了master和slave上都有一致的消息。当master和slave之间的延迟增大时,会标记该group为readonly状态,这个时候将不再接收消息,只提供消息消费服务。下图为消息发送和主从同步示意图。
然后也有RocketMQ的臭毛病:目前当master离线后,不提供自动切换功能,需要人工启动master。当slave离线后,该group不再提供接收消息服务,只提供消息消费服务。当master出现故障,导致消息丢失时,可以将其切换为slave,原来的slave切换为master,slave将从master同步数据,同步完成后提供服务。
也就是说异常的时候需要告警,可以尝试自动重启主机,不行再由人工来实现主机的重启。
6. 监控和Trace
QMQ提供prometheus的export,其实这个也是目前常见的监控模式了,在页面监控里写了指标,但是没写推荐阈值,也是需要生产环境验证以后才能定。不友好。。
Trace功能是在生产者和消费者的生产者和消费动作上进行了埋点,基本和Kafka的interceptor的模式类似,可以定义生产成功与否的日志记录或者往数据库,redis,其他消息中间件上进行日志的记录。实现追踪的能力。
总结
QMQ基本的消息模型和目前最主流的Kafka和rocketMQ类似,基本可以理解为rocketMQ的一些机制补足,比如delay server的延时消息服务器,pull log实现的消费者横向扩展能力等等很亮眼。但是前者可以通过delay server自研外挂在rocketMQ上实现,后者在实际的环境中也不是足够痛的点。这就导致了QMQ和RocketMQ相比,吸引力不够大。生态上来说比不上Kafka,机制上又比不上Pulsar的计算存储分离的cloud-native模式。
而且看了下客户端用了lambda,应该是要求jdk1.8以上……不像rocketMQ仍然支持jdk 1.6.
emmm…很尴尬…..
不过delay server和pull log倒是给了我一些灵感,看看能不能合并到Kafka上。。
整理下可以后续借鉴的点:
- pull log 实现消费者横向扩展,可以做个consumer proxy来实现Kafka的消费者横向扩展能力。
- delay sever HashWheelTimer好像很有趣,准备研究下。
- mysql或者redis作为元数据的存放地,然后提供接口供生产者消费者连接,前期我设计了个消息中间件前置也是这么搞的。
- 作为消息平台,可以为各个接入应用提供trace的能力,通过提供一个公用的producer interceptor和consumer interceptor实现生产和消费的日志异步批量往一个地方发。可以和公司内部的日志中心联动一下。
- 还是应该大力推动一下prometheus的使用。下周把JMX exporter放到ActiveMQ服务器上用prometheus实现性能监控先。