1. Stream 是什么
Redis Stream 是 Redis 提供的一种消息队列数据结构,用于保存和传递一系列消息。
它的核心特点是:
消息有唯一 ID。
消息会持久化保存在 Redis 中,不会像 Pub/Sub 一样发送后立刻丢失。
支持消费者组。
支持消息确认机制。
支持查看未确认消息并重新处理。
适合订单、异步任务、日志、通知等可靠消息场景。
可以把 Stream 理解为一个“消息流”:
生产者 → Stream 消息队列 → 消费者例如秒杀业务中:
用户下单请求 ↓ Lua 脚本校验库存和重复下单 ↓ 将订单消息写入 stream.orders ↓ 消费者异步读取订单消息 ↓ 写入 MySQL 数据库2. Stream 中的消息结构
Stream 中的一条消息由两部分组成:
消息 ID + 多个 field-value 数据例如:
1720000000000-0 voucherId = 10 userId = 1001 orderId = 20001其中:
1720000000000-0:消息 ID。voucherId:优惠券 ID。userId:用户 ID。orderId:订单 ID。
消息 ID 的格式通常是:
毫秒时间戳-序号例如:
1720000000000-0 1720000000000-1同一毫秒内写入多条消息时,后面的序号会递增。
3. 添加消息:XADD
生产者使用XADD向 Stream 中写入消息。
XADD stream.orders * voucherId 10 userId 1001 orderId 20001含义:
stream.orders :Stream 名称 * :让 Redis 自动生成消息 ID voucherId 10 :字段和值 userId 1001 orderId 20001如果stream.orders不存在,Redis 会自动创建它。
执行后会返回一条消息 ID,例如:
1720000000000-0Java 中通过 Lua 脚本写入消息时,通常类似:
redis.call( 'xadd', 'stream.orders', '*', 'voucherId', voucherId, 'userId', userId, 'orderId', orderId )注意:XADD后面必须传入合法的消息 ID。
错误示例:
redis.call('xadd', 'stream.orders', 'voucherId', voucherId)这里 Redis 会把voucherId当成消息 ID,因此报错:
ERR Invalid stream ID specified正确写法必须加上:
'*'即:
redis.call('xadd', 'stream.orders', '*', ...)4. 不使用消费者组读取消息:XREAD
最基础的读取方式是XREAD。
XREAD COUNT 1 STREAMS stream.orders 0含义:
COUNT 1 :最多读取 1 条消息 STREAMS stream.orders:读取哪个 Stream 0 :从最早的消息开始读也可以读取最新消息:
XREAD BLOCK 2000 COUNT 1 STREAMS stream.orders $含义:
BLOCK 2000 :没有消息时阻塞等待 2 秒 $ :只读取命令执行之后新产生的消息但是XREAD有一个问题:
多个消费者可能读取到同一条消息。因此它不适合多个消费者协作处理订单这类任务。
这时需要使用消费者组。
5. 消费者组是什么
消费者组用于实现“多个消费者共同处理同一批消息”。
结构可以理解为:
Stream ├── 消费者组 g1 │ ├── 消费者 c1 │ ├── 消费者 c2 │ └── 消费者 c3 │ └── 消费者组 g2 ├── 消费者 c4 └── 消费者 c5关键规则:
不同消费者组之间:广播
同一条消息可以被不同组分别消费。
例如:
stream.orders ↓ g1:订单服务消费 g2:日志服务消费那么同一条订单消息:
g1 可以收到 g2 也可以收到这相当于广播。
同一个消费者组内:竞争消费
同一组中的多个消费者会竞争消息。
例如:
g1 ├── c1 ├── c2 └── c3某一条消息只会分配给其中一个消费者:
消息 A → c1 消息 B → c2 消息 C → c3不会出现同一条消息同时被c1、c2、c3正常消费的情况。
所以:
想实现广播:创建多个消费者组。 想实现负载均衡:在同一个消费者组中创建多个消费者。6. 创建消费者组:XGROUP CREATE
创建消费者组命令:
XGROUP CREATE stream.orders g1 0 MKSTREAM含义:
stream.orders :Stream 名称 g1 :消费者组名称 0 :从最早的消息开始读取 MKSTREAM :如果 Stream 不存在,则自动创建其中:
0表示消费者组从 Stream 的第一条消息开始消费。
如果写成:
$表示消费者组只消费创建之后的新消息,不处理历史消息。
例如:
XGROUP CREATE stream.orders g1 $ MKSTREAM含义是:
忽略当前已有消息, 只消费以后新增的消息。如果消费者组已经存在,再执行创建命令会报错:
BUSYGROUP Consumer Group name already exists这不是 Stream 出错,而是表示:
g1 这个消费者组已经创建过了。通常不需要重复创建。
7. 使用消费者组读取消息:XREADGROUP
消费者组读取消息使用:
XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.orders >含义:
GROUP g1 c1 :g1 是消费者组,c1 是消费者名称 COUNT 1 :最多读取 1 条消息 STREAMS stream.orders > :读取从未分配给消费者的新消息其中最重要的是:
>它表示:
读取消费者组中还没有被任何消费者领取过的新消息。例如:
XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.orders >消费者c1获取一条新消息后,这条消息会被记录为:
已投递给 c1,但尚未确认。此时它会进入 Pending List,也就是待确认消息列表。
8. Pending List:未确认消息列表
消费者读取消息后,消息不会马上彻底完成。
Redis 会先将消息记录到消费者组的 Pending List 中。
流程如下:
消费者读取消息 ↓ 消息进入 Pending List ↓ 消费者执行业务逻辑 ↓ 业务成功后发送 XACK ↓ 消息从 Pending List 移除例如订单业务:
读取订单消息 ↓ 创建订单 ↓ 扣减数据库库存 ↓ 订单写入成功 ↓ XACK 确认消息如果消费者读取后宕机:
消息没有 XACK那么消息仍然保留在 Pending List 中,不会丢失。
这就是 Stream 比 Pub/Sub 更可靠的原因之一。
9. 确认消息:XACK
消费者处理成功后,需要手动确认消息。
命令:
XACK stream.orders g1 1720000000000-0含义:
stream.orders :Stream 名称 g1 :消费者组 1720000000000-0 :消息 ID确认后:
消息会从 g1 的 Pending List 中移除。注意:
XACK 不会删除 Stream 中的原始消息。它只是表示:
这个消费者组已经成功处理过这条消息。所以:
消息仍然可以被其他消费者组消费。10. 为什么必须手动确认
因为 Redis 不知道你的业务是否真正执行成功。
例如消费者读取到订单消息后,可能发生:
1. 数据库写入失败 2. 数据库事务回滚 3. 服务宕机 4. 网络异常 5. 消费者代码报错如果 Redis 自动确认消息,就可能出现:
Redis 认为消息已处理, 但订单实际上没有写入数据库。这会导致消息丢失。
因此正确流程应该是:
先完成业务逻辑 ↓ 确认数据库事务成功 ↓ 再执行 XACK即:
业务成功后确认消息, 业务失败不要确认消息。11. 读取 Pending List 中的消息
当消费者宕机或处理失败后,消息会留在 Pending List 中。
可以使用:
XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.orders 0注意这里最后一个参数不是>,而是:
0含义:
读取当前消费者 c1 自己尚未确认的消息。区别如下:
> :读取从未被分配过的新消息 0 :读取当前消费者未确认的历史消息在实际项目中,常见流程是:
启动消费者 ↓ 先读取 Pending List 中未确认消息 ↓ 处理完成后 XACK ↓ 再循环读取新消息这样可以避免服务重启后遗留消息一直无法处理。
12. XPENDING:查看未确认消息
查看消费者组中是否有未确认消息:
XPENDING stream.orders g1可以看到:
未确认消息数量 最早未确认消息 ID 最新未确认消息 ID 每个消费者持有的未确认消息数量查看具体 Pending 消息:
XPENDING stream.orders g1 - + 10含义:
- + :查询全部范围 10 :最多查看 10 条13. Stream 消费流程总结
完整流程如下:
生产者执行 XADD ↓ 消息写入 Stream ↓ 消费者组读取消息 XREADGROUP ↓ 消息进入 Pending List ↓ 消费者执行业务逻辑 ↓ 业务成功:XACK 业务失败:不 XACK ↓ 未确认消息后续重新处理可以写成:
XADD → XREADGROUP → 业务处理 → XACK14. Stream 与 List、Pub/Sub 的区别
List 消息队列
List 常用命令:
LPUSH queue message BRPOP queue 0特点:
优点: - 简单 - 支持阻塞读取 - 可以实现基础任务队列 缺点: - 没有消费者组 - 没有消息确认机制 - 消费失败后不方便恢复 - 多消费者场景功能较弱Pub/Sub
Pub/Sub 常用命令:
SUBSCRIBE channel PUBLISH channel message特点:
优点: - 实时广播 - 多个订阅者都可以收到消息 - 使用简单 缺点: - 不保存历史消息 - 订阅者离线期间会丢消息 - 没有确认机制 - 不适合订单等可靠业务Stream
特点:
优点: - 消息持久化 - 支持消费者组 - 组内竞争消费 - 组间广播消费 - 支持 ACK 确认 - 支持 Pending List - 支持故障恢复适合:
秒杀订单 异步下单 支付通知 日志收集 任务队列 事件驱动业务15. 秒杀订单中的 Stream 设计
秒杀业务通常分为两部分。
第一部分:Redis + Lua 脚本
Lua 脚本负责快速校验:
库存是否充足 用户是否重复下单 扣减 Redis 库存 记录用户已购买 写入 Stream 消息Lua 脚本成功后,将订单消息写入:
stream.orders例如:
redis.call( 'xadd', 'stream.orders', '*', 'voucherId', voucherId, 'id', userId, 'orderId', orderId )这里的字段名id虽然可以使用,但更推荐写成:
'userId'因为它表达更清晰:
redis.call( 'xadd', 'stream.orders', '*', 'voucherId', voucherId, 'userId', userId, 'orderId', orderId )第二部分:消费者异步创建订单
消费者不断读取消息:
stream.orders然后执行:
创建 VoucherOrder 写入数据库 扣减数据库库存 检查是否重复下单成功后执行:
XACK如果数据库事务失败,例如:
userId 为 null 数据库字段不允许为空 唯一索引冲突 save() 失败 事务回滚那么:
不能执行 XACK。否则 Redis 会认为消息已成功消费,但数据库订单并没有创建成功。
16. 最重要的几个命令
# 添加消息 XADD stream.orders * voucherId 10 userId 1001 orderId 20001 # 创建消费者组 XGROUP CREATE stream.orders g1 0 MKSTREAM # 读取新消息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders > # 确认消息 XACK stream.orders g1 1720000000000-0 # 读取当前消费者未确认的消息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.orders 0 # 查看 Pending List 概况 XPENDING stream.orders g1 # 查看具体 Pending 消息 XPENDING stream.orders g1 - + 1017. 核心结论
Redis Stream 的重点可以记成下面几句话:
Stream 是可靠消息队列。 XADD 用于写入消息。 消费者组用于管理多个消费者。 不同消费者组之间是广播关系。 同一消费者组内部是竞争消费关系。 消费者读取消息后,消息会进入 Pending List。 业务处理成功后必须 XACK。 没有 XACK 的消息可以在后续重新处理。 > 用于读取新消息。 0 用于读取当前消费者未确认的消息。