使用 List (列表) 实现队列
List 是一个字符串元素的有序集合,按照元素插入的顺序排序,它非常适合用作简单的队列。

核心命令
-
入队
-
LPUSH key value [value ...]: 将一个或多个值插入到列表的头部(左端),在队列中,这通常代表“高优先级任务”或“紧急任务”。- 时间复杂度: O(1) 或 O(N),N 是要插入的元素数量。
- 示例:
LPUSH myqueue "task1"(将 "task1" 放入队列最前面)
-
RPUSH key value [value ...]: 将一个或多个值插入到列表的尾部(右端),这是标准的队列“入队”操作。- 时间复杂度: O(1) 或 O(N)
- 示例:
RPUSH myqueue "task2"(将 "task2" 放入队列最后面)
-
-
出队
(图片来源网络,侵删)-
LPOP key: 移除并返回列表的头部(左端)元素,这是标准的队列“出队”操作(先进先出)。- 时间复杂度: O(1)
- 示例:
LPOP myqueue(返回并移除 "task1")
-
RPOP key: 移除并返回列表的尾部(右端)元素。- 时间复杂度: O(1)
- 示例:
RPOP myqueue(返回并移除 "task2")
-
-
查看队列
LRANGE key start stop: 获取列表中指定范围内的元素,这是查看队列内容的最佳方式。- 时间复杂度: O(S+N),S 是偏移量 start,N 是要获取的元素数量。
- 示例:
LRANGE myqueue 0 -1: 获取队列中的所有元素。LRANGE myqueue 0 4: 获取队列中前5个元素。LRANGE myqueue -5 -1: 获取队列中最后5个元素。
-
其他常用命令
(图片来源网络,侵删)LLEN key: 获取列表的长度。LINDEX key index: 通过索引获取列表中的元素。LTRIM key start stop: 裁剪列表,只保留指定范围内的元素,可以用来限制队列的最大长度,防止内存无限增长。
使用 Streams 实现队列
Redis Streams 是 Redis 5.0 引入的一种新的数据结构,它功能非常强大,专为消息流和日志设计,是实现高级队列的理想选择。
核心概念
- Stream: 一个消息的有序集合,每个消息都有一个唯一的 ID。
- Producer (生产者): 向 Stream 中添加消息的客户端。
- Consumer Group (消费者组): 将多个消费者组织成一个组,共同消费一个 Stream,消息只会被组内的一个消费者处理,实现了负载均衡。
- Consumer (消费者): 消费者组中的成员,负责处理消息。
- Message ID: 每条消息的唯一标识符,格式为
timestamp-sequence。
核心命令
-
生产者操作
XADD key [ID] field value [field value ...]: 向 Stream 中添加一条新消息。- 时间复杂度: O(1) (平均情况)
- ID:
- 自动生成一个唯一的 ID(推荐)。
0-0: 表示 Stream 的开头,会返回一个比现有 ID 更大的 ID。1548330222535-0: 指定一个具体的 ID。
- 示例:
XADD mystream * name "Alice" age "30"(自动生成 ID 并添加消息)
-
消费者操作
-
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]: 读取一个或多个 Stream 中的消息,这是最基础的消费命令。- 时间复杂度: O(N),N 是返回的消息数量。
- 参数:
COUNT count: 限制返回的消息数量。BLOCK milliseconds: 如果没有新消息,阻塞客户端指定毫秒数,0 表示无限阻塞。STREAMS key ... ID ...: 指定要读取的 Stream 和对应的起始 ID。
- 示例:
XREAD COUNT 1 STREAMS mystream $: 从mystream读取一条新消息( 表示“从未读过的消息开始”)。XREAD BLOCK 5000 STREAMS mystream $: 阻塞最多5秒,等待mystream的新消息。
-
XREADGROUP GROUP groupname consumername [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]: 从消费者组中读取消息。- 这是 Streams 队列的核心命令,它将消息“派发”给特定的消费者。
- ID:
>: 表示“为这个消费者读取所有尚未分配给它的消息”。0-0: 表示“从 Stream 的开头读取”。- 其他具体 ID。
- 示例:
XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >
-
-
消费者组管理
XGROUP CREATE key groupname id [MKSTREAM]: 创建一个消费者组。MKSTREAM: Stream 不存在,则自动创建。- 示例:
XGROUP CREATE mystream mygroup $(创建一个组,从“下一个”消息开始消费)
XGROUP SETID key groupname id: 设置消费者组的最后读取 ID。XGROUP DESTROY key groupname: 删除消费者组。XGROUP DELCONSUMER key groupname consumername: 从消费者组中删除一个消费者。
-
消息确认
XACK key groupname id [id ...]: 确认一条或多条消息已被成功处理,这是非常重要的一步,可以防止消息丢失。- 示例:
XACK mystream mygroup 1548330222535-0
-
Pending 消息查看
XPENDING key groupname [min-id max-id count consumername]: 查看消费者组中已被分配但尚未确认的消息(即“挂起”的消息)。这对于监控和重新处理失败的任务非常有用。
- 示例:
XPENDING mystream mygroup
两种队列方式的对比与选择
| 特性 | List (列表) | Streams |
|---|---|---|
| 数据结构 | 简单的列表 | 消息流,支持持久化 |
| 消费者模型 | 单消费者或多消费者(竞争关系) | 消费者组,支持多个消费者并行消费,负载均衡 |
| 消息可靠性 | 消息被取出后即删除,无确认机制,如果消费者崩溃,消息会丢失。 | 消息确认机制,消息被分配后处于“Pending”状态,直到被消费者确认,崩溃后可重新投递。 |
| 消息历史 | 无法轻易获取已消费的消息 | 完整保存消息历史,可以按 ID 重新读取任意消息 |
| 阻塞操作 | BLPOP / BRPOP |
XREAD with BLOCK |
| 复杂功能 | 无 | 支持消费者组、消息ID、消息计数等高级功能 |
| 适用场景 | - 简单的任务队列 - 生产者-消费者模式,且对消息丢失不敏感 - 高性能、低延迟的简单场景 |
- 可靠的消息队列系统 - 需要消息持久化和不丢失的场景 - 需要多个消费者并行处理的场景 - 日志聚合、事件溯源等复杂场景 |
如何选择?
- 选择 List:如果你的需求非常简单,只是一个简单的 FIFO 队列,并且可以容忍偶尔的消息丢失(一些非核心的后台任务处理),List 是最简单、最高效的选择。
- 选择 Streams:如果你的应用需要一个健壮的、可靠的、可扩展的消息队列,不能丢失消息,并且需要多个消费者并行处理任务,Streams 是不二之选,它提供了现代消息队列系统所需的所有核心特性。
最佳实践与注意事项
-
处理消费者失败:
- List: 没有内置机制,通常的做法是“消费-处理”模式,如果处理失败,消息已经从队列中移除,只能通过其他方式(如记录日志、重试队列)来恢复。
- Streams: 使用
XPENDING命令监控长时间处于 Pending 状态的消息,这些消息可能对应着已经崩溃的消费者,然后可以使用XREADGROUP重新读取这些消息(使用相同的消费者名或新的消费者名)进行处理。
-
避免队列阻塞:
- 在使用
BLPOP/BRPOP或XREAD BLOCK时,一定要设置合理的超时时间,避免消费者因网络问题或处理缓慢而导致整个生产者系统阻塞。
- 在使用
-
防止内存溢出:
- 对于 List,可以使用
LTRIM命令定期清理队列,只保留最近的任务。 - 对于 Streams,可以设置
MAXLEN选项来限制 Stream 的长度,XADD mystream MAXLEN 1000 * ...,只保留最新的1000条消息。
- 对于 List,可以使用
-
选择合适的命令:
- 如果只是简单的先进先出队列,坚持使用
RPUSH+LPOP(或LPUSH+RPOP),这是最经典和高效的模式。 - 如果需要更强大的功能,直接拥抱 Streams。
- 如果只是简单的先进先出队列,坚持使用
