抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RocketMQ消息存储

RocketMQ存储概要设计

​ 目前的MQ中间件从存储模型来,分为需要持久化和不需要持久化的两种模型,现在大多数的是支持持久化存储的,比如ActiveMQ、RabbitMQ、Kafka、RocketMQ。ZeroMQ却不需要支持持久化存储而业务系统也大多需要MQ有持久存储的能力,这样可以大大增加系统的高可用性。

​ 从存储方式和效率来看,文件系统高于KV存储,KV存储又高于关系型数据库,直接操作文件系统肯定是最快的,但如果从可靠性的角度出发直接操作文件系统是最低的,而关系型数据库的可靠性是最高的。

​ RocketMQ主要存储的文件包括Commitlog文件、ConsumeQueue文件、IndexFile。RocketMQ将所有主题的消息存储在同一文件,确保消息发送时顺序写文件,尽最大的能力确保消息发送的高性能与高吞吐量。

​ 但由于一般的消息中间件是基于消息主题的订阅机制,这样便给按照消息主题检索消息带来了极大的不便。为了提高消息消费的效率,RocketMQ引入了ConsumeQueue消息队列文件,每个消息主题包含多个消息消费队列,每个消息队列有一个消息文件IndexFile索引文件,其主要设计理念就是为了加速消息的检索性能,可以根据消息的属性快速从Commitlog文件中检索消息,整体如下:

  • CommitLog :消息存储文件,所有消息主题的消息都存储在 CommitLog 文件中

  • ConsumeQueue :消息消费队列,消息到达 CommitLog 文件后,将异步转发到消息消费队列,供消息消费者消费

  • IndexFile :消息索引文件,主要存储消息 Key 与 Offset 的对应关系

消息存储结构

​ 消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。RocketMQ 采取一些机制,尽量向 CommitLog 中顺序写,但是随机读。单个文件大小默认1G ,可通过在 broker 置文件中设置 mapedFileSizeCommitLog 属性来改变默认大小。

CommitLog

​ CommitLog 以物理文件的方式存放,每台 Broker 上的 CommitLog 被本机器所有 ConsumeQueue 共享,文件地址:$ {user.home} \store$ { commitlog} \ $ { fileName}。

​ 文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。

​ Commitlog 文件存储的逻辑视图如下,每条消息的前面 4 个字节存储该条消息的总长度。但是一个消息的存储长度是不固定的。

文件的消息单元存储详细信息

编号 字段简称 字段大小(字节) 字段含义
1 msgSize 4 代表这个消息的大小
2 MAGICCODE 4 MAGICCODE = daa320a7
3 BODY CRC 4 消息体BODY CRC 当broker重启recover时会校验
4 queueId 4
5 flag 4
6 QUEUEOFFSET 8 这个值是个自增值不是真正的consume queue的偏移量,可以代表这个consumeQueue队列或者tranStateTable队列中消息的个数,若是非事务消息或者commit事务消息,可以通过这个值查找到consumeQueue中数据,QUEUEOFFSET * 20才是偏移地址;若是PREPARED或者Rollback事务,则可以通过该值从tranStateTable中查找数据
7 PHYSICALOFFSET 8 代表消息在commitLog中的物理起始地址偏移量
8 SYSFLAG 4 指明消息是事物事物状态等消息特征,二进制为四个字节从右往左数:当4个字节均为0(值为0)时表示非事务消息;当第1个字节为1(值为1)时表示表示消息是压缩的(Compressed);当第2个字节为1(值为2)表示多消息(MultiTags);当第3个字节为1(值为4)时表示prepared消息;当第4个字节为1(值为8)时表示commit消息;当第3/4个字节均为1时(值为12)时表示rollback消息;当第3/4个字节均为0时表示非事务消息
9 BORNTIMESTAMP 8 消息产生端(producer)的时间戳
10 BORNHOST 8 消息产生端(producer)地址(address:port)
11 STORETIMESTAMP 8 消息在broker存储时间
12 STOREHOSTADDRESS 8 消息存储到broker的地址(address:port)
13 RECONSUMETIMES 8 消息被某个订阅组重新消费了几次(订阅组之间独立计数),因为重试消息发送到了topic名字为%retry%groupName的队列queueId=0的队列中去了,成功消费一次记录为0;
14 PreparedTransaction Offset 8 表示是prepared状态的事物消息
15 messagebodyLength 4 消息体大小值
16 messagebody bodyLength 消息体内容
17 topicLength 1 topic名称内容大小
18 topic topicLength topic的内容值
19 propertiesLength 2 属性值大小
20 properties propertiesLength propertiesLength大小的属性数据
ConsumeQueue

​ RocketMQ基于主题订阅模式实现消息的消费,消费者关心的是主题下的所有消息。但是由于不同的主题的消息不连续的存储在commitlog文件中,如果只是检索该消息文件可想而知会有多慢,为了提高效率,对应的主题的队列建立了索引文件,为了加快消息的检索和节省磁盘空间,每一个consumequeue条目存储了消息的关键信息commitog文件中的偏移量、消息长度、tag的hashcode值。

​ ConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个 Topic 下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件, 文件地址在$ {$storeRoot} \consumequeue$ {topicName} $ { queueld} $ {fileName}。

​ 单个consumequeue文件中默认包含30万个条目,每个条目20个字节,所以每个文件的大小是固定的20w x 20字节,单个consumequeue文件可认为是一个数组,下标即为逻辑偏移量,消息的消费进度存储的偏移量即逻辑偏移量。

​ ConsumeQueue中并不需要存储消息的内容,而存储的是消息在CommitLog中的offset。也就是说,ConsumeQueue其实是CommitLog的一个索引文件。

​ ConsumeQueue是定长的结构,每1条记录固定的20个字节。很显然,Consumer消费消息的时候,要读2次:先读ConsumeQueue得到offset,再读CommitLog得到消息内容。

​ ConsumeQueue 即为 Commitlog 文件的索引文件,其构建机制是 当消息到达 Commitlog 文件后由专门的线程产生消息转发任务,从而构建消息消费队列文件(ConsumeQueue )与下文提到的索引文件。

设计的优势

存储机制这样设计有以下几个好处:

  • CommitLog 顺序写 ,可以大大提高写入效率。(实际上,磁盘有时候会比你想象的快很多,有时候也比你想象的慢很多,关键在如何使用,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到 600MB/s ,超过了一般网卡的传输速度,这是磁盘比想象的快的地方 但是磁盘随机写的速度只有大概 lOOKB/s,和顺序写的性能相差 6000 倍!)。

  • 虽然是随机读,但是利用操作系统的 pagecache 机制,可以批量地从磁盘读取,作为 cache 存到内存中,加速后续的读取速度。

  • 为了保证完全的顺序写,需要 ConsumeQueue 这个中间结构 ,因为 ConsumeQueue 里只存偏移量信息,所以尺寸是有限的,在实际情况中,大 部分的 ConsumeQueue 能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证 CommitLog 和 ConsumeQueue 的一致性, CommitLog 里存储了 Consume Queues 、Message Key、Tag 等所有信息,即使 ConsumeQueue 丢失,也可以通过 commitLog 完全恢复出来。

IndexFile

​ index 存的是索引文件,用于为生成的索引文件提供访问服务,这个文件用来加快消息查询的速度,通过消息Key值查询消息真正的实体内容。消息消费队列 RocketMQ 专门为消息订阅构建的索引文件 ,提高根据主题与消息检索 消息的速度 ,使用 Hash 索引机制,具体是 Hash 槽与 Hash 冲突的链表结构。

​ 在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引

IndexFile结构分析

IndexHead 数据

​ beginTimestamp:该索引文件包含消息的最小存储时间 endTimestamp:该索引文件包含消息的最大存储时间 beginPhyoffset:该索引文件中包含消息的最小物理偏移量(commitlog 文件偏移量) endPhyoffset:该索引文件中包含消息的最大物理偏移量(commitlog 文件偏移量) hashSlotCount:hashslot个数,并不是 hash 槽使用的个数,在这里意义不大, indexCount:已使用的 Index 条目个数

Hash 槽

​ 一个 IndexFile 默认包含 500W 个 Hash 槽,每个 Hash 槽存储的是落在该 Hash 槽的 hashcode 最新的 Index 的索引

Index 条目列表

  • hashcode:key 的 hashcode

  • phyoffset:消息对应的物理偏移量

  • timedif:该消息存储时间与第一条消息的时间戳的差值,小于 0 表示该消息无效

  • preIndexNo:该条目的前一条记录的 Index 索引,hash 冲突时,根据该值构建链表结构

IndexFile条目存储

​ RocketMQ将消息索引键与消息的偏移量映射关系写入IndexFile中,其核心的实现方法是public boolean putKey(final String key, final long phyOffset, final long storeTimestamp);参数含义分别是消息的索引、消息的物理偏移量、消息的存储时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
//判断当前的条目数是否大于最大的允许的条目数
if (this.indexHeader.getIndexCount() < this.indexNum) {
//获取KEY的hash值(正整数)
int keyHash = indexKeyHashMethod(key);
//计算hash槽的下标
int slotPos = keyHash % this.hashSlotNum;
//获取hash槽的物理地址
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
// false);
//获取hash槽中存储的数据
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
//判断值是否小于等于0或者 大于当前索引文件的最大条目
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
//计算当前消息存储时间与第一条消息时间戳的时间差
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
//秒
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
//计算条目的物理地址 = 索引头部大小(40字节) + hash槽的大小(4字节)*槽的数量(500w) + 当前索引最大条目的个数*每index的大小(20字节)
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
//依次存入 key的hash值(4字节)+消息的物理偏移量(8字节)+消息存储时间戳和index文件的时间戳差(4字节)+当前hash槽的值(4字节)
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
//存储当前index中包含的条目数量存入hash槽中,覆盖原先hash槽的值
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
//更新文件索引的头信息,hash槽的总数、index条目的总数、最后消息的物理偏移量、最后消息的存储时间
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);

return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}

以上详细了分析了IndexFile条目存储的业务逻辑

通过KEY查找消息

​ DefaultMessageStore类中的public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) 中其核心方法是QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);获取消息的物理存储地址,通过偏移量去commitLog中获取消息集。

​ public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end)核心方法又是IndexFile类中的public void selectPhyOffset(final List phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock)方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
if (this.mappedFile.hold()) {
//获取key的hash信息
int keyHash = indexKeyHashMethod(key);
//获取hash槽的下标
int slotPos = keyHash % this.hashSlotNum;
//获取hash槽的物理地址
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
if (lock) {
// fileLock = this.fileChannel.lock(absSlotPos,
// hashSlotSize, true);
}
//获取hash槽的值
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// if (fileLock != null) {
// fileLock.release();
// fileLock = null;
// }
//判断值是否小于等于0或者 大于当前索引文件的最大条目
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
//计算条目的物理地址 = 索引头部大小(40字节) + hash槽的大小(4字节)*槽的数量(500w) + 当前索引最大条目的个数*每index的大小(20字节)
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
//获取key的hash值
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
//获取消息的物理偏移量
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
//获取当前消息的存储时间戳与index文件的时间戳差值
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
//获取前一个条目的信息(链表结构)
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
//判断该消息是否在查询的区间
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
//判断key的hash值是否相等并且在查询的时间区间内
if (keyHash == keyHashRead && timeMatched) {
//加入到物理偏移量的List中
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
//继续前一个条目信息获取进行匹配
nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
this.mappedFile.release();
}
}
}
  1. 根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置( slotNum 是一个索引文件里面包含的最大槽的数目,例如图中所示 slotNum=5000000)。
  2. 根据 slotValue( slot 位置对应的值)查找到索引项列表的最后一项(倒序排列, slotValue 总是指向最新的一个 索引项)。
  3. 遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记彔)
  4. Hash 冲突;寻找 key 的 slot 位置时相当于执行了两次散列函数,一次 key 的 hash,一次 key 的 hash 值取模,因此返里存在两次冲突的情况;第一种, key 的 hash 不同但模数相同,此时查询的时候会在比较一次key 的hash 值(每个索引项保存了 key 的 hash 值),过滤掉 hash 值不相等的项。第二种, hash 值相等但 key 不等,出于性能的考虑冲突的检测放到客户端处理( key 的原始值是存储在消息文件中的,避免对数据文件的解析),客户端比较一次消息体的 key 是否相同
checkpoint

​ checkpoint文件的作用是记录commitlog、consumequeue、index文件的刷盘时间点,文件固定长度4k,其中只用了该文件的前24个字节。查看其存储格式

  • physicMsgTimestamp:commitlog文件刷盘时间点

  • logicsMsgTimestamp:消息的消费队列文件刷盘时间点

  • indexMsgTimestamp:索引文件刷盘时间点

Config

config 文件夹中 存储着 Topic 和 Consumer 等相关信息。主题和消费者群组相关的信息就存在在此。

  • topics.json : topic 配置属性

  • subscriptionGroup.json :消息消费组配置信息。

  • delayOffset.json :延时消息队列拉取进度。

  • consumerOffset.json :集群消费模式消息消进度。consumerFilter.json :主题消息过滤信息。

其他

​ abort :如果存在 abort 文件说明 Broker 非正常闭,该文件默认启动时创建,正常退出之前删除

内存映射

​ 内存映射文件,是由一个文件到一块内存的映射。文件的数据就是这块区域内存中对应的数据,读写文件中的数据,直接对这块区域的地址操作就 可以,减少了内存复制的环节。所以说,内存映射文件比起文件 I/O 操作,效率要高,而且文件越大,体现出来的差距越大。

​ RocketMQ 通过使用内存映射文件来提高 IO 访问性能,无论是 CommitLog,ConsumeQueue 还是 IndexFile ,单个文件都被设计为固定长度,如果 一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量。

PageCache与Mmap内存映射

​ 这里有必要先稍微简单地介绍下page cache的概念。系统的所有文件I/O请求,操作系统都是通过page cache机制实现的。对于操作系统来说,磁盘文件都是由一系列的数据块顺序组成,数据块的大小由操作系统本身而决定,x86的linux中一个标准页面大小是4KB。
​ 操作系统内核在处理文件I/O请求时,首先到page cache中查找(page cache中的每一个数据块都设置了文件以及偏移量地址信息),如果未命中,则启动磁盘I/O,将磁盘文件中的数据块加载到page cache中的一个空闲块,然后再copy到用户缓冲区中。
​ page cache本身也会对数据文件进行预读取,对于每个文件的第一个读请求操作,系统在读入所请求页面的同时会读入紧随其后的少数几个页面。因此,想要提高page cache的命中率(尽量让访问的页在物理内存中),从硬件的角度来说肯定是物理内存越大越好。从操作系统层面来说,访问page cache时,即使只访问1k的消息,系统也会提前预读取更多的数据,在下次读取消息时, 就很可能可以命中内存。
​ 在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue的读性能会比较高近乎内存,即使在有消息堆积情况下也不会影响性能。而对于 CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Noop”(此时块存储采用SSD的话),随机读的性能也会有所提升。
​ 另外,RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型直接将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了)。

Mmap内存映射技术的特点

​ Mmap内存映射和普通标准IO操作的本质区别在于它并不需要将文件中的数据先拷贝至OS的内核IO缓冲区,而是可以直接将用户进程私有地址空间中的一块区域与文件对象建立映射关系,这样程序就好像可以直接从内存中完成对文件读/写操作一样。只有当缺页中断发生时,直接将文件从磁盘拷贝至用户态的进程空间内,只进行了一次数据拷贝。对于容量较大的文件来说(文件大小一般需要限制在1.5~2G以下),采用Mmap的方式其读/写的效率和性能都非常高。

JDK NIO的MappedByteBuffer简要分析

​ 从JDK的源码来看,MappedByteBuffer继承自ByteBuffer,其内部维护了一个逻辑地址变量—address。在建立映射关系时,MappedByteBuffer利用了JDK NIO的FileChannel类提供的map()方法把文件对象映射到虚拟内存。仔细看源码中map()方法的实现,可以发现最终其通过调用native方法map0()完成文件对象的映射工作,同时使用Util.newMappedByteBuffer()方法初始化MappedByteBuffer实例,但最终返回的是DirectByteBuffer的实例。在Java程序中使用MappedByteBuffer的get()方法来获取内存数据是最终通过DirectByteBuffer.get()方法实现(底层通过unsafe.getByte()方法,以“地址 + 偏移量”的方式获取指定映射至内存中的数据)。

使用Mmap的限制

Mmap映射的内存空间释放的问题

​ 由于映射的内存空间本身就不属于JVM的堆内存区(Java Heap),因此其不受JVM GC的控制,卸载这部分内存空间需要通过系统调用 unmap()方法来实现。然而unmap()方法是FileChannelImpl类里实现的私有方法,无法直接显示调用。RocketMQ中的做法是,通过Java反射的方式调用“sun.misc”包下的Cleaner类的clean()方法来释放映射占用的内存空间;

MappedByteBuffer内存映射大小限制

​ 因为其占用的是虚拟内存(非JVM的堆内存),大小不受JVM的-Xmx参数限制,但其大小也受到OS虚拟内存大小的限制。一般来说,一次只能映射1.5~2G 的文件至用户态的虚拟内存空间,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了;

使用MappedByteBuffe的其他问题

​ 会存在内存占用率较高和文件关闭不确定性的问题;

OS的PageCache机制

​ PageCache是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写访问,这里的主要原因就是在于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。

对于数据文件的读取

​ 如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取(ps:顺序读入紧随其后的少数几个页面)。这样,只要下次访问的文件已经被加载至PageCache时,读取操作的速度基本等于访问内存。

对于数据文件的写入

​ OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。

​ 对于文件的顺序读写操作来说,读和写的区域都在OS的PageCache内,此时读写性能接近于内存。RocketMQ的大致做法是,将数据文件映射到OS的虚拟内存中(通过JDK NIO的MappedByteBuffer),写消息的时候首先写入PageCache,并通过异步刷盘的方式将消息批量的做持久化(同时也支持同步刷盘);订阅消费消息时(对CommitLog操作是随机读取),由于PageCache的局部性热点原理且整体情况下还是从旧到新的有序读,因此大部分情况下消息还是可以直接从Page Cache中读取,不会产生太多的缺页(Page Fault)中断而从磁盘读取。

​ PageCache机制也不是完全无缺点的,当遇到OS进行脏页回写,内存回收,内存swap等情况时,就会引起较大的消息读写延迟。
对于这些情况,RocketMQ采用了多种优化技术,比如内存预分配,文件预热,mlock系统调用等,来保证在最大可能地发挥PageCache机制优点的同时,尽可能地减少其缺点带来的消息读写延迟。

文件刷盘机制

​ RocketMQ 存储与读写是基于 JDK NIO 的内存映射机制,具体使用 MappedByteBuffer(基于 MappedByteBuffer 操作大文件的方式,其读写性能极高)RocketMQ 的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息 超出内存的限制 RocketMQ 为了提高性能,会尽可能地保证 磁盘的顺序写 消息在通过 Producer 写人 RocketMQ 的时候,有两种写磁盘方式:

同步刷盘方式

​ 如上图所示,只有在消息真正持久化至磁盘后,RocketMQ的Broker端才会真正地返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用领域。

​ RocketMQ同步刷盘的大致做法是,基于生产者消费者模型,主线程创建刷盘请求实例—GroupCommitRequest并在放入刷盘写队列后唤醒同步刷盘线程—GroupCommitService,来执行刷盘动作(其中用了CAS变量和CountDownLatch来保证线程间的同步)。这里,RocketMQ源码中用读写双缓存队列(requestsWrite/requestsRead)来实现读写分离,其带来的好处在于内部消费生成的同步刷盘请求可以不用加锁,提高并发度。

​ 在返回写成功状态时,消息已经被写人磁盘 具体流程是,消息写入内存的 PAGECACHE 后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程 执行完成后唤醒等待的线程,返回消息写成功的状态

​ 消息存储时首先将消息追加到内存,再根据配值的刷盘 略在不同时间进行刷写磁盘 如果是同步刷盘,消息追加到内存后,将同步调用 MappedByteB uffer force ()方法;如果是异步刷盘,在消息追加到内存后立刻返回给消息发送端 RocketMQ 使用 个单独的线程按照某个设定的频 执行刷盘操作。

​ 通过在 broker 配置文件中配置 flushDiskType 来设定刷盘方式,可选值为 ASYNC_FLUSH (异步刷盘), SYNC_FLUSH 同步刷盘) 默认为异步。

异步刷盘

​ 能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。异步和同步刷盘的区别在于,异步刷盘时,主线程并不会阻塞,在将刷盘线程wakeup后,就会继续执行。

​ 在返回写成功状态时 ,消息可能只是被写入了内存的 PAGECACHE ,写操作的返回快,吞吐量大 ;当内存里的消息积累到一定程度时,统一触发写 磁盘动作,快速写入。

总结

​ 实际应用中要结合业务场景,合理设置刷盘方式,尤其是同步刷盘的方式,由于频繁的触发磁盘写动作,会明显降低性能。通常情况下,应该把 Rocket 置成异步刷盘方式。

过期文件删除

​ 由于RocketMQ操作CommitLog、ConsumeQueue文件是基于文件内存映射机制,并且在启动的时候会将所有的文件加载,为了避免内存与磁盘的浪费、能够让磁盘能够循环利用、避免因为磁盘不足导致消息无法写入等引入了文件过期删除机制。

​ 删除过程分别执行清理消息存储文件( Commitlog )与消息消费 队列文件( ConsumeQueue 文件),消息消费队列文件与消息存储文件 ( Commitlog )共用一套过期文件机制。

消费完后的消息去哪里了

​ 消息的存储是一直存在于CommitLog中的。而由于CommitLog是以文件为单位(而非消息)存在的,CommitLog的设计是只允许顺序写的,且每个消息大小不定长,所以这决定了消息文件几乎不可能按照消息为单位删除(否则性能会极具下降,逻辑也非常复杂)。所以消息被消费了,消息所占据的物理空间并不会立刻被回收。

​ 但消息既然一直没有删除,那RocketMQ怎么知道应该投递过的消息就不再投递?——答案是客户端自身维护——客户端拉取完消息之后,在响应体中,broker会返回下一次应该拉取的位置,PushConsumer通过这一个位置,更新自己下一次的pull请求。这样就保证了正常情况下,消息只会被投递一次。

删除策略

​ RocketMQ 清除过期文件的方法是 :如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除, RocketMQ 不会关注 这个文件上的消息是否全部被消费。默认每个文件的过期时间为 42 小时(不同版本的默认值不同,这里以 4.4.0 为例) ,通过在 Broker 配置文件中 设置 fileReservedTime 来改变过期时间,单位为小时。

​ 触发文件清除操作的是一个定时任务,而且只有定时任务,文件过期删除定时任务的周期由该删除决定,默认每 10s 执行一次。

过期判断

fileReservedTime

文件删除主要是由这个配置属性

文件保留时间。也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件,可以删除。

deletePhysicFilesInterval

​ 删除物理文件的时间间隔(默认是 100MS),在一次定时任务触发时,可能会有多个物理文件超过过期时间可被删除, 因此删除一个文件后需要间隔 deletePhysicFilesInterval 这个时间再删除另外一个文件,由于删除文件是一个非常耗费 IO 的操作,会引起消息插入消 费的延迟(相比于正常情况下),所以不建议直接删除所有过期文件。

destroyMapedFileIntervalForcibly

​ 在删除文件时,如果该文件还被线程引用,此时会阻止此次删除操作,同时将该文件标记不可用并且纪录当前时间戳 destroyMapedFileIntervalForcibly 这个表示文件在第一次删除拒绝后,文件保存的最大时间,在此时间内一直会被拒绝删除,当超过这个时间时,会将引用每次减少 1000,直到引用 小于等于 0 为止,即可删除该文件。

删除条件

  1. 消息文件过期(默认72小时),RocketMQ 通过 deleteWhen 设置一天的固定时间执行一次(默认是凌晨4点),删除过期文件。

  2. 消息文件过期(默认72小时),且磁盘空间达到了水位线(默认75%),删除过期文件。

  3. 磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。

注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。

这样设计带来的好处

消息的物理文件一直存在,消费逻辑只是听客户端的决定而搜索出对应消息进行,这样做,有以下几个好处:

  1. 一个消息很可能需要被N个消费组(设计上很可能就是系统)消费,但消息只需要存储一份,消费进度单独记录即可。这给强大的消息堆积能力提供了很好的支持——一个消息无需复制N份,就可服务N个消费组。
  2. 由于消费从哪里消费的决定权一直都是客户端决定,所以只要消息还在,就可以消费到,这使得RocketMQ可以支持其他传统消息中间件不支持的回溯消费。即我可以通过设置消费进度回溯,就可以让我的消费组重新像放快照一样消费历史消息;或者我需要另一个系统也复制历史的数据,只需要另起一个消费组从头消费即可(前提是消息文件还存在)。
  3. 消息索引服务。只要消息还存在就能被搜索出来。所以可以依靠消息的索引搜索出消息的各种原信息,方便事后排查问题。

注:在消息清理的时候,由于消息文件默认是1GB,所以在清理的时候其实是在删除一个大文件操作,这对于IO的压力是非常大的,这时候如果有消息写入,写入的耗时会明显变高。这个现象可以在凌晨4点(默认删时间时点)后的附近观察得到。

RocketMQ官方建议Linux下文件系统改为Ext4,对于文件删除操作相比Ext3有非常明显的提升。

跳过历史消息的处理

​ 由于消息本身是没有过期的概念,只有文件才有过期的概念。那么对于很多业务场景——一个消息如果太老,是无需要被消费的,是不合适的。

​ 这种需要跳过历史消息的场景,在RocketMQ要怎么实现呢?

但对于已存在的消费组,RocketMQ没有内置的跳过历史消息的实现,但有以下手段可以解决:

  1. 自身的消费代码按照日期过滤,太老的消息直接过滤。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    for(MessageExt msg: msgs){
    if(System.currentTimeMillis()-msg.getBornTimestamp()>60*1000) {//一分钟之前的认为过期
    continue;//过期消息跳过
    }

    //do consume here

    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
  2. 自身的消费代码代码判断消息的offset和MAX_OFFSET相差很远,认为是积压了很多,直接return CONSUME_SUCCESS过滤。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(//
    List<MessageExt> msgs, //
    ConsumeConcurrentlyContext context) {
    long offset = msgs.get(0).getQueueOffset();
    String maxOffset = msgs.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET);
    long diff = Long. parseLong(maxOffset) - offset;
    if (diff > 100000) { //消息堆积了10W情况的特殊处理
    return ConsumeConcurrentlyStatus. CONSUME_SUCCESS;
    }
    //do consume here
    return ConsumeConcurrentlyStatus. CONSUME_SUCCESS;
    }
  3. 消费者启动前,先调整该消费组的消费进度,再开始消费。可以人工使用控制台命令resetOffsetByTime把消费进度调整到后面,再启动消费。

  4. 原理同3,但使用代码来控制。代码中调用内部的运维接口,具体代码实例祥见ResetOffsetByTimeCommand.java.

评论