流篇

Redis(Streams)是一种非常强大的数据结构,可用于管理高速数据流(如消息队列)。凭借开箱即用的分区、复制和持久性,它能以亚毫秒级的延迟每秒捕获和处理数百万个数据点。Redis Streams 基于高效的 radix-tree 实现(一种算法,每个节点的唯一子节点都与父节点合并),这使得范围查询和查找查询速度极快。它通过异步调用连接生产者和消费者,并支持消费者群组。

redis-stream

场景

实时日志处理

Streams可以用于实时日志记录和处理,例如将应用程序的日志消息发送到流中,并通过消费者来实时监控和分析日志数据。

  • 生产者
// 生产者发布消息到流中...
// * 代表自动生成消息ID
// 127.0.0.1:6379> XADD logger_stream * message xxxxxxxxx
// "1711379361313-0"
$redis->xAdd('logger_stream', '*', ['message' => 'xxxxxxxxx']);

  • 消息者
// 取消息时当ID为`$`时代表取最新的消息
$lastId = '0';
// 127.0.0.1:6379> xread count 1 block 0 Streams logger_stream 0

// 阻塞读取消息
while(($messages = $redis->xRead(['logger_stream' => $lastId], 1, 0)) {
    
    $lastId = array_key_first($messages['logger_stream']);
    
    $message = $messages['logger_stream'][$lastId];
    // todo
}

消息队列

Streams可以作为高性能的消息队列使用,支持多个发布者和多个消费者,可用于解耦应用程序的不同模块,实现异步消息处理和任务分发。

事件驱动架构

Streams可以用于构建事件驱动架构,例如通过发布事件到流中,然后让订阅者消费这些事件来实现松耦合的系统架构和异步通信。


$redis->xAdd('events_stream', '*', ['event' => 'order', 'id' => 100]);

// 读取最新的一条消息
$messages = $redis->xRead(['events_stream' => '$'], 1);

实时数据分析

Streams可以用于实时数据分析和处理,例如将传感器数据或业务指标数据发送到流中,然后使用消费者来实时处理和分析数据,支持实时监控和实时反馈。在实时分析中可能消费的速率比生产慢,这时候就需要多消费者了。可以使用RedisGroup来管理多消费者的问题。

// XGROUP CREATE mystream mygroup $ MKSTREAM
// 在mystream流上创建一个组
$redis->xGroup('CREATE', 'mystream', 'mygroup', '0', true); 

// 生产消息
$redis->xAdd('mystream', '*', ['event' => 'order', 'id' => 100]);


// 消费者一

// XREADGROUP GROUP mygroup consumer_one COUNT 1 STREAMS mystream 0
 
// '>'含义每次从未消费列表中取N条 
// 0 始终从当前消费者中的第1条开始取N条
// 1711419899831-0 取出当前消费者中的此ID后面的N条 
$messages = $redis->xReadGroup('mygroup', 'consumer_one', ['mystream' => '>'], 1);  

// 消息处理完,你可能还需要消息确认

foreach ($messages['mystream'] as $id => $data) {
     // todo
        
    // XACK mystream mygroup $id
    // XACK 后会把消息从消息列表中移除, 之后消息就不会被取到
    $redis->xACK('mystream', 'mygroup', [$id]);   
}

// 消费者二

// XREADGROUP GROUP mygroup consumer_one COUNT 1 STREAMS mystream 0
 
// '>'含义每次从未消费列表中取N条 
// 0 始终从当前消费者中的第1条开始取N条
// 1711419899831-0 取出当前消费者中的此ID后面的N条 
$messages = $redis->xReadGroup('mygroup', 'consumer_two', ['mystream' => '>'], 1);  

消息订阅和通知

Streams可以用于消息订阅和通知系统,例如通过发布者将通知消息发送到流中,然后让订阅者消费这些消息来实现实时通知和实时更新。

排行榜和热门内容

Streams可以用于构建排行榜和热门内容系统,例如通过发布者将用户行为数据(如点赞、评论、分享等)发送到流中,然后使用消费者来实时统计和更新排行榜数据。

事件溯源和日志复制

Streams可以用于事件溯源和日志复制,例如通过发布者将系统的操作日志或事件记录发送到流中,然后使用消费者来复制和分发这些日志数据。

实时监控和警报

Streams可以用于实时监控和警报系统,例如通过发布者将系统监控数据(如性能指标、异常事件等)发送到流中,然后使用消费者来实时监控和触发警报。

协作

如果你有更多的场景使用用例,可以通过github提交pr请求。有问题可以开issue编辑此页面