rocketmq 源码版本: 4.3.1
消息存储
数据存储
示例
1 | store/ |
CommitLog
CommitLog、MappedFileQueue、MappedFile 类
MappedFile
:00000000000000000000、00000000001073741824、00000000002147483648等文件。- 每个
MappedFile
统一文件大小。在CommitLog
里默认为 1GB。 - 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。
- 每个
MappedFileQueue
:MappedFile
所在的文件夹,对MappedFile
进行封装成文件队列,对上层提供可无限使用的文件容量。CommitLog
:针对MappedFileQueue
的封装使用。
CommitLog、MappedFileQueue、MappedFile 的关系CommitLog
: MappedFileQueue
: MappedFile
= 1 : 1 : N
1 | public class CommitLog { |
存储结构
CommitLog
目前存储在 MappedFile
有两种内容类型:
- MESSAGE :消息。
- BLANK :文件不足以存储消息时, 填充空白占位。
CommitLog
存储在 MappedFile
的结构:
MESSAGE[1] | MESSAGE[2] | … | MESSAGE[n - 1] | MESSAGE[n] | BLANK |
---|---|---|---|---|---|
MESSAGE
在 CommitLog
存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | MsgLen | 消息总长度 | Int | 4 |
2 | MagicCode | MESSAGE_MAGIC_CODE | Int | 4 |
3 | BodyCRC | 消息内容CRC | Int | 4 |
4 | QueueId | 消息队列编号 | Int | 4 |
5 | Flag | flag | Int | 4 |
6 | QueueOffset | 消息队列位置 | Long | 8 |
7 | PhysicalOffset | 物理位置。在 CommitLog 的顺序存储位置。 |
Long | 8 |
8 | SysFlag | MessageSysFlag | Int | 4 |
9 | BornTimestamp | 生成消息时间戳 | Long | 8 |
10 | BornHost | 生效消息的地址+端口 | Long | 8 |
11 | StoreTimestamp | 存储消息时间戳 | Long | 8 |
12 | StoreHost | 存储消息的地址+端口 | Long | 8 |
13 | ReconsumeTimes | 重新消费消息次数 | Int | 4 |
14 | PreparedTransationOffset | Long | 8 | |
15 | BodyLength + Body | 内容长度 + 内容 | Int + Bytes | 4 + bodyLength |
16 | TopicLength + Topic | Topic长度 + Topic | Byte + Bytes | 1 + topicLength |
17 | PropertiesLength + Properties | 拓展字段长度 + 拓展字段 | Short + Bytes | 2 + PropertiesLength |
BLANK
在 CommitLog
存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | maxBlank | 空白长度 | Int | 4 |
2 | MagicCode | BLANK_MAGIC_CODE | Int | 4 |
CommitLog#putMessage(…)
1 |
|
1 | public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) { |
CommitLog.DefaultAppendMessageCallback#doAppend(…)
1 | public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, |
CommitLog#getMessage(…)1
2
3
4
5
6
7
8
9public SelectMappedBufferResult getMessage(final long offset, final int size) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
return mappedFile.selectMappedBuffer(pos, size);
}
return null;
}
ConsumeQueue
ConsumeQueue、MappedFileQueue、MappedFile 类
ConsumeQueue
: MappedFileQueue
: MappedFile
= 1 : 1 : N
ConsumeQueue
、MappedFileQueue
、MappedFile
的定义如下:
MappedFile
:00000000000000000000等文件。MappedFileQueue
:MappedFile
所在的文件夹,对MappedFile
进行封装成文件队列,对上层提供可无限使用的文件容量。- 每个
MappedFile
统一文件大小。在ConsumeQueue
里默认为 6000000B。 - 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。
- 每个
ConsumeQueue
:针对MappedFileQueue
的封装使用。Store : ConsumeQueue = ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>
。
存储结构
ConsumeQueue
存储在 MappedFile
的内容必须大小是 20B( ConsumeQueue.CQ_STORE_UNIT_SIZE
),有两种内容类型:
MESSAGE_POSITION_INFO
:消息位置信息。BLANK
: 文件前置空白占位。当历史Message
被删除时,需要用BLANK
占位被删除的消息。
MESSAGE_POSITION_INFO
在 ConsumeQueue
存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | offset | 消息 CommitLog 存储位置 |
Long | 8 |
2 | size | 消息长度 | Int | 4 |
3 | tagsCode | 消息tagsCode | Long | 8 |
BLANK
在 ConsumeQueue
存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | 0 | Long | 8 | |
2 | Integer.MAX_VALUE | Int | 4 | |
3 | 0 | Long | 8 |
源码解析
主要有两个组件:
ReputMessageService
:write ConsumeQueue。FlushConsumeQueueService
:flush ConsumeQueue。
ReputMessageService
参考
本文参考了互联网上大家的分享,就不一一列举,在此一并谢过。
也希望本文,能对大家有所帮助,若有错误,还请谅解、指正。