经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » ASP.net » 查看文章
.NetCore 消息队列的使用
来源:cnblogs  作者:无天SoraX  时间:2021/5/24 11:00:00  对本文有异议

1 消息队列的优点

消息队列本质是生产者——消费者模式。也有很多使用方式。那么它有什么优点呢?
以日常生活中邮寄信件这个行为举例,
当只有1个寄信人,1个邮递员的时候。寄信人想要寄信,到指定地点(邮局),直接将信件交给邮递员即可。

当有50个寄信人的时候,1个邮递员的时候。这50个寄信人就要依次排队等待邮递员处理信件。
可以增加邮递员的数量,但是依然会有忙闲不均的问题存在。

我们现在增加一个邮筒(也就是数据缓冲区)

在这个例子中,寄信人就是生产者,邮递员是消费者。而邮筒就是一个消息队列。这个邮筒解决了以下问题:

1.1 解除耦合

实现了时间上解耦,也实现了对象间解耦。
之前邮递员隶属于A邮局,寄信人想要寄信,到指定地点,直接将信件交给邮递员即可。如果因为实际需求,以后由B邮局的快递员负责寄信业务。那么寄信人就要去另一个地点寄信。
这就是由于耦合产生的问题。
现在不管信件是由A邮局还是其他邮局负责,寄信人只管将信件投递进邮筒就行了。解除了寄信人和邮递员的耦合性。

1.2 实现异步处理

之前寄信将信件直接交给邮递员,可能要等待邮递员要确认很多信息(比如寄件人信息)之后,长辄几分钟,才能结束本次寄信的行为。
而现在将信件直接投递到邮箱里,只要不到1S,就能结束寄信的行为。

1.3 支持并发操作

解决同步处理的阻塞问题。
之前所有寄信人需要排队等待上一个人寄信完毕,才能开始寄信。
现在所有寄信人都把信件投递进邮筒即可。

1.4 实现流量削峰

可以根据邮递员方的处理能力,调节邮筒的容量。超过这个容量后,邮筒就放不下(拒绝)信件了。
即能根据下游的处理能力自由调节流量,实现削峰。

2 安装erlang和RabbitMQ

2.1 安装erlang

由于RabbitMQ是基于erlang开发的,需要先安装erlang。
确认自己要安装的RabbitMQ依赖的erlang的最低版本。

erlang:https://www.erlang.org/downloads
安装后添加环境变量。
在系统变量中添加:
变量名:ERLANG_HOME
变量值:C:\Program Files\erl-24.0(安装ERLANG的文件夹)
然后在用户变量的PATH中添加:%ERLANG_HOME%\bin
添加完环境变量之后可能需要重启。
然后打开CMD,运行erl,出现版本号为成功。

2.2 安装RabbitMQ

RabbitMQ:https://github.com/rabbitmq/rabbitmq-server/releases/
安装成功后会自动创建RabbitMQ服务并且启动
可以在任务管理器中确认:

2.3 安装RabbitMQ的Web管理插件

在命令行中CD到安装目录下,执行
rabbitmq-plugins.bat enable rabbitmq_management

成功后进入浏览器,输入:http://localhost:15672
初始账号和密码:guest/guest

3 理解消息队列中的基本概念

消息队列中有Exchange、Connection、Channel、Queue等概念

3.1 Exchange(交换机)

是生产者和消息队列的一个中介,负责将生产者的消息分发给消息队列。如果使用简单模式(只有一个生产者,一个消费者,一对一)时,不配置Exchange,实际上使用的是默认的Exchange。

3.2 Connection(连接)

是连接到MQ的TCP连接。为了方便理解,可以将Connection想象成一个光纤电缆。

3.3 Channel(通道)

一个Connection中存在多个Channel。可以把Channel理解为光纤电缆中的光纤。

3.4 Queue(消息队列)

一个Channel中可以存在多个Queue。

3.5 其他

因为建立和销毁 TCP 连接是非常昂贵的开销,所以一般维持Connection。在Connection之上,操作channel。
Channel的其中一个作用就是,屏蔽Connection的TCP层面的细节,方便开发,同时达到TCP连接复用的效果。

4 尝试消息队列的简单模式(一对一)

特点:一个生产者对应一个消费者。最简单的模式。
场景:一对一私聊。
新建一个解决方案,包含两个控制台程序,分别是生产者和消费者。
右键解决方案,设置多项目启动。

4.1 生产者代码

  1. /// <summary>
  2. /// 生产者
  3. /// </summary>
  4. internal class Program
  5. {
  6. private static void Main(string[] args)
  7. {
  8. //创建连接工厂
  9. ConnectionFactory factory = new ConnectionFactory
  10. {
  11. UserName = "guest",//用户名
  12. Password = "guest",//密码
  13. HostName = "localhost"//rabbitmq ip
  14. };
  15. //创建RabbitMQ的TCP长连接(可以比喻成一个光纤电缆)
  16. //因为建立和销毁 TCP 连接是非常昂贵的开销,所以一般维持连接(TCP连接复用)。在连接之上,建立和销毁channel。
  17. var connection = factory.CreateConnection();
  18. //创建通道(可以比喻成光纤电缆中的"一根"光纤)
  19. var channel = connection.CreateModel();
  20. /*声明一个队列:实现通道与队列的绑定
  21. * 5个参数:
  22. * queue:被绑定的消息队列名,当该消息队列不存在时,将新建该消息队列
  23. * durable:是否使用持久化
  24. * exclusive:该通道是否独占该队列
  25. * autoDelete:消费完成时是否删除队列, 该删除操作在消费者彻底断开连接之后进行。
  26. * args:其他配置参数
  27. */
  28. channel.QueueDeclare("hello", false, false, false, null);
  29. Console.WriteLine("\nRabbitMQ连接成功,生产者已启动,请输入消息,输入exit退出!");
  30. string input;
  31. do
  32. {
  33. input = Console.ReadLine();
  34. var sendBytes = Encoding.UTF8.GetBytes(input);
  35. //发布消息
  36. channel.BasicPublish("", "hello", null, sendBytes);
  37. }
  38. while (input.Trim().ToLower() != "exit");
  39. channel.Close();
  40. connection.Close();
  41. }
  42. }

4.2 消费者代码

  1. /// <summary>
  2. /// 消费者
  3. /// </summary>
  4. internal class Program
  5. {
  6. private static void Main(string[] args)
  7. {
  8. //创建连接工厂
  9. ConnectionFactory factory = new ConnectionFactory
  10. {
  11. UserName = "guest",//用户名
  12. Password = "guest",//密码
  13. HostName = "localhost"//rabbitmq ip
  14. };
  15. //创建连接
  16. var connection = factory.CreateConnection();
  17. //创建通道
  18. var channel = connection.CreateModel();
  19. //事件基本消费者
  20. EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
  21. //接收到消息事件
  22. consumer.Received += (ch, ea) =>
  23. {
  24. string message = Encoding.Default.GetString(ea.Body.ToArray());
  25. Console.WriteLine($"收到消息: {message}");
  26. //确认该消息已被消费
  27. channel.BasicAck(ea.DeliveryTag, false);
  28. };
  29. //启动消费者 设置为手动应答消息
  30. channel.BasicConsume("hello", false, consumer);
  31. Console.WriteLine("消费者已启动");
  32. Console.ReadKey();
  33. channel.Dispose();
  34. connection.Close();
  35. }
  36. }

4.3 测试

两个项目一起启动之后
在生产者对应的控制台输入文字后,添加到消息队列中,由消费者进行消费,显示在消费者控制台上。

参考文档:https://zhuanlan.zhihu.com/p/143521328

5 尝试消息队列的WORK模式

特点:争夺消息,能者多劳。每个消费者获得的消息具有唯一性。
场景:抢红包。抢单。

5.1 生产者的代码

为了代码逻辑清晰,将各种模式的代码从Main函数中提出来单独封装成函数。Main函数中使用Switch来方便之后的测试。

  1. /// <summary>
  2. /// 生产者
  3. /// </summary>
  4. internal static class Program
  5. {
  6. private static void Main(string[] args)
  7. {
  8. //选择的模式类型
  9. string ModeNumber = "2";
  10. switch (ModeNumber)
  11. {
  12. case "1":
  13. SignalMode();
  14. break;
  15. case "2":
  16. WorkMode();
  17. break;
  18. default:
  19. break;
  20. }
  21. }
  22. /// <summary>
  23. /// 简单模式
  24. /// </summary>
  25. private static void SignalMode()
  26. {
  27. //创建连接工厂
  28. ConnectionFactory factory = new ConnectionFactory
  29. {
  30. UserName = "guest",//用户名
  31. Password = "guest",//密码
  32. HostName = "localhost"//rabbitmq ip
  33. };
  34. //创建RabbitMQ的TCP长连接(可以比喻成一个光纤电缆)
  35. //因为建立和销毁 TCP 连接是非常昂贵的开销,所以一般维持连接(TCP连接复用)。在连接之上,建立和销毁channel。
  36. var connection = factory.CreateConnection();
  37. //创建通道(可以比喻成光纤电缆中的"一根"光纤)
  38. var channel = connection.CreateModel();
  39. /*声明一个队列:实现通道与队列的绑定
  40. * 5个参数:
  41. * queue:被绑定的消息队列名,当该消息队列不存在时,将新建该消息队列
  42. * durable:是否使用持久化
  43. * exclusive:该通道是否独占该队列
  44. * autoDelete:消费完成时是否删除队列, 该删除操作在消费者彻底断开连接之后进行。
  45. * args:其他配置参数
  46. */
  47. channel.QueueDeclare("队列A", false, false, false, null);
  48. Console.WriteLine("\nRabbitMQ连接成功,生产者已启动,请输入消息,输入exit退出!");
  49. string input;
  50. do
  51. {
  52. input = Console.ReadLine();
  53. var sendBytes = Encoding.UTF8.GetBytes(input);
  54. //发布消息
  55. channel.BasicPublish("", "hello", null, sendBytes);
  56. }
  57. while (input.Trim().ToLower() != "exit");
  58. channel.Close();
  59. connection.Close();
  60. }
  61. /// <summary>
  62. /// Work模式
  63. /// </summary>
  64. private static void WorkMode()
  65. {
  66. //创建连接工厂
  67. ConnectionFactory factory = new ConnectionFactory
  68. {
  69. UserName = "guest",//用户名
  70. Password = "guest",//密码
  71. HostName = "localhost"//rabbitmq ip
  72. };
  73. var connection = factory.CreateConnection();
  74. var channel = connection.CreateModel();
  75. channel.QueueDeclare("队列A", false, false, false, null);
  76. for (int i = 0; i < 50; i++)
  77. {
  78. String message = "" + i;
  79. var sendBytes = Encoding.UTF8.GetBytes(message);
  80. channel.BasicPublish("", "队列A", null, sendBytes);
  81. Console.WriteLine(" [x] Sent '" + message + "'");
  82. Thread.Sleep(i * 10);
  83. }
  84. channel.Close();
  85. connection.Close();
  86. }
  87. }

5.2 消费者的代码

为了模拟多个消费者争夺消息,将之前的消费者项目重命名为"RabbitMQ_Consumer01",并新建项目"RabbitMQ_Consumer02"。在work模式中,消费者01和消费者02的代码是相同的。
并将生产者、消费者01、消费者02同时设为启动项(由于消费者代码相同,只贴一个)。

  1. /// <summary>
  2. /// 消费者01
  3. /// </summary>
  4. internal static class Program
  5. {
  6. private static void Main(string[] args)
  7. {
  8. //选择的模式类型
  9. string ModeNumber = "2";
  10. switch (ModeNumber)
  11. {
  12. case "1":
  13. SignalMode();
  14. break;
  15. case "2":
  16. WorkMode();
  17. break;
  18. default:
  19. break;
  20. }
  21. }
  22. /// <summary>
  23. /// 简单模式
  24. /// </summary>
  25. private static void SignalMode()
  26. {
  27. //创建连接工厂
  28. ConnectionFactory factory = new ConnectionFactory
  29. {
  30. UserName = "guest",//用户名
  31. Password = "guest",//密码
  32. HostName = "localhost"//rabbitmq ip
  33. };
  34. //创建连接
  35. var connection = factory.CreateConnection();
  36. //创建通道
  37. var channel = connection.CreateModel();
  38. //事件基本消费者
  39. EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
  40. //接收到消息事件
  41. consumer.Received += (model, ea) =>
  42. {
  43. string message = Encoding.Default.GetString(ea.Body.ToArray());
  44. Console.WriteLine($@"收到消息: {message}");
  45. //确认该消息已被消费
  46. channel.BasicAck(ea.DeliveryTag, false);
  47. };
  48. //启动消费者 设置为手动应答消息
  49. channel.BasicConsume("队列A", false, consumer);
  50. Console.WriteLine($@"消费者已启动");
  51. Console.ReadKey();
  52. channel.Dispose();
  53. connection.Close();
  54. }
  55. /// <summary>
  56. /// Work模式
  57. /// </summary>
  58. private static void WorkMode()
  59. {
  60. ConnectionFactory factory = new ConnectionFactory
  61. {
  62. UserName = "guest",//用户名
  63. Password = "guest",//密码
  64. HostName = "localhost"//rabbitmq ip
  65. };
  66. var connection = factory.CreateConnection();
  67. var channel = connection.CreateModel();
  68. channel.QueueDeclare("队列A", false, false, false, null);
  69. /** 设置限流机制
  70. * param1: prefetchSize,消息本身的大小 如果设置为0 那么表示对消息本身的大小不限制
  71. * param2: prefetchCount,告诉rabbitmq,不要一次性给消费者推送大于N个消息(一旦有N个消息没有Ack,此消费者不再获取消息,直到有消息Ack为止)
  72. * param3:global,是否将上面的设置应用于整个通道,false表示只应用于当前消费者
  73. */
  74. channel.BasicQos(0, 1, false);
  75. // 定义队列的消费者
  76. EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
  77. //启动消费者
  78. //false为手动应答,true为自动应答
  79. channel.BasicConsume("队列A", false, consumer);
  80. consumer.Received += (model, ea) =>
  81. {
  82. string message = Encoding.Default.GetString(ea.Body.ToArray());
  83. Console.WriteLine($@"收到消息: {message}");
  84. //确认该消息已被消费,手动返回完成
  85. channel.BasicAck(ea.DeliveryTag, false);
  86. };
  87. Console.WriteLine($@"消费者01已启动");
  88. Console.ReadKey();
  89. channel.Dispose();
  90. connection.Close();
  91. }
  92. }

5.3 测试处理能力相同的情况

启动项目后,可以看到消费者向队列推送消息1-50。而消费者01和消费者02对队列中的消息进行争抢,并且获取的消息具有唯一性。
可以看到由于消费者01、02的处理能力相同,争抢消息的数量也是平均的。

5.3 测试处理能力不相同的情况

那如何体现“能者多劳”这个特点呢。
在消费者01获取消息后,通过Thread.Sleep(1000);模拟消费者01的对消息的处理速度比较慢

可以看到,由于消费者01的处理速度慢,争抢到的消息也比较少

6 消费者端的限流配置

6.1 配置限流参数

消费者中的这句代码就是在配置限流参数:

channel.BasicQos(0, 1, false);

param1: prefetchSize,消息本身的大小 如果设置为0 那么表示对消息本身的大小不限制
param2: prefetchCount,告诉rabbitmq,不要一次性给消费者推送大于N个消息(简单点说就是:一旦有N个消息没有Ack,此消费者不再获取消息,直到有消息Ack为止)
param3: global,是否将上面的设置应用于整个通道,false表示只应用于当前消费者

6.2 测试限流功能

我们将消费者02的限流数量设置为1,同时注释掉手动ACK的语句。

这样消费者02获取消息后,由于不会进行ACK操作,会导致消费者02的阻塞。我们设置的限流数量是1,所以消费者02由始至终只会获得一条消息。

7 RabbitMQ的确认机制(ACK)

RabbitMQ的确认方式分为自动ACK和手动ACK。
队列接收到ACK后,才会对对应的消息进行删除操作。

7.1 自动ACK和手动ACK的区别

自动ACK:消费者获取到消息后,会自动进行ACK操作。
手动ACK:可以自定义调用ACK操作的位置。

选择自动ACK,如果消费者处理时出现问题,或者中途退出没有处理。但队列已经接收到自动ACK把消息删除了,可能导致对消息处理出错。
选择手动ACK,可以将ACK的时机放在消费者正确将消息处理完毕之后。如果消费者中途退出,消息会由另一个消费者获取到进行操作。

原文链接:http://www.cnblogs.com/soraxtube/p/14789116.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号