多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
[TOC] ## 主题模式 > [参考CSDN](https://blog.csdn.net/Super_RD/article/details/70850453) ### 匹配规则 ``` * (星号) 代表任意 一个单词 # (井号) 0个或者多个单词 ``` > 注意:上面说的是单词不是字符。 ![UTOOLS1580704466223.png](http://yanxuan.nosdn.127.net/ff2c5c873b4fbf907b39480d5f6ff33c.png) > RoutingKey为“black.critical.high”的日志会投递到**queue1和queue2**。 > RoutingKey为“red.critical.high”的日志会只投递到queue2。 > RoutingKey为“white.critical.high”的日志会投递到queue2,并且虽然queue2的两个匹配规则都符合但只会向queue2**投递一份**。 ### code 用法发布信息 <details> <summary>topic.php</summary> ``` /* * topic 模式 * create by superrd */ $exchangeName = 'extopic'; $routeKey1 = "black.critical.high"; $routeKey2 = "red.critical.high"; $routeKey3 = "white.critical.high"; $message1 = 'black-critical-high!'; $message2 = 'red-critical-high!'; $message3 = 'white-critical-high!'; $connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd')); $connection->connect() or die("Cannot connect to the broker!\n"); try { $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_TOPIC); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange(); $exchange->publish($message1,$routeKey1); var_dump("[x] Sent ".$message1); $exchange->publish($message2,$routeKey2); var_dump("[x] Sent ".$message2); $exchange->publish($message3,$routeKey3); var_dump("[x] Sent ".$message3); } catch (AMQPConnectionException $e) { var_dump($e); exit(); } $connection->disconnect(); ``` </details> <br /> 监听queue1队列 <details> <summary>q2.php</summary> ``` /* * topic 模式 * create by superrd */ $queueName = 'queue1'; $exchangeName = 'extopic'; $routeKey = "black.critical.high"; $connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_TOPIC); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); $queue->bind($exchangeName, $routeKey); //阻塞模式接收消息 echo "Message:\n"; while(True){ $queue->consume('processMessage'); //自动ACK应答 //$queue->consume('processMessage', AMQP_AUTOACK); } $conn->disconnect(); /* * 消费回调函数 * 处理消息 */ function processMessage($envelope, $q) { $msg = $envelope->getBody(); echo $msg."\n"; //处理消息 $q->ack($envelope->getDeliveryTag()); //手动发送ACK应答 } ``` </details> <br /> 监听queue1队列 <details> <summary>q2.php</summary> ``` /* * topic 模式 * create by superrd */ $queueName = 'queue2'; $exchangeName = 'extopic'; $routeKey1 = "#.high"; $routeKey2 = "white.critical.*"; $connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_TOPIC); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); $queue->bind($exchangeName, $routeKey1); $queue->bind($exchangeName, $routeKey2); //阻塞模式接收消息 echo "Message:\n"; while(True){ $queue->consume('processMessage'); //自动ACK应答 //$queue->consume('processMessage', AMQP_AUTOACK); } $conn->disconnect(); /* * 消费回调函数 * 处理消息 */ function processMessage($envelope, $q) { $msg = $envelope->getBody(); echo $msg."\n"; //处理消息 $q->ack($envelope->getDeliveryTag()); //手动发送ACK应答 } ``` </details> <br /> > 注意: 如果`binding_key` 是 `#` . 它会接收所有的Message,不管`routing_key`是什么.