企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
大部分时候我们项目的业务逻辑是同步阻塞运行的,但是有些时候会遇到一些耗时较大的操作,比如向十万个用户群发通知邮件,我们的程序不可能等待十万次循环后再执行其他操作,这种时候我们会采用异步操作,由 worker 进程向 task 进程发送任务,task 进程处理完全部任务之后通过onFinish回调函数通知 worker 进程。例如,我们在后台操作向十万个用户群发通知邮件,操作完成后操作的状态显示为发送中,这时我们可以继续其他操作。等邮件群发完毕后,操作的状态自动改为已发送。 接下来我们演示使用 ThinkPHP5 和 Swoole 的 task 模块来演示异步多任务处理。 创建 ThinkPHP5 自定义命令行 1.创建命令行类 - 创建application/console/AsyncTask.php文件 ~~~ <?php namespace app\Console; use think\console\Command; use think\console\Input; use think\console\Output; class AsyncTask extends Command { protected $server; // 命令行配置函数 protected function configure() { // setName 设置命令行名称 // setDescription 设置命令行描述 $this->setName('task:start')->setDescription('Start Task Server!'); } // 设置命令返回信息 protected function execute(Input $input, Output $output) { $this->server = new \swoole_server('0.0.0.0', 9501); // server 运行前配置 $this->server->set([ 'worker_num' => 4, 'daemonize' => false, 'task_worker_num' => 4 # task 进程数 ]); // 注册回调函数 $this->server->on('Start', [$this, 'onStart']); $this->server->on('Connect', [$this, 'onConnect']); $this->server->on('Receive', [$this, 'onReceive']); $this->server->on('Task', [$this, 'onTask']); $this->server->on('Finish', [$this, 'onFinish']); $this->server->on('Close', [$this, 'onClose']); $this->server->start(); } // 主进程启动时回调函数 public function onStart(\swoole_server $server) { echo "Start\n"; } // 建立连接时回调函数 public function onConnect(\swoole_server $server, $fd, $from_id) { echo "Connect\n"; } // 收到信息时回调函数 public function onReceive(\swoole_server $server, $fd, $from_id, $data) { echo "message: {$data} form Client: {$fd} \n"; // 投递异步任务 $task_id = $server->task($data); echo "Dispath AsyncTask: id={$task_id}\n"; // 将受到的客户端消息再返回给客户端 $server->send($fd, "Message form Server: {$data}, task_id: {$task_id}"); } // 异步任务处理函数 public function onTask(\swoole_server $server, $task_id, $from_id, $data) { echo "{$task_id}, Task Completed \n"; //返回任务执行的结果 $server->finish("$data -> OK"); } // 异步任务完成通知 Worker 进程函数 public function onFinish(\swoole_server $server, $task_id, $data) { echo "AsyncTask[{$task_id}] Finish: {$data} \n"; } // 关闭连时回调函数 public function onClose(\swoole_server $server, $fd, $from_id) { echo "Close\n"; } } ~~~ 2.修改配置文件 - 文件所在 application/command.php ~~~ <?php return [ 'app\console\AsyncTask', ]; ~~~ 接下来就可以通过命令行来启动异步多任务处理 $ > `php think task:start` 这个时候我们使用 telnet 进行测试 `telnet 127.0.0.1 9501` 连接上服务器后,输入hello回车,发送消息给 TCP 服务器,将受到一下回执 ![](https://box.kancloud.cn/60b9b89450bfdc005101aaf645694153_706x292.png) 同时服务器端会显示出异步任务操作的具体流程 ![](https://box.kancloud.cn/5d0a78abd6fb4a17102537152520d66b_491x129.png) 当 TCP 服务器启动时会自动输出Start,当客户端连接上服务器的时候自动输出Connect,这时客户端向服务器发送hello,服务器接受到数据之后先显示出message: hello form Client: 1,意思是收到来自客户端id 为1的数据hello,同时把数据提交给异步任务,显示出Dispath AsyncTask: id=0,异步任务处理函数收到任务时执行任务,显示出0, Task Completed,表示任务已经完成,同时使用onFinish函数通知Worker进程已经完成异步任务