NIUCLOUD是一款SaaS管理后台框架多应用插件+云编译。上千名开发者、服务商正在积极拥抱开发者生态。欢迎开发者们免费入驻。一起助力发展! 广告
# 流streams ``` <pre class="calibre29">``` <span class="token6">5.0</span> 全新数据类型 数据流streams 看起来比pubsub可靠多的消息队列。pubsub不靠谱? 很不靠谱,网络一断或buffer一大就会主动清理数据。stream的设计参考了kafka的消费组模型 以更抽象的方式建模日志的数据结构。Redis的streams主要是一个append only的数据结构,至少在概念上它是一种在内存中表示的抽象数据类型,只不过它们实现了更强大的操作,以克服日志文件本身的限制 如果你了解MQ,那么可以把streams当做MQ。如果你还了解kafka,那么甚至可以把streams当做kafka 这个功能有点类似于redis以前的Pub<span class="token1">/</span>Sub,但是也有基本的不同: streams支持多个客户端(消费者)等待数据(Linux环境开多个窗口执行XREAD即可模拟),并且每个客户端得到的是完全相同的数据 Pub<span class="token1">/</span>Sub是发送忘记的方式,并且不存储任何数据;而streams模式下,所有消息被无限期追加在streams中,除非用于显示执行删除(XDEL) streams的Consumer Groups也是Pub<span class="token1">/</span>Sub无法实现的控制方式 streams数据结构本身非常简单,但是streams依然是Redis到目前为止最复杂的类型,其原因是实现的一些额外的功能:一系列的阻塞操作允许消费者等待生产者加入到streams的新数据。另外还有一个称为Consumer Groups的概念,这个概念最先由kafka提出,Redis有一个类似实现,和kafka的Consumer Groups的目的是一样的:允许一组客户端协调消费相同的信息流 stream适用于允许丢失数据的业务场景,因为redis本身是不支持数据的绝对可靠的,哪怕aof调成always Redis Stream——作为消息队列的典型应用场景 https<span class="token3">:</span><span class="token1">/</span><span class="token1">/</span>segmentfault<span class="token3">.</span>com<span class="token1">/</span>a<span class="token1">/</span><span class="token6">1190000016777728</span> <span class="token4">xAck</span><span class="token3">(</span>$stream<span class="token3">,</span> $group<span class="token3">,</span> $arr_messages_ids<span class="token3">)</span> <span class="token1">-</span> 确认一条或多条待处理的消息 $redis<span class="token1">-</span><span class="token1">></span><span class="token4">xAck</span><span class="token3">(</span><span class="token2">'mystream_key'</span><span class="token3">,</span> <span class="token2">'group1'</span><span class="token3">,</span> <span class="token3">[</span><span class="token2">'1530063064286-0'</span><span class="token3">,</span> <span class="token2">'1530063064286-1'</span><span class="token3">]</span><span class="token3">)</span><span class="token3">;</span> <span class="token4">xAdd</span><span class="token3">(</span>$str_key<span class="token3">,</span> $<span class="token4">str_id</span><span class="token3">(</span><span class="token1">*</span> 代表redis自动生成流序列id<span class="token3">)</span><span class="token3">,</span> $arr_message<span class="token3">)</span> <span class="token1">-</span> 向流添加消息 返回添加消息的id $obj_redis<span class="token1">-</span><span class="token1">></span><span class="token4">xAdd</span><span class="token3">(</span><span class="token2">'mystream_key'</span><span class="token3">,</span> <span class="token2">"*"</span><span class="token3">,</span> <span class="token3">[</span><span class="token2">'field'</span> <span class="token1">=</span><span class="token1">></span> <span class="token2">'value'</span><span class="token3">]</span><span class="token3">)</span><span class="token3">;</span><span class="token">//1530063064286-0</span> <span class="token4">xClaim</span><span class="token3">(</span>$str_key<span class="token3">,</span> $str_group<span class="token3">,</span> $str_consumer<span class="token3">,</span> $min_idle_time<span class="token3">,</span> $arr_messages_ids<span class="token3">,</span> <span class="token3">[</span>$arr_options<span class="token3">]</span><span class="token3">)</span> <span class="token1">-</span> 获取待处理信息的所有权 返回消息ID数组以及相应的数据 $ids <span class="token1">=</span> <span class="token3">[</span><span class="token2">'1530113681011-0'</span><span class="token3">,</span> <span class="token2">'1530113681011-1'</span><span class="token3">,</span> <span class="token2">'1530113681011-2'</span><span class="token3">]</span><span class="token3">;</span> <span class="token">// 无options </span> $obj_redis<span class="token1">-</span><span class="token1">></span><span class="token4">xClaim</span><span class="token3">(</span> <span class="token2">'mystream'</span><span class="token3">,</span> <span class="token2">'group1'</span><span class="token3">,</span> <span class="token2">'myconsumer1'</span><span class="token3">,</span> <span class="token6">0</span><span class="token3">,</span> $ids <span class="token3">)</span><span class="token3">;</span> <span class="token">// 有 options </span> $obj_redis<span class="token1">-</span><span class="token1">></span><span class="token4">xClaim</span><span class="token3">(</span> <span class="token2">'mystream'</span><span class="token3">,</span> <span class="token2">'group1'</span><span class="token3">,</span> <span class="token2">'myconsumer2'</span><span class="token3">,</span> <span class="token6">0</span><span class="token3">,</span> $ids<span class="token3">,</span> <span class="token3">[</span> <span class="token2">'IDLE'</span> <span class="token1">=</span><span class="token1">></span> <span class="token4">time</span><span class="token3">(</span><span class="token3">)</span> <span class="token1">*</span> <span class="token6">1000</span><span class="token3">,</span> <span class="token2">'RETRYCOUNT'</span> <span class="token1">=</span><span class="token1">></span> <span class="token6">5</span><span class="token3">,</span> <span class="token2">'FORCE'</span><span class="token3">,</span> <span class="token2">'JUSTID'</span> <span class="token3">]</span> <span class="token3">)</span><span class="token3">;</span> <span class="token4">xDel</span><span class="token3">(</span>stream_key<span class="token3">,</span> $arr_messages_ids<span class="token3">)</span> <span class="token1">-</span> 从流中删除消息 $redis<span class="token1">-</span><span class="token1">></span><span class="token4">xDel</span><span class="token3">(</span><span class="token2">'mystream'</span><span class="token3">,</span> <span class="token3">[</span><span class="token2">'1530115304877-0'</span><span class="token3">,</span> <span class="token2">'1530115305731-0'</span><span class="token3">]</span><span class="token3">)</span><span class="token3">;</span> Group <span class="token1">-</span> 创建<span class="token3">,</span>销毁或管理消费者群组 <span class="token">//创建</span> $redis<span class="token1">-</span><span class="token1">></span><span class="token4">xGroup</span><span class="token3">(</span><span class="token2">'CREATE'</span><span class="token3">,</span> $str_key<span class="token3">,</span> $str_group<span class="token3">,</span> $str_msg_id<span class="token3">,</span> <span class="token3">[</span>$boo_mkstream<span class="token3">]</span><span class="token3">)</span><span class="token3">;</span> <span class="token">//删除</span> $redis<span class="token1">-</span><span class="token1">></span><span class="token4">xGroup</span><span class="token3">(</span><span class="token2">'DESTROY'</span><span class="token3">,</span> $str_key<span class="token3">,</span> $str_group<span class="token3">)</span><span class="token3">;</span> <span class="token">//管理</span> $obj_redis<span class="token1">-</span><span class="token1">></span><span class="token4">xGroup</span><span class="token3">(</span><span class="token2">'HELP'</span><span class="token3">)</span><span class="token3">;</span> $obj_redis<span class="token1">-</span><span class="token1">></span><span class="token4">xGroup</span><span class="token3">(</span><span class="token2">'SETID'</span><span class="token3">,</span> $str_key<span class="token3">,</span> $str_group<span class="token3">,</span> $str_msg_id<span class="token3">)</span><span class="token3">;</span> $obj_redis<span class="token1">-</span><span class="token1">></span><span class="token4">xGroup</span><span class="token3">(</span><span class="token2">'DELCONSUMER'</span><span class="token3">,</span> $str_key<span class="token3">,</span> $str_group<span class="token3">,</span> $str_consumer_name<span class="token3">)</span><span class="token3">;</span> <span class="token">//删除消费者</span> <span class="token4">xInfo</span><span class="token3">(</span><span class="token2">'CONSUMERS/GROUPS/STREAM/HELP'</span><span class="token3">,</span> $str_stream<span class="token3">,</span> $str_group<span class="token3">)</span> <span class="token1">-</span> 获取有关流的信息 <span class="token4">xLen</span><span class="token3">(</span>stream_key<span class="token3">)</span> <span class="token1">-</span> 获取流的总数据个数 <span class="token4">xPending</span><span class="token3">(</span>$str_stream<span class="token3">,</span> $str_group <span class="token3">[</span><span class="token3">,</span> $str_start<span class="token3">,</span> $str_end<span class="token3">,</span> $i_count<span class="token3">,</span> $str_consumer<span class="token3">]</span><span class="token3">)</span> <span class="token1">-</span> 检查流中的待处理消息 $redis<span class="token1">-</span><span class="token1">></span><span class="token4">xPending</span><span class="token3">(</span><span class="token2">'mystream'</span><span class="token3">,</span> <span class="token2">'mygroup'</span><span class="token3">,</span> <span class="token2">'-'</span><span class="token3">,</span> <span class="token2">'+'</span><span class="token3">,</span> <span class="token6">1</span><span class="token3">,</span> <span class="token2">'consumer-1'</span><span class="token3">)</span><span class="token3">;</span> <span class="token4">xRange</span><span class="token3">(</span>$str_stream<span class="token3">,</span> $str_start<span class="token3">,</span> $str_end <span class="token3">[</span><span class="token3">,</span> $i_count<span class="token3">]</span><span class="token3">)</span> <span class="token1">-</span> 查询流中的一系列消息 $redis<span class="token1">-</span><span class="token1">></span><span class="token4">xRange</span><span class="token3">(</span><span class="token2">'mystream'</span><span class="token3">,</span> <span class="token2">'-'</span><span class="token3">,</span> <span class="token2">'+'</span><span class="token3">,</span> <span class="token6">2</span><span class="token3">)</span><span class="token3">;</span> <span class="token4">xRead</span><span class="token3">(</span>$arr_streams <span class="token3">[</span><span class="token3">,</span> $i_count<span class="token3">,</span> $i_block<span class="token3">)</span> <span class="token1">-</span> 从流中读取消息 $redis<span class="token1">-</span><span class="token1">></span><span class="token4">xRead</span><span class="token3">(</span><span class="token3">[</span><span class="token2">'stream1'</span> <span class="token1">=</span><span class="token1">></span> <span class="token2">'1535222584555-0'</span><span class="token3">,</span> <span class="token2">'stream2'</span> <span class="token1">=</span><span class="token1">></span> <span class="token2">'1535222584555-0'</span><span class="token3">]</span><span class="token3">)</span><span class="token3">;</span> <span class="token">//仅接收新消息(($=last id)并无限期等待一个新消息</span> $redis<span class="token1">-</span><span class="token1">></span><span class="token4">xRead</span><span class="token3">(</span><span class="token3">[</span><span class="token2">'stream1'</span> <span class="token1">=</span><span class="token1">></span> <span class="token2">'$'</span><span class="token3">]</span><span class="token3">,</span> <span class="token6">1</span><span class="token3">,</span> <span class="token6">0</span><span class="token3">)</span><span class="token3">;</span> <span class="token">//</span> <span class="token4">xReadGroup</span><span class="token3">(</span>$str_group<span class="token3">,</span> $str_consumer<span class="token3">,</span> $arr_streams <span class="token3">[</span><span class="token3">,</span> $i_count<span class="token3">,</span> $i_block<span class="token3">]</span><span class="token3">)</span> <span class="token1">-</span> 使用组和消费者一起读取流消息 类似于xRead<span class="token3">,</span>但它支持读取特定消费者组的消息 返回传递给此使用者组的消息(如果有) $redis<span class="token1">-</span><span class="token1">></span><span class="token4">xReadGroup</span><span class="token3">(</span><span class="token2">'mygroup'</span><span class="token3">,</span> <span class="token2">'consumer2'</span><span class="token3">,</span> <span class="token3">[</span><span class="token2">'s1'</span> <span class="token1">=</span><span class="token1">></span> <span class="token6">0</span><span class="token3">,</span> <span class="token2">'s2'</span> <span class="token1">=</span><span class="token1">></span> <span class="token6">0</span><span class="token3">]</span><span class="token3">,</span> <span class="token6">1</span><span class="token3">,</span> <span class="token6">1000</span><span class="token3">)</span><span class="token3">;</span> <span class="token4">xRevRange</span><span class="token3">(</span>$str_stream<span class="token3">,</span> $str_end<span class="token3">,</span> $str_start <span class="token3">[</span><span class="token3">,</span> $i_count<span class="token3">]</span><span class="token3">)</span> <span class="token1">-</span> 查询从start到end的一条或多条消息 $redis<span class="token1">-</span><span class="token1">></span><span class="token4">xRevRange</span><span class="token3">(</span><span class="token2">'mystream'</span><span class="token3">,</span> <span class="token2">'+'</span><span class="token3">,</span> <span class="token2">'-'</span><span class="token3">)</span><span class="token3">;</span> <span class="token4">xTrim</span><span class="token3">(</span>$str_stream<span class="token3">,</span> $i_max_len <span class="token3">[</span><span class="token3">,</span> $boo_approximate<span class="token3">]</span><span class="token3">)</span><span class="token1">-</span> 流裁剪为指定数量的项目 $obj_redis<span class="token1">-</span><span class="token1">></span><span class="token4">xTrim</span><span class="token3">(</span><span class="token2">'mystream'</span><span class="token3">,</span> <span class="token6">100</span><span class="token3">,</span> <span class="token6">true</span><span class="token3">)</span><span class="token3">;</span> ``` ```