何时需要削峰
当上游调用下游服务速率高于下游服务接口QPS时,那么如果不对调用速率进行控制,那么会发生很多失败请求
通过消息队列的削峰方法有两种
控制消费者消费速率和生产者投放延时消息,本质都是控制消费速度
通过消费者参数控制消费速度
先分析那些参数对控制消费速度有作用
1.PullInterval: 设置消费端,拉取mq消息的间隔时间。
注意:该时间算起时间是rocketMq消费者从broker消息后算起。经过PullInterval再次向broker拉去消息
源码分析:
首先需要了解rocketMq的消息拉去过程
拉去消息的类
PullMessageService
- public class PullMessageService extends ServiceThread {
- private final InternalLogger log = ClientLogger.getLog();
- private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
- private final MQClientInstance mQClientFactory;
- private final ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "PullMessageServiceScheduledThread");
- }
- });
- public PullMessageService(MQClientInstance mQClientFactory) {
- this.mQClientFactory = mQClientFactory;
- }
- public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
- if (!isStopped()) {
- this.scheduledExecutorService.schedule(new Runnable() {
- @Override
- public void run() {
- PullMessageService.this.executePullRequestImmediately(pullRequest);
- }
- }, timeDelay, TimeUnit.MILLISECONDS);
- } else {
- log.warn("PullMessageServiceScheduledThread has shutdown");
- }
- }
- public void executePullRequestImmediately(final PullRequest pullRequest) {
- try {
- this.pullRequestQueue.put(pullRequest);
- } catch (InterruptedException e) {
- log.error("executePullRequestImmediately pullRequestQueue.put", e);
- }
- }
- public void executeTaskLater(final Runnable r, final long timeDelay) {
- if (!isStopped()) {
- this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
- } else {
- log.warn("PullMessageServiceScheduledThread has shutdown");
- }
- }
- public ScheduledExecutorService getScheduledExecutorService() {
- return scheduledExecutorService;
- }
- private void pullMessage(final PullRequest pullRequest) {
- final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
- if (consumer != null) {
- DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
- impl.pullMessage(pullRequest);
- } else {
- log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
- }
- }
- @Override
- public void run() {
- log.info(this.getServiceName() + " service started");
- while (!this.isStopped()) {
- try {
- PullRequest pullRequest = this.pullRequestQueue.take();
- this.pullMessage(pullRequest);
- } catch (InterruptedException ignored) {
- } catch (Exception e) {
- log.error("Pull Message Service Run Method exception", e);
- }
- }
- log.info(this.getServiceName() + " service end");
- }
- @Override
- public void shutdown(boolean interrupt) {
- super.shutdown(interrupt);
- ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);
- }
- @Override
- public String getServiceName() {
- return PullMessageService.class.getSimpleName();
- }
- }
继承自ServiceThread,这是一个单线程执行的service,不断获取阻塞队列中的pullRequest,进行消息拉取。
executePullRequestLater会延时将pullrequest放入到pullRequestQueue,达到延时拉去的目的。
那么PullInterval参数就是根据这个功能发挥的作用,在消费者拉去消息成功的回调
- PullCallback pullCallback = new PullCallback() {
- @Override
- public void onSuccess(PullResult pullResult) {
- if (pullResult != null) {
- pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
- subscriptionData);
- switch (pullResult.getPullStatus()) {
- case FOUND:
- long prevRequestOffset = pullRequest.getNextOffset();
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
- long pullRT = System.currentTimeMillis() - beginTimestamp;
- DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(), pullRT);
- long firstMsgOffset = Long.MAX_VALUE;
- if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- } else {
- firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
- DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
- boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
- DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
- pullResult.getMsgFoundList(),
- processQueue,
- pullRequest.getMessageQueue(),
- dispatchToConsume);
- if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
- DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
- } else {
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- }
- }
- if (pullResult.getNextBeginOffset() < prevRequestOffset
- || firstMsgOffset < prevRequestOffset) {
- log.warn(
- "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
- pullResult.getNextBeginOffset(),
- firstMsgOffset,
- prevRequestOffset);
- }
- break;
- case NO_NEW_MSG:
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
- DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- break;
- case NO_MATCHED_MSG:
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
- DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- break;
- case OFFSET_ILLEGAL:
- log.warn("the pull request offset illegal, {} {}",
- pullRequest.toString(), pullResult.toString());
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
- pullRequest.getProcessQueue().setDropped(true);
- DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
- @Override
- public void run() {
- try {
- DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
- pullRequest.getNextOffset(), false);
- DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
- DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
- log.warn("fix the pull request offset, {}", pullRequest);
- } catch (Throwable e) {
- log.error("executeTaskLater Exception", e);
- }
- }
- }, 10000);
- break;
- default:
- break;
- }
- }
- }
- @Override
- public void onException(Throwable e) {
- if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- log.warn("execute the pull request exception", e);
- }
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
- }
- };
在 case found的情况下,也就是拉取到消息的q情况,在PullInterval>0的情况下,会延时投递到pullRequestQueue中,实现拉取消息的间隔
- if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
- DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
- } else {
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- }
2.PullBatchSize: 设置每次pull消息的数量,该参数设置是针对逻辑消息队列,并不是每次pull消息拉到的总消息数
消费端分配了两个消费队列来监听。那么PullBatchSize 设置为32,那么该消费端每次pull到 64个消息。
消费端每次pull到消息总数=PullBatchSize*监听队列数
源码分析
消费者拉取消息时
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage中
会执行
- this.pullAPIWrapper.pullKernelImpl(
- pullRequest.getMessageQueue(),
- subExpression,
- subscriptionData.getExpressionType(),
- subscriptionData.getSubVersion(),
- pullRequest.getNextOffset(),
- this.defaultMQPushConsumer.getPullBatchSize(),
- sysFlag,
- commitOffsetValue,
- BROKER_SUSPEND_MAX_TIME_MILLIS,
- CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
- CommunicationMode.ASYNC,
- pullCallback
- );
其中 this.defaultMQPushConsumer.getPullBatchSize(),就是配置的PullBatchSize,代表的是每次从broker的一个队列上拉取的最大消息数。
3.ThreadMin和ThreadMax: 消费端消费pull到的消息需要的线程数量。
源码分析:
还是在消费者拉取消息成功时
- boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
- DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
- pullResult.getMsgFoundList(),
- processQueue,
- pullRequest.getMessageQueue(),
- dispatchToConsume);
通过consumeMessageService执行
默认情况下是并发消费
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest
- @Override
- public void submitConsumeRequest(
- final List<MessageExt> msgs,
- final ProcessQueue processQueue,
- final MessageQueue messageQueue,
- final boolean dispatchToConsume) {
- final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
- if (msgs.size() <= consumeBatchSize) {
- ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
- try {
- this.consumeExecutor.submit(consumeRequest);
- } catch (RejectedExecutionException e) {
- this.submitConsumeRequestLater(consumeRequest);
- }
- } else {
- for (int total = 0; total < msgs.size(); ) {
- List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
- for (int i = 0; i < consumeBatchSize; i++, total++) {
- if (total < msgs.size()) {
- msgThis.add(msgs.get(total));
- } else {
- break;
- }
- }
- ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
- try {
- this.consumeExecutor.submit(consumeRequest);
- } catch (RejectedExecutionException e) {
- for (; total < msgs.size(); total++) {
- msgThis.add(msgs.get(total));
- }
- this.submitConsumeRequestLater(consumeRequest);
- }
- }
- }
- }
其中consumeExecutor初始化
- this.consumeExecutor = new ThreadPoolExecutor(
- this.defaultMQPushConsumer.getConsumeThreadMin(),
- this.defaultMQPushConsumer.getConsumeThreadMax(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.consumeRequestQueue,
- new ThreadFactoryImpl("ConsumeMessageThread_"));
对象线程池最大和核心线程数。对于顺序消费ConsumeMessageOrderlyService也会使用最大和最小线程数这两个参数,只是消费时会锁定队列。
以上三种情况:是针对参数配置,来调整消费速度。
除了这三种情况外还有两种服务部署情况,可以调整消费速度:
4.rocketMq 逻辑消费队列配置数量 有消费端每次pull到消息总数=PullBatchSize*监听队列数
可知rocketMq 逻辑消费队列配置数量即上图中的 queue1 ,queue2,配置数量越多每次pull到的消息总数也就越多。如果下边配置读队列数量:修改tocpic的逻辑队列数量
5.消费端节点部署数量 :
部署数量无论一个节点监听所有队列,还是多个节点按照分配策略分配监听队列数量,理论上每秒pull到的数量都一样的,但是多节点消费端消费线程数量要比单节点消费线程数量多,也就是多节点消费速度大于单节点。
消费延时控流
针对消息订阅者的消费延时流控的基本原理是,每次消费时在客户端增加一个延时来控制消费速度,此时理论上消费并发最快速度为:
单节点部署:
ConsumInterval :延时时间单位毫秒
ConcurrentThreadNumber:消费端线程数量
MaxRate :理论每秒处理数量
MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber
如果消息并发消费线程(ConcurrentThreadNumber)为 20,延时(ConsumInterval)为 100 ms,代入上述公式可得
如果消息并发消费线程(ConcurrentThreadNumber)为 20,延时(ConsumInterval)为 100 ms,代入上述公式可得
200 = 1 / 0.1 * 20
由上可知,理论上可以将并发消费控制在 200 以下
如果是多个节点部署如两个节点,理论消费速度最高为每秒处理400个消息。
如下延时流控代码:
- /**
- * 测试mq 并发 接受
- */
- @Component
- @RocketMQMessageListener(topic = ConstantTopic.WRITING_LIKE_TOPIC,selectorExpression = ConstantTopic.WRITING_LIKE_ADD_TAG, consumerGroup = "writing_like_topic_add_group")
- class ConsumerLikeSave implements RocketMQListener<LikeWritingParams>, RocketMQPushConsumerLifecycleListener{
- @SneakyThrows
- @Override
- public void onMessage(LikeWritingParams params) {
- System.out.println("睡上0.1秒");
- Thread.sleep(100);
- long begin = System.currentTimeMillis();
- System.out.println("mq消费速度"+Thread.currentThread().getName()+" "+DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now()));
- //writingLikeService.saveLike2Db(params.getUserId(),params.getWritingId());
- long end = System.currentTimeMillis();
- // System.out.println("消费:: " +Thread.currentThread().getName()+ "毫秒:"+(end - begin));
- }
- @Override
- public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
- defaultMQPushConsumer.setConsumeThreadMin(20); //消费端拉去到消息以后分配线索去消费
- defaultMQPushConsumer.setConsumeThreadMax(50);//最大消费线程,一般情况下,默认队列没有塞满,是不会启用新的线程的
- defaultMQPushConsumer.setPullInterval(0);//消费端多久一次去rocketMq 拉去消息
- defaultMQPushConsumer.setPullBatchSize(32); //消费端每个队列一次拉去多少个消息,若该消费端分赔了N个监控队列,那么消费端每次去rocketMq拉去消息说为N*1
- defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
- defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
- defaultMQPushConsumer.setConsumeMessageBatchMaxSize(2);
- }
- }
注释:如上消费端,单节点每秒处理速度也就是最高200个消息,实际上要小于200,业务代码执行也是需要时间。
但是要注意实际操作中并发流控实际是默认存在的,
spring boot 消费端默认配置
this.consumeThreadMin = 20;
this.consumeThreadMax = 20;
this.pullInterval = 0L;
this.pullBatchSize = 32;
若业务逻辑执行需要20ms,那么单节点处理速度就是:1/0.02*20=1000
这里默认拉去的速度1s内远大于1000
注意: 这里虽然pullInterval 等于0 当时受限于每次拉去64个,处理完也是需要一端时间才能回复ack,才能再次拉取,所以消费速度应该小于1000
所以并发流控要消费速度大于消费延时流控 ,那么消费延时流控才有意义
使用rokcetMq支持的延时消息也可以实现消息的延时消费,通过对delayLevel对应的时间进行配置为我们的需求。为不同的消息设置不同delayLevel,达到延时消费的目的。
总结
rocketMq 肖锋流控两种方式:
并发流控:就是根据业务流控速率要求,来调整topic 消费队列数量(read queue),消费端部署节点,消费端拉去间隔时间,消费端消费线程数量等,来达到要求的速率内
延时消费流控:就是在消费端延时消费消息(sleep),具体延时多少要根据业务要求速率,和消费端线程数量,和节点部署数量来控制
到此这篇关于RocketMq深入分析讲解两种削峰方式的文章就介绍到这了,更多相关RocketMq削峰内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!