经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » RocketMQ » 查看文章
RocketMQ - 消费者Rebalance机制
来源:cnblogs  作者:VipSoft  时间:2023/2/28 8:51:05  对本文有异议

客户端是通过Rebalance服务做到高可靠的。当发生Broker掉线、消费者实例掉线、Topic 扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡,以支持全部队列的正常消费的呢?

RebalancePullImpl 和 RebalancePushImpl 两个重平衡实现类,分别被 DefaultMQPullConsumer 和DefaultMQPushConsumer 使用。下面讲一下 Rebalancelmpl 的核心属性和方法
image

核心属性

  1. public abstract class RebalanceImpl {
  2. protected static final InternalLogger log = ClientLogger.getLog();
  3. //记录MessageQueue和ProcessQueue的关系。MessageQueue可以简单地理解为ConsumeQueue的客户端实现;ProcessQueue是保存Pull消息的本地容器
  4. protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
  5. //Topic 路由信息 。保存 Topic 和 MessageQueue的关系。
  6. protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>();
  7. //真正的订阅关系,保存当前消费者组订阅了哪些Topic的哪些Tag
  8. protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>();
  9. protected String consumerGroup;
  10. protected MessageModel messageModel;
  11. //消费分配策略的实现
  12. protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
  13. //client实例对象
  14. protected MQClientInstance mQClientFactory;
  15. }

核心方法

  1. public abstract class RebalanceImpl {
  2. //为MessageQueue加锁
  3. public boolean lock(final MessageQueue mq) {}
  4. //执行Rebalance操作
  5. public void doRebalance(final boolean isOrder) {}
  6. //通知Message发生变化,这个方法在Push和Pull两个类中被重写
  7. public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
  8. final Set<MessageQueue> mqDivided);
  9. //去掉不再需要的 MessageQueue
  10. public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq);
  11. //执行消息拉取请求
  12. public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
  13. //在Rebalance中更新processQueue
  14. private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
  15. final boolean isOrder)
  16. }

Rebalancelmpl 、 RebalancePushImpl 、 RebalancePullImpl 是Rebalance的核心实现,主要逻辑都在Rebalancelmpl中,因为Pull消费者和Push消费者对Rebalance的需求不同,在各自的实现中重写了部分方法,以满足自身需求

如果有一个消费者实例下线了,Broker和其他消费者是怎么做Rebalance的呢
image

  1. @Override
  2. public void run() {
  3. log.info(this.getServiceName() + " service started");
  4. while (!this.isStopped()) {
  5. this.waitForRunning(waitInterval);
  6. this.mqClientFactory.doRebalance();
  7. }
  8. log.info(this.getServiceName() + " service end");
  9. }

目前队列分配策略有以下5种实现方法

  • AllocateMessageQueueAveragely:平均分配,也是默认使用的策略(强烈推荐)。
  • AllocateMessageQueueAveragelyByCircle:环形分配策略。
  • AllocateMessageQueueByConfig:手动配置。
  • AllocateMessageQueueConsistentHash:一致性Hash分配。
  • AllocateMessageQueueByMachineRoom:机房分配策略

原文链接:https://www.cnblogs.com/vipsoft/p/17150065.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号