Redis不仅可作为缓存服务器,还可以用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:

由于Redis的列表是使用双向链表实现的,保存了头节点和尾节点,所以在列表的头部和尾部两边插入或获取元素都是非常快的,时间复杂度为O(1)。

普通队列

可以直接使用Redis的list数据类型实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。

  • lpush rpop:左进右出的队列
  • rpush lpop:左出右进的队列

下面使用redis的命令来模拟普通队列。
使用lpush命令生产消息:

>lpush queue:single 1
"1"
>lpush queue:single 2
"2"
>lpush queue:single 3
"3"

使用rpop命令消费消息:

>rpop queue:single
"1"
>rpop queue:single
"2"
>rpop queue:single
"3"

下面使用Java代码来实现普通队列。

生产者SingleProducer

package com.morris.redis.demo.queue.single;

import redis.clients.jedis.Jedis;

/**
 * 生产者
 */
public class SingleProducer {

    public static final String SINGLE_QUEUE_NAME = "queue:single";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i < 100; i  ) {
            jedis.lpush(SINGLE_QUEUE_NAME, "hello "   i);
        }
        jedis.close();
    }
}

消费者SingleConsumer:

package com.morris.redis.demo.queue.single;

import redis.clients.jedis.Jedis;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * 消费者
 */
public class SingleConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
            if(Objects.nonNull(message)) {
                System.out.println(message);
            } else {
                TimeUnit.MILLISECONDS.sleep(500);
            }
        }
    }
}

上面的代码已经基本实现了普通队列的生产与消费,但是上述的例子中消息的消费者存在两个问题:

  • 消费者需要不停的调用rpop方法查看redis的list中是否有待处理的数据(消息)。每调用一次都会发起一次连接,有可能list中没有数据,造成大量的空轮询,导致造成不必要的浪费。也许你可以使用Thread.sleep()等方法让消费者线程隔一段时间再消费,如果睡眠时间过长,这样不能处理一些时效性要求高的消息,睡眠时间过短,也会在连接上造成比较大的开销。
  • 如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。

阻塞队列

消费者可以使用brpop指令从redis的list中获取数据,这个指令只有在有元素时才返回,没有则会阻塞直到超时返回null,于是消费端就不需要休眠后获取数据了,这样就相当于实现了一个阻塞队列,

使用redis的brpop命令来模拟阻塞队列。

>brpop queue:single 30

可以看到命令行阻塞在了brpop这里了,30s后没数据就返回。

Java代码实现如下:

生产者与普通队列的生产者一致。

消费者BlockConsumer:

package com.morris.redis.demo.queue.block;

import redis.clients.jedis.Jedis;

import java.util.List;

/**
 * 消费者
 */
public class BlockConsumer {

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        while (true) {
            // 超时时间为1s
            List<String> messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME);
            if (null != messageList && !messageList.isEmpty()) {
                System.out.println(messageList);
            }
        }
    }
}

缺点:无法实现一次生产多次消费。

发布订阅模式

Redis除了对消息队列提供支持外,还提供了一组命令用于支持发布/订阅模式。利用Redis的pub/sub模式可以实现一次生产多次消费的队列。

发布:PUBLISH指令可用于发布一条消息,格式:

PUBLISH channel message

返回值表示订阅了该消息的数量。

订阅:SUBSCRIBE指令用于接收一条消息,格式:

SUBSCRIBE channel

使用SUBSCRIBE指令后进入了订阅模式,但是不会接收到订阅之前publish发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。

回复分为三种类型:

  • 如果为subscribe,第二个值表示订阅的频道,第三个值表示是已订阅的频道的数量
  • 如果为message(消息),第二个值为产生该消息的频道,第三个值为消息
  • 如果为unsubscribe,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。

下面使用redis的命令来模拟发布订阅模式。

生产者:

127.0.0.1:6379> publish queue hello
(integer) 1
127.0.0.1:6379> publish queue hi
(integer) 1

消费者:

127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1
1) "message"
2) "queue"
3) "hello"
1) "message"
2) "queue"
3) "hi"

Java代码实现如下:

生产者PubsubProducer:

​
package com.morris.redis.demo.queue.pubsub;

import redis.clients.jedis.Jedis;

/**
 * 生产者
 */
public class PubsubProducer {

    public static final String PUBSUB_QUEUE_NAME = "queue:pubsub";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        for (int i = 0; i < 100; i  ) {
            jedis.publish(PUBSUB_QUEUE_NAME, "hello "   i);
        }
        jedis.close();
    }
}

​

消费者PubsubConsumer:

package com.morris.redis.demo.queue.pubsub;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

/**
 * 消费者
 */
public class PubsubConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();

        JedisPubSub jedisPubSub = new JedisPubSub() {

            @Override
            public void onMessage(String channel, String message) {
                System.out.println("receive message: "   message);
                if(message.indexOf("99") > -1) {
                    this.unsubscribe();
                }
            }

            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                System.out.println("subscribe channel: "   channel);
            }

            @Override
            public void onUnsubscribe(String channel, int subscribedChannels) {
                System.out.println("unsubscribe channel "   channel);
            }
        };

        jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME);
    }
}

消费者可以启动多个,每个消费者都能收到所有的消息。

可以使用指令UNSUBSCRIBE退订,如果不加参数,则会退订所有由SUBSCRIBE指令订阅的频道。

Redis还支持基于通配符的消息订阅,使用指令PSUBSCRIBE (pattern subscribe),例如:

psubscribe channel.*

用PSUBSCRIBE指令订阅的频道也要使用指令PUNSUBSCRIBE指令退订,该指令无法退订SUBSCRIBE订阅的频道,同理UNSUBSCRIBE也不能退订PSUBSCRIBE指令订阅的频道。

同时PUNSUBSCRIBE指令通配符不会展开。例如:PUNSUBSCRIBE \*不会匹配到channel.\*,所以要取消订阅channel.\*就要这样写PUBSUBSCRIBE channel.\*。

Redis的pub/sub也有其缺点,那就是如果消费者下线,生产者的消息会丢失。

延时队列和优先级队列

Redis中有个数据类型叫Zset,其本质就是在数据类型Set的基础上加了个排序的功能而已,除了保存原始的数据value之外,还提供另一个属性score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset会自动重新按新的score值进行排序。

如果score字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。

如果score字段代表的是消息想要执行时间的时间戳,将它插入Zset集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的score就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个Zset集合,以免造成性能浪费。

下面使用redis的zset来模拟延时队列。

生产者:

127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3
(integer) 0

消费者:

127.0.0.1:6379> zrange queue:delay 0 0 withscores
1) "order1"
2) "1"
127.0.0.1:6379> zrem queue:delay order1
(integer) 1

Java代码如下:

生产者DelayProducer:

package com.morris.redis.demo.queue.delay;

import redis.clients.jedis.Jedis;

import java.util.Date;
import java.util.Random;

/**
 * 生产者
 */
public class DelayProducer {

    public static final String DELAY_QUEUE_NAME = "queue:delay";

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        long now = new Date().getTime();
        Random random = new Random();
        for (int i = 0; i < 10; i  ) {
            int second = random.nextInt(30); // 随机订单失效时间
            jedis.zadd(DELAY_QUEUE_NAME, now   second * 1000, "order" i);
        }
        jedis.close();
    }
}

消费者:

package com.morris.redis.demo.queue.delay;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;

import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * 消费者
 */
public class DelayConsumer {

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis();
        while (true) {
            long now = new Date().getTime();
            Set<Tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
            if(tupleSet.isEmpty()) {
                TimeUnit.MILLISECONDS.sleep(500);
            } else {
                for (Tuple tuple : tupleSet) {
                    Double score = tuple.getScore();
                    long time = score.longValue();
                    if(time < now) {
                        jedis.zrem(DelayProducer.DELAY_QUEUE_NAME, tuple.getElement());
                        System.out.println("order["   tuple.getElement()  "] is timeout at "   time);
                    } else {
                        TimeUnit.MILLISECONDS.sleep(500);
                    }
                    break;
                }
            }
        }
    }
}

应用场景

延时队列可用于订单超时失效的场景
二级缓存(local redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。

到此这篇关于redis实现队列的阻塞、延时、发布和订阅的文章就介绍到这了,更多相关redis 队列阻塞、延时、发布和订阅内容请搜索Devmax以前的文章或继续浏览下面的相关文章希望大家以后多多支持Devmax!

redis实现队列的阻塞、延时、发布和订阅的更多相关文章

  1. 布隆过滤器(bloom filter)及php和redis实现布隆过滤器的方法

    这篇文章主要介绍了布隆过滤器(bloom filter)介绍以及php和redis实现布隆过滤器实现方法,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下

  2. springboot+redis实现简单的热搜功能

    这篇文章主要介绍了springboot+redis实现一个简单的热搜功能,通过代码介绍了过滤不雅文字的过滤器,代码简单易懂,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  3. PHP队列原理及基于队列的写文件案例

    这篇文章主要介绍了PHP队列原理及基于队列的写文件案例,较为详细的分析了队列的原理、定义方法并结合实例形式给出了php基于队列的写文件操作技巧,需要的朋友可以参考下

  4. Java利用redis zset实现延时任务详解

    zset作为redis的有序集合数据结构存在,排序的依据就是score。本文就将利用zset score这个排序的这个特性,来实现延时任务,感兴趣的可以了解一下

  5. SpringBoot+Redis+Lua分布式限流的实现

    本文主要介绍了SpringBoot+Redis+Lua分布式限流的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

  6. PHP基于数组实现的堆栈和队列功能示例

    这篇文章主要介绍了PHP基于数组实现的堆栈和队列功能,结合实例形式分析了php基于数组的array_push()、array_pop()、array_shift()等函数实现堆栈与队列的入栈、出栈以及队列的添加、删除等相关操作技巧,需要的朋友可以参考下

  7. Yii2框架redis基本应用示例

    这篇文章主要介绍了Yii2框架redis基本应用,结合实例形式分析了Yii2 redis扩展包的安装、配置及基本数据操作相关技巧,需要的朋友可以参考下

  8. redis+php实现微博(二)发布与关注功能详解

    这篇文章主要介绍了redis+php实现微博发布与关注功能,结合实例形式分析了php结合redis实现微博的发布及关注相关操作技巧,需要的朋友可以参考下

  9. python中的queue队列类型及函数用法

    这篇文章主要介绍了python中的queue队列类型及函数用法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

  10. 详解PHP队列的实现

    这篇文章主要介绍了PHP队列的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部