Redis实现简单队列
队列是一种常见的数据结构,遵循先进先出(FIFO)的原则,在很多业务场景中都有广泛应用,比如异步任务处理、消息通知、流量削峰等。Redis作为高性能的内存数据库,提供了多种数据结构来支持队列的实现,其中最常用的是列表(List)结构,此外也可以结合有序集合(Sorted Set)实现带优先级的队列。本文将介绍基于Redis实现简单队列的两种常见方式,并给出对应的代码示例。
一、基于List结构的简单队列
Redis的List结构是一个双向链表,支持从两端插入和弹出元素,非常适合实现基础队列。我们可以使用LPUSH命令从列表左侧插入元素,使用RPOP命令从列表右侧弹出元素,这样就实现了先进先出的队列效果。如果希望避免队列为空时频繁轮询,还可以使用BRPOP阻塞式弹出命令,在没有元素时会阻塞连接,直到有新元素加入或者超时。
1.1 基本操作命令
LPUSH key value1 [value2 ...]:将一个或多个值插入到列表头部(左侧)
RPUSH key value1 [value2 ...]:将一个或多个值插入到列表尾部(右侧)
RPOP key:移除并获取列表尾部的元素
BRPOP key [key ...] timeout:阻塞式移除并获取列表尾部的元素,若列表没有元素则阻塞等待,timeout为超时时间(单位秒,0表示永久阻塞)
LLEN key:获取列表长度
1.2 Java实现示例
以下示例使用Jedis客户端操作Redis实现简单队列的生产和消费逻辑:
import redis.clients.jedis.Jedis;
import java.util.concurrent.TimeUnit;
public class RedisListQueue {
// Redis连接配置,实际使用时替换为自己的Redis地址和端口
private static final String REDIS_HOST = "www.ipipp.com";
private static final int REDIS_PORT = 6379;
private static final String QUEUE_KEY = "simple_queue";
// 生产者:向队列中添加消息
public static void producer(Jedis jedis, String message) {
// 从列表左侧插入元素,实现队列的入队操作
Long result = jedis.lpush(QUEUE_KEY, message);
System.out.println("生产者插入消息:" + message + ",队列当前长度:" + result);
}
// 消费者:从队列中获取消息(非阻塞方式)
public static void consumer(Jedis jedis) {
// 从列表右侧弹出元素,实现队列的出队操作
String message = jedis.rpop(QUEUE_KEY);
if (message != null) {
System.out.println("消费者获取消息:" + message);
} else {
System.out.println("队列为空,暂无消息");
}
}
// 消费者:阻塞式获取消息
public static void blockingConsumer(Jedis jedis, int timeout) {
// BRPOP阻塞获取,返回结果为key和value的数组,timeout为0表示永久阻塞
java.util.List<String> result = jedis.brpop(timeout, QUEUE_KEY);
if (result != null && result.size() == 2) {
System.out.println("阻塞消费者获取消息:" + result.get(1));
}
}
public static void main(String[] args) throws InterruptedException {
Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);
try {
// 先清空队列,避免历史数据影响测试
jedis.del(QUEUE_KEY);
// 生产者发送3条消息
producer(jedis, "任务1:用户注册发送欢迎邮件");
producer(jedis, "任务2:订单支付成功发送通知");
producer(jedis, "任务3:生成用户月度报表");
// 非阻塞消费者消费消息
System.out.println("--- 非阻塞消费测试 ---");
for (int i = 0; i < 4; i++) {
consumer(jedis);
TimeUnit.SECONDS.sleep(1);
}
// 再发送1条消息,测试阻塞消费
producer(jedis, "任务4:超时订单自动取消");
System.out.println("--- 阻塞消费测试 ---");
// 启动一个线程执行阻塞消费
new Thread(() -> {
blockingConsumer(jedis, 0);
}).start();
// 主线程等待2秒,确保阻塞消费线程获取到消息
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
e.printStackTrace();
} finally {
jedis.close();
}
}
}二、基于Sorted Set的优先级队列
如果需要实现带优先级的队列,即优先级高的消息先被消费,可以使用Redis的有序集合(Sorted Set)结构。有序集合中的每个关联一个分数(score),Redis会根据分数对元素进行排序,分数越小的元素优先级越高(或者根据业务需求定义分数规则)。我们可以使用ZADD命令插入带分数的元素,使用ZRANGEBYSCORE命令获取分数最小的元素,再结合ZREM命令删除已消费的元素,实现带优先级的队列。
2 基本操作命令
ZADD key score member:向有序集合中添加一个或多个成员,或者更新已存在成员的分数
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]:返回有序集合中指定分数区间内的成员,按分数从小到大排序
ZREM key member:移除有序集合中的一个或多个成员
ZCARD key:获取有序集合的成员数量
2.2 Java实现示例
以下示例实现带优先级的队列,分数越小优先级越高:
import redis.clients.jedis.Jedis;
import java.util.Set;
public class RedisPriorityQueue {
private static final String REDIS_HOST = "www.ipipp.com";
private static final int REDIS_PORT = 6379;
private static final String PRIORITY_QUEUE_KEY = "priority_queue";
// 生产者:发送带优先级的消息,score越小优先级越高
public static void priorityProducer(Jedis jedis, String message, double priority) {
// 插入元素,关联的score为优先级
jedis.zadd(PRIORITY_QUEUE_KEY, priority, message);
System.out.println("生产者插入优先级消息:" + message + ",优先级分数:" + priority);
}
// 消费者:获取并消费最高优先级的消息(分数最小)
public static void priorityConsumer(Jedis jedis) {
// 获取分数最小的前1个元素,不返回分数
Set<String> messages = jedis.zrangeByScore(PRIORITY_QUEUE_KEY, 0, Double.MAX_VALUE, 0, 1);
if (messages != null && !messages.isEmpty()) {
String message = messages.iterator().next();
// 消费成功后从有序集合中删除该元素
Long removeCount = jedis.zrem(PRIORITY_QUEUE_KEY, message);
if (removeCount > 0) {
System.out.println("优先级消费者获取消息:" + message);
}
} else {
System.out.println("优先级队列为空,暂无消息");
}
}
public static void main(String[] args) throws InterruptedException {
Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);
try {
// 清空队列
jedis.del(PRIORITY_QUEUE_KEY);
// 发送不同优先级的消息,分数越小优先级越高
priorityProducer(jedis, "低优先级任务:生成周报", 3.0);
priorityProducer(jedis, "高优先级任务:系统故障告警", 1.0);
priorityProducer(jedis, "中优先级任务:用户反馈处理", 2.0);
// 消费消息,会先消费高优先级的任务
System.out.println("--- 优先级消费测试 ---");
for (int i = 0; i < 4; i++) {
priorityConsumer(jedis);
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
jedis.close();
}
}
}三、注意事项
1. 基于List的队列在消费时如果使用
RPOP非阻塞命令,队列为空时会返回null,业务层需要自己处理空队列的逻辑,避免无效轮询;使用BRPOP则可以减少无效请求,但需要注意连接阻塞时的超时处理。2. 基于Sorted Set的优先级队列在消费时需要先查询再删除元素,这两个操作不是原子的,如果多个消费者同时消费可能会出现重复消费的问题,实际生产环境中可以结合Redis的事务或者Lua脚本保证操作的原子性。
3. Redis的队列是内存存储,如果服务重启可能会导致未消费的消息丢失,如果需要持久化,需要开启Redis的AOF或者RDB持久化机制,不过仍需要注意极端情况下的数据丢失风险。
4. 以上示例仅为简单演示,实际生产环境中需要根据业务需求完善异常处理、连接池管理、消息重试等逻辑。