ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
依赖 `composer require php-amqplib/php-amqplib` ``` <?php namespace app\common\tool; use Exception; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; /** * RabbitMQ 客户端 */ class Rabbit { private static $_instance = null; # 连接资源 private $connect = null; # 通道资源 private $channel = null; # 连接配置 private $config = []; /** * 构造函数 * * @param string $connectName 连接名称 */ private function __construct($connectName){ # 对象是否已经存储了连接,没有则连接并存储 if(!isset($this->connect)){ $config = $this->config; if(empty($config)){ $config = config('queue.'.$connectName); if(!$config){ throw new Exception('config\queue.php文件中,没有配置'.$connectName, -1); } $this->config = $config; } $this->connect = new AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password']); $this->channel = $this->connect->channel(); } } /** * 每一个连接配置生成一个单例 * * @param string $connectName * @return void */ public static function instance($connectName = 'rabbit'){ if(is_null(self::$_instance)){ self::$_instance = new self($connectName); } return self::$_instance; } /** * 仅用来查看编辑器提示mq相关方法的参数意思 * * @return void */ private static function document(){ $conf = [ 'host'=> '192.168.7.236', 'port'=> '15672', 'username'=> 'guest', 'password'=> ':guest~', ]; # 创建连接 $connection = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['username'], $conf['password']); # 创建通道 $channel = $connection->channel(); /***************** 生产者写入队列开始 *******************/ # 声明交换机,声明则需要绑定队列;可以不声明,则默认default交换机。 # 交换机相当于是邮箱,可以声明多个邮箱,队列相当于信件,可以一信多投。 # 因为可能一个队列信息,多个子系统都需要单独处理。有点像广播 $exchangeName = 'default'; $channel->exchange_declare($exchangeName, 'direct', false, true, false); # 必须声明队列,通常一个数据队列定义一个名词,里面的数据结构都是一致,队列模式必须指定持久化,否则服务器重启,队列会丢失 $queueName = 'queueName'; $channel->queue_declare($queueName, false, true, false, false); # 将队列与某个交换机进行绑定,并使用路由关键字.个人理解路由关键字更像是队列别名,不一定准确,默认空字符串则使用队列名替代。 $routingKey = ''; $channel->queue_bind($queueName, $exchangeName, $routingKey); # 必须声明一个消息体,且为字符串类型,具体的格式,可以对数据采用各种需要的encode,投递模式必须指定持久化,否则服务器重启,队列消息会丢失 # 第二个消息状态配置,还包含有 content_type=> text/plain【默认】 correlation_id=> 自定义唯一id【比如, uniqid('rmq', true)】correlation_id 会在消费确认里用到 $msg = new AMQPMessage($msgBody, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); # 推送入队列 $channel->basic_publish($msg, $exchangeName, $routingKey); /***************** 生产者写入队列结束 *******************/ /***************** 消费者读取队列开始 *******************/ # 必须声明队列,通常一个数据队列定义一个名词,里面的数据结构都是一致,队列模式必须指定持久化,否则服务器重启,队列会丢失 $channel->queue_declare($queueName, false, true, false, false); # 声明消息的回调处理与哪个队列绑定 $callback = function($msg){ # 处理队列信息的逻辑 $queueData = $msg->body; # 业务逻辑处理 $businessDone = true; # 处理成功要手动确认下消息,告知队列已处理完,可以清理了。如果未声明自动确认删除模式,也没有其他消费者处理,则永远获取同一条。明显这是不符合的 # 所以在未声明自动确认删除模式下,一定要手动确认,最好是有个重试逻辑,重试多少次后,将队列消息推到另外的队列中,当作异常保留,然后在本队列进行确认删除,避免阻塞 if($businessDone==true){ # 如果没有在basic_publish的最后参数为消息指定一个唯一ID,则rabbit会默认生成一个唯一标识delivery_tag $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } }; # 设置限流, # 参数一,限制消息大小,0代表不限制 # 参数二,限制允许unack的最大消息数, # 参数三,限制对象,true对整个channel限制,false对当前消费者限制 $channel->basic_qos(0, 1, false); # 声明消费队列绑定的回调逻辑 $channel->basic_consume($queueName, '', true, false, false, false, $callback); # 持久检测队列,看是否有数据心跳,如果只是单次获取,直接$channel->wait();即可 while($channel->is_consuming()) { # 执行消费回调,PHP应该是没有回溯执行的,猜测wait方法内部的实现是,通过声明的队列参数获取消息,然后传递给前面声明的回调方法进行执行 $channel->wait(); } /***************** 消费者读取队列开始 *******************/ /***************** 心跳检测开始 *******************/ $connection->checkHeartBeat(); /***************** 心跳检测开始 *******************/ } public function checkHeartBeat(){ try{ $this->connect->checkHeartBeat(); }catch(Exception $e){ $this->connect(); } } /** * 队列设置:有配置读配置,无配置读设定 * * @param [type] $queueName * @param [type] $routingKey * @param [type] $exchangeName * @return void */ protected function queueSet($queueName, $routingKey, $exchangeName){ $config = $this->config; if(isset($config['queue'][$queueName])){ $queueConf = $config['queue'][$queueName]; if(isset($queueConf['exchangeName'])){ $exchangeName = $queueConf['exchangeName']; } if(isset($queueConf['routingKey'])){ $routingKey = $queueConf['routingKey']; } } # 声明交换机属性,持久化 $this->channel->exchange_declare($exchangeName, 'direct', false, true, false); # 声明队列属性,持久化 $this->channel->queue_declare($queueName, false, true, false, false); # 绑定交换机,队列,以及路由键 $this->channel->queue_bind($queueName, $exchangeName, $routingKey); } /** * 生产队列复杂版:相同exchangeName和routingKey绑定的任何队列之一投递任务,会导致所有的绑定的队列都被投递 * * @param string $msgBody 队列消息,我们要传输的数据 * @param string $queueName 队列名称 * @param string $routingKey 路由关键字,不指定该参数,会往所有的相同的exchangeName里的所有队列都投递队列消息。适用于一条数据走多个任务 * @param string $exchangeName 交换机 【传任务所在的平台,理解为命名空间也可】 * @return void */ public function produce($msgBody='', $queueName='default', $routingKey='', $exchangeName='default'){ // $this->queueSet($queueName, $routingKey, $exchangeName); # 声明一个消息体属性,持久化 $msg = new AMQPMessage($msgBody, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); # 推送入队列 $this->channel->basic_publish($msg); # 记得在调用层,显示调用【 $this->close() 】关闭连接,因为可能涉及循环推送队列,反复连接和关闭浪费资源,所以这里不主动关闭 # $this->close(); return true; } /** * 简化版,仅使用默认的交换机投递队列 * * @param string $msgBody * @param string $queueName * @return void */ public function easyProduce($msgBody='', $queueName='default'){ # 声明队列属性,持久化 $this->channel->queue_declare($queueName, false, true, false, false); # 定义一个消息,格式化 $msg = new AMQPMessage($msgBody, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); # 这里测试publish的返回是null 后面需要考虑加确认机制 目前是稳定可以推送 $pulishResult = $this->channel->basic_publish($msg, '', $queueName); # 记得在调用层,显示调用【 $this->close() 】关闭连接,因为可能涉及循环推送队列,反复连接和关闭浪费资源,所以这里不主动关闭 # $this->close(); return true; } /** * 取出指定队列的指定条数消息 * * @param string $queueName 队列名词 * @param integer $msgCount 消息条数 * @param bool $close 获取数据后是否关闭连接 * @return void */ public function consumeData($queueName='default', $msgCount=5, $close=false, $routingKey='', $exchangeName='default'){ # 队列设置 $this->queueSet($queueName, $routingKey, $exchangeName); $data = []; $callback = function($msg) use (&$data) { $rawData = $msg->body; $data[] = [ 'delivery_tag'=> $msg->delivery_info['delivery_tag'], 'rawData'=> $rawData ]; # 手动确认删除 // $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; # 限流 $this->channel->basic_qos(0, $msgCount, false); # 定义消费队列,并指定回调处理 $this->channel->basic_consume($queueName, '', true, false, false, false, $callback); # 触发回调 for($i=0; $i<$msgCount; $i++){ $this->channel->wait(); } if($close){ $this->close(); } return $data; } /** * 手动确认删除指定的消息 * * @param string $delivery_tag 队列消息的唯一标识 * @return void */ public function ack($delivery_tag){ $this->channel->basic_ack($delivery_tag); } /** * 消费队列 * * @param [function] $callback 函数,只有一个参数$msg,就是消息体.也可以写匿名函数,函数体内是消费者实现。如果消费任务完成,需要在函数体内调用: * $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 用以通知MQ已完成消费,则队列消息会被删除 * * @param string $queueName 队列名称 * @param string $connectName 连接名称 * @param integer $unackLimit 限流:最多允许unack的消息数量,达到上限则队列不再继续获取消息处理,默认0不限制 * @param bool $a_global 限流:是对整个channel影响,还是只影响当前消费者 * @return void */ public function consume($callback, $queueName='default', $unackLimit=0, $a_global=false, $routingKey='', $exchangeName='default'){ # 队列设置 $this->queueSet($queueName, $routingKey, $exchangeName); # 限流 if($unackLimit > 0){ $this->channel->basic_qos(0, $unackLimit, $a_global); } //在接收消息的时候调用$callback函数 $this->channel->basic_consume($queueName, '', true, false, false, false, $callback); while($this->channel->is_consuming()) { $this->channel->wait(); } } /** * 简化版消费队列,使用默认的交换机 * * @param [function] $callback 函数,只有一个参数$msg,就是消息体.也可以写匿名函数,函数体内是消费者实现。如果消费任务完成,需要在函数体内调用: * $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 用以通知MQ已完成消费,则队列消息会被删除 * * @param string $queueName 队列名称 * @param string $connectName 连接名称 * @param integer $unackLimit 限流:最多允许unack的消息数量,达到上限则队列不再继续获取消息处理,默认0不限制 * @param bool $a_global 限流:是对整个channel影响,还是只影响当前消费者 * @return void */ public function easyConsume($callback, $queueName='default', $unackLimit=0, $a_global=false){ # 声明队列属性,持久化 $this->channel->queue_declare($queueName, false, true, false, false); # 限流 if($unackLimit > 0){ $this->channel->basic_qos(0, $unackLimit, $a_global); } //在接收消息的时候调用$callback函数 $this->channel->basic_consume($queueName, '', true, false, false, false, $callback); while($this->channel->is_consuming()) { $this->channel->wait(); } } /** * 消费队列 * * @param [function] $callback 函数,只有一个参数$msg,就是消息体.也可以写匿名函数,函数体内是消费者实现。如果消费任务完成,需要在函数体内调用: * $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 用以通知MQ已完成消费,则队列消息会被删除 * * @param string $queueName 队列名称 * @return void */ public function consumeOne($callback, $queueName='default', $routingKey='', $exchangeName='default'){ $this->queueSet($queueName, $routingKey, $exchangeName); //在接收消息的时候调用$callback函数 $this->channel->basic_consume($queueName, '', true, false, false, false, $callback); $this->channel->wait(); } /** * 关闭通道 * * @return void */ private function closeChannel(){ $this->channel->close(); $this->channel = null; } /** * 关闭连接 * * @return void */ private function closeConnect(){ $this->connect->close(); $this->connect = null; } /** * 关闭所有的连接和通道 * * @return void */ public function close(){ $this->closeChannel(); $this->closeConnect(); } /** * 连接rabbit * * @return void */ public function connect(){ if(empty($this->config)){ throw new Exception("配置信息丟失", -1); return false; } $config = $this->config; if(is_null($this->connect)){ $this->connect = new AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password']); } if(is_null($this->channel)){ $this->channel = $this->connect->channel(); } return $this; } /* # produce demo $queueName = 'test'; $msg = json_encode(['queueName'=> $queueName]); $rabbit = \App\Http\Controllers\Common\RabbitMQ::instance('rabbit'); $result = $rabbit->easyProduce($msg, $queueName); $rabbit->close(); */ /* # consumer demo $rabbit = \app\common\tool\Rabbit::instance('rabbit'); $rabbit->consumeDemo($queueName); $rabbit->consume(function($msg){ $info = " [x] Received ". $msg->body. "\n"; echo $info; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }, $queueName); $rabbit->close(); */ } ```