# Beanstalkd任务队列 > 高性能离不开异步,异步离不开队列。 ## Beanstalkd是什么 Beanstalkd 是一个轻量级消息中间件,它最大特点是将自己定位为基于管道 (tube) 和任务 (job) 的工作队列。 Beanstalkd 支持任务优先级 (priority), 延时 (delay), 超时重发 (time-to-run) 和预留 (buried), 能够很好的支持分布式的后台任务和定时任务处理。它的内部采用libevent,服务器-客户端之间采用类似Memcached的轻量级通讯协议,因此性能很高(enque: 9000 jobs/second, worker: 5200 jobs/second)。 尽管是内存队列, Beanstalkd 提供了 binlog 机制, 当重启 beanstalkd 时,当前任务状态能够从纪录的本地 binlog 中恢复。Beanstalkd支持过有9.5 million用户的Facebook Causes应用。后来开源,现在有PostRank大规模部署和使用,每天处理百万级任务。Beanstalkd是典型的类Memcached设计,协议和使用方式都是同样的风格,所以使用过Memcached的用户会觉得Beanstalkd似曾相识。 Beanstalkd支持的语言有很多,可以参考这里:https://github.com/kr/beanstalkd/wiki/client-libraries ## Beanstalkd设计的核心概念 ### job 一个需要异步处理的任务,是Beanstalkd中的基本单元,需要放在一个tube中。 ### tube 一个有名的任务队列,用来存储统一类型的job,是producer和consumer操作的对象。 ### producer Job的生产者,通过put命令来将一个job放到一个tube中。 ### consumer Job的消费者,通过`reserve/release/bury/delete`命令来获取job或改变job的状态。 当producer直接put一个job时,job就处于READY状态,等待consumer来处理,如果选择延迟put,job就先到DELAYED状态,等待时间过后才迁移到READY状态。 consumer获取了当前READY的job后,该job的状态就迁移到RESERVED,这样其他的consumer就不能再操作该job。 当consumer完成该job后,可以选择delete, release或者bury操作: * delete操作,job从系统消亡,之后不能再获取; * release操作,可以重新把该job状态迁移回READY(也可以延迟该状态迁移操作),使其他的consumer可以继续获取和执行该job; * bury操作,可以把该job休眠,等到需要的时候,再将休眠的job kick回READY状态,也可以delete buride状态的job。 ![](https://box.kancloud.cn/2ee937085397575f7a28d2be61ca7e93_600x254.png) #### 任务优先级 (priority): 任务 (job) 可以有 0~2^32 个优先级, 0 代表最高优先级。 beanstalkd 采用最大最小堆 (Min-max heap) 处理任务优先级排序, 任何时刻调用 reserve 命令的消费者总是能拿到当前优先级最高的任务, 时间复杂度为 O(logn). #### 延时任务 (delay): 有两种方式可以延时执行任务 (job): 生产者发布任务时指定延时;或者当任务处理完毕后, 消费者再次将任务放入队列延时执行 (RELEASE with <delay>)。这种机制可以实现分布式的 Java.util.Timer,这种分布式定时任务的优势是:如果某个消费者节点故障,任务超时重发 (time-to-run) 能够保证任务转移到另外的节点执行。 #### 任务超时重发 (time-to-run): Beanstalkd 把任务返回给消费者以后:消费者必须在预设的 TTR (time-to-run) 时间内发送 delete / release/ bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,然后把任务交给另外的消费者节点执行。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从系统时间重新计算 TTR (time-to-run). #### 任务预留 (buried): 如果任务因为某些原因无法执行, 消费者可以把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员可以通过 peek buried 命令查询被保留的任务,并且进行人工干预。简单的, kick <n> 能够一次性把 n 条被保留的任务踢回队列。 ### Beanstalkd 协议 Beanstalkd 采用类 memcached 协议, 客户端通过文本命令与服务器交互。 这些命令可以简单的分成三组: 1. 生产类 - use <tube> / put <priority> <delay> <ttr> [bytes]: 2. 生产者用 use 选择一个管道 (tube), 然后用 put 命令向管道发布任务 (job). 3. 消费类 - watch <tubes> / reserve / delete <id> / release <id> <priority> <delay> / bury <id> / touch <id> 消费者用 watch 选择多个管道 (tube), 然后用 reserve 命令获取待执行的任务,这个命令是阻塞的。客户端直到有任务可执行才返回。当任务处理完毕后, 消费者可以彻底删除任务 (DELETE), 释放任务让别人处理 (RELEASE), 或者保留 (BURY) 任务。 维护类 - peek job / peek delayed / peek ready / peek buried / kick <n> 用于维护管道内的任务状态, 在不改变任务状态的条件下获取任务。可以用消费类命令改变这些任务的状态。 被保留 (buried) 的任务可以用 kick 命令 "踢" 回队列。 > 协议文档: https://raw.github.com/kr/beanstalkd/master/doc/protocol.txt   正是有这些有趣的操作和状态,才可以基于此做出很多意思的应用,比如要实现一个循环队列,就可以将RESERVED状态的job休眠掉,等没有READY状态的job时再将BURIED状态的job一次性kick回READY状态。 ## Beanstalkd有什么不足?   Beanstalkd 没有提供主备同步 + 故障切换机制, 在应用中有成为单点的风险。实际应用中,可以用数据库为任务 (job) 提供持久化存储。 和 Memcached 类似, Beanstalkd 依赖 libevent 的单线程事件分发机制, 不能有效利用多核 cpu 的性能。这一点可以通过单机部署多个实例克服。 ![](https://box.kancloud.cn/8e4eb7ee2aac45fbb31490c131b5571b_544x327.png)   一个Beanstalkd尚无提供删除一个tube的操作,只能将tube的job依次删除,并让Beanstalkd来自动删除空tube。还有就是Beanstalkd不支持客户端认证机制(开发者将应用场景定位在局域网)。   Beanstalk速度非常快,协议简单,占用内存空间少,而且支持持久化。唯一的不足是挂了之后恢复慢,3G日志数据恢复了十多分钟。如何安装Beanstalkd? ## 安装   使用下面的命令进行安装,同时查看版本: sudo apt-get install beanstalkd beanstalkd -v beanstalkd 1.9   Beanstalkd可以使用以下命令停止和启动: root@ubuntu-vagrant:/usr/local/nginx/conf# service beanstalkd stop * Stopping in-memory queueing server beanstalkd [ OK ] root@ubuntu-vagrant:/usr/local/nginx/conf# service beanstalkd start * Starting in-memory queueing server beanstalkd root@ubuntu-vagrant:/usr/local/nginx/conf# service beanstalkd start 通过apt-get安装后的配置文件目录在/etc/default/beanstalkd,里面描述了Beanstalkd监听的地址和端口: root@ubuntu-vagrant:/usr/local/nginx/conf# cat /etc/default/beanstalkd ## Defaults for the beanstalkd init script, /etc/init.d/beanstalkd on ## Debian systems. BEANSTALKD_LISTEN_ADDR=127.0.0.1 BEANSTALKD_LISTEN_PORT=11300 # You can use BEANSTALKD_EXTRA to pass additional options. See beanstalkd(1) # for a list of the available options. Uncomment the following line for # persistent job storage. # BEANSTALKD_EXTRA="-b /var/lib/beanstalkd" ## 使用composer组件来管理它 [GitHub - davidpersson/beanstalk: Minimalistic PHP client for beanstalkd.](https://github.com/davidpersson/beanstalk)我们可以使用这个组件来帮助操作beanstalkd。 ### 安装composer依赖组件 此处我用composer没有拉下来,换了种方式直接git clone。 git clone https://github.com/davidpersson/beanstalk.git ### 代码实现 <?php use Beanstalk\Client; // // A sample producer. // $beanstalk = new Client(); // For connection options see the // class documentation. $beanstalk->connect(); $beanstalk->useTube('flux'); // Begin to use tube `'flux'`. $beanstalk->put( 23, // Give the job a priority of 23. 0, // Do not wait to put job into the ready queue. 60, // Give the job 1 minute to run. '/path/to/cat-image.png' // The job's body. ); $beanstalk->disconnect(); // // A sample consumer. // $beanstalk = new Client(); $beanstalk->connect(); $beanstalk->watch('flux'); while (true) { $job = $beanstalk->reserve(); // Block until job is available. // Now $job is an array which contains its ID and body: // ['id' => 123, 'body' => '/path/to/cat-image.png'] // Processing of the job... $result = touch($job['body']); if ($result) { $beanstalk->delete($job['id']); } else { $beanstalk->bury($job['id']); } } // When exiting i.e. on critical error conditions // you may also want to disconnect the consumer. // $beanstalk->disconnect(); ## 控制台 为了可以可视化的在网页上查看消息队列的运行情况,我推荐使用控制台。 [ptrofimov/beanstalk_console - Packagist](https://packagist.org/packages/ptrofimov/beanstalk_console)我们使用这个项目作为消息队列的控制台,直观的查看队列任务的执行。 安装console项目: composer create-project ptrofimov/beanstalk_console 然后给他配置虚拟站点,就可以进入控制台查看beanstalkd状态了。 ![](https://box.kancloud.cn/e4128681a0bc929742def2074a661940_1996x802.png) 此处因为使用lnmp一键脚本搭建环境,踩了几个坑。有时间和大家分享下。