ThinkSSL🔒 一键申购 5分钟快速签发 30天无理由退款 购买更放心 广告
# 并发及协程池 说这个章节之前,先看一下自定义进程章节的示例。 ## 自定义进程 自定义进程的使用非常灵活,只需要继承 GoProcess 类,并在配置文件中注册即可。 下面用一个 redis 队列的 demo 进行说明。 ~~~ <?php namespace app\Process; use ESD\Core\Message\Message; use ESD\Core\Server\Process\Process; use ESD\Coroutine\Co; use ESD\Go\GoProcess; use ESD\Plugins\Redis\RedisConfig; use ESD\Plugins\Redis\RedisOneConfig; /** * Created by PhpStorm. * User: anythink * Date: 2019/6/11 * Time: 2:08 PM */ class QueueTask extends GoProcess { use GetLogger; /** * @var RedisConfig */ private $_configClass; /** * @var RedisOneConfig */ private $config; /** * @var \Redis */ protected $redis; public function loadConfig($default = 'default') { $this->_configClass = DIGet(RedisConfig::class); $this->config = $this->_configClass->getRedisConfigs()[$default]; } public function onProcessStart() { $this->loadConfig(); $this->redis = new \Redis(); while(true){ $this->redis->connect($this->config->getHost(), $this->config->getPort()); if($this->config->getPassword() != ''){ $this->redis->auth($this->config->getPassword()); } try{ while($val = $this->redis->brPop(['test'],0)){ goWithContext(function () use($val){ $this->process($val); }); } }catch (\RedisException $e){ $this->info('RedisException ' . $e->getMessage() .'#'. $e->getCode()); } //连接超时每隔一秒进行一次重试 Co::sleep(1); } } public function process($val){ Co::sleep(2); $this->info('process val' , $val); } public function onPipeMessage(Message $message, Process $fromProcess) { $res = $message->getData(); $this->debug('QueueTask onPipeMessage' . $res); } } ~~~ 通过观察代码,我们发现消费redis队列的方法 QueueTask::process 是在 goWithContext 的协程内执行的。 >[danger] ❓那么问题来了:消费方法模拟了需要耗时2秒才能处理完毕。那么如果有队列有10条消息,请问10条消息需要多久才能消费完? # 并发 带着上面的问题,接着来说说并发。答案是,20秒吗❓其实仅需2秒。 当我们的执行代码包裹在 goWithContext 里,那么该方法就会变成并发执行。 再简单看一个例子 ~~~ goWithContext(){ Co::sleep(2) }); goWithContext(){ Co::sleep(2) }); ~~~ 以上代码,如果不支持协程的话需要4秒执行完毕,如果使用ESD框架,那么2段代码会并行执行,只需2秒。如果让10条消息在2秒内处理完,那么在传统的框架下,需要开10个worker并行处理。试想下,如果有100条,甚至1000条消息呢,你的服务器开得起这么多worker吗🙂 # 💡上限 上面所提到的并发,实际上是应用了协程的自动切换机制,此处不做过多的扩展,使用这种特性会受到 esd.server.max_coroutine 配置的限制,默认为3000,也就是说最多可以创建3000个协程。 # 💡 协程池 由于框架的worker 也同样受到 esd.server.max_coroutine 配置的限制,所以如果像上面的例子无脑使用,在高并发下就会出现如下问题 ~~~ Warning: go(): exceed max number of coroutine 3000 in /data/vendor/esd/esd-core/src/Core/Common.php on line 109 Warning: go(): exceed max number of coroutine 3000 in /data/vendor/esd/esd-core/src/Core/Common.php on line 109 ~~~ 你的 request 可能无法继续处理请求了。当然可以提高max_coroutine的配置,但是终归不是保险的方案。 那么使用协程池来限制最大并发数,就是一个更优秀的选择。 看一下例子,还是上面的代码,这里只摘出修改的部分 ~~~ public function onProcessStart() { $this->loadConfig(); $this->redis = new \Redis(); $pool = CoPoolFactory::createCoPool('queue_co-' . $this->getProcessId(), 2, 10, 5); $pool->preStartAllCoreThreads(); while (true) { $this->redis->connect($this->config->getHost(), $this->config->getPort()); if ($this->config->getPassword() != '') { $this->redis->auth($this->config->getPassword()); } try { while ($val = $this->redis->brPop(['test'], 0)) { $pool->execute(function() use ($val){ $this->process($val); }); } } catch (\RedisException $e) { $this->info('RedisException ' . $e->getMessage() . '#' . $e->getCode()); } Co::sleep(1); } } ~~~ 通过 CoPoolFactory::createCoPool 创建一个协程池,以下代码创建了最低2个协程,最多10个协程的连接池来控制并发。注意协程池的名称不要重复。 ~~~ $pool = CoPoolFactory::createCoPool('queue_co-' . $this->getProcessId(), 2, 10, 5); $pool->preStartAllCoreThreads(); ~~~ 然后使用 $pool->execute 将需要执行的代码通过匿名函数进行传递。 ~~~ $pool->execute(function() use ($val){ $this->process($val); }); ~~~ 此时超过协程池最大数量的请求就会被阻塞,直到协程空闲。通过此手段,我们就实现了一个可以控制并发数的消费队列。 ## 实际场景 如果队列消费是请求三方接口,就可以根据其限流规则合理规划协程池的数量,保证不会因为并发过大被警告。 # 带有返回的并发执行 在需要执行并发的控制器中使用 new Runnable 创建类,在构造函数中传递一个需要并发执行代码的匿名函数。 使用 CoroutineExecutor::getInstance()->execute() 执行。 使用 $ret_1->getResult() 获取返回的数据。 ~~~ /** * @GetMapping() * @return string */ public function gorun(){ $ret_1 = new Runnable(function (){ Co::sleep(2); return Co::getCid(); },true); $ret_2 = new Runnable(function (){ Co::sleep(2); return Co::getCid(); },true); CoroutineExecutor::getInstance()->execute($ret_1); CoroutineExecutor::getInstance()->execute($ret_2); $data = [ 'ret1' => $ret_1->getResult(), 'ret2' => $ret_2->getResult(), ]; return $this->successResponse($data); } ~~~