依赖
`composer require php-amqplib/php-amqplib`
```
<?php
namespace app\common\tool;
use Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
/**
* RabbitMQ 客户端
*/
class Rabbit {
private static $_instance = null;
# 连接资源
private $connect = null;
# 通道资源
private $channel = null;
# 连接配置
private $config = [];
/**
* 构造函数
*
* @param string $connectName 连接名称
*/
private function __construct($connectName){
# 对象是否已经存储了连接,没有则连接并存储
if(!isset($this->connect)){
$config = $this->config;
if(empty($config)){
$config = config('queue.'.$connectName);
if(!$config){
throw new Exception('config\queue.php文件中,没有配置'.$connectName, -1);
}
$this->config = $config;
}
$this->connect = new AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password']);
$this->channel = $this->connect->channel();
}
}
/**
* 每一个连接配置生成一个单例
*
* @param string $connectName
* @return void
*/
public static function instance($connectName = 'rabbit'){
if(is_null(self::$_instance)){
self::$_instance = new self($connectName);
}
return self::$_instance;
}
/**
* 仅用来查看编辑器提示mq相关方法的参数意思
*
* @return void
*/
private static function document(){
$conf = [
'host'=> '192.168.7.236',
'port'=> '15672',
'username'=> 'guest',
'password'=> ':guest~',
];
# 创建连接
$connection = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['username'], $conf['password']);
# 创建通道
$channel = $connection->channel();
/***************** 生产者写入队列开始 *******************/
# 声明交换机,声明则需要绑定队列;可以不声明,则默认default交换机。
# 交换机相当于是邮箱,可以声明多个邮箱,队列相当于信件,可以一信多投。
# 因为可能一个队列信息,多个子系统都需要单独处理。有点像广播
$exchangeName = 'default';
$channel->exchange_declare($exchangeName, 'direct', false, true, false);
# 必须声明队列,通常一个数据队列定义一个名词,里面的数据结构都是一致,队列模式必须指定持久化,否则服务器重启,队列会丢失
$queueName = 'queueName';
$channel->queue_declare($queueName, false, true, false, false);
# 将队列与某个交换机进行绑定,并使用路由关键字.个人理解路由关键字更像是队列别名,不一定准确,默认空字符串则使用队列名替代。
$routingKey = '';
$channel->queue_bind($queueName, $exchangeName, $routingKey);
# 必须声明一个消息体,且为字符串类型,具体的格式,可以对数据采用各种需要的encode,投递模式必须指定持久化,否则服务器重启,队列消息会丢失
# 第二个消息状态配置,还包含有 content_type=> text/plain【默认】 correlation_id=> 自定义唯一id【比如, uniqid('rmq', true)】correlation_id 会在消费确认里用到
$msg = new AMQPMessage($msgBody, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
# 推送入队列
$channel->basic_publish($msg, $exchangeName, $routingKey);
/***************** 生产者写入队列结束 *******************/
/***************** 消费者读取队列开始 *******************/
# 必须声明队列,通常一个数据队列定义一个名词,里面的数据结构都是一致,队列模式必须指定持久化,否则服务器重启,队列会丢失
$channel->queue_declare($queueName, false, true, false, false);
# 声明消息的回调处理与哪个队列绑定
$callback = function($msg){
# 处理队列信息的逻辑
$queueData = $msg->body;
# 业务逻辑处理
$businessDone = true;
# 处理成功要手动确认下消息,告知队列已处理完,可以清理了。如果未声明自动确认删除模式,也没有其他消费者处理,则永远获取同一条。明显这是不符合的
# 所以在未声明自动确认删除模式下,一定要手动确认,最好是有个重试逻辑,重试多少次后,将队列消息推到另外的队列中,当作异常保留,然后在本队列进行确认删除,避免阻塞
if($businessDone==true){
# 如果没有在basic_publish的最后参数为消息指定一个唯一ID,则rabbit会默认生成一个唯一标识delivery_tag
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
};
# 设置限流,
# 参数一,限制消息大小,0代表不限制
# 参数二,限制允许unack的最大消息数,
# 参数三,限制对象,true对整个channel限制,false对当前消费者限制
$channel->basic_qos(0, 1, false);
# 声明消费队列绑定的回调逻辑
$channel->basic_consume($queueName, '', true, false, false, false, $callback);
# 持久检测队列,看是否有数据心跳,如果只是单次获取,直接$channel->wait();即可
while($channel->is_consuming()) {
# 执行消费回调,PHP应该是没有回溯执行的,猜测wait方法内部的实现是,通过声明的队列参数获取消息,然后传递给前面声明的回调方法进行执行
$channel->wait();
}
/***************** 消费者读取队列开始 *******************/
/***************** 心跳检测开始 *******************/
$connection->checkHeartBeat();
/***************** 心跳检测开始 *******************/
}
public function checkHeartBeat(){
try{
$this->connect->checkHeartBeat();
}catch(Exception $e){
$this->connect();
}
}
/**
* 队列设置:有配置读配置,无配置读设定
*
* @param [type] $queueName
* @param [type] $routingKey
* @param [type] $exchangeName
* @return void
*/
protected function queueSet($queueName, $routingKey, $exchangeName){
$config = $this->config;
if(isset($config['queue'][$queueName])){
$queueConf = $config['queue'][$queueName];
if(isset($queueConf['exchangeName'])){
$exchangeName = $queueConf['exchangeName'];
}
if(isset($queueConf['routingKey'])){
$routingKey = $queueConf['routingKey'];
}
}
# 声明交换机属性,持久化
$this->channel->exchange_declare($exchangeName, 'direct', false, true, false);
# 声明队列属性,持久化
$this->channel->queue_declare($queueName, false, true, false, false);
# 绑定交换机,队列,以及路由键
$this->channel->queue_bind($queueName, $exchangeName, $routingKey);
}
/**
* 生产队列复杂版:相同exchangeName和routingKey绑定的任何队列之一投递任务,会导致所有的绑定的队列都被投递
*
* @param string $msgBody 队列消息,我们要传输的数据
* @param string $queueName 队列名称
* @param string $routingKey 路由关键字,不指定该参数,会往所有的相同的exchangeName里的所有队列都投递队列消息。适用于一条数据走多个任务
* @param string $exchangeName 交换机 【传任务所在的平台,理解为命名空间也可】
* @return void
*/
public function produce($msgBody='', $queueName='default', $routingKey='', $exchangeName='default'){
// $this->queueSet($queueName, $routingKey, $exchangeName);
# 声明一个消息体属性,持久化
$msg = new AMQPMessage($msgBody, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
# 推送入队列
$this->channel->basic_publish($msg);
# 记得在调用层,显示调用【 $this->close() 】关闭连接,因为可能涉及循环推送队列,反复连接和关闭浪费资源,所以这里不主动关闭
# $this->close();
return true;
}
/**
* 简化版,仅使用默认的交换机投递队列
*
* @param string $msgBody
* @param string $queueName
* @return void
*/
public function easyProduce($msgBody='', $queueName='default'){
# 声明队列属性,持久化
$this->channel->queue_declare($queueName, false, true, false, false);
# 定义一个消息,格式化
$msg = new AMQPMessage($msgBody, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
# 这里测试publish的返回是null 后面需要考虑加确认机制 目前是稳定可以推送
$pulishResult = $this->channel->basic_publish($msg, '', $queueName);
# 记得在调用层,显示调用【 $this->close() 】关闭连接,因为可能涉及循环推送队列,反复连接和关闭浪费资源,所以这里不主动关闭
# $this->close();
return true;
}
/**
* 取出指定队列的指定条数消息
*
* @param string $queueName 队列名词
* @param integer $msgCount 消息条数
* @param bool $close 获取数据后是否关闭连接
* @return void
*/
public function consumeData($queueName='default', $msgCount=5, $close=false, $routingKey='', $exchangeName='default'){
# 队列设置
$this->queueSet($queueName, $routingKey, $exchangeName);
$data = [];
$callback = function($msg) use (&$data) {
$rawData = $msg->body;
$data[] = [
'delivery_tag'=> $msg->delivery_info['delivery_tag'],
'rawData'=> $rawData
];
# 手动确认删除
// $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
# 限流
$this->channel->basic_qos(0, $msgCount, false);
# 定义消费队列,并指定回调处理
$this->channel->basic_consume($queueName, '', true, false, false, false, $callback);
# 触发回调
for($i=0; $i<$msgCount; $i++){
$this->channel->wait();
}
if($close){
$this->close();
}
return $data;
}
/**
* 手动确认删除指定的消息
*
* @param string $delivery_tag 队列消息的唯一标识
* @return void
*/
public function ack($delivery_tag){
$this->channel->basic_ack($delivery_tag);
}
/**
* 消费队列
*
* @param [function] $callback 函数,只有一个参数$msg,就是消息体.也可以写匿名函数,函数体内是消费者实现。如果消费任务完成,需要在函数体内调用:
* $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 用以通知MQ已完成消费,则队列消息会被删除
*
* @param string $queueName 队列名称
* @param string $connectName 连接名称
* @param integer $unackLimit 限流:最多允许unack的消息数量,达到上限则队列不再继续获取消息处理,默认0不限制
* @param bool $a_global 限流:是对整个channel影响,还是只影响当前消费者
* @return void
*/
public function consume($callback, $queueName='default', $unackLimit=0, $a_global=false, $routingKey='', $exchangeName='default'){
# 队列设置
$this->queueSet($queueName, $routingKey, $exchangeName);
# 限流
if($unackLimit > 0){
$this->channel->basic_qos(0, $unackLimit, $a_global);
}
//在接收消息的时候调用$callback函数
$this->channel->basic_consume($queueName, '', true, false, false, false, $callback);
while($this->channel->is_consuming()) {
$this->channel->wait();
}
}
/**
* 简化版消费队列,使用默认的交换机
*
* @param [function] $callback 函数,只有一个参数$msg,就是消息体.也可以写匿名函数,函数体内是消费者实现。如果消费任务完成,需要在函数体内调用:
* $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 用以通知MQ已完成消费,则队列消息会被删除
*
* @param string $queueName 队列名称
* @param string $connectName 连接名称
* @param integer $unackLimit 限流:最多允许unack的消息数量,达到上限则队列不再继续获取消息处理,默认0不限制
* @param bool $a_global 限流:是对整个channel影响,还是只影响当前消费者
* @return void
*/
public function easyConsume($callback, $queueName='default', $unackLimit=0, $a_global=false){
# 声明队列属性,持久化
$this->channel->queue_declare($queueName, false, true, false, false);
# 限流
if($unackLimit > 0){
$this->channel->basic_qos(0, $unackLimit, $a_global);
}
//在接收消息的时候调用$callback函数
$this->channel->basic_consume($queueName, '', true, false, false, false, $callback);
while($this->channel->is_consuming()) {
$this->channel->wait();
}
}
/**
* 消费队列
*
* @param [function] $callback 函数,只有一个参数$msg,就是消息体.也可以写匿名函数,函数体内是消费者实现。如果消费任务完成,需要在函数体内调用:
* $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 用以通知MQ已完成消费,则队列消息会被删除
*
* @param string $queueName 队列名称
* @return void
*/
public function consumeOne($callback, $queueName='default', $routingKey='', $exchangeName='default'){
$this->queueSet($queueName, $routingKey, $exchangeName);
//在接收消息的时候调用$callback函数
$this->channel->basic_consume($queueName, '', true, false, false, false, $callback);
$this->channel->wait();
}
/**
* 关闭通道
*
* @return void
*/
private function closeChannel(){
$this->channel->close();
$this->channel = null;
}
/**
* 关闭连接
*
* @return void
*/
private function closeConnect(){
$this->connect->close();
$this->connect = null;
}
/**
* 关闭所有的连接和通道
*
* @return void
*/
public function close(){
$this->closeChannel();
$this->closeConnect();
}
/**
* 连接rabbit
*
* @return void
*/
public function connect(){
if(empty($this->config)){
throw new Exception("配置信息丟失", -1);
return false;
}
$config = $this->config;
if(is_null($this->connect)){
$this->connect = new AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password']);
}
if(is_null($this->channel)){
$this->channel = $this->connect->channel();
}
return $this;
}
/*
# produce demo
$queueName = 'test';
$msg = json_encode(['queueName'=> $queueName]);
$rabbit = \App\Http\Controllers\Common\RabbitMQ::instance('rabbit');
$result = $rabbit->easyProduce($msg, $queueName);
$rabbit->close();
*/
/*
# consumer demo
$rabbit = \app\common\tool\Rabbit::instance('rabbit');
$rabbit->consumeDemo($queueName);
$rabbit->consume(function($msg){
$info = " [x] Received ". $msg->body. "\n";
echo $info;
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}, $queueName);
$rabbit->close();
*/
}
```
- 常见功能
- 第三方授权登录
- 邮件发送
- 简易聊天室
- 获取各国汇率
- PHP获取服务器硬件指标
- 数据上报之
- web开发
- 开发规范
- 前端
- 踩坑
- 将footer固定在底部
- bootstrap
- Metronic
- 用到的jquery插件
- bootstrap-hover-dropdown
- jquery.slimscroll
- jquery.blockui
- bootstrap-switch
- js.cookie
- moment
- bootstrap-daterangepicker
- morris
- raphael
- jquery.waypoints
- jquery.counterup
- select2
- 取值和设置默认值
- vue
- axios
- 浏览器
- 谷歌浏览器
- 谷歌插件
- layui
- layui-表格
- layui-表单
- layui-弹窗
- layui-分页
- 后端
- 操作系统
- linux
- 用户管理
- 文件管理
- 目录管理
- 压缩和解压缩
- 进程查看
- 端口查看
- 开机自启动服务
- 定时任务
- shell脚本
- 杀掉运行超过指定时长指定服务的进程
- 获取服务器使用状态
- bash-shell连接socket
- 自定义快捷命令
- centos-踩坑
- 防火墙
- 软件
- yum
- vim
- screen
- window
- 语言
- PHP
- 配置优化
- 框架
- thinkphp5.1+
- think命令行
- laravel6.+
- 维护模式
- 根据环境读取不同配置
- laravel6.+采坑
- laravel坑位
- 数据库事务
- 任务调度
- 文件权限问题
- 增强框架
- larvel:elastic-search
- 图形验证码
- laravel获取ip
- 函数
- strtotime
- 正则匹配
- 类
- 接口类与抽象类
- 类相关的关键字 - abstract
- 类相关的关键字 - interface
- PHP有关类的调用方式"->"与"::"的区别
- 扩展
- 问题归纳
- json_encode和json_decode
- 字符串的运算
- curl
- 优化php效率
- 数组相加合并与array_merge
- 时区转换
- 不常用特性
- php反射
- 包管理器-composer
- GuzzleHttp
- Python
- Go
- 数据库
- Redis
- 安装
- 本地化-数据备份
- php-redis操作
- Mysql
- mysql-命令集合
- 设置终端可访问
- 数据库设计
- 用户基础信息表
- 踩坑集合
- mysql-2002
- mysql-2054
- 优化策略
- mysql-密码验证插件
- 一些牛逼的sql查询
- topN
- 无限级分类
- Memcache
- MongoDb
- 安装mongo-server
- 安装php-mongodb扩展
- 在laravel中使用mongoDB
- 客户端软件
- Hbase
- Elasticsearch
- elastic-search
- restfulApi操作es
- web服务器
- 1.nginx
- 配置语法规则
- 配置详解
- rewrite规则
- request_filename
- 2.apache
- 功能设计
- 加密解密
- Base64
- 对亚马逊SKU加密
- 兼职项目中的加解密
- 腾讯外包时的加密
- 接口设计
- 接口限流设计
- 分库分表
- 遍历展示文件目录结构
- 时区换算
- 文件切割
- 解析xml字符串
- 项目
- 博客后台管理
- 亚马逊广告API
- 官方指引文档
- 开发人员中心
- 应用商店
- 第三方库
- 申请API邮件记录
- 亚马逊MWS
- 付款报告
- 乱码
- 亚马逊管理库存报告
- 报告
- 商品
- 入库
- 履行
- 出库
- 财务
- 订单
- 异步任务处理
- 集群如何同步代码
- 基本开发流程
- 文档管理
- showdoc
- 运行环境
- 开发环境
- vagrant
- windows上配置安装
- vagrant安装插件缓慢
- 更换ssh默认端口映射
- 设置x-shell密码登录
- 使用市场的box-homestead
- homestead-7: Box 'lc/homestead'
- 常见问题
- 虚拟环境reboot
- 突然无法使用
- phpStudy
- wamp
- 压测性能
- VPN
- vultr
- 凌空图床
- 宝塔
- 自动化部署
- 版本管理软件钩子
- 线上环境-LNMP
- centos7
- nginx
- mysql
- mysql开机自启
- mysql-更换默认端口
- datetime字段类型默认值
- php
- php扩展安装
- redis
- swoole
- gd
- BCMath
- igbinary
- zstd
- 包管理器:composer
- 优化性能
- nodejs
- 更新gcc版本
- 版本控制
- git
- 常用命令
- gitlab
- 版本管理规范
- 使用阿里云创建远程仓库
- git自动化部署
- svn
- 忽略指定文件
- 拉取代码
- 自动化运维
- jekins
- 容器
- 集群
- 架构设计
- 设计原则
- 阅读参考
- 代码规划
- 架构实战
- 服务治理
- 权限控制设计
- 具体设计
- 计划
- 疑问知识点
- 读书笔记
- 高性能Mysql
- TCP-IP详解-卷一:协议
- 思考
- php如何实现并发执行
- 对接调用设计
- 如何在浏览器上实现插件
- 如何设计一个app结合业务告警
- mysql的where查询没有用到索引
- 为啥in查询比循环嵌套sql的查询还要慢
- 使用git来创建属于自己的composer包
- 翻页获取数据的时候又新增了数据
- 安全思路
- 月报
- PHP ?? 和 ?: 的区别
- PHP异步执行
- redis集群的目标是什么
- 大文件数据处理
- 性能瓶颈分析
- 命令行里输出带颜色的字体
- 面试问题合集
- 基础
- 安全
- 算法
- 冒泡排序
- 快速排序
- 二分法查询数组指定成员
- 字符查找匹配
- 令牌桶
- 漏桶
- 计数器
- 代理
- 协议
- http
- 状态码
- tcp
- udp
- Oauth2.0
- 设计模式
- 单例模式
- 适配器模式
- 工厂模式
- 观察者模式
- 流程化
- 地址栏输入网址到返回网页的流程
- 题目收集
- 工具
- rabbitMq
- rabbitMQ用户管理
- 生产者
- 消费者
- 支持TP5.*的think-queue
- 消息丢失
- 消费者报错
- rabbitMQ配置优化
- 磁盘满载导致服务挂掉
- PHP类库
- rabbitMQ踩坑
- navicat
- vscode
- phpstorm
- 激活码
- markdown
- PHP自定义类库
- 工具类
- 领导力
- 任务分配
- 代码组织
- 不要重复
- 避免污染
- 接口定义规范
- 小业务需求
- 获取充值面额组成
- 监控服务器CPU和内存
- shell脚本版本