企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# 自定义进程 自定义进程的使用非常灵活,只需要继承 GoProcess 类,并在配置文件中注册即可。 下面用一个 redis 队列的 demo 进行说明。 ~~~ <?php namespace app\Process; use ESD\Core\Message\Message; use ESD\Core\Server\Process\Process; use ESD\Coroutine\Co; use ESD\Go\GoProcess; use ESD\Plugins\Redis\RedisConfig; use ESD\Plugins\Redis\RedisOneConfig; /** * Created by PhpStorm. * User: anythink * Date: 2019/6/11 * Time: 2:08 PM */ class QueueTask extends GoProcess { use GetLogger; /** * @var RedisConfig */ private $_configClass; /** * @var RedisOneConfig */ private $config; /** * @var \Redis */ protected $redis; public function loadConfig($default = 'default') { $this->_configClass = DIGet(RedisConfig::class); $this->config = $this->_configClass->getRedisConfigs()[$default]; } public function onProcessStart() { $this->loadConfig(); $this->redis = new \Redis(); while(true){ $this->redis->connect($this->config->getHost(), $this->config->getPort()); if($this->config->getPassword() != ''){ $this->redis->auth($this->config->getPassword()); } try{ while($val = $this->redis->brPop(['test'],0)){ goWithContext(function () use($val){ $this->process($val); }); } }catch (\RedisException $e){ $this->info('RedisException ' . $e->getMessage() .'#'. $e->getCode()); } //连接超时每隔一秒进行一次重试 Co::sleep(1); } } public function process($val){ Co::sleep(2); $this->info('process val' , $val); } public function onPipeMessage(Message $message, Process $fromProcess) { $res = $message->getData(); $this->debug('QueueTask onPipeMessage' . $res); } } ~~~ >[danger] 此类实例化了一个新的 redis 连接访问阻塞的 redis 队列,请勿直接使用 redis 连接池,否则会长时间占用连接,导致 worker 进程的 redis 可用连接变少。 在 process 方法中实现自身的业务逻辑。 由于process 方法使用了协程调度,所以在 process 方法内执行外部API请求注意以下2点: 1. api请求的 client 是否支持协程,比如使用 saber-plugin,或guzzehttp-saber。不支持协程的 client 将会转为同步模式,大幅度降低队列处理的QPS,此时需要多开几个自定义 process。 2. 使用协程需要注意 api 服务端的流控规则,请参考对应服务端的限流规则合理调整并发数。 # 注册自定义进程 在配置文件中添加如下配置: ~~~ esd: process: queue-0: name: queue-0 class_name: app\Process\QueueTask group_name: TaskGroup queue-1: name: queue-1 class_name: app\Process\QueueTask group_name: TaskGroup queue-2: name: queue-2 class_name: app\Process\QueueTask group_name: TaskGroup ~~~