经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » RabbitMQ » 查看文章
如何利用rabbitMq的死信队列实现延时消息
来源:jb51  时间:2023/1/20 8:39:31  对本文有异议

前言

使用mq自带的死信去实现延时消息要注意一个坑点,就是mq只会检测队首的消息的过期时间,假设先放入队列10s过期消息,再放入2s过期。

mq会检测头部10s是否过期,10s不过期的情况下,2s就算过去也不会跑到死信.

mq基本的消息模型

mq死信队列的消息模型

简单的说就是先弄一个正常队列,然后不要设置消费者,接着给这个正常队列绑定一个死信队列,这个死信队列设置方式和正常队列没啥区别。

然后监听这个死信队列的消费.

一般死信队列由三大核心组件组成:死信交换机+死信路由+TTL(消息过期时间)

通常死信队列由“面向生产者的基本交换机+基本路由”绑定而成,所以生产者首先是将消息发送至“基本交换机+基本路由”所绑定而成的消息模型中,即间接性地进入到死信队列中,当过了TTL,消息将“挂掉”,从而进入下一个中转站,即“面下那个消费者的死信交换机+死信路由”所绑定而成的消息模型中.

maven依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-web</artifactId>
  8. </dependency>

配置普通队列和死信队列

yml文件

  1. spring:
  2. application:
  3. name: ttl-queue
  4. rabbitmq:
  5. host: 110.40.181.73
  6. port: 35672
  7. username: root
  8. password: 10086
  9. virtual-host: /fchan
  10. connection-timeout: 15000
  11. # 发送确认
  12. #publisher-confirms: true
  13. # 路由失败回调
  14. publisher-returns: true
  15. template:
  16. # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
  17. mandatory: true
  18. listener:
  19. simple:
  20. # 每次从RabbitMQ获取的消息数量
  21. prefetch: 1
  22. default-requeue-rejected: false
  23. # 每个队列启动的消费者数量
  24. concurrency: 1
  25. # 每个队列最大的消费者数量
  26. max-concurrency: 1
  27. # 签收模式为手动签收-那么需要在代码中手动ACK
  28. acknowledge-mode: manual
  1. package com.fchan.mq.mqDelay.dlx;
  2.  
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.beans.factory.annotation.Qualifier;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7.  
  8. @Configuration
  9. public class MyRabConfig {
  10.  
  11. //定义普通任务队列(其实就是一个普通队列,只是没有消费者)
  12. @Bean("delayQue")
  13. public Queue delayQue(){
  14. //普通队列绑定死信交换机及路由key
  15.  
  16. //delayQueue延时队列
  17. return QueueBuilder.durable("delayQueue")
  18. //指定这个延时队列下的死信交换机
  19. //如果消息过时则会被投递到当前对应的my-dlx-exchange
  20. .withArgument("x-dead-letter-exchange", "my-dlx-exchange")
  21. //死信交换机绑定的路由key
  22. //如果消息过时,my-dlx-exchange会根据routing-key-delay投递消息到对应的队列
  23. //mq发送消息的时候一般指定的是exchange交换机+这个routingKey来找到队列,这个应该是出于有时候需要让mq的交换机将一个消息发送到多个队列所采用的方式
  24. .withArgument("x-dead-letter-routing-key", "routing-key-delay")
  25. .build();
  26. }
  27.  
  28. //定义普通队列的交换机
  29. @Bean("delayExchange")
  30. public Exchange delayExchange(){
  31. return ExchangeBuilder.directExchange("delayExchange").build();
  32. }
  33.  
  34. //绑定普通队列的交换机
  35. @Bean("delayBinding")
  36. public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQue") Queue queue){
  37. return BindingBuilder.bind(queue).to(exchange).with("delayBindingRoutingKey").noargs();
  38. }
  39.  
  40.  
  41.  
  42. //定义死信队列
  43. @Bean("dlxQueue")
  44. public Queue dlxQueue(){
  45. return QueueBuilder.durable("my-dlx-queue").build();
  46. }
  47.  
  48.  
  49. //定义死信交换机,这里的死信交换机要和上面普通队列所绑定的交换机名称一致
  50. @Bean("dlxExchange")
  51. public Exchange dlxExchange(){
  52. return ExchangeBuilder.directExchange("my-dlx-exchange").build();
  53. }
  54.  
  55.  
  56. //绑定交换机与死信队列
  57. @Bean("dlxBinding")
  58. public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue){
  59. //这儿这个routingKey要和上面正常队列中所绑定的死信routingKey一致
  60. return BindingBuilder.bind(queue).to(exchange).with("routing-key-delay").noargs();
  61. }
  62.  
  63. }

死信队列消费者

  1. package com.fchan.mq.mqDelay;
  2.  
  3. import com.rabbitmq.client.Channel;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.amqp.core.Message;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.stereotype.Component;
  9.  
  10. import java.util.Map;
  11.  
  12. @Component
  13. public class MyRabbitConsume {
  14. Logger log = LoggerFactory.getLogger(MyRabbitConsume.class);
  15.  
  16. @RabbitListener(queues = {"my-dlx-queue"})
  17. public void receiveDeadMessage(Map<String,Object> map, Message message, Channel channel) throws Exception {
  18. log.info("收到死信信息:{}",map);
  19. log.info("然后进行一系列逻辑处理 Thanks?(?ω?)?");
  20. //手动确认消息
  21. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  22. }
  23.  
  24. }

发送消息测试

  1. /**
  2. * 发送消息到延时队列 delayQueue
  3. * 消息5秒后会被投递到死信队列
  4. * @return
  5. */
  6. @GetMapping("sendMsgToDelay")
  7. public String sendMsgToDelay(String msg){
  8. Map<String,Object> map = new HashMap<>();
  9. map.put("msg", msg);
  10. rabbitTemplate.convertAndSend("delayExchange","delayBindingRoutingKey", map, new MessagePostProcessor() {
  11. @Override
  12. public Message postProcessMessage(Message message) throws AmqpException {
  13. //设置消息过期时间,5秒过期
  14. message.getMessageProperties().setExpiration("5000");
  15. return message;
  16. }
  17. });
  18. return msg;
  19. }

测试成功

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持w3xue。

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号