经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » RocketMQ » 查看文章
RocketMq深入分析讲解两种削峰方式
来源:jb51  时间:2023/1/20 8:39:34  对本文有异议

何时需要削峰

当上游调用下游服务速率高于下游服务接口QPS时,那么如果不对调用速率进行控制,那么会发生很多失败请求

通过消息队列的削峰方法有两种

控制消费者消费速率和生产者投放延时消息,本质都是控制消费速度

通过消费者参数控制消费速度

先分析那些参数对控制消费速度有作用

1.PullInterval: 设置消费端,拉取mq消息的间隔时间。

注意:该时间算起时间是rocketMq消费者从broker消息后算起。经过PullInterval再次向broker拉去消息

源码分析:

首先需要了解rocketMq的消息拉去过程

拉去消息的类

PullMessageService

  1. public class PullMessageService extends ServiceThread {
  2. private final InternalLogger log = ClientLogger.getLog();
  3. private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
  4. private final MQClientInstance mQClientFactory;
  5. private final ScheduledExecutorService scheduledExecutorService = Executors
  6. .newSingleThreadScheduledExecutor(new ThreadFactory() {
  7. @Override
  8. public Thread newThread(Runnable r) {
  9. return new Thread(r, "PullMessageServiceScheduledThread");
  10. }
  11. });
  12. public PullMessageService(MQClientInstance mQClientFactory) {
  13. this.mQClientFactory = mQClientFactory;
  14. }
  15. public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
  16. if (!isStopped()) {
  17. this.scheduledExecutorService.schedule(new Runnable() {
  18. @Override
  19. public void run() {
  20. PullMessageService.this.executePullRequestImmediately(pullRequest);
  21. }
  22. }, timeDelay, TimeUnit.MILLISECONDS);
  23. } else {
  24. log.warn("PullMessageServiceScheduledThread has shutdown");
  25. }
  26. }
  27. public void executePullRequestImmediately(final PullRequest pullRequest) {
  28. try {
  29. this.pullRequestQueue.put(pullRequest);
  30. } catch (InterruptedException e) {
  31. log.error("executePullRequestImmediately pullRequestQueue.put", e);
  32. }
  33. }
  34. public void executeTaskLater(final Runnable r, final long timeDelay) {
  35. if (!isStopped()) {
  36. this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
  37. } else {
  38. log.warn("PullMessageServiceScheduledThread has shutdown");
  39. }
  40. }
  41. public ScheduledExecutorService getScheduledExecutorService() {
  42. return scheduledExecutorService;
  43. }
  44. private void pullMessage(final PullRequest pullRequest) {
  45. final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
  46. if (consumer != null) {
  47. DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
  48. impl.pullMessage(pullRequest);
  49. } else {
  50. log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
  51. }
  52. }
  53. @Override
  54. public void run() {
  55. log.info(this.getServiceName() + " service started");
  56. while (!this.isStopped()) {
  57. try {
  58. PullRequest pullRequest = this.pullRequestQueue.take();
  59. this.pullMessage(pullRequest);
  60. } catch (InterruptedException ignored) {
  61. } catch (Exception e) {
  62. log.error("Pull Message Service Run Method exception", e);
  63. }
  64. }
  65. log.info(this.getServiceName() + " service end");
  66. }
  67. @Override
  68. public void shutdown(boolean interrupt) {
  69. super.shutdown(interrupt);
  70. ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);
  71. }
  72. @Override
  73. public String getServiceName() {
  74. return PullMessageService.class.getSimpleName();
  75. }
  76. }

继承自ServiceThread,这是一个单线程执行的service,不断获取阻塞队列中的pullRequest,进行消息拉取。

executePullRequestLater会延时将pullrequest放入到pullRequestQueue,达到延时拉去的目的。

那么PullInterval参数就是根据这个功能发挥的作用,在消费者拉去消息成功的回调

  1. PullCallback pullCallback = new PullCallback() {
  2. @Override
  3. public void onSuccess(PullResult pullResult) {
  4. if (pullResult != null) {
  5. pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
  6. subscriptionData);
  7. switch (pullResult.getPullStatus()) {
  8. case FOUND:
  9. long prevRequestOffset = pullRequest.getNextOffset();
  10. pullRequest.setNextOffset(pullResult.getNextBeginOffset());
  11. long pullRT = System.currentTimeMillis() - beginTimestamp;
  12. DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
  13. pullRequest.getMessageQueue().getTopic(), pullRT);
  14. long firstMsgOffset = Long.MAX_VALUE;
  15. if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
  16. DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
  17. } else {
  18. firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
  19. DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
  20. pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
  21. boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
  22. DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
  23. pullResult.getMsgFoundList(),
  24. processQueue,
  25. pullRequest.getMessageQueue(),
  26. dispatchToConsume);
  27. if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
  28. DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
  29. DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
  30. } else {
  31. DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
  32. }
  33. }
  34. if (pullResult.getNextBeginOffset() < prevRequestOffset
  35. || firstMsgOffset < prevRequestOffset) {
  36. log.warn(
  37. "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
  38. pullResult.getNextBeginOffset(),
  39. firstMsgOffset,
  40. prevRequestOffset);
  41. }
  42. break;
  43. case NO_NEW_MSG:
  44. pullRequest.setNextOffset(pullResult.getNextBeginOffset());
  45. DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
  46. DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
  47. break;
  48. case NO_MATCHED_MSG:
  49. pullRequest.setNextOffset(pullResult.getNextBeginOffset());
  50. DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
  51. DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
  52. break;
  53. case OFFSET_ILLEGAL:
  54. log.warn("the pull request offset illegal, {} {}",
  55. pullRequest.toString(), pullResult.toString());
  56. pullRequest.setNextOffset(pullResult.getNextBeginOffset());
  57. pullRequest.getProcessQueue().setDropped(true);
  58. DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
  59. @Override
  60. public void run() {
  61. try {
  62. DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
  63. pullRequest.getNextOffset(), false);
  64. DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
  65. DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
  66. log.warn("fix the pull request offset, {}", pullRequest);
  67. } catch (Throwable e) {
  68. log.error("executeTaskLater Exception", e);
  69. }
  70. }
  71. }, 10000);
  72. break;
  73. default:
  74. break;
  75. }
  76. }
  77. }
  78. @Override
  79. public void onException(Throwable e) {
  80. if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  81. log.warn("execute the pull request exception", e);
  82. }
  83. DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
  84. }
  85. };

在 case found的情况下,也就是拉取到消息的q情况,在PullInterval>0的情况下,会延时投递到pullRequestQueue中,实现拉取消息的间隔

  1. if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
  2. DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
  3. DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
  4. } else {
  5. DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
  6. }

2.PullBatchSize: 设置每次pull消息的数量,该参数设置是针对逻辑消息队列,并不是每次pull消息拉到的总消息数

消费端分配了两个消费队列来监听。那么PullBatchSize 设置为32,那么该消费端每次pull到 64个消息。

消费端每次pull到消息总数=PullBatchSize*监听队列数

源码分析

消费者拉取消息时

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage中

会执行

  1. this.pullAPIWrapper.pullKernelImpl(
  2. pullRequest.getMessageQueue(),
  3. subExpression,
  4. subscriptionData.getExpressionType(),
  5. subscriptionData.getSubVersion(),
  6. pullRequest.getNextOffset(),
  7. this.defaultMQPushConsumer.getPullBatchSize(),
  8. sysFlag,
  9. commitOffsetValue,
  10. BROKER_SUSPEND_MAX_TIME_MILLIS,
  11. CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
  12. CommunicationMode.ASYNC,
  13. pullCallback
  14. );

其中 this.defaultMQPushConsumer.getPullBatchSize(),就是配置的PullBatchSize,代表的是每次从broker的一个队列上拉取的最大消息数。

3.ThreadMin和ThreadMax: 消费端消费pull到的消息需要的线程数量。

源码分析:

还是在消费者拉取消息成功时

  1. boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
  2. DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
  3. pullResult.getMsgFoundList(),
  4. processQueue,
  5. pullRequest.getMessageQueue(),
  6. dispatchToConsume);

通过consumeMessageService执行

默认情况下是并发消费

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest

  1. @Override
  2. public void submitConsumeRequest(
  3. final List<MessageExt> msgs,
  4. final ProcessQueue processQueue,
  5. final MessageQueue messageQueue,
  6. final boolean dispatchToConsume) {
  7. final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
  8. if (msgs.size() <= consumeBatchSize) {
  9. ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
  10. try {
  11. this.consumeExecutor.submit(consumeRequest);
  12. } catch (RejectedExecutionException e) {
  13. this.submitConsumeRequestLater(consumeRequest);
  14. }
  15. } else {
  16. for (int total = 0; total < msgs.size(); ) {
  17. List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
  18. for (int i = 0; i < consumeBatchSize; i++, total++) {
  19. if (total < msgs.size()) {
  20. msgThis.add(msgs.get(total));
  21. } else {
  22. break;
  23. }
  24. }
  25. ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
  26. try {
  27. this.consumeExecutor.submit(consumeRequest);
  28. } catch (RejectedExecutionException e) {
  29. for (; total < msgs.size(); total++) {
  30. msgThis.add(msgs.get(total));
  31. }
  32. this.submitConsumeRequestLater(consumeRequest);
  33. }
  34. }
  35. }
  36. }

其中consumeExecutor初始化

  1. this.consumeExecutor = new ThreadPoolExecutor(
  2. this.defaultMQPushConsumer.getConsumeThreadMin(),
  3. this.defaultMQPushConsumer.getConsumeThreadMax(),
  4. 1000 * 60,
  5. TimeUnit.MILLISECONDS,
  6. this.consumeRequestQueue,
  7. new ThreadFactoryImpl("ConsumeMessageThread_"));

对象线程池最大和核心线程数。对于顺序消费ConsumeMessageOrderlyService也会使用最大和最小线程数这两个参数,只是消费时会锁定队列。

以上三种情况:是针对参数配置,来调整消费速度。

除了这三种情况外还有两种服务部署情况,可以调整消费速度:

4.rocketMq 逻辑消费队列配置数量 有消费端每次pull到消息总数=PullBatchSize*监听队列数

可知rocketMq 逻辑消费队列配置数量即上图中的 queue1 ,queue2,配置数量越多每次pull到的消息总数也就越多。如果下边配置读队列数量:修改tocpic的逻辑队列数量

5.消费端节点部署数量 :

部署数量无论一个节点监听所有队列,还是多个节点按照分配策略分配监听队列数量,理论上每秒pull到的数量都一样的,但是多节点消费端消费线程数量要比单节点消费线程数量多,也就是多节点消费速度大于单节点。

消费延时控流

针对消息订阅者的消费延时流控的基本原理是,每次消费时在客户端增加一个延时来控制消费速度,此时理论上消费并发最快速度为:

单节点部署:

ConsumInterval :延时时间单位毫秒

ConcurrentThreadNumber:消费端线程数量

MaxRate :理论每秒处理数量

MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber

如果消息并发消费线程(ConcurrentThreadNumber)为 20,延时(ConsumInterval)为 100 ms,代入上述公式可得

如果消息并发消费线程(ConcurrentThreadNumber)为 20,延时(ConsumInterval)为 100 ms,代入上述公式可得

200 = 1 / 0.1 * 20

由上可知,理论上可以将并发消费控制在 200 以下

如果是多个节点部署如两个节点,理论消费速度最高为每秒处理400个消息。

如下延时流控代码:

  1. /**
  2. * 测试mq 并发 接受
  3. */
  4. @Component
  5. @RocketMQMessageListener(topic = ConstantTopic.WRITING_LIKE_TOPIC,selectorExpression = ConstantTopic.WRITING_LIKE_ADD_TAG, consumerGroup = "writing_like_topic_add_group")
  6. class ConsumerLikeSave implements RocketMQListener<LikeWritingParams>, RocketMQPushConsumerLifecycleListener{
  7. @SneakyThrows
  8. @Override
  9. public void onMessage(LikeWritingParams params) {
  10. System.out.println("睡上0.1秒");
  11. Thread.sleep(100);
  12. long begin = System.currentTimeMillis();
  13. System.out.println("mq消费速度"+Thread.currentThread().getName()+" "+DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now()));
  14. //writingLikeService.saveLike2Db(params.getUserId(),params.getWritingId());
  15. long end = System.currentTimeMillis();
  16. // System.out.println("消费:: " +Thread.currentThread().getName()+ "毫秒:"+(end - begin));
  17. }
  18. @Override
  19. public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
  20. defaultMQPushConsumer.setConsumeThreadMin(20); //消费端拉去到消息以后分配线索去消费
  21. defaultMQPushConsumer.setConsumeThreadMax(50);//最大消费线程,一般情况下,默认队列没有塞满,是不会启用新的线程的
  22. defaultMQPushConsumer.setPullInterval(0);//消费端多久一次去rocketMq 拉去消息
  23. defaultMQPushConsumer.setPullBatchSize(32); //消费端每个队列一次拉去多少个消息,若该消费端分赔了N个监控队列,那么消费端每次去rocketMq拉去消息说为N*1
  24. defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
  25. defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
  26. defaultMQPushConsumer.setConsumeMessageBatchMaxSize(2);
  27. }
  28. }

注释:如上消费端,单节点每秒处理速度也就是最高200个消息,实际上要小于200,业务代码执行也是需要时间。

但是要注意实际操作中并发流控实际是默认存在的,

spring boot 消费端默认配置

this.consumeThreadMin = 20;

this.consumeThreadMax = 20;

this.pullInterval = 0L;

this.pullBatchSize = 32;

若业务逻辑执行需要20ms,那么单节点处理速度就是:1/0.02*20=1000

这里默认拉去的速度1s内远大于1000

注意: 这里虽然pullInterval 等于0 当时受限于每次拉去64个,处理完也是需要一端时间才能回复ack,才能再次拉取,所以消费速度应该小于1000

所以并发流控要消费速度大于消费延时流控 ,那么消费延时流控才有意义

使用rokcetMq支持的延时消息也可以实现消息的延时消费,通过对delayLevel对应的时间进行配置为我们的需求。为不同的消息设置不同delayLevel,达到延时消费的目的。

总结

rocketMq 肖锋流控两种方式:

并发流控:就是根据业务流控速率要求,来调整topic 消费队列数量(read queue),消费端部署节点,消费端拉去间隔时间,消费端消费线程数量等,来达到要求的速率内

延时消费流控:就是在消费端延时消费消息(sleep),具体延时多少要根据业务要求速率,和消费端线程数量,和节点部署数量来控制

到此这篇关于RocketMq深入分析讲解两种削峰方式的文章就介绍到这了,更多相关RocketMq削峰内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号