流篇
Redis
流(Streams)
是一种非常强大的数据结构,可用于管理高速数据流(如消息队列)。凭借开箱即用的分区、复制和持久性,它能以亚毫秒级的延迟每秒捕获和处理数百万个数据点。Redis Streams 基于高效的 radix-tree 实现(一种算法,每个节点的唯一子节点都与父节点合并),这使得范围查询和查找查询速度极快。它通过异步调用连接生产者和消费者,并支持消费者群组。
场景
实时日志处理
Streams
可以用于实时日志记录和处理,例如将应用程序的日志消息发送到流中,并通过消费者来实时监控和分析日志数据。
- 生产者
// 生产者发布消息到流中...
// * 代表自动生成消息ID
// 127.0.0.1:6379> XADD logger_stream * message xxxxxxxxx
// "1711379361313-0"
$redis->xAdd('logger_stream', '*', ['message' => 'xxxxxxxxx']);
- 消息者
// 取消息时当ID为`$`时代表取最新的消息
$lastId = '0';
// 127.0.0.1:6379> xread count 1 block 0 Streams logger_stream 0
// 阻塞读取消息
while(($messages = $redis->xRead(['logger_stream' => $lastId], 1, 0)) {
$lastId = array_key_first($messages['logger_stream']);
$message = $messages['logger_stream'][$lastId];
// todo
}
消息队列
Streams
可以作为高性能的消息队列使用,支持多个发布者和多个消费者,可用于解耦应用程序的不同模块,实现异步消息处理和任务分发。
事件驱动架构
Streams
可以用于构建事件驱动架构,例如通过发布事件到流中,然后让订阅者消费这些事件来实现松耦合的系统架构和异步通信。
$redis->xAdd('events_stream', '*', ['event' => 'order', 'id' => 100]);
// 读取最新的一条消息
$messages = $redis->xRead(['events_stream' => '$'], 1);
实时数据分析
Streams
可以用于实时数据分析和处理,例如将传感器数据或业务指标数据发送到流中,然后使用消费者来实时处理和分析数据,支持实时监控和实时反馈。在实时分析中可能消费的速率比生产慢,这时候就需要多消费者了。可以使用Redis
Group来管理多消费者的问题。
// XGROUP CREATE mystream mygroup $ MKSTREAM
// 在mystream流上创建一个组
$redis->xGroup('CREATE', 'mystream', 'mygroup', '0', true);
// 生产消息
$redis->xAdd('mystream', '*', ['event' => 'order', 'id' => 100]);
// 消费者一
// XREADGROUP GROUP mygroup consumer_one COUNT 1 STREAMS mystream 0
// '>'含义每次从未消费列表中取N条
// 0 始终从当前消费者中的第1条开始取N条
// 1711419899831-0 取出当前消费者中的此ID后面的N条
$messages = $redis->xReadGroup('mygroup', 'consumer_one', ['mystream' => '>'], 1);
// 消息处理完,你可能还需要消息确认
foreach ($messages['mystream'] as $id => $data) {
// todo
// XACK mystream mygroup $id
// XACK 后会把消息从消息列表中移除, 之后消息就不会被取到
$redis->xACK('mystream', 'mygroup', [$id]);
}
// 消费者二
// XREADGROUP GROUP mygroup consumer_one COUNT 1 STREAMS mystream 0
// '>'含义每次从未消费列表中取N条
// 0 始终从当前消费者中的第1条开始取N条
// 1711419899831-0 取出当前消费者中的此ID后面的N条
$messages = $redis->xReadGroup('mygroup', 'consumer_two', ['mystream' => '>'], 1);
消息订阅和通知
Streams
可以用于消息订阅和通知系统,例如通过发布者将通知消息发送到流中,然后让订阅者消费这些消息来实现实时通知和实时更新。
排行榜和热门内容
Streams
可以用于构建排行榜和热门内容系统,例如通过发布者将用户行为数据(如点赞、评论、分享等)发送到流中,然后使用消费者来实时统计和更新排行榜数据。
事件溯源和日志复制
Streams
可以用于事件溯源和日志复制,例如通过发布者将系统的操作日志或事件记录发送到流中,然后使用消费者来复制和分发这些日志数据。
实时监控和警报
Streams
可以用于实时监控和警报系统,例如通过发布者将系统监控数据(如性能指标、异常事件等)发送到流中,然后使用消费者来实时监控和触发警报。
协作
如果你有更多的场景使用用例,可以通过github
提交pr
请求。有问题可以开issue
。
编辑此页面