rocketmq 存储文件详解

概览

rocketmq-store

rocketmq 默认存储根目录是 $HOME/store,可以在修改配置文件中进行更改

1
storePathRootDir=$HOME/store

rocketmq 默认存储根目录结构,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# tree store
store
├── abort
├── checkpoint
├── commitlog
│   └── 00000000000000000000
├── config
│   ├── consumerOffset.json
│   ├── consumerOffset.json.bak
│   ├── delayOffset.json
│   ├── delayOffset.json.bak
│   ├── subscriptionGroup.json
│   ├── subscriptionGroup.json.bak
│   ├── topics.json
│   └── topics.json.bak
├── consumequeue
│   ├── artisanDetailBrowseMqTopic
│   │   ├── 0
│   │   │   └── 00000000000000000000
│   │   ├── 1
│   │   │   └── 00000000000000000000
│   │   ├── 2
│   │   │   └── 00000000000000000000
│   │   └── 3
│   │   └── 00000000000000000000
│   ├── productBrowsingTopic
│   │   ├── 0
│   │   │   └── 00000000000000000000
│   │   ├── 1
│   │   │   └── 00000000000000000000
│   │   ├── 2
│   │   │   └── 00000000000000000000
│   │   └── 3
│   │   └── 00000000000000000000
...
│   └── userLevelTopic
│   ├── 0
│   │   └── 00000000000000000000
│   ├── 1
│   │   └── 00000000000000000000
│   └── 2
│   └── 00000000000000000000
└── index
└── 20180418154913712

rocketmq 存盘的数据有 6 类,其中有 3 类小文件:

  • abort
  • checkpoint
  • config

    • topics.json 存储每个 topic 的读写队列数、权限、是否顺序等信息

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      {
      "dataVersion":{
      "counter":285,
      "timestatmp":1557378378161
      },
      "topicConfigTable":{
      "updateReserveMQTopic":{
      "order":false,
      "perm":6,
      "readQueueNums":4,
      "topicFilterType":"SINGLE_TAG",
      "topicName":"updateReserveMQTopic",
      "topicSysFlag":0,
      "writeQueueNums":4
      },
      ...
      }
      }
    • consumerOffset.json 存储每个 consumer 在每个 topic consumequeue 队列的消费进度

      1
      2
      3
      4
      5
      6
      7
      {
      "offsetTable":{
      "artisanDetailBrowseMqTopic@artisanUserRelationMqConsumerGroup":{0:150,2:104,1:120,3:89
      },
      ...
      }
      }
    • delayOffset.json文 存储对于延迟 topic 每个consumequeue 队列的消费进度

    • subscriptionGroup.json 存储每个 consumer 的订阅信息

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      {
      "dataVersion":{
      "counter":36,
      "timestatmp":1557368823539
      },
      "subscriptionGroupTable":{
      "orderCenter":{
      "brokerId":0,
      "consumeBroadcastEnable":true,
      "consumeEnable":true,
      "consumeFromMinEnable":true,
      "groupName":"orderCenter",
      "retryMaxTimes":16,
      "retryQueueNums":1,
      "whichBrokerWhenConsumeSlowly":1
      },
      ...
      }
      }

还有 3 类大文件:

  • commitlog: 存储 rocketmq 所有消息的元数据信息
  • consumequeue: 消息队列,存储消息在 commitlog 的位置信息
  • index: 索引,可根据 key 查询到对应的消息

commitlog

commitlog + consumequeue

commitlog-consumequeue

rocketmq 数据存储的特点:

  • 所有消息的元数据都单独存储到 commitlog 里,完全顺序写,随机读
  • 对最终用户展现的消息队列 consumequeue,实际只存储消息在 commitlog 的位置信息,串行方式刷盘

这样做的好处:

  • 消息队列轻量化,单个队列数据量非常少
  • 对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致IOWAIT增高

缺点是:

  • 写虽然完全是顺序写,但是读却变成了完全的随机读
  • 读一条消息,要先读 consumequeue,再读 commitlog,增加了开销
  • 要保证 commitlog 与 consumequeue 完全的一致,增加了编程的复杂度

如何克服以上缺点呢?

  • 随机读,尽可能让读命中 PAGECACHE,减少 IO 读操作,所以内存越大越好
  • 由于 consumequeue 存储数据量极少,而且是顺序读,在PAGECACHE预读作用下,consumequeue的读
    性能几乎与内存一致,可认为 consumequeue 完全不会阻碍读性能
  • commitlog 中存储了所有的元信息,包含消息体,只要有 commitlog 在,consumequeue 即使数据丢失,仍然可以恢复出来

commitlog 文件特点

  • commitlog 每个文件的大小固定不变,默认是 1GB = 102410241024B = 1073741824 Bytes
  • commitlog 文件的文件名,长度为20位,为起始偏移量,左边补零
    • 00000000000000000000 代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当这个文件满了,第二个文件名为00000000001073741824,起始偏移量为1073741824,以此类推,第三个文件名字为00000000002147483648,起始偏移量为2147483648
  • 消息存储时,顺序写入文件,当文件满了,写入下一个文件

存储结构

commitlog 文件中,有两种单元类型:

  1. MESSAGE: 消息单元
  2. BLANK: 空白占位单元,文件不足以存储消息时, 填充空白占位
MESSAGE[1] MESSAGE[2] MESSAGE[n - 1] MESSAGE[n] BLANK

MESSAGE 消息单元存储结构

第几位 字段 说明 数据类型 字节数
1 MsgLen 消息总长度 Int 4
2 MagicCode MESSAGE_MAGIC_CODE Int 4
3 BodyCRC 消息内容CRC Int 4
4 QueueId 消息队列编号 Int 4
5 Flag flag Int 4
6 QueueOffset 消息队列位置 Long 8
7 PhysicalOffset 物理位置: commitlog 文件中的顺序存储位置 Long 8
8 SysFlag MessageSysFlag Int 4
9 BornTimestamp 生成消息时间戳 Long 8
10 BornHost 生成消息的 ip + 端口 Long 8
11 StoreTimestamp 存储消息时间戳 Long 8
12 StoreHost 存储消息的 ip + 端口 Long 8
13 ReconsumeTimes 重新消费消息次数 Int 4
14 PreparedTransationOffset Long 8
15 BodyLength + Body 内容长度 + 内容 Int + Bytes 4 + bodyLength
16 TopicLength + Topic Topic长度 + Topic Byte + Bytes 1 + topicLength
17 PropertiesLength + Properties 拓展字段长度 + 拓展字段 Short + Bytes 2 + PropertiesLength

BLANK 空白占位单元存储结构

第几位 字段 说明 数据类型 字节数
1 maxBlank 空白长度 Int 4
2 MagicCode BLANK_MAGIC_CODE Int 4

其中,MESSAGE_MAGIC_CODE、BLANK_MAGIC_CODE 是在 org.apache.rocketmq.store.CommitLog 中定义的

1
2
3
4
// Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = -626843481;
// End of file empty MAGIC CODE cbd43194
protected final static int BLANK_MAGIC_CODE = -875286124;

MessageSysFlag 是在 org.apache.rocketmq.common.sysflag.MessageSysFlag 中定义的,值为 1 表示 commitlog 实际存储的该条消息 body 是 zip 压缩过,需要 unzip 解压才能得到原始的消息 (默认 body >= 4KB 就会进行 zip 压缩)

1
2
3
4
5
6
public final static int COMPRESSED_FLAG = 0x1;
public final static int MULTI_TAGS_FLAG = 0x1 << 1;
public final static int TRANSACTION_NOT_TYPE = 0;
public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;

consumequeue

consumequeue 文件特点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
store
├── consumequeue
│   ├── artisanDetailBrowseMqTopic
│   │   ├── 0
│   │   │   └── 00000000000000000000
│   │   ├── 1
│   │   │   └── 00000000000000000000
│   │   ├── 2
│   │   │   └── 00000000000000000000
│   │   └── 3
│   │   └── 00000000000000000000
│   ├── productBrowsingTopic
│   │   ├── 0
│   │   │   └── 00000000000000000000
│   │   ├── 1
│   │   │   └── 00000000000000000000
│   │   ├── 2
│   │   │   └── 00000000000000000000
│   │   └── 3
│   │   └── 00000000000000000000
...
│   └── userLevelTopic
│   ├── 0
│   │   └── 00000000000000000000
│   ├── 1
│   │   └── 00000000000000000000
│   └── 2
│   └── 00000000000000000000
  • consumequeue 文件,按照 topic、queue_id 分目录存储
  • consumequeue 每个文件的大小固定不变,大小为 600W Bytes
    • 每个文件由 30W 个数据单元组成,每个数据单元 20 Bytes
  • cosumequeue 文件名,名字长度为20位,为起始偏移量,左边补零
    • 00000000000000000000 代表了第一个文件,起始偏移量为0,文件大小为600W,当第一个文件满之后创建的第二个文件的名字为00000000000006000000,起始偏移量为6000000,以此类推,第三个文件名字为00000000000012000000,起始偏移量为12000000
  • 消息存储时,顺序写入文件,当文件满了,写入下一个文件

存储结构

consumequeue 文件中,有两种单元类型:

  1. MESSAGE_POSITION_INFO :消息位置信息
  2. BLANK : 空白占位 (当历史 Message 被删除时,需要用 BLANK 占位被删除的消息)

consumequeue-unit

MESSAGE_POSITION_INFO 存储结构

第几位 字段 说明 数据类型 字节数
1 offset 消息在 commitlog 中的存储位置 Long 8
2 size 消息长度 Int 4
3 tagsCode 消息tagsCode Long 8

注: tagsCode 字段是 tags 的 hashCode,采用 java.lang.String#hashCode,也即 s[0]31^(n-1) + s[1]31^(n-2) + … + s[n-1]

BLANK 存储结构

第几位 字段 说明 数据类型 字节数
1 0 Long 8
2 Integer.MAX_VALUE Int 4
3 0 Long 8

index

index 文件特点

index
index-detail

  • index 每个文件的大小固定不变,大小为 40 + 500W4 + 2000W20 = 420000040 Bytes
  • index 文件名,是创建时的时间戳,如 20190425105434433
  • index 的逻辑结构,与 HashMap 实现类似
beginTimestamp endTimestamp beginPhyOffset endPhyOffset hashSlotCount indexCount
8 Bytes 8 Bytes 8 Bytes 8 Bytes 4 Bytes 4 Bytes

Index Header 各字段含义:
beginTimestamp:第一个索引消息存储时间戳;
endTimestamp:最后一个索引消息存储时间戳;
beginPhyOffset:第一个索引消息在 commitlog 的偏移量;
endPhyOffset:最后一个索引消息在 commitlog 的偏移量;
hashSlotCount:该索引文件目前的hash slot的个数;
indexCount:该索引文件目前的消息索引个数;

Slot Table

  1. 根据 key 定位 Slot Table 的位置及在 index 文件中的偏移量 absSlotPos
1
2
3
// hashSlotNum = 500W, INDEX_HEADER_SIZE = 40, hashSlotSize = 4
int slotPos = keyHash % hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
  1. slot 位置存储的值 (slotValue) 是该槽位索引链表的最新索引当时的 index count 值
    • 索引当时的 index count 值,相当于序号,因为索引是固定长度的,有了这个序号,就可以计算得到该索引的 absIndexPos
    • 链表倒序排列,slotValue 总是指向最新的一个索引项,每次该槽位新增索引项时,slotValue 更新为这个新增的索引项

Index Linked List

keyHash phyOffset timeDiff slotValue
4 Bytes 8 Bytes 4 Bytes 4 Bytes

Index Linked List 各字段含义:
keyHash: key 的 hash 值;
phyOffset: commitLog 真实的物理偏移值;
timeDiff:时间偏移值,消息的存储时间与 Index Header 中 beginTimestamp 的时间差 (目的是为了节省空间);
slotValue:指向上一个索引项,从而形成一个链表结构;

Index Linked List的位置(absIndexPos)的计算

1
2
// hashSlotNum = 500W, INDEX_HEADER_SIZE = 40, hashSlotSize = 4, indexSize = 20
int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + hashSlotNum * hashSlotSize + indexCount * indexSize;

添加索引 putKey

org.apache.rocketmq.store.index.IndexFile#putKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
// 1. 判断该索引文件的索引数是否小于最大的索引数,如果>=最大索引数,IndexService就会尝试新建一个索引文件
if (this.indexHeader.getIndexCount() < this.indexNum) {
// 2. 计算该message key的hash值
int keyHash = indexKeyHashMethod(key);
// 3. 根据message key的hash值散列到某个hash slot里
int slotPos = keyHash % this.hashSlotNum;
// 4. 计算得到该hash slot的实际文件位置Position
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

try {
// 5. 根据该hash slot的实际文件位置absSlotPos得到slot里的值
// 这里有两种情况:
// 1). slot=0, 当前message的key是该hash值第一个消息索引
// 2). slot>0, 该key hash值上一个消息索引的位置
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);

// 6. 数据校验及修正
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}

// 7. 计算时间偏移值
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;

if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}

// 8. 计算当前消息索引具体的存储位置(Append模式)
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
// 9. 存储该消息索引
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

// 10. 在该key hash slot处存入当前消息索引的位置
// 下次通过该key进行搜索时,会找到该key hash slot -> slot value -> curIndex ->
// if(curIndex.prevIndex>0) pre index (一直循环 直至该curIndex.prevIndex==0就停止)
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}

this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);

return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}

return false;
}

索引查询 selectPhyOffset

org.apache.rocketmq.store.index.IndexFile#selectPhyOffset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
if (this.mappedFile.hold()) {
// 1. 计算该key的hash
int keyHash = indexKeyHashMethod(key);
// 2. 计算该hash value 对应的hash slot位置
int slotPos = keyHash % this.hashSlotNum;
// 3. 计算该hash value 对应的hash slot物理文件位置
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

FileLock fileLock = null;
try {
// 4. 取出该hash slot 的值
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);

// 5. 该slot value <= 0 就代表没有该key对应的消息索引,直接结束搜索
// 该slot value > maxIndexCount 就代表该key对应的消息索引超过最大限制,数据有误,直接结束搜索
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
// 6. 从当前slot value 开始搜索
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}

// 7. 找到当前slot value(也就是index count)物理文件位置
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;

// 8. 读取消息索引数据
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
// 9. 获取该消息索引的上一个消息索引index(可以看成链表的prev 指向上一个链节点的引用)
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

if (timeDiff < 0) {
break;
}

timeDiff *= 1000L;

long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
// 10. 数据校验: 比对 hash值和落盘时间
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}

// 当prevIndex <= 0 或prevIndex > maxIndexCount 或prevIndexRead == nextIndexToRead 或 timeRead < begin 停止搜索
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}

nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {

this.mappedFile.release();
}
}
}

Hash 冲突

寻找 key 的 slot 位置时,相当于执行了两次散列函数,一次是计算 key 的 hash,一次 key 的 hash 值取模,
因此这里存在两次冲突的情况:

  • 第一种,key 的 hash 值不同但模数相同,此时查询的时候会在比较一次 key 的 hash 值(每个索引项保存了 key 的 hash 值),过滤掉 hash 值不相等的项
  • 第二种,hash 值相等但 key 不等, 出于性能的考虑,冲突的检测放到客户端处理(key 的原始值是存储在消息文件中的,避免对数据文件的解析), 客户端比较一次消息体的 key 是否相同

附 rocketmq_parser.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
#!/usr/bin/python
# -*- coding: UTF-8 -*-

import os
import struct# https://docs.python.org/2/library/struct.html
import sys
import traceback
import zlib# https://docs.python.org/2/library/zlib.html

# import chardet

class TopicStats:

def __init__(self, topic, count, sum, max, min, max_body, min_body, max_message):
self.topic = topic
self.count = count
self.sum = sum
self.max = max
self.min = min
self.max_body = max_body
self.min_body = min_body
self.max_message = max_message

def __repr__(self):
return "%s: count=%d, sum=%d, avg=%d, max=%d, min=%d" \
% (self.topic, self.count, self.sum, self.sum/self.count, self.max, self.min)


def parse_commitlog(store_base_dir):
commitlog_dir = store_base_dir + '/commitlog'
files = os.listdir(commitlog_dir)
print(files)

for file_name in files:
commitlog_path = commitlog_dir + "/" + file_name
topics = parse_commitlog_0(commitlog_path)

if len(topics) == 0:
print("NO message!")
else:
max = 0
mt = None
for t in topics.values():
print(t)
if t.max > max:
max = t.max
mt = t

print("\n=====\nMAX OF ALL TOPICS:\n%s" % mt)
print("\n=====\nMAX CONTENT:\n%s" % bytes2string(mt.max_body))
# print("\n=====\nMAX MESSAGE:\n%s" % mt.max_message)


def parse_commitlog_0(commitlog_path):
count = 0
topics = {}
with open(commitlog_path, mode='rb') as f:
while True:
success, temp_tup = parse_message(f)
if not success:
break

count += 1
if count % 100000 == 0:
print(count)

topic, body, message = temp_tup
body_length = len(body)
if topic not in topics:
topics[topic] = TopicStats(topic, 1, body_length, body_length, body_length, body, body, message)
else:
topics[topic].count += 1
topics[topic].sum += body_length
if body_length > topics[topic].max:
topics[topic].max = body_length
topics[topic].max_body = body
topics[topic].max_message = message
if body_length < topics[topic].min:
topics[topic].min = body_length
topics[topic].min_body = body

print("count=%d" % count)
return topics


# org.apache.rocketmq.store.CommitLog
MESSAGE_MAGIC_CODE = -626843481
BLANK_MAGIC_CODE = -875286124

# + 4 //TOTALSIZE
# + 4 //MAGICCODE
# + 4 //BODYCRC
# + 4 //QUEUEID
# + 4 //FLAG
# + 8 //QUEUEOFFSET
# + 8 //PHYSICALOFFSET
# + 4 //SYSFLAG
# + 8 //BORNTIMESTAMP
# + 8 //BORNHOST
# + 8 //STORETIMESTAMP
# + 8 //STOREHOSTADDRESS
# + 4 //RECONSUMETIMES
# + 8 //Prepared Transaction Offset
# + 4 + body_length //BODY
# + 1 + topic_length //TOPIC
# + 2 + properties_length //propertiesLength

# org.apache.rocketmq.common.sysflag.MessageSysFlag
COMPRESSED_FLAG = 1

def parse_message(f):

msg_length, = struct.unpack('>I', f.read(4))
if msg_length == 0:
print("current position=%d, msg_length=%d, OVER!" % (f.tell(), msg_length))
return False, None

(magic_code, ) = struct.unpack('>i', f.read(4))
if magic_code == 0 or magic_code != MESSAGE_MAGIC_CODE:
print("current position=%d, msg_length=%d, magic_code=%d, OVER!" % (f.tell(), msg_length, magic_code))
return False, None

# f.seek(4 * 5 + 8 * 7, 1)
body_crc, queue_id, flag, queue_offset, physical_offset, sys_flag, \
born_timestamp, bh_ip1, bh_ip2, bh_ip3, bh_ip4, bh_port, \
store_timestamp, sh_ip1, sh_ip2, sh_ip3, sh_ip4, sh_port, \
reconsume_times, prepared_transation_offset \
= struct.unpack('>IIiQQiQBBBBIQBBBBIIQ', f.read(4 * 5 + 8 * 7))
born_host = "%d.%d.%d.%d:%d" % (bh_ip1, bh_ip2, bh_ip3, bh_ip4, bh_port)
store_host = "%d.%d.%d.%d:%d" % (sh_ip1, sh_ip2, sh_ip3, sh_ip4, sh_port)

# body
body_length, = struct.unpack('>I', f.read(4))
body = f.read(body_length)
if sys_flag == COMPRESSED_FLAG:
body = zlib.decompress(body)

# topic
topic_length, = struct.unpack('>B', f.read(1))
topic = f.read(topic_length).decode('ascii')

(properties_length, ) = struct.unpack('>H', f.read(2))
# f.seek(properties_length, 1)
properties = f.read(properties_length).decode('utf8')

calc_length = 4 * 7 + 8 * 7 + (4 + body_length) + (1 + topic_length) + (2 + properties_length)
message = "calc_length=%d, msg_length=%d, magic_cod=%d, body_crc=%d, queue_id=%d, flag=%d, queue_offset=%d, "\
"physical_offset=%d, sys_flag=%d, born_timestamp=%d, born_host=%s, store_timestamp=%d, store_host=%s, "\
"reconsume_times=%d, prepared_transation_offset=%d, body_length=%d, body=%s, topic_length=%d, topic=%s, "\
"properties_length=%d, properties=%s" \
% (calc_length, msg_length, magic_code, body_crc, queue_id, flag, queue_offset, physical_offset, sys_flag,
born_timestamp, born_host, store_timestamp, store_host, reconsume_times, prepared_transation_offset,
body_length, bytes2string(body), topic_length, topic, properties_length, properties)

return True, (topic, body, message)


def parse_consumequeue(store_base_dir, topic, queue_id):
topic_consume_queue_dir = '%s/consumequeue/%s/%s' % (store_base_dir, topic, queue_id)
commitlog_dir = store_base_dir + '/commitlog'

for file_name in os.listdir(topic_consume_queue_dir):
topic_consume_queue_file = topic_consume_queue_dir + '/' + file_name
print('===== topic_consume_queue_file=%s' % topic_consume_queue_file)
with open(topic_consume_queue_file, mode='rb') as f:
while True:
try:
success, temp_tup = parse_consumequeue_0(f)
if not success:
print('commitlog_offset=0, parse OVER!')
break

commitlog_offset, = temp_tup
message = query_by_commitlog_offset(commitlog_offset, commitlog_dir)
print(message)

except Exception as e:
print(e, traceback.format_exc())
break


def parse_consumequeue_0(f):
commitlog_offset, message_size = struct.unpack('>QI', f.read(12))
if commitlog_offset == 0:
return False, None

message_tag_hashcode = struct.unpack('>Q', f.read(8))[0]
print('commitlog_offset=%d, message_size=%d, message_tag_hashcode=%d'
% (commitlog_offset, message_size, message_tag_hashcode))

return True, (commitlog_offset)


def query_by_commitlog_offset(commitlog_offset, commitlog_dir):
files = os.listdir(commitlog_dir)
# print(files)

commitlog_file_name = None
begin_offset = -1
for file_name in files:
file_begin_offset = int(file_name)
if commitlog_offset >= file_begin_offset and begin_offset < file_begin_offset:
begin_offset = file_begin_offset
commitlog_file_name = file_name

# 消息已从 commitlog 中删除
if not commitlog_file_name:
return None

commitlog_path = commitlog_dir + '/' + commitlog_file_name
with open(commitlog_path, mode='rb') as f:
f.seek(commitlog_offset - begin_offset, 0)
success, tup = parse_message(f)# tup: topic, body, message
if success:
return tup[2]


def bytes2string(body):
try:
# <type 'str'>
# print(type(body))
# {'confidence': 0.99, 'encoding': 'utf-8'}
# print(chardet.detect(body))
return body.decode('utf8')
except UnicodeDecodeError as e:
print(e)
return repr(e)


# 同 java.lang.String#hashCode
# s[0]*31^(n-1) + s[1]*31^(n-2) + ... + s[n-1]
def hashcode(string):
h = 0
for c in string:
# print(ord(c), chr(ord(c)))
h = 31 * h + ord(c)
return h


# nohup python rocketmq_parser.py commitlog ~/store > commitlog.result &
# python rocketmq_parser.py commitlog /home/rocketmq/store
# python rocketmq_parser.py consumequeue ~/store ***topic 0 > commitlog.result
if __name__ == '__main__':
kind = sys.argv[1]
store_base_dir = sys.argv[2]

if kind == 'consumequeue':
topic = sys.argv[3]
queue_id = sys.argv[4]
parse_consumequeue(store_base_dir, topic, queue_id)
elif kind == 'commitlog':
parse_commitlog(store_base_dir)

参考

本文参考了互联网上大家的分享,就不一一列举,在此一并谢过。
也希望本文,能对大家有所帮助,若有错误,还请谅解、指正。