继承vendor\topthink\think-queue\src\queue\Connector.php;
然后实现了相应的抽象方法;
```
<?php
namespace think\queue\connector;
use Exception;
use think\helper\Str;
use think\queue\Connector;
use think\queue\job\Redis as RedisJob;
class Redis extends Connector
{
/** @var \Redis */
protected $redis;
protected $options = [
'expire' => 60,
'default' => 'default', //默认的队列名称
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false
];
public function __construct($options)
{
if (!extension_loaded('redis')) {
throw new Exception('redis扩展未安装');
}
if (!empty($options)) {//用application\extra\queue.php里面的配置覆盖默认配置
$this->options = array_merge($this->options, $options);
}
//判断配置中是否使用redis链接复用,使用长链接可以提高速度
$func = $this->options['persistent'] ? 'pconnect' : 'connect';
$this->redis = new \Redis;
$this->redis->$func($this->options['host'], $this->options['port'], $this->options['timeout']);
if ('' != $this->options['password']) {
$this->redis->auth($this->options['password']);
}
if (0 != $this->options['select']) {
$this->redis->select($this->options['select']);
}
}
//插入任务 ,立刻执行
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
//插入任务,延迟$delay秒执行
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
//Zadd 命令用于将一个或多个成员元素及其分数值加入到有序集当中
$this->redis->zAdd($this->getQueue($queue) . ':delayed', time() + $delay, $payload);
}
public function pop($queue = null)
{
$original = $queue ?: $this->options['default'];
$queue = $this->getQueue($queue);
$this->migrateExpiredJobs($queue . ':delayed', $queue, false);
if (!is_null($this->options['expire'])) {
$this->migrateExpiredJobs($queue . ':reserved', $queue);
}
$job = $this->redis->lPop($queue);
if ($job !== false) {
$this->redis->zAdd($queue . ':reserved', time() + $this->options['expire'], $job);
return new RedisJob($this, $job, $original);
}
}
/**
* 重新发布任务
*
* @param string $queue
* @param string $payload
* @param int $delay
* @param int $attempts
* @return void
*/
public function release($queue, $payload, $delay, $attempts)
{
$payload = $this->setMeta($payload, 'attempts', $attempts);
$this->redis->zAdd($this->getQueue($queue) . ':delayed', time() + $delay, $payload);
}
//正式插入任务
public function pushRaw($payload, $queue = null)
{
//Rpush 命令用于将一个或多个值插入到列表的尾部(最右边)
$this->redis->rPush($this->getQueue($queue), $payload);
return json_decode($payload, true)['id'];
}
//创建任务容器
protected function createPayload($job, $data = '', $queue = null)
{
//调用父类的创建任务容器函数,然后附带上一个随机id
$payload = $this->setMeta(
parent::createPayload($job, $data), 'id', $this->getRandomId()
);
return $this->setMeta($payload, 'attempts', 1);
}
/**
* 删除任务
*
* @param string $queue
* @param string $job
* @return void
*/
public function deleteReserved($queue, $job)
{
$this->redis->zRem($this->getQueue($queue) . ':reserved', $job);
}
/**
* 移动延迟任务
*
* @param string $from
* @param string $to
* @param bool $attempt
*/
public function migrateExpiredJobs($from, $to, $attempt = true)
{
//$this->redis->watch($from);
$jobs = $this->getExpiredJobs(
$from, $time = time()
);
if (count($jobs) > 0) {
$this->transaction(function () use ($from, $to, $time, $jobs, $attempt) {
$this->removeExpiredJobs($from, $time);
for ($i = 0; $i < count($jobs); $i += 100) {
$values = array_slice($jobs, $i, 100);
$this->pushExpiredJobsOntoNewQueue($to, $values, $attempt);
}
});
}
//$this->redis->unwatch();
}
/**
* redis事务
* @param \Closure $closure
*/
protected function transaction(\Closure $closure)
{
$this->redis->multi();
try {
call_user_func($closure);
if (!$this->redis->exec()) {
$this->redis->discard();
}
} catch (Exception $e) {
$this->redis->discard();
}
}
/**
* 获取所有到期任务
*
* @param string $from
* @param int $time
* @return array
*/
protected function getExpiredJobs($from, $time)
{
//Zrangebyscore 返回有序集合中指定分数区间的成员列表。有序集成员按分数值递增(从小到大)次序排列。
//-inf:负无限,这里的意思是 获取 from里面排序 小于 $time的全部数据,一把这个$time就是当前时间戳,也就是小于当前时间的,过期的
return $this->redis->zRangeByScore($from, '-inf', $time);
}
/**
* 删除过期任务
*
* @param string $from
* @param int $time
* @return void
*/
protected function removeExpiredJobs($from, $time)
{
//Zremrangebyscore 命令用于移除有序集中,指定分数(score)区间内的所有成员
//移除$time小于当前时间的,已经过期的数据
$this->redis->zRemRangeByScore($from, '-inf', $time);
}
/**
* 重新发布到期任务
*
* @param string $to
* @param array $jobs
* @param boolean $attempt
*/
protected function pushExpiredJobsOntoNewQueue($to, $jobs, $attempt = true)
{
if ($attempt) {
foreach ($jobs as &$job) {
$attempts = json_decode($job, true)['attempts'];
$job = $this->setMeta($job, 'attempts', $attempts + 1);
}
}
$this->redis->rPush($to, ...$jobs);
// call_user_func_array([$this->redis, 'rPush'], array_merge([$to], $jobs));
}
/**
* 随机id
*
* @return string
*/
protected function getRandomId()
{
return Str::random(32);
}
/**
* 获取队列名
*
* @param string|null $queue
* @return string
*/
protected function getQueue($queue)
{
//组装出完整的 目录 路径,例:queues:shopro , queues:default
return 'queues:' . ($queue ?: $this->options['default']);
}
}
```
- FA的JS调用机制说明
- FA的JS之Fast.api逐个详解
- FA页面渲染时后端传递数据给前端的方式
- FA的ajax查询数据的前后台流程
- FA特有的函数解释
- FA的鉴权Auth类
- extend\fast\Auth.php详解
- application\admin\library\Auth.php详解
- application\common\library\Auth.php详解
- FA的Token机制
- FA管理员(后台)的权限机制
- FA用户(前台和API)的权限机制
- FA在前台模板文件中进行鉴权
- FA的登录页面
- TP类Hook:钩子机制
- TP类Lang:多语言机制
- TP类Config:参数配置机制
- TP类Request:请求类
- TP的模型关联详解
- think-queue队列组件
- Queue.php
- \queue\Connector.php
- \queue\connector\Redis.php
- \queue\Job.php
- queue\job\Redis.php
- PHP规则:正则表达式
- PHP规则:闭包与匿名函数
- 项目架构说明
- 代码架构
- TP数据库where条件的各种写法
