多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
使用场景。有个高并发的某某活动,因为并发量很高,Mysql的写入的性能不能满足吞吐(其他类似于mongo的库除外),此时,可用借助RabbitMQ的工作队列来处理,将直接写入mysql的数据,以消息的形式先发送到RabbitMq的工作队列中,消费端从队列中消费数据入库,这样:1)客户端不需要考虑mysql的瓶颈,队列做为一个缓冲区。2)可用加入多个消费者,加速消费工作队列,避免造成工作队列消息积压。 我们再hello world的发送、接收的代码中,我做了几点调整。 指定exchange,并显式申明它的类型。 将exchange持久化,这样那怕Rabbit重启,exchange也不会消失。 将queue持久化,这样那怕Rabbit重启,queue也不会消失(包括queue中的消息) 在exchange publish消息时,指定 route key,同时在queue中绑定rout key,这样exchange在转发消息时,能够将消息转发到与route key匹配的的队列中。 ~~~ send.php <?php $config = [ 'host' => '127.0.0.1', 'vhost' => '/', 'port' => 5672, 'login' => 'guest', 'password' => 'guest' ]; $conn = new \AMQPConnection($config); $conn->connect(); $channel = new \AMQPChannel($conn); $channel->qos(0,0); $exchange = new \AMQPExchange($channel); $exchange->setName('exchange.activity'); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange(); $queue = new \AMQPQueue($channel); $queue->setName('queue.activity'); $queue->declareQueue(); $queue->bind('exchange.activity','route.activity'); for($i=0;$i<6;$i++){ $result = $exchange->publish("message[$i]",'route.activity'); var_dump($result); } receive.php <?php $config = [ 'host' => '127.0.0.1', 'vhost' => '/', 'port' => 5672, 'login' => 'guest', 'password' => 'guest' ]; $conn = new \AMQPConnection($config); $conn->connect(); $channel = new \AMQPChannel($conn); $channel->qos(0,0); $queue = new \AMQPQueue($channel); $queue->setName("queue.activity"); $queue->setFlags(AMQP_PASSIVE); $queue->declareQueue(); $queue->bind('exchange.activity','route.activity'); $queue->consume('processMessage',AMQP_AUTOACK); function processMessage($envelope, $queue) { global $i; echo "Message $i: " . $envelope->getBody() . "\n"; $i++; } ~~~