rocketmq 源码版本: 4.3.1
Consumer
Consumer Group
Consumer 的 groupName 用于把多个 Consumer 组织到一起,提高并发处理能力、可靠性,groupName需要和消息模式 (MessageModel) 配合使用。
rocketmq 支持两种消息模式 (MessageModel): CLUSTERING、BROADCASTING。
- 在 CLUSTERING 模式下,同一个 consumerGroup (groupName 相同) 里的每个 Consumer 实例 (进程),只消费所订阅消息的一部分内容,同一个 Consumer Group 里所有的 Consumer 实例消费的内容合起来才是所订阅 Topic 内容的整体,从而达到负载均衡的目的。
- 在 BROADCASTING 模式下,同一个 Consumer Group 里的每个 Consumer 都能消费到所订阅 Topic 的全部消息,也就是一个消息会被多次分发,被多个 Consumer 消费,也即 BROADCASTING 模式下,Consumer Group 无意义。
PUSH, PULL, LONG POLLING
PUSH
PUSH 方式是 server 端接收到消息后,主动把消息推送给 client 端,实时性高。
但对于消息队列服务来说,用 PUSH 方式主动推送有些弊端:
- 加大了 server 端的工作量,进而影响了 server 的性能;
- client 的处理能力各不相同,client 的状态不受 server 控制,如果 client 不能及时处理 server 推送过来的消息,会造成各种潜在问题。
PULL
PULL 方式是 client 端循环地从 server 端拉取消息,主动权在 client 手里,自己拉取到一定量消息后,处理妥当了再接着取。
PULL 方式的问题是循环拉取消息的间隔不好设定:
- PULL 时间间隔太短,就处在一个 “忙等” 的状态,浪费资源;
- PULL 时间间隔太长, server 端有消息到来时, 就没法被及时处理。
LONG POLLING
rocketmq 通过 “长轮询” 方式达到 PUSH 效果。
“长轮询” 方式通过 client 端和 server 端的配合,达到既拥有 PULL 的优点,又能保证实时性(当然,还是有一定的实时性损失)。
“长轮询” 的核心是, Broker 端 HOLD 住客户端请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给 Consumer。
“长轮询” 的主动权还是掌握在 Consumer 手中,Broker 即使有大量消息积压,也不会主动推送给 Consumer。
“长轮询” 方式的局限性,是在 HOLD 住 Consumer 请求的时候需要占用资源,它适用于消息队列这种客户端连接数可控的场景中。
下文主要针对 “PUSH 消费普通消息”
示例: PUSH 消费普通消息
1 | public class PushConsumer { |
PushConsumer 组件一览
先看一张 PushConsumer
包含的组件以及组件之间的交互图:
RebalanceService
:均衡消息队列服务,负责分配当前Consumer
可消费的消息队列(MessageQueue
)。当有新的Consumer
的加入或移除,都会重新分配消息队列。PullMessageService
:拉取消息服务,不断从Broker
拉取消息,并提交消费任务到ConsumeMessageService
。ConsumeMessageService
:消费消息服务,不断消费消息,并处理消费结果。RemoteBrokerOffsetStore
:Consumer
消费进度管理,负责从Broker
获取消费进度,同步消费进度到Broker
。ProcessQueue
:消息处理队列。MQClientInstance
:封装对Namesrv
,Broker
的 API 调用,提供给Producer
、Consumer
使用。
源码解析
PushConsumer 启动
DefaultMQPushConsumer
1 | public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { |
- 门面类 facade
- 包含一个 DefaultMQPushConsumerImpl 实例, 实际处理逻辑
- 继承了 ClientConfig 并扩充了 PushConsumer 特定对配置项, 方便使用时设置各项参数
DefaultMQPushConsumerImpl#subscribe(topic, subExpression)
1 | public void subscribe(String topic, String subExpression) throws MQClientException { |
FilterAPI#buildSubscriptionData()
1 | public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, String subString) throws Exception { |
- topic
- 订阅的多个 tag, 使用 || 分隔
- “*” 表示订阅 topic 下的所有 tag
DefaultMQPushConsumerImpl#start()
1 | public synchronized void start() throws MQClientException { |
MQClientInstance#start()
1 | public void start() throws MQClientException { |
队列负载均衡
什么是队列负载均衡?
rocketmq 一个 topic 有多个 Queue, 1个 Consumer Group 有多个 Consumer 实例,
采用 CLUSTERING 集群模式时, 多个 Consumer 分摊到 topic 的不同 Queue。
rocketmq 采用 client 侧负载均衡 (而不是 broker 侧负载均衡),
即 consumer 拉取消息时, 对消费队列进行负载均衡。
RebalanceService
1 | public class RebalanceService extends ServiceThread { |
- 均衡消息队列服务,负责分配当前
Consumer
可消费的消息队列(MessageQueue
)。 - 调用
MQClientInstance#doRebalance(...)
分配消息队列 - 有三种情况情况下触发:
- 等待超时,每 20s 调用一次
- 启动时,调用
rebalanceService#wakeup(...)
触发 Broker
通知Consumer
加入 或 移除时,Consumer
响应通知,调用rebalanceService#wakeup(...)
触发
MQClientInstance#doRebalance()
1 | public void doRebalance() { |
- 一个应用进程可包含多个 consumer集合, 遍历当前进程的 consumerTable (Consumer集合),执行消息队列分配
- consumerTable 元素来源于 DefaultMQPushConsumerImpl#start()
- mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this)
- 目前 rocketmq 只有两种 MQConsumerInner: DefaultMQPullConsumerImpl, DefaultMQPushConsumerImpl
DefaultMQPushConsumerImpl#doRebalance()
1 |
|
- 经由 DefaultMQPushConsumerImpl 控制一下 (!this.pause)
- DefaultMQPullConsumerImpl#doRebalance() 没有顺序消息
- this.rebalanceImpl.doRebalance(false)
RebalanceImpl#doRebalance(isOrder)
1 | public void doRebalance(final boolean isOrder) { |
#truncateMessageQueueNotMyTopic(...)
:移除未订阅的消息队列。- 当调用
DefaultMQPushConsumer#unsubscribe(topic)
时,只移除订阅主题集合(subscriptionInner
),对应消息队列移除在该方法
RebalanceImpl#rebalanceByTopic(topic, isOrder)
1 | private void rebalanceByTopic(final String topic, final boolean isOrder) { |
- this.dispatchPullRequest(pullRequestList);
- 发起消息拉取请求, 该调用是 PushConsumer 不断拉取消息的起点
AllocateMessageQueueAveragely
1 | /** |
- 举个例子:
某 topic 有 4 个消息队列。
Consumer 2 可以整除* | Consumer 3 不可整除* | Consumer 5 无法都分配* | |
---|---|---|---|
消息队列[0] | Consumer[0] | Consumer[0] | Consumer[0] |
消息队列[1] | Consumer[0] | Consumer[0] | Consumer[1] |
消息队列[2] | Consumer[1] | Consumer[1] | Consumer[2] |
消息队列[3] | Consumer[1] | Consumer[2] | Consumer[3] |
- AllocateMessageQueueStrategy
- AllocateMachineRoomNearby
- AllocateMessageQueueAveragely
- AllocateMessageQueueAveragelyByCircle
- AllocateMessageQueueByConfig
- AllocateMessageQueueByMachineRoom
- AllocateMessageQueueConsistentHash、
拉取消息
RebalancePushImpl#dispatchPullRequest()
1 |
|
DefaultMQPushConsumerImpl#executePullRequestImmediately(pullRequest)
1 | public void executePullRequestImmediately(final PullRequest pullRequest) { |
PullMessageService
1 | public class PullMessageService extends ServiceThread { |
DefaultMQPushConsumerImpl#pullMessage(pullRequest)
1 | public void pullMessage(final PullRequest pullRequest) { |
RebalancePushImpl#computePullFromWhere(mq)
1 |
|
- 计算消息队列开始消费位置
PushConsumer
读取消费进度有三种选项:CONSUME_FROM_LAST_OFFSET
:一个新的消费集群第一次启动从队列的最后位置开始消费。后续再启动接着上次消费的进度开始消费。CONSUME_FROM_FIRST_OFFSET
:一个新的消费集群第一次启动从队列的最前位置开始消费。后续再启动接着上次消费的进度开始消费。CONSUME_FROM_TIMESTAMP
:一个新的消费集群第一次启动从指定时间点开始消费。后续再启动接着上次消费的进度开始消费。
PullAPIWrapper#pullKernelImpl(…)
1 | public PullResult pullKernelImpl( |
MQClientAPIImpl#pullMessage(…)
1 | public PullResult pullMessage( |
RemotingCommand#encode()
1 | public ByteBuffer encode() { |
消费消息
消费消息时序图
ConsumeMessageConcurrentlyService#submitConsumeRequest(…)
1 |
|
ConsumeMessageConcurrentlyService.ConsumeRequest
1 | class ConsumeRequest implements Runnable { |
消费进度存储
RemoteBrokerOffsetStore
:Consumer
集群模式 下,使用远程Broker
消费进度。LocalFileOffsetStore
:Consumer
广播模式下,使用本地文件
消费进度。
消费进度持久化
- 拉取消息、分配消息队列等等操作,会进行消费进度持久化
- 定时任务
RemoteBrokerOffsetStore
MQClientInstance#startScheduledTask
1 | private void startScheduledTask() { |
- org.apache.rocketmq.client.ClientConfig
- private int persistConsumerOffsetInterval = 1000 * 5;
DefaultMQPushConsumerImpl#persistConsumerOffset
1 |
|
参考
本文参考了互联网上大家的分享,就不一一列举,在此一并谢过。
也希望本文,能对大家有所帮助,若有错误,还请谅解、指正。