经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » RabbitMQ » 查看文章
RabbitMQ保姆级教程最佳实践
来源:cnblogs  作者:佛祖让我来巡山  时间:2023/9/25 16:50:21  对本文有异议

一、消息队列介绍

1、消息队列概念

1、MQ全称为Message Queue,消息队列(MQ)是?种应?程序对应?程序的通信?法。
应?程序通过读写出?队列的消息(针对应?程序的数据)来通信,??需专?连接来
链接它们。
2、消息传递指的是程序之间通过在消息中发送数据进?通信,?不是通过直接调?彼此来
通信,直接调?通常是?于诸如远程过程调?的技术。

2、常?的消息队列产品

1、RabbitMQ 稳定可靠,数据?致,?持多协议,有消息确认,基于erlang语?
2、Kafka ?吞吐,?性能,快速持久化,?消息确认,?消息遗漏,可能会有有重复消息,依赖于zookeeper,成本?.
3、ActiveMQ 不够灵活轻巧,对队列较多情况?持不好.
4、RocketMQ 性能好,?吞吐,?可?性,?持?规模分布式,协议?持单?

?、RabbitMQ

1、RabbitMQ介绍

1、RabbitMQ是?个在AMQP基础上完成的,可复?的企业消息系统。他遵循MozillaPublic License开源协议。
2、AMQP,即Advanced Message Queuing Protocol, ?个提供统?消息服务的应?层标准
     ?级消息队列协议,是应?层协议的?个开放标准,为?向消息的中间件设计。基于此协议
     的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语?等
     条件的限制。Erlang中的实现有 RabbitMQ等。
3、主要特性:
  • 保证可靠性 :使??些机制来保证可靠性,如持久化、传输确认、发布确认
  • 灵活的路由功能
  • 可伸缩性:?持消息集群,多台RabbitMQ服务器可以组成?个集群
  • ?可?性 :RabbitMQ集群中的某个节点出现问题时队列仍然可?
  • ?持多种协议
  • ?持多语?客户端
  • 提供良好的管理界?
  • 提供跟踪机制:如果消息出现异常,可以通过跟踪机制分析异常原因
  • 提供插件机制:可通过插件进?多??扩展

2、RabbitMQ安装和配置

具体参考:https://note.youdao.com/s/MKn2Jr8c

3、RabbitMQ逻辑结构

三、RabbitMQ?户管理

RabbitMQ默认提供了?个guests账号,但是此账号不能?作远程登录,也就是不能在管理系统的登录;我们可以创建?个新的账号并授予响应的管理权限来实现远程登录

1、逻辑结构

?户
虚拟主机
队列

2、?户管理

2.1、命令??户管理

1、在linux中使?命令?创建?户

  1. ## 进?到rabbit_mq的sbin?录
  2. cd /usr/local/rabbitmq_server-3.7.0/sbin
  3. ## 新增?户
  4. ./rabbitmqctl add_user ytao admin123

2、设置?户级别

  1. ## ?户级别:
  2. ## 1.administrator 可以登录控制台、查看所有信息、可以对RabbitMQ进?管理
  3. ## 2.monitoring 监控者 登录控制台、查看所有信息
  4. ## 3.policymaker 策略制定者 登录控制台、指定策略
  5. ## 4.managment 普通管理员 登录控制台
  6. ./rabbitmqctl set_user_tags ytao administrator

2.2、管理系统进??户管理

管理系统登录:访问http://localhost:15672/

四、RabbitMQ?作?式

RabbitMQ提供了多种消息的通信?式—?作模式  https://www.rabbitmq.com/getstarted.html
消息通信是由两个??完成:消息?产者(producer)和 消息消费者(Consumer)

1、简单模式

?个队列只有?个消费者

2、?作模式

多个消费者监听同?个队列

3、订阅模式

?个交换机绑定多个消息队列,每个消息队列有?个消费者监听

4、路由模式

?个交换机绑定多个消息队列,每个消息队列都由??唯?的key,每个消息队列有?个消费者监听

五、RabbitMQ交换机和队列管理

1、创建队列

2、创建交换机

3、交换机绑定队列

六、在普通的Maven应?中使?MQ

1、简单模式

1.1、消息?产者

1、创建Maven项?

2、添加RabbitMQ连接所需要的依赖

  1. <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
  2. <dependency>
  3. <groupId>com.rabbitmq</groupId>
  4. <artifactId>amqp-client</artifactId>
  5. <version>4.10.0</version>
  6. </dependency>
  7. <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
  8. <dependency>
  9. <groupId>org.slf4j</groupId>
  10. <artifactId>slf4j-log4j12</artifactId>
  11. <version>1.7.25</version>
  12. <scope>test</scope>
  13. </dependency>
  14. <!-- https://mvnrepository.com/artifact/org.apache.commons/commonslang3 -->
  15. <dependency>
  16. <groupId>org.apache.commons</groupId>
  17. <artifactId>commons-lang3</artifactId>
  18. <version>3.9</version>
  19. </dependency>

3、在resources?录下创建log4j.properties

  1. log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao = DEBUG
  2. log4j.logger.org.mybatis = DEBUG
  3. log4j.appender.A1=org.apache.log4j.ConsoleAppender
  4. log4j.appender.A1.layout=org.apache.log4j.PatternLayout
  5. log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-[%p] %m%n

4、创建MQ连接工具类

  1. import com.rabbitmq.client.Connection;
  2. import com.rabbitmq.client.ConnectionFactory;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class ConnectionUtil {
  6. public static Connection getConnection() throws IOException,
  7. TimeoutException {
  8. //1.创建连接??
  9. ConnectionFactory factory = new ConnectionFactory();
  10. //2.在??对象中设置MQ的连接信息
  11. (ip,port,virtualhost,username,password)
  12. factory.setHost("47.96.11.185");
  13. factory.setPort(5672);
  14. factory.setVirtualHost("host1");
  15. factory.setUsername("ytao");
  16. factory.setPassword("admin123");
  17. //3.通过??对象获取与MQ的链接
  18. Connection connection = factory.newConnection();
  19. return connection;
  20. }
  21. }

5、消息?产者发送消息

  1. import com.qfedu.mq.utils.ConnectionUtil;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. public class SendMsg {
  5. public static void main(String[] args) throws Exception{
  6. String msg = "Hello HuangDaoJun!";
  7. Connection connection = ConnectionUtil.getConnection();
  8. Channel channel = connection.createChannel();
  9. //定义队列(使?Java代码在MQ中新建?个队列)
  10. //参数1:定义的队列名称
  11. //参数2:队列中的数据是否持久化(如果选择了持久化)
  12. //参数3: 是否排外(当前队列是否为当前连接私有)
  13. //参数4:?动删除(当此队列的连接数为0时,此队列会销毁(?论队列中是否还有数据))
  14. //参数5:设置当前队列的参数
  15. //channel.queueDeclare("queue7",false,false,false,null);
  16. //参数1:交换机名称,如果直接发送信息到队列,则交换机名称为""
  17. //参数2:?标队列名称
  18. //参数3:设置当前这条消息的属性(设置过期时间 10)
  19. //参数4:消息的内容
  20. channel.basicPublish("","queue1",null,msg.getBytes());
  21. System.out.println("发送:" + msg);
  22. channel.close();
  23. connection.close();
  24. }
  25. }

1.2、消息消费者

1、创建Maven项?
2、添加依赖
3、log4j.properties
4、ConnetionUtil.java
5、消费者消费消息
  1. import com.qfedu.mq.utils.ConnectionUtil;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class ReceiveMsg {
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. Connection connection = ConnectionUtil.getConnection();
  8. Channel channel = connection.createChannel();
  9. Consumer consumer = new DefaultConsumer(channel){
  10. @Override
  11. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  12. //body就是从队列中获取的数据
  13. String msg = new String(body);
  14. System.out.println("接收:"+msg);
  15. }
  16. };
  17. channel.basicConsume("queue1",true,consumer);
  18. }
  19. }

2、?作模式

?个发送者多个消费者

2.1、发送者

  1. public class SendMsg {
  2. public static void main(String[] args) throws Exception{
  3. System.out.println("请输?消息:");
  4. Scanner scanner = new Scanner(System.in);
  5. String msg = null;
  6. while(!"quit".equals(msg = scanner.nextLine())){
  7. Connection connection = ConnectionUtil.getConnection();
  8. Channel channel = connection.createChannel();
  9. channel.basicPublish("","queue2",null,msg.getBytes());
  10. System.out.println("发送:" + msg);
  11. channel.close();
  12. connection.close();
  13. }
  14. }
  15. }

2.2、消费者1

  1. public class ReceiveMsg {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. Consumer consumer = new DefaultConsumer(channel){
  6. @Override
  7. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  8. //body就是从队列中获取的数据
  9. String msg = new String(body);
  10. System.out.println("Consumer1接收:"+msg);
  11. if("wait".equals(msg)){
  12. try {
  13. Thread.sleep(10000);
  14. }
  15. catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }
  20. };
  21. channel.basicConsume("queue2",true,consumer);
  22. }
  23. }

2.3、消费者2

  1. public class ReceiveMsg {
  2. public static void main(String[] args) throws IOException,
  3. TimeoutException {
  4. Connection connection = ConnectionUtil.getConnection();
  5. Channel channel = connection.createChannel();
  6. Consumer consumer = new DefaultConsumer(channel){
  7. @Override
  8. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  9. //body就是从队列中获取的数据
  10. String msg = new String(body);
  11. System.out.println("Consumer2接收:"+msg);
  12. }
  13. };
  14. channel.basicConsume("queue2",true,consumer);
  15. }
  16. }

3、订阅模式

1、发送者 发送消息到交换机

  1. public class SendMsg {
  2. public static void main(String[] args) throws Exception{
  3. System.out.println("请输?消息:");
  4. Scanner scanner = new Scanner(System.in);
  5. String msg = null;
  6. while(!"quit".equals(msg = scanner.nextLine())){
  7. Connection connection = ConnectionUtil.getConnection();
  8. Channel channel = connection.createChannel();
  9. channel.basicPublish("ex1","",null,msg.getBytes());
  10. System.out.println("发送:" + msg);
  11. channel.close();
  12. connection.close();
  13. }
  14. }
  15. }

2、消费者1

  1. public class ReceiveMsg1 {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. Consumer consumer = new DefaultConsumer(channel){
  6. @Override
  7. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  8. //body就是从队列中获取的数据
  9. String msg = new String(body);
  10. System.out.println("Consumer1接收:"+msg);
  11. if("wait".equals(msg)){
  12. try {
  13. Thread.sleep(10000);
  14. }
  15. catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }
  20. };
  21. channel.basicConsume("queue3",true,consumer);
  22. }
  23. }

3、消费者2

  1. public class ReceiveMsg2 {
  2. public static void main(String[] args) throws IOException,
  3. TimeoutException {
  4. Connection connection = ConnectionUtil.getConnection();
  5. Channel channel = connection.createChannel();
  6. Consumer consumer = new DefaultConsumer(channel){
  7. @Override
  8. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  9. //body就是从队列中获取的数据
  10. String msg = new String(body);
  11. System.out.println("Consumer2接收:"+msg);
  12. }
  13. }
  14. ;
  15. channel.basicConsume("queue4",true,consumer);
  16. }
  17. }

4、路由模式

1、发送者 发送消息到交换机

  1. public class SendMsg {
  2. public static void main(String[] args) throws Exception{
  3. System.out.println("请输?消息:");
  4. Scanner scanner = new Scanner(System.in);
  5. String msg = null;
  6. while(!"quit".equals(msg = scanner.nextLine())){
  7. Connection connection = ConnectionUtil.getConnection();
  8. Channel channel = connection.createChannel();
  9. if(msg.startsWith("a")){
  10. channel.basicPublish("ex2","a",null,msg.getBytes());
  11. } else if(msg.startsWith("b")){
  12. channel.basicPublish("ex2","b",null,msg.getBytes());
  13. }
  14. System.out.println("发送:" + msg);
  15. channel.close();
  16. connection.close();
  17. }
  18. }
  19. }

2、消费者1

  1. public class ReceiveMsg1 {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. Consumer consumer = new DefaultConsumer(channel){
  6. @Override
  7. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  8. //body就是从队列中获取的数据
  9. String msg = new String(body);
  10. System.out.println("Consumer1接收:"+msg);
  11. if("wait".equals(msg)){
  12. try {
  13. Thread.sleep(10000);
  14. }
  15. catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }
  20. };
  21. channel.basicConsume("queue5",true,consumer);
  22. }
  23. }

3、消费者2

  1. public class ReceiveMsg2 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. Consumer consumer = new DefaultConsumer(channel){
  6. @Override
  7. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  8. //body就是从队列中获取的数据
  9. String msg = new String(body);
  10. System.out.println("Consumer2接收:"+msg);
  11. }
  12. };
  13. channel.basicConsume("queue6",true,consumer);
  14. }
  15. }

七、在SpringBoot应?中使?MQ

SpringBoot应?可以完成?动配置及依赖注?——可以通过Spring直接提供与MQ的连接对象

1、消息?产者

1、创建SpringBoot应?,添加依赖

2、配置application.yml

  1. server:
  2. port: 9001
  3. spring:
  4. application:
  5. name: producer
  6. rabbitmq:
  7. host: 47.96.11.185
  8. port: 5672
  9. virtual-host: host1
  10. username: ytao
  11. password: admin123

3、发送消息

  1. @Service
  2. public class TestService {
  3. @Resource
  4. private AmqpTemplate amqpTemplate;
  5. public void sendMsg(String msg){
  6. //1. 发送消息到队列
  7. amqpTemplate.convertAndSend("queue1",msg);
  8. //2. 发送消息到交换机(订阅交换机)
  9. amqpTemplate.convertAndSend("ex1","",msg);
  10. //3. 发送消息到交换机(路由交换机)
  11. amqpTemplate.convertAndSend("ex2","a",msg);
  12. }
  13. }

2、消息消费者

1、创建项?添加依赖
2、配置yml
3、接收消息
  1. @Service
  2. //@RabbitListener(queues = {"queue1","queue2"})
  3. @RabbitListener(queues = "queue1")
  4. public class ReceiveMsgService {
  5. @RabbitHandler
  6. public void receiveMsg(String msg){
  7. System.out.println("接收MSG:"+msg);
  8. }
  9. }

?、使?RabbitMQ传递对象

RabbitMQ是消息队列,发送和接收的都是字符串/字节数组类型的消息

1、使?序列化对象

要求:
传递的对象实现序列化接?
传递的对象的包名、类名、属性名必须?致

 

1、消息提供者

  1. @Service
  2. public class MQService {
  3. @Resource
  4. private AmqpTemplate amqpTemplate;
  5. public void sendGoodsToMq(Goods goods){
  6. //消息队列可以发送 字符串、字节数组、序列化对象
  7. amqpTemplate.convertAndSend("","queue1",goods);
  8. }
  9. }

2、消息消费者

  1. @Component
  2. @RabbitListener(queues = "queue1")
  3. public class ReceiveService {
  4. @RabbitHandler
  5. public void receiveMsg(Goods goods){
  6. System.out.println("Goods---"+goods);
  7. }
  8. }

2、使?序列化字节数组

要求:
  传递的对象实现序列化接?
  传递的对象的包名、类名、属性名必须?致

 

1、消息提供者

  1. @Service
  2. public class MQService {
  3. @Resource
  4. private AmqpTemplate amqpTemplate;
  5. public void sendGoodsToMq(Goods goods){
  6. //消息队列可以发送 字符串、字节数组、序列化对象
  7. byte[] bytes = SerializationUtils.serialize(goods);
  8. amqpTemplate.convertAndSend("","queue1",bytes);
  9. }
  10. }

2、消息消费者

  1. @Component
  2. @RabbitListener(queues = "queue1")
  3. public class ReceiveService {
  4. @RabbitHandler
  5. public void receiveMsg(byte[] bs){
  6. Goods goods = (Goods) SerializationUtils.deserialize(bs);
  7. System.out.println("byte[]---"+goods);
  8. }
  9. }

3、使?JSON字符串传递

要求:对象的属性名?直

 

1、消息提供者

  1. @Service
  2. public class MQService {
  3. @Resource
  4. private AmqpTemplate amqpTemplate;
  5. public void sendGoodsToMq(Goods goods) throws JsonProcessingException {
  6. //消息队列可以发送 字符串、字节数组、序列化对象
  7. ObjectMapper objectMapper = new ObjectMapper();
  8. String msg = objectMapper.writeValueAsString(goods);
  9. amqpTemplate.convertAndSend("","queue1",msg);
  10. }
  11. }

2、消息消费者

  1. @Component
  2. @RabbitListener(queues = "queue1")
  3. public class ReceiveService {
  4. @RabbitHandler
  5. public void receiveMsg(String msg) throws JsonProcessingException {
  6. ObjectMapper objectMapper = new ObjectMapper();
  7. Goods goods = objectMapper.readValue(msg,Goods.class);
  8. System.out.println("String---"+msg);
  9. }
  10. }

九、基于Java的交换机与队列创建

我们使?消息队列,消息队列和交换机可以通过管理系统完成创建,也可以在应?程序中通过Java代码来完成创建

1、普通Maven项?交换机及队列创建

1、使?Java代码新建队列

  1. //1.定义队列 (使?Java代码在MQ中新建?个队列)
  2. //参数1:定义的队列名称
  3. //参数2:队列中的数据是否持久化(如果选择了持久化)
  4. //参数3: 是否排外(当前队列是否为当前连接私有)
  5. //参数4:?动删除(当此队列的连接数为0时,此队列会销毁(?论队列中是否还有数据))
  6. //参数5:设置当前队列的参数
  7. channel.queueDeclare("queue7",false,false,false,null);

2、新建交换机

  1. //定义?个“订阅交换机”
  2. channel.exchangeDeclare("ex3", BuiltinExchangeType.FANOUT);
  3. //定义?个“路由交换机”
  4. channel.exchangeDeclare("ex4", BuiltinExchangeType.DIRECT);

3、绑定队列到交换机

  1. //绑定队列
  2. //参数1:队列名称
  3. //参数2:?标交换机
  4. //参数3:如果绑定订阅交换机参数为"",如果绑定路由交换机则表示设置队列的key
  5. channel.queueBind("queue7","ex4","k1");
  6. channel.queueBind("queue8","ex4","k2");

2、SpringBoot应?中通过配置完成队列的创建

  1. @Configuration
  2. public class RabbitMQConfiguration {
  3. //声明队列
  4. @Bean
  5. public Queue queue9(){
  6. Queue queue9 = new Queue("queue9");
  7. //设置队列属性
  8. return queue9;
  9. }
  10. @Bean
  11. public Queue queue10(){
  12. Queue queue10 = new Queue("queue10");
  13. //设置队列属性
  14. return queue10;
  15. }
  16. //声明订阅模式交换机
  17. @Bean
  18. public FanoutExchange ex5(){
  19. return new FanoutExchange("ex5");
  20. }
  21. //声明路由模式交换机
  22. @Bean
  23. public DirectExchange ex6(){
  24. return new DirectExchange("ex6");
  25. }
  26. //绑定队列
  27. @Bean
  28. public Binding bindingQueue9(Queue queue9, DirectExchange ex6){
  29. return BindingBuilder.bind(queue9).to(ex6).with("k1");
  30. }
  31. @Bean
  32. public Binding bindingQueue10(Queue queue10, DirectExchange ex6){
  33. return BindingBuilder.bind(queue10).to(ex6).with("k2");
  34. }
  35. }

?、消息的可靠性

消息的可靠性:从 ?产者发送消息 —— 消息队列存储消息 —— 消费者消费消息 的整个过程中消息的安全性及可控性。
  • ?产者
  • 消息队列
  • 消费者

1、RabbitMQ事务

RabbitMQ事务指的是基于客户端实现的事务管理,当在消息发送过程中添加了事务,处理效率降低??倍甚?上百倍 
  1. Connection connection = RabbitMQUtil.getConnection(); //connection 表示与 host1的连接
  2. Channel channel = connection.createChannel();
  3. channel.txSelect();//开启事务
  4. try{
  5. channel.basicPublish("ex4", "k1", null, msg.getBytes());
  6. channel.txCommit();//提交事务
  7. }
  8. catch (Exception e){
  9. channel.txRollback();//事务回滚
  10. }
  11. finally{
  12. channel.close();
  13. connection.close();
  14. }

2、RabbitMQ消息确认和return机制

1、消息确认机制:确认消息提供者是否成功发送消息到交换机
2、return机制:确认消息是否成功的从交换机分发到队列

2.1、普通Maven项?的消息确认

1、普通confirm?式

  1. //1.发送消息之前开启消息确认
  2. channel.confirmSelect();
  3. channel.basicPublish("ex1", "a", null, msg.getBytes());
  4. //2.接收消息确认
  5. Boolean b = channel.waitForConfirms();
  6. System.out.println("发送:" +(b?"成功":"失败"));

2、批量confirm?式

  1. //1.发送消息之前开启消息确认
  2. channel.confirmSelect();
  3. //2.批量发送消息
  4. for (int i=0 ; i<10 ; i++){
  5. channel.basicPublish("ex1", "a", null, msg.getBytes());
  6. }
  7. //3.接收批量消息确认:发送的所有消息中,如果有?条是失败的,则所有消息发送直接失败,抛出IO异常
  8. Boolean b = channel.waitForConfirms();

3、异步confirm?式

  1. //发送消息之前开启消息确认
  2. channel.confirmSelect();
  3. //批量发送消息
  4. for (int i=0 ; i<10 ; i++){
  5. channel.basicPublish("ex1", "a", null, msg.getBytes());
  6. }
  7. //假如发送消息需要10s,waitForConfirms会进?阻塞状态
  8. //boolean b = channel.waitForConfirms();
  9. //使?监听器异步confirm
  10. channel.addConfirmListener(new ConfirmListener() {
  11. //参数1: long l 返回消息的表示
  12. //参数2: boolean b 是否为批量confirm
  13. public void handleAck(long l, Boolean b) throws IOException {
  14. System.out.println("~~~~~消息成功发送到交换机");
  15. }
  16. public void handleNack(long l, Boolean b) throws IOException {
  17. System.out.println("~~~~~消息发送到交换机失败");
  18. }
  19. }
  20. );

2.2、普通Maven项?的return机制

1、添加return监听器
2、发送消息是指定第三个参数为true
3、由于监听器监听是异步处理,所以在消息发送之后不能关闭channel
  1. String msg = "Hello HuangDaoJun!";
  2. Connection connection = ConnectionUtil.getConnection();
  3. //相当于JDBC操作的数据库连接
  4. Channel channel = connection.createChannel();
  5. //相当于JDBC操作的statement
  6. //return机制:监控交换机是否将消息分发到队列
  7. channel.addReturnListener(new ReturnListener() {
  8. public void handleReturn(int i, String s, String s1, String s2,AMQP.BasicProperties basicProperties,byte[] bytes) throws IOException {
  9. //如果交换机分发消息到队列失败,则会执?此?法(?来处理交换机分发消息到队列失败的情况)
  10. System.out.println("*****"+i);//标识
  11. System.out.println("*****"+s);//
  12. System.out.println("*****"+s1);//交换机名
  13. System.out.println("*****"+s2);//交换机对应的队列的key
  14. System.out.println("*****"+new String(bytes));//发送的消息
  15. }
  16. }
  17. );
  18. //发送消息
  19. //channel.basicPublish("ex2", "c", null, msg.getBytes());
  20. channel.basicPublish("ex2", "c", true, null, msg.getBytes());

2.3、在SpringBoot应?实现消息确认与return监听

1、配置application.yml,开启消息确认和return监听

  1. spring:
  2. rabbitmq:
  3. publisher-confirm-type: simple ## 开启消息确认模式
  4. publisher-returns: true ##使?return监听机制

2、创建confirm和return监听

2.1、消息确认

  1. @Component
  2. public class MyConfirmListener implements
  3. RabbitTemplate.ConfirmCallback {
  4. @Autowired
  5. private AmqpTemplate amqpTemplate;
  6. @Autowired
  7. private RabbitTemplate rabbitTemplate;
  8. @PostConstruct
  9. public void init(){
  10. rabbitTemplate.setConfirmCallback(this);
  11. }
  12. @Override
  13. public void confirm(CorrelationData correlationData, Boolean b, String s) {
  14. //参数b 表示消息确认结果
  15. //参数s 表示发送的消息
  16. if(b){
  17. System.out.println("消息发送到交换机成功!");
  18. } else{
  19. System.out.println("消息发送到交换机失败!");
  20. amqpTemplate.convertAndSend("ex4","",s);
  21. }
  22. }
  23. }

2.2、return机制

  1. @Component
  2. public class MyReturnListener implements RabbitTemplate.ReturnsCallback
  3. {
  4. @Autowired
  5. private AmqpTemplate amqpTemplate;
  6. @Autowired
  7. private RabbitTemplate rabbitTemplate;
  8. @PostConstruct
  9. public void init(){
  10. rabbitTemplate.setReturnsCallback(this);
  11. }
  12. @Override
  13. public void returnedMessage(ReturnedMessage returnedMessage) {
  14. System.out.println("消息从交换机分发到队列失败");
  15. String exchange = returnedMessage.getExchange();
  16. String routingKey = returnedMessage.getRoutingKey();
  17. String msg = returnedMessage.getMessage().toString();
  18. amqpTemplate.convertAndSend(exchange,routingKey,msg);
  19. }
  20. }

3、RabbitMQ消费者?动应答 

  1. @Component
  2. @RabbitListener(queues="queue01")
  3. public class Consumer1 {
  4. @RabbitHandler
  5. public void process(String msg,Channel channel, Message message) throws IOException {
  6. try {
  7. System.out.println("get msg1 success msg = "+msg);
  8. /**
  9.         * 确认?条消息:<br>
  10.          * channel.basicAck(deliveryTag, false); <br>
  11.          * deliveryTag:该消息的index <br>
  12.         * multiple:是否批量.true:将?次性ack所有?于deliveryTag的消息 <br>
  13.       */
  14. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  15. } catch (Exception e) {
  16. //消费者处理出了问题,需要告诉队列信息消费失败
  17. /**
  18.         * 拒绝确认消息:<br>
  19.         * channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ; <br>
  20.         * deliveryTag:该消息的index<br>
  21.         * multiple:是否批量.true:将?次性拒绝所有?于deliveryTag的消息。<br>
  22.         * requeue:被拒绝的是否重新?队列 <br>
  23.       */
  24. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  25. System.err.println("get msg1 failed msg = "+msg);
  26. }
  27. }
  28. }

4、消息消费的幂等性问题

消息消费的幂等性——多次消费的执?结果时相同的 (避免重复消费)
解决?案:处理成功的消息setnx到redis

??、延迟机制

1、延迟队列

1、延迟队列——消息进?到队列之后,延迟指定的时间才能被消费者消费
2、AMQP协议和RabbitMQ队列本身是不?持延迟队列功能的,但是可以通过TTL(Time To Live)特性模拟延迟队列的功能
3、TTL就是消息的存活时间。RabbitMQ可以分别对队列和消息设置存活时间

1、在创建队列的时候可以设置队列的存活时间,当消息进?到队列并且在存活时间内没有消费者消费,则此消息就会从当前队列被移除;
2、创建消息队列没有设置TTL,但是消息设置了TTL,那么当消息的存活时间结束,也会被移除;
3、当TTL结束之后,我们可以指定将当前队列的消息转存到其他指定的队列

2、使?延迟队列实现订单?付监控

1、实现流程图

 2、创建交换机和队列

??、消息队列作?/使?场景总结

1、解耦

场景说明:?户下单之后,订单系统要通知库存系统

2、异步

场景说明:?户注册成功之后,需要发送注册邮件及注册短信提醒

3、消息通信

场景说明:应?系统之间的通信,例如聊天室

4、流量削峰

场景说明:秒杀业务

5、?志处理

场景说明:系统中?量的?志处理

 

原文链接:https://www.cnblogs.com/sun-10387834/p/17724914.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号