经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » RabbitMQ » 查看文章
RabbitMQ 工作模式介绍
来源:cnblogs  作者:TaylorSWMM  时间:2023/5/30 9:43:20  对本文有异议

RabbitMQ 工作模式介绍

1.Hello World

RabbitMQ 是一个消息代理:它接受并转发消息。您可以将其视为邮局:当您将要邮寄的邮件放入邮箱时,您可以确定信使最终会将邮件交付给您的收件人。在这个类比中,RabbitMQ是一个邮政信箱,一个邮局和一个信件载体。

RabbitMQ 和邮局之间的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据 - 消息

以下是RabbitMq的专业术语

  • 消费与接受的含义相似。使用者是一个主要等待接收消息的程序

  • 生产只意味着发送。发送消息的程序是生产者

  • 队列是 RabbitMQ 中邮箱的名称。尽管消息流经 RabbitMQ 和您的应用程序,但它们只能存储在队列中。队列仅受主机内存和磁盘限制的约束,它本质上是一个大型消息缓冲区。许多生产者可以发送到一个队列的消息,许多使用者可以尝试从一个队列接收数据。这就是我们表示队列的方式

请注意,生产者、使用者和代理不必驻留在同一个主机上;事实上,在大多数应用程序中,它们不会。应用程序也可以是生产者和使用者

“hello world”
(使用 php-amqplib 客户端)
在本教程的这一部分中,我们将用 PHP 编写两个使用 RabbitMQ 进行通信的程序。本教程使用需要 PHP 7.x 或 8.x 的客户端库。

第一个程序将是发送单个消息的生产者,第二个程序将是接收消息并将其打印出来的消费者。我们将掩盖php-amqplib API中的一些细节,专注于这个非常简单的事情,只是为了开始。这是一个消息传递的“Hello World”。

在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列 - RabbitMQ 代表消费者保留的消息缓冲区。

Sending

  1. <?php
  2. include "../../vendor/autoload.php";
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. // 创建服务器连接
  6. $connection = new AMQPStreamConnection(
  7. '127.0.0.1',
  8. 5672,
  9. 'guest',
  10. 'guest',
  11. '/');
  12. // 创建一个通道,这是大多数用于完成工作的 API 所在的位置
  13. $channel = $connection->channel();
  14. // 要发送,我们必须声明一个队列供我们发送到;然后我们可以向队列发布一条消息:
  15. $channel->queue_declare('hello', false, false, false, false);
  16. // 创建消息
  17. $msg = new AMQPMessage('Hello World!');
  18. // 发送消息
  19. $channel->basic_publish($msg, '', 'hello');
  20. echo " [x] Sent 'Hello World!'\n";
  21. $channel->close();
  22. $connection->close();

Recive

我们的接收器侦听来自 RabbitMQ 的消息,因此与发布单个消息的发布者不同,我们将保持接收器运行以侦听消息并打印出来

  1. <?php
  2. include "../../vendor/autoload.php";
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. // 设置与发布者相同;我们打开一个连接和一个通道,并声明我们要从中使用的队列。
  5. // 请注意,这与将发布发送到的队列匹配
  6. $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  7. $channel = $connection->channel();
  8. $channel->queue_declare('hello', false, false, false, false);
  9. echo " [*] Waiting for messages. To exit press CTRL+C\n";
  10. // 我们的代码将阻塞,而我们的$channel有回调。
  11. // 每当我们收到消息时,我们的$callback函数都会传递收到的消息。
  12. $callback = function ($msg) {
  13. echo ' [x] Received ', $msg->body, "\n";
  14. };
  15. $channel->basic_consume('hello', '', false, true, false, false, $callback);
  16. while ($channel->is_open()) {
  17. $channel->wait();
  18. }

2. Work Queue

在第一个教程中(hello world),我们编写了程序来发送和接收来自命名队列的消息。在此部分中,我们将创建一个工作队列,用于在多个工作线程之间分配耗时的任务

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务并必须等待其完成。相反,我们将任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行许多工作线程时,任务将在它们之间共享

此概念在 Web 应用程序中特别有用,在这些应用程序中,在短 HTTP 请求窗口期间无法处理复杂任务。

task.php (P)

  1. <?php
  2. // 工作队列 work queue 发送 send
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. include "../../vendor/autoload.php";
  6. // 创建服务器连接
  7. $connection = new AMQPStreamConnection(
  8. '127.0.0.1',
  9. 5672,
  10. 'guest',
  11. 'guest',
  12. '/');
  13. // 创建一个通道,这是大多数用于完成工作的 API 所在的位置
  14. $channel = $connection->channel();
  15. // 要发送,我们必须声明一个队列供我们发送到;然后我们可以向队列发布一条消息:
  16. $channel->queue_declare('task_queue', false, false, false, false);
  17. // 创建消息
  18. $data = implode(' ', array_slice($argv, 1));
  19. if (empty($data)) {
  20. $data = "hello world!";
  21. }
  22. // [
  23. // 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
  24. //]
  25. $msg = new AMQPMessage($data);
  26. // 发送消息
  27. $channel->basic_publish($msg, '', 'task_queue');
  28. echo " [x] Sent '".$data ."'\n";
  29. $channel->close();
  30. $connection->close();

worker.php (C)

  1. <?php
  2. include "../../vendor/autoload.php";
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. // 设置与发布者相同;我们打开一个连接和一个通道,并声明我们要从中使用的队列。
  5. // 请注意,这与将发布发送到的队列匹配
  6. $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  7. $channel = $connection->channel();
  8. $channel->queue_declare(
  9. 'task_queue',
  10. false,
  11. false,
  12. false,
  13. false
  14. );
  15. echo " [*] Waiting for messages. To exit press CTRL+C\n";
  16. $callback = function ($msg) {
  17. echo ' [x] Received ', $msg->body, "\n";
  18. sleep(substr_count($msg->body, '.'));
  19. echo " [x] Done\n";
  20. // 手动ack
  21. // $msg->ack();
  22. };
  23. // 公平调度
  24. $channel->basic_qos(null, 1, null);
  25. // 手动ack
  26. // $channel->basic_consume('task_queue', '', false, false, false, false, $callback);
  27. $channel->basic_consume('task_queue', '', false, true, false, false, $callback);
  28. while ($channel->is_open()) {
  29. $channel->wait();
  30. }

执行结果

  1. # shell1
  2. php task.php First message.
  3. php task.php Second message..
  4. php task.php Third message...
  5. php task.php Fourth message....
  6. php task.php Fifth message.....
  7. # shell2
  8. php worker.php
  9. # => [*] Waiting for messages. To exit press CTRL+C
  10. # => [x] Received 'First message.'
  11. # => [x] Received 'Third message...'
  12. # => [x] Received 'Fifth message.....'
  13. # shell3
  14. php worker.php
  15. # => [*] Waiting for messages. To exit press CTRL+C
  16. # => [x] Received 'Second message..'
  17. # => [x] Received 'Fourth message....'

默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。对三个或更多工作人员进行尝试。

消息确认(Confirm)

消息确认以前由我们自己关闭。是时候通过将第四个参数设置为 basic_consume false 来打开它们了(true 表示没有确认),并在我们完成任务后从工作人员发送适当的确认

  1. $callback = function ($msg) {
  2. echo ' [x] Received ', $msg->body, "\n";
  3. $msg->ack();
  4. };
  5. $channel->basic_consume('task_queue', '', false, false, false, false, $callback);

使用此代码,可以确保即使在处理消息时使用 CTRL+C 终止工作线程,也不会丢失任何内容。工作线程终止后不久,将重新传递所有未确认的消息

消息持久性(Message Durability)

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。

当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久

首先,我们需要确保队列在 RabbitMQ 节点重新启动后能够幸存下来

$channel->queue_declare('task_queue', false, true, false, false);

此标志设置为 true 需要同时应用于生产者代码和使用者代码

现在我们需要将我们的消息标记为持久 - 通过设置 delivery_mode = 2 消息属性,AMQPMessage 将其作为属性数组的一部分

$msg = new AMQPMessage( $data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));

公平调度(Fair dispatch)

您可能已经注意到,调度仍然不能完全按照我们想要的方式工作。例如,在有两个工作线程的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工作人员将一直很忙,而另一个工作人员几乎不会做任何工作。好吧,RabbitMQ 对此一无所知,仍然会均匀地调度消息

发生这种情况是因为 RabbitMQ 只是在消息进入队列时调度消息。它不会查看使用者的未确认消息数。它只是盲目地将每 n 条消息发送给第 n 个消费者

为了解决这个问题,我们可以将basic_qos方法与 prefetch_count = 1 设置一起使用。这告诉 RabbitMQ 不要一次向一个 worker 提供多条消息。或者,换句话说,在工作人员处理并确认前一条消息之前,不要向工作人员发送新消息。相反,它会将其调度给下一个尚未繁忙的工作人员

  1. $channel->basic_qos(null, 1, null);

3. Publish/Subscribe

在上一教程中,我们创建了一个工作队列。工作队列背后的假设是每个任务只传递给一个工作人员。在这一部分中,我们将做一些完全不同的事情 - 我们将向多个消费者传递消息。此模式称为“发布/订阅”

为了说明这种模式,我们将构建一个简单的日志记录系统。它将由两个程序组成 - 第一个将发出日志消息,第二个将接收和打印它们

在我们的日志记录系统中,接收器程序的每个运行副本都将获得消息。这样,我们将能够运行一个接收器并将日志定向到磁盘;同时,我们将能够运行另一个接收器并在屏幕上查看日志

本质上,已发布的日志消息将广播到所有接收方

Exchanges

在本教程的前几部分中,我们向队列发送消息和从队列接收消息。现在是时候在 Rabbit 中引入完整的消息传递模型了

让我们快速回顾一下前面教程中介绍的内容:

  • 生产者(Producer)是发送消息的用户应用程序。
  • 队列是(Queue)存储消息的缓冲区。
  • 使用者(Consumer)是接收消息的用户应用程序

RabbitMQ 中消息传递模型的核心思想是,生产者从不将任何消息直接发送到队列。实际上,很多时候,生产者甚至根本不知道消息是否会传递到任何队列

相反,生产者只能向交易所(exchange)发送消息。交换是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面将它们推送到队列中。交换必须确切地知道如何处理它收到的消息。是否应将其附加到特定队列?是否应该将其附加到许多队列中?或者应该丢弃它。其规则由交换类型(exchange type)定义。

交换机类型有 direct, topic, headers and fanout, 下面我们创建一个direct类型的交换机

  1. $channel->exchange_declare('logs', 'fanout', false, false, false);

现在,我们可以发布到我们命名的交易所(exchange)

  1. $channel->exchange_declare('logs', 'fanout', false, false, false);
  2. $channel->basic_publish($msg, 'logs');

Temporary queues(临时队列)

您可能还记得以前我们使用具有特定名称的队列(还记得 hello 和 task_queue 吗?能够命名队列对我们来说至关重要 - 我们需要将工人指向同一个队列。如果要在生产者和使用者之间共享队列,为队列命名非常重要

首先,每当我们连接到Rabbit时,我们都需要一个新的空队列。为此,我们可以创建一个具有随机名称的队列,或者更好的是 - 让服务器为我们选择一个随机队列名称

其次,一旦我们断开了消费者的连接,队列应该被自动删除。

在 php-amqplib 客户端中,当我们以空字符串形式提供队列名称时,我们创建一个具有生成名称的非持久队列:

  1. list($queue_name, ,) = $channel->queue_declare("");

当该方法返回时,$queue_name 变量包含由 RabbitMQ 生成的随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

Bindings(绑定)

交换机和队列之间的关系称为绑定

  1. $channel->queue_bind($queue_name, 'logs');

将一切都整合到一起

发出日志消息的生产者程序看起来与上一教程没有太大区别。最重要的变化是我们现在希望将消息发布到我们的日志交换机

publish.php

  1. <?php
  2. require_once __DIR__ . '/vendor/autoload.php';
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  6. $channel = $connection->channel();
  7. // 创建log交换机 类型为:fanout
  8. $channel->exchange_declare('logs', 'fanout', false, false, false);
  9. $data = implode(' ', array_slice($argv, 1));
  10. if (empty($data)) {
  11. $data = "info: Hello World!";
  12. }
  13. $msg = new AMQPMessage($data);
  14. // 把消息发布到日志交换机
  15. $channel->basic_publish($msg, 'logs');
  16. echo ' [x] Sent ', $data, "\n";
  17. $channel->close();
  18. $connection->close();

subscribe.php

  1. <?php
  2. require_once __DIR__ . '/vendor/autoload.php';
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  5. $channel = $connection->channel();
  6. // 日志交换机类型为:fanout
  7. $channel->exchange_declare('logs', 'fanout', false, false, false);
  8. // 生成临时队列
  9. list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
  10. // 队列绑定交换机
  11. $channel->queue_bind($queue_name, 'logs');
  12. echo " [*] Waiting for logs. To exit press CTRL+C\n";
  13. $callback = function ($msg) {
  14. echo ' [x] ', $msg->body, "\n";
  15. };
  16. $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
  17. while ($channel->is_open()) {
  18. $channel->wait();
  19. }
  20. $channel->close();
  21. $connection->close();

执行结果

  1. # shell1
  2. php publish.php 记录日志信息
  3. # shell2
  4. php subscribe.php
  5. [*] Waiting for logs. To exit press CTRL+C
  6. [x] 记录日志信息
  7. # shell3
  8. php subscribe.php
  9. [*] Waiting for logs. To exit press CTRL+C
  10. [x] 记录日志信息

4. Routing

在前面的教程中,我们构建了一个简单的日志记录系统。我们能够将日志消息广播到许多接收器

在本教程中,我们将为其添加一个功能 - 我们将使仅订阅消息的子集成为可能。例如,我们将能够仅将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息

Bindings

绑定可以采用额外的routing_key参数。为了避免与 $channel::basic_publish 参数混淆,我们将其称为绑定键。这就是我们如何用键创建绑定

  1. $channel->queue_bind($queue_name, 'logs');
  2. $binding_key = 'black';
  3. $channel->queue_bind($queue_name, $exchange_name, $binding_key);

Direct exchange

上一教程中的日志记录系统将所有消息广播给所有使用者。我们希望对其进行扩展,以允许根据消息的严重性筛选消息。例如,我们可能希望将日志消息写入磁盘的脚本仅接收严重错误,而不是在警告或信息日志消息上浪费磁盘空间

fanout 这并没有给我们带来太大的灵活性 - 它只能进行无意识的广播

direct 我们将改用直接交换。直接交换背后的路由算法很简单 - 消息进入绑定键与消息的路由键完全匹配的队列

![](C:\Users\yanwe\Pictures\Saved Pictures\python-four.png)

send.php

  1. <?php
  2. require_once __DIR__ . '/vendor/autoload.php';
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. // 创建连接
  6. $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  7. $channel = $connection->channel();
  8. // 声明交换机 类型为:direct
  9. $channel->exchange_declare('direct_logs', 'direct', false, false, false);
  10. // 接受控制台参数
  11. $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
  12. $data = implode(' ', array_slice($argv, 2));
  13. if (empty($data)) {
  14. $data = "Hello World!";
  15. }
  16. // 实例化消息
  17. $msg = new AMQPMessage($data);
  18. // 发送消息:exchange、routingkey
  19. $channel->basic_publish($msg, 'direct_logs', $severity);
  20. echo ' [x] Sent ', $severity, ':', $data, "\n";
  21. $channel->close();
  22. $connection->close();

revice.php

  1. <?php
  2. require_once __DIR__ . '/vendor/autoload.php';
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  5. $channel = $connection->channel();
  6. // 声明交换机类型为:direct
  7. $channel->exchange_declare('direct_logs', 'direct', false, false, false);
  8. // 声明空队列
  9. list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
  10. $severities = array_slice($argv, 1);
  11. if (empty($severities)) {
  12. file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
  13. exit(1);
  14. }
  15. // 绑定多个routingkey
  16. foreach ($severities as $severity) {
  17. $channel->queue_bind($queue_name, 'direct_logs', $severity);
  18. }
  19. echo " [*] Waiting for logs. To exit press CTRL+C\n";
  20. $callback = function ($msg) {
  21. echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
  22. };
  23. $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
  24. while ($channel->is_open()) {
  25. $channel->wait();
  26. }
  27. $channel->close();
  28. $connection->close();

执行结果

  1. # shell1 发送error类型日志
  2. php send.php error "Run. Run. Or it will explode."
  3. # shell2 监听 warning error 类型
  4. php receive.php warning error
  5. # 有消息输出
  6. # shell3 监听info类型日志内容
  7. php receive.php info
  8. # 正常情况 无法收到消息

5. Topics

在前面的教程中,我们改进了日志记录系统。我们没有使用只能进行虚拟广播的fanout交换,而是使用了direct直接交换,并获得了有选择地接收日志的可能性。

尽管使用direct直接交换改进了我们的系统,但它仍然存在局限性 - 它不能基于多个标准进行路由

Topic exchange

发送到主题交换的消息不能具有任意routing_key - 它必须是单词列表,由点分隔。单词可以是任何东西,但通常它们指定与消息相关的一些特征。一些有效的路由密钥示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。路由密钥中可以有任意数量的单词,最多 255 字节的限制

绑定密钥也必须采用相同的形式。主题交换背后的逻辑类似于直接交换 - 使用特定路由密钥发送的消息将被传递到与匹配绑定密钥绑定的所有队列。但是,绑定键有两种重要的特殊情况

  • * (star) can substitute for exactly one word. (可以替代一个词)
  • # (hash) can substitute for zero or more words. (可以替代零个或者多个单词)

![](C:\Users\yanwe\Pictures\Saved Pictures\python-five.png)

在这个例子中,我们将发送所有描述动物的消息。消息将使用由三个单词(两个点)组成的路由密钥发送。路由键中的第一个词将描述速度,第二个词描述颜色,第三个词描述物种:<speed>.<colour>.<species>

send.php

  1. <?php
  2. require_once __DIR__ . '/vendor/autoload.php';
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  6. $channel = $connection->channel();
  7. $channel->exchange_declare('topic_logs', 'topic', false, false, false);
  8. $routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
  9. $data = implode(' ', array_slice($argv, 2));
  10. if (empty($data)) {
  11. $data = "Hello World!";
  12. }
  13. $msg = new AMQPMessage($data);
  14. $channel->basic_publish($msg, 'topic_logs', $routing_key);

revice.php

  1. <?php
  2. require_once __DIR__ . '/vendor/autoload.php';
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  5. $channel = $connection->channel();
  6. $channel->exchange_declare('topic_logs', 'topic', false, false, false);
  7. list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
  8. $binding_keys = array_slice($argv, 1);
  9. if (empty($binding_keys)) {
  10. file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
  11. exit(1);
  12. }
  13. foreach ($binding_keys as $binding_key) {
  14. $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
  15. }
  16. echo " [*] Waiting for logs. To exit press CTRL+C\n";
  17. $callback = function ($msg) {
  18. echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
  19. };
  20. $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
  21. while ($channel->is_open()) {
  22. $channel->wait();
  23. }
  24. $channel->close();
  25. $connection->close();

执行结果

  1. # shell1
  2. # shell2
  3. php revice.php "#"
  4. # shell3
  5. php revice.php "kern.*"
  6. # shell4
  7. php revice.php "*.critical"
  8. # shell5
  9. php revice.php "kern.*" "*.critical"

6. Publisher Confirms

发布者确认是 RabbitMQ 扩展,以实现可靠的发布。当在通道上启用发布者确认时,代理会异步确认客户端发布的消息,这意味着它们已在服务器端处理

概述

在本教程中,我们将使用发布者确认来确保已发布的消息已安全到达代理。我们将介绍使用发布商确认的几种策略,并解释它们的优缺点

开启发布者确认

  1. $channel = $connection->channel();
  2. $channel->confirm_select();

Strategy #1: Publishing Messages Individually(策略#1:单独发布消息)

让我们从使用确认发布的最简单方法开始,即发布消息并同步等待其确认

  1. while (thereAreMessagesToPublish()) {
  2. $data = "Hello World!";
  3. $msg = new AMQPMessage($data);
  4. $channel->basic_publish($msg, 'exchange');
  5. // uses a 5 second timeout
  6. $channel->wait_for_pending_acks(5.000);
  7. }

在前面的示例中,我们像往常一样发布一条消息,并使用 $channel::wait_for_pending_acks(int|float) 方法等待其确认。确认消息后,该方法将立即返回。如果消息未在超时内确认,或者它是裸的(意味着代理由于某种原因无法处理它),该方法将引发异常。异常的处理通常包括记录错误消息和/或重试发送消息

不同的客户端库有不同的方法来同步处理发布者确认,因此请务必仔细阅读您正在使用的客户端的文档

这种技术非常简单,但也有一个主要缺点:它大大减慢了发布速度,因为消息的确认会阻止所有后续消息的发布。此方法不会提供每秒超过几百条已发布消息的吞吐量。尽管如此,这对于某些应用程序来说已经足够了

Strategy #2: Publishing Messages in Batches(策略#2:批量发布消息)

为了改进前面的示例,我们可以发布一批消息并等待整个批次得到确认。以下示例使用一批 100

  1. $batch_size = 100;
  2. $outstanding_message_count = 0;
  3. while (thereAreMessagesToPublish()) {
  4. $data = ...;
  5. $msg = new AMQPMessage($data);
  6. $channel->basic_publish($msg, 'exchange');
  7. $outstanding_message_count++;
  8. if ($outstanding_message_count === $batch_size) {
  9. $channel->wait_for_pending_acks(5.000);
  10. $outstanding_message_count = 0;
  11. }
  12. }
  13. if ($outstanding_message_count > 0) {
  14. $channel->wait_for_pending_acks(5.000);
  15. }

等待一批消息被确认比等待单个消息的确认大大提高了吞吐量(使用远程 RabbitMQ 节点最多 20-30 倍)。一个缺点是,在发生故障时,我们不知道到底出了什么问题,因此我们可能不得不在内存中保留整个批次以记录有意义的内容或重新发布消息。并且此解决方案仍然是同步的,因此它会阻止消息的发布

Strategy #3: Handling Publisher Confirms Asynchronously(策略 #3:处理发布服务器异步确认)

代理异步确认已发布的消息,只需在客户端注册回调即可收到这些确认的通知

  1. $channel = $connection->channel();
  2. $channel->confirm_select();
  3. $channel->set_ack_handler(
  4. function (AMQPMessage $message){
  5. // code when message is confirmed
  6. }
  7. );
  8. $channel->set_nack_handler(
  9. function (AMQPMessage $message){
  10. // code when message is nack-ed
  11. }
  12. );

有 2 个回调:一个用于已确认的消息,一个用于裸消息(代理可以认为丢失的消息)。每个回调都有 AMQPMessage $message 参数和返回的消息,因此您无需处理序列号(传递标记)即可了解此回调属于哪条消息

send.php

  1. $channel->confirm_select();
  2. $channel->set_ack_handler(function (AMQPMessage $message) {
  3. file_put_contents("./librabbitmq.log", var_export([
  4. 'type' => 'ack', 'message' => $message->body
  5. ], true) . PHP_EOL , FILE_APPEND);
  6. });
  7. $channel->set_nack_handler(function (AMQPMessage $message) {
  8. file_put_contents("./librabbitmq.log", var_export([
  9. 'type' => 'nack', 'message' => $message->body
  10. ], true) . PHP_EOL , FILE_APPEND);
  11. });
  12. $channel->wait_for_pending_acks();

原文链接:https://www.cnblogs.com/yanweifeng/p/17442347.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号