> think-queue是ThinkPHP官方提供的一个消息队列服务,是专门支持队列服务的扩展包。 > think-queue消息队列适用于大并发或返回结果时间比较长且需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送。 > think-queue消息队列可进行发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等操作。 ## **执行流程图:** ![](https://img.kancloud.cn/0e/ca/0eca5e498a919e21aa1f1db353534f58_1200x533.png) ## **安装think-queue扩展** 在命令行切换到你的WEB根目录并执行下面的命令: ~~~ // 安装think-queue扩展 composer require topthink/think-queue ~~~ 安装后在`app/config`下将产生一个queue.php配置文件: ~~~ return [ //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动 'default' => 'redis', 'connections' => [ 'sync' => [ 'type' => 'sync', ], 'database' => [ 'type' => 'database', 'queue' => 'default', 'table' => 'jobs', ], 'redis' => [ 'type' => 'redis', 'queue' => 'default', 'host' => '127.0.0.1', 'port' => 6379, 'password' => '', 'select' => 0, 'timeout' => 0, 'persistent' => false, ], ], 'failed' => [ 'type' => 'none', 'table' => 'failed_jobs', ], ]; ~~~ **一、优化上一章中的SmsTask.php 的逻辑代码** ~~~ <?php declare (strict_types = 1); namespace app\listener; use think\facade\Queue; class SmsTask { /** * 事件监听处理 * * @return mixed */ public function handle($event) { var_dump($event->data);//event的data数据即server->task()传入的数据 /*创建新消息并推送到消息队列*/ // 当前任务由哪个类负责处理 $job_handler_classname = "app\job\Dismiss"; // 当前队列归属的队列名称 $job_queue_name = "dismiss_job_queue"; // 当前任务所需的业务数据 $job_data = ["ts"=>time(), "bizid"=>uniqid(), "params"=>$event->data]; // 将任务推送到消息队列等待对应的消费者去执行 $is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name); // 可以调用 finish 方法通知其他事件类,通知当前异步任务已经完成了(非必须调用) // 参数 $event 是 Swoole\Server\Task 类的一个对象 可以调用 finish 方法触发 task 任务的 onFinish 事件 $event->finish($event->data); return; } } ~~~ **二、创建消费者类 Dismiss.php 的逻辑代码** ~~~ <?php // +---------------------------------------------------------------------- // | najing [ 通用后台管理系统 ] // +---------------------------------------------------------------------- // | Copyright (c) 2020 http://www.najingquan.com All rights reserved. // +---------------------------------------------------------------------- // | Author: 救火队队长 // +---------------------------------------------------------------------- namespace app\job; use think\facade\Log; use think\queue\Job; /** * 消费者类 * 用于处理队列中的任务 */ class Dismiss { /** * fire是消息队列默认调用的方法 * @param Job $job 当前的任务对象 * @param array|mixed $data 发布任务时自定义的数据 */ public function fire(Job $job, $data) { //有效消息到达消费者时可能已经不再需要执行了 if(!$this->checkJob($data)){ $job->delete(); return; } //执行业务处理 if($this->doJob($data)){ $job->delete();//任务执行成功后删除 Log::record("job has been down and deleted"); }else{ //检查任务重试次数 if($job->attempts() > 3){ Log::record("job has been retried more that 3 times"); $job->delete(); } } } /** * 消息在到达消费者时可能已经不需要执行了 * @param array|mixed $data 发布任务时自定义的数据 * @return boolean 任务执行的结果 */ private function checkJob($data) { $ts = $data["ts"]; $bizid = $data["bizid"]; $params = $data["params"]; return true; } /** * 根据消息中的数据进行实际的业务处理 */ private function doJob($data) { // 实际业务流程处理 var_dump($data); Log::record(json_encode($data)); return true; } } ~~~ **三、启动一个工作进程来处理消息队列** ~~~ php think queue:work --queue dismiss_job_queue ~~~ **四、浏览器访问注册方法** 发送短信的任务被异步推送到了队列里面,然后由消费队列正常消费: ![](https://img.kancloud.cn/f8/69/f8690b9be36a553bcf68d6e9b589b6ce_680x157.png) ![](https://img.kancloud.cn/bc/42/bc42f55cbd59ddd92d8b021c7399c855_572x270.png)