企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
1.RabbitMQ之延时队列 ``` 在消息生产者处 use PhpAmqpLib\Wire\AMQPTable; (1)通过队列属性x-message-ttl设置,投递到该队列中的所有消息都有相同的过期时间 第一种写法: $tale = new AMQPTable([ 'x-dead-letter-exchange' => $delayExName, 'x-message-ttl' => $ttl, //消息存活时间 ,单位毫秒 'x-dead-letter-routing-key' => $queueName ]); 第二种写法: $tale = new AMQPTable(); $tale->set('x-dead-letter-exchange', 'delay_exchange'); $tale->set('x-dead-letter-routing-key','delay_exchange'); $tale->set('x-message-ttl',10000); //10秒 放在队列声明第7个参数 $channel->queue_declare('cache_queue',false,true,false,false,false,$tale); (2)通过消息属性expiration设置消息本身的有效期 $msg = new AMQPMessage('Hello World'.'3000',array( 'expiration' => intval(18000), 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT )); (3)注意事项 * 如果两种同时使用,会以两者之间较小的那个数值为准,单位都为毫秒 * 通过队列设置的,一旦消息过期,就会从队列中抹去,因为过期的消息肯定在队列头部,RabbitMQ只需要定期处理头部过期消息即可。 * 而单独设置有效期的,如果要删除则需要遍历整个队列,所以采取消费时判定是否过期处理删除 (4)队列有效期 $tale = new \PhpAmqpLib\\Wire\AMQPTable(); $tale->set('x-expires', 60000 ); 通过channel.queueDeclare方法中到x-expires参数是可以控制队列在指定时间未被使用过后删除,未被使用包括以下三点: * 没有任何消费者 * 未被重新声明过期时间 * 未调用过Basic.Get命令 ``` 2.工作队列 ``` (1)同时运行两个`worker.php`脚本。它们都会从队列中获取消息 # shell 1 php worker.php # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....' # shell 2 php worker.php # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....' ``` (2)在默认情况下,RabbitMQ 将按顺序将队列中的消息分发给消费者。平均每个消费者获得相同数量的消息。这种奋发消息的方式叫做循环 (3)公平调度 消息消费者代码 $channel->basic_qos(null, 1, null); 我们可以通过设置 basic_qos 第二个参数 prefetch_count = 1。这一项告诉RabbitMQ不要一次给一个消费者发送多个消息。或者换一种说法,在确认前一个消息之前,不要向消费者发送新的消息。相反,新的消息将发送到一个处于空闲的消费者又或者只有consumer已经处理并确认了上一条message时queue才分派新的message给它 3.消息确认机制 (ack) 消息消费者代码 在默认情况下,消息确认机制是关闭的。现在是时候开启消息确认机制,将basic_consumer的第四个参数设置为false(true表示不开启消息确认),并且工作进程处理完消息后发送确认消息。 ``` # 处理消息回调函数 $callback = function($msg){ echo " [x] Received ", $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done", "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; # 开启消息确认 $channel->basic_consume('task_queue', '', false, false, false, false, $callback); ``` 4.消息持久化 (1)队列声明为持久化:要将 queue_declare 的第三个参数durable设置为 true : $channel->queue_declare($queue, false, true, false, false); RabbitMQ不允许使用不同的参数重新定义一个已经存在的队列,并且会返回一个错误给任何尝试做此事的程序 (2)消息持久化: $msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); 或者'delivery_mode' =>2 (3)交换机持久化:消息生产者代码 $channel->exchange_declare($exhcange_name,$type,$passive,$durable,$auto_delete); 设置第四个参数$durable为true,即开启交换机持久化 交换机的持久化其实就是相当于将交换机的属性在服务器内部保存,当MQ的服务器发生意外或关闭之后,重启RabbitMQ时不需要重新手动或执行代码去建立交换机,交换机会自动建立,相当于一直存在。 5.消息主题(topics) 强烈注意:队列可以绑定多个路由键到交换机上 发送到 topic exchange 的消息不能任意命名一个 routing key - 它必须是由一个.划分单词列表。这些单词可以是任意的,但它们通常指定与消息相关联的一些功能。这里有几个有效的 routing key: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit" , routing key 中可以包含多个单词,最多可以达到255个字节。 binding key也必须是相同的形式。topic exchange背后的逻辑类似于 direct exchange - 使用特定routing key1 发送的消息将被传递到与binding key绑定的所有队列。但是,binding key 有两个重要的特殊情况: *(星号) 可以代替一个单词 #(哈希) 可以匹配0个或多个单词 我们将发送所有描述动物的消息。消息将使用由三个单词(两个点)组成的routing key发送。其中第一个单词描述 速度,第二个描述颜色,第三个描述种类:”..”。 我们接着创建三个绑定:Q1 binding key “*.orange.*”, Q2 binding key “*.*.rabbit” “lazy.#”。 这三个绑定可以解释为: Q1 对所有橙色的动物感兴趣。 Q2 想要获取有关兔子的一切消息,以及所有惰性动物的一切。 一条 routing key 为 “quick.orange.rabbit” 的消息将传递上面的到两个对列。routing key 为 “lazy.orange.elephant” 的消息也将传递上面的到两个对列。另外 “lazy.pink.rabbit” 消息将只会被传递到Q2一次, 即使它匹配了两个 binding key。”quick.brown.fox” 不匹配任何 binding key, 所以它将被丢弃。 如果我们不遵守以上的规则发送 routing key 为一个或者四个单词的消息会发生什么? 比如,”orange” 或者 “quick.orange.male.rabbit”。那么我们将丢失这些消息,因为它们不匹配任何 binding key。 另一方面,”lazy.orange.male.rabbit” 即使它有四个单词,但它能匹配最后一个绑定,并且将被传递到Q2中。