rocketmq 源码版本: 4.3.1
Broker
消息中转角色,负责接收消息,消息存储,转发消息,一般也称为 Message Server。
JMS 规范中,称为 Provider。
源码解析
Broker 启动
BrokerStartup
1 | public static void main(String[] args) { |
BrokerController
1 | public boolean initialize() throws CloneNotSupportedException { |
NettyRemotingServer
1 |
|
NettyRemotingServer.NettyServerHandler
1 | class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { |
NettyRemotingAbstract#processMessageReceived(…)
1 | public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { |
NettyRemotingAbstract#processRequestCommand(…)
1 | public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { |
处理发送消息请求
SendMessageProcessor
1 |
|
DefaultMessageStore#putMessage(msg)
1 | public PutMessageResult putMessage(MessageExtBrokerInner msg) { |
处理拉取消息请求
PullMessageProcessor#processRequest(…)
1 | private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException { |
DefaultMessageStore#getMessage(…)
1 | // 根据 消息分组(group) + 主题(Topic) + 队列编号(queueId) + 队列位置(offset) + 订阅信息(subscriptionData) 获取 指定条数(maxMsgNums) 的 消息(Message) |
参考
本文参考了互联网上大家的分享,就不一一列举,在此一并谢过。
也希望本文,能对大家有所帮助,若有错误,还请谅解、指正。