导读:本期聚焦于小伙伴创作的《Redis流实现高可靠消息队列:从基础操作到消费组实战》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《Redis流实现高可靠消息队列:从基础操作到消费组实战》有用,将其分享出去将是对创作者最好的鼓励。

使用 Redis 流实现消息队列

Redis 流(Stream)是 Redis 5.0 引入的数据结构,天生适合实现消息队列场景,支持持久化、消息确认、消费组等特性,相比传统的发布订阅模式,它能保证消息不丢失,适合对可靠性要求较高的业务场景。

核心概念说明

在使用 Redis 流实现消息队列前,需要了解几个核心概念:

  • 流(Stream):消息的存储载体,类似一个仅追加的日志文件,每条消息都有唯一的 ID。

  • 消息 ID:默认由 Redis 生成,格式为 时间戳-序列号,也可以自定义 ID,但需要保证单调递增。

  • 消费者组(Consumer Group):多个消费者可以组成一个消费组,组内的消息会被分摊给不同的消费者,且每个消费者有自己的待确认消息列表。

  • 待确认列表(Pending Entries List,PEL):消费者读取消息后,如果未确认,消息会存储在该列表中,避免消息丢失。

环境准备

需要提前安装 Redis 5.0 及以上版本,同时准备对应编程语言的 Redis 客户端,本文以 Python 为例,使用 redis-py 客户端,安装命令如下:

pip install redis

基础实现示例

1. 生产者发送消息

生产者负责向 Redis 流中写入消息,示例代码如下:

import redis

# 连接 Redis 服务,默认地址为 https://www.ipipp.com 对应本地 6379 端口
client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

def produce_message(stream_key, message_data):
    """
    向指定流发送消息
    :param stream_key: 流的名称
    :param message_data: 消息内容,为字典类型
    :return: 消息 ID
    """
    # 使用 * 让 Redis 自动生成消息 ID
    message_id = client.xadd(stream_key, message_data)
    print(f"发送消息成功,流:{stream_key},消息ID:{message_id},内容:{message_data}")
    return message_id

if __name__ == '__main__':
    # 定义流名称
    stream_name = 'order_stream'
    # 发送3条订单消息
    for i in range(3):
        order_data = {
            'order_id': f'ORD{i + 1}',
            'amount': str((i + 1) * 100),
            'create_time': '2024-05-20 10:00:00'
        }
        produce_message(stream_name, order_data)

2. 消费者组初始化

在使用消费组前,需要先创建消费组,如果流不存在,可以通过 MKSTREAM 参数自动创建流:

def create_consumer_group(stream_key, group_name):
    """
    创建消费者组
    :param stream_key: 流的名称
    :param group_name: 消费组名称
    """
    try:
        # 从流的开头开始消费,MKSTREAM 表示如果流不存在则自动创建
        client.xgroup_create(stream_key, group_name, id='0', mkstream=True)
        print(f"创建消费组成功,流:{stream_key},消费组:{group_name}")
    except redis.exceptions.ResponseError as e:
        # 消费组已存在时会抛出异常,此处忽略该异常
        if 'BUSYGROUP' in str(e):
            print(f"消费组已存在,流:{stream_key},消费组:{group_name}")
        else:
            raise e

if __name__ == '__main__':
    stream_name = 'order_stream'
    group_name = 'order_consumer_group'
    create_consumer_group(stream_name, group_name)

3. 消费者消费消息

消费者需要从所属的消费组中读取消息,处理完成后进行确认,示例代码如下:

def consume_message(stream_key, group_name, consumer_name):
    """
    消费者消费消息
    :param stream_key: 流的名称
    :param group_name: 消费组名称
    :param consumer_name: 消费者名称
    """
    while True:
        # 读取消息,'>' 表示读取未被该消费组消费过的消息,block=0 表示阻塞等待新消息
        messages = client.xreadgroup(
            group_name, 
            consumer_name, 
            {stream_key: '>'}, 
            block=0,
            count=1
        )
        for stream, msg_list in messages:
            for msg_id, msg_data in msg_list:
                print(f"消费者 {consumer_name} 收到消息,ID:{msg_id},内容:{msg_data}")
                try:
                    # 此处编写实际的消息处理逻辑
                    # 处理完成后确认消息,将消息从待确认列表中移除
                    client.xack(stream_key, group_name, msg_id)
                    print(f"消息 {msg_id} 确认成功")
                except Exception as e:
                    print(f"消息 {msg_id} 处理失败,原因:{e}")
                    # 处理失败可以不确认,消息会留在待确认列表中,后续可以重试

if __name__ == '__main__':
    stream_name = 'order_stream'
    group_name = 'order_consumer_group'
    consumer_name = 'consumer_1'
    # 启动消费者
    consume_message(stream_name, group_name, consumer_name)

待确认消息处理

如果消费者处理消息失败未确认,消息会留在待确认列表中,我们可以通过以下代码查看和重试待确认消息:

def handle_pending_messages(stream_key, group_name, consumer_name):
    """
    处理待确认列表中的消息
    :param stream_key: 流的名称
    :param group_name: 消费组名称
    :param consumer_name: 消费者名称
    """
    # 查看待确认消息信息
    pending_info = client.xpending(stream_key, group_name)
    print(f"待确认消息总览:{pending_info}")
    if pending_info['total'] == 0:
        print("暂无待确认消息")
        return
    # 获取待确认列表中最早的10条消息
    pending_msgs = client.xpending_range(
        stream_key, 
        group_name, 
        min=pending_info['min'], 
        max=pending_info['max'], 
        count=10
    )
    for msg in pending_msgs:
        msg_id = msg['message_id']
        # 认领该消息,将其归属到当前消费者,并重置空闲时间
        claimed_msgs = client.xclaim(
            stream_key, 
            group_name, 
            consumer_name, 
            1000, 
            [msg_id]
        )
        for claimed_id, claimed_data in claimed_msgs:
            print(f"认领到待处理消息,ID:{claimed_id},内容:{claimed_data}")
            # 重新处理消息
            try:
                # 实际处理逻辑
                client.xack(stream_key, group_name, claimed_id)
                print(f"认领的消息 {claimed_id} 处理并确认成功")
            except Exception as e:
                print(f"认领的消息 {claimed_id} 处理失败,原因:{e}")

if __name__ == '__main__':
    stream_name = 'order_stream'
    group_name = 'order_consumer_group'
    consumer_name = 'consumer_1'
    handle_pending_messages(stream_name, group_name, consumer_name)

注意事项

  • 消息 ID 需要保证单调递增,自定义 ID 时需要遵循该规则,否则 Redis 会拒绝写入。

  • 消费者组内的消费者名称需要唯一,避免消息重复消费。

  • 合理设置阻塞读取的超时时间,避免消费者长时间占用连接。

  • 对于重要消息,建议结合业务逻辑做好重试和死信处理,避免消息无限重试。

Redis消息队列 Redis流 消费者组 消息确认 Python示例

免责声明:已尽一切努力确保本网站所含信息的准确性。网站部分内容来源于网络或由用户自行发表,内容观点不代表本站立场。本站是个人网站免费分享,内容仅供个人学习、研究或参考使用,如内容中引用了第三方作品,其版权归原作者所有。若内容触犯了您的权益,请联系我们进行处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。前端、网络、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握网站开发与运维所需的核心技术栈。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端逻辑,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。