Rocket、kafka、rabbitmq消息队列之间架构的对比
一个消息队列需要解决哪些问题
阿里中间件团队的这篇十分钟入门RocketMq文章中指出了一个消息中间件需要解决哪些问题,然后展示了RocketMq是如何工作的。
- 发布/订阅:基本功能
- 消息优先级:消息的优先投递功能,RocketMq及一些持久化的消息队列要么不支持消息优先级,要么只支持分级别的消息优先级。rocketmq属于后者,kafka属于前者
- 消息有序性:消息的顺序投递,RocketMq支持严格的消息有序性,而kafka因为一个Topic被物理分成多分区所以仅支持总的有序性。
- 消息过滤:有在broker端和在consumer端做过滤两种选择, 在broker端做过滤的缺点是broker压力过大,而broker又是整个消息队列当中最为重要的一环。但是如果在consumer中做过滤又会让consumer接收许多无用数据(个人认为这里不是什么问题)
- 消息持久化:数据库、kv存储系统、文件甚至内存镜像都是可以吃持久化的对象。
- 消息可靠性:直接看引用文章
- 低时延:kafka采用短轮询push,RocketMq采用长轮询pull
- 消息至少被投递一次:
- 幂等性:
- 消息拒绝策略:broker内存中满了之后对后来的消息会采取拒绝策略
- 消息堆积的能力:大量消息堆积是否会对性能造成影响也是mq需要保证的。
- 事务:分布式事务
- 定时消息:同消息优先级一样,只支持某几种级别的定时消息。
- 消息重试:失败之后重试
1. 架构
RocketMq

RocketMq工作流程
- 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
Kafka架构

Kafka工作流程

kafka为每个主题维护了分布式的分区(partition)日志文件,每个partition在kafka存储层面是append log。任何发布到此partition的消息都会被追加到log文件的尾部,在分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号,也就是我们的offset,offset是一个long型的数字,我们通过这个offset可以确定一条在该partition下的唯一消息。在partition下面是保证了有序性,但是在topic下面没有保证有序性。如果要保证全局的有序性,只需要建立一个Partition分区即可。
partition按照key划分
1.如果没有Key值则进行轮询发送。
2.如果有Key值,对Key值进行Hash,然后对分区数量取余,保证了同一个Key值会被路由到同一个分区,如果想队列的强顺序一致性,可以让所有的消息都设置为同一个Key。
2. 消息存储
Rocketmq消息存储

消息存储是RocketMQ中最为复杂和最为重要的一部分
消息存储整体架构
- CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件

ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件,根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M(思考:1.为什么根据commitlog文件检索消息是非常低效的?2.为什么要引入consumeQueue?)
IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:$HOME \store\index${fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。
在上面的RocketMQ的消息存储整体架构图中可以看出,RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。RocketMQ的混合型存储结构(多个Topic的消息实体内容都存储于一个CommitLog中)针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。
关键词:多个Topic共用一个CommitLog、数据索引分离、长轮询请求
Rocketmq还采用了页缓存和内存映射(Mmap)提高预读速度,即时消息堆积也有不错的读写性能。另外消息落盘还支持同步刷盘和异步刷盘,需要根据需求自主选择使用哪种刷盘策略,选择不同的刷盘策略会对消息的可靠性造成产生不同的效果。
Kafka消息存储
kafka的每个topic物理上被划分为多个partition,而每个partition目录中包含多个按顺序生成的segment文件,在每个segment文件中记录了数据和索引信息。所以同Rocketmq多个topic被记录到同一个CommitLog不同,每个topic都会被分开记录。
每个topic下partition存储:在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1
partition中文件存储方式:每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除,segment文件的生命周期由服务端配置,这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。
partiton中segment文件存储结构:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件。partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充


上述图3中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。 其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移地址为497。
在partition中如何通过offset查找message
例如读取offset=368776的message,需要通过下面2个步骤查找。
- 第一步查找segment file,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset二分查找文件列表,就可以快速定位到具体文件。 当offset=368776时定位到00000000000000368769.index|log
- 第二步通过segment file查找message 通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。