Redis怎么实现队列的阻塞、延时、发布和订阅

本篇文章给大家带来了关于Redis的相关知识,其中主要介绍了关于怎么实现队列的阻塞、延时、发布和订阅的相关问题,下面一起来看一下,希望对大家有帮助。 Redis不仅可作为缓存服务器…

本篇文章给大家带来了关于Redis的相关知识,其中主要介绍了关于怎么实现队列的阻塞、延时、发布和订阅的相关问题,下面一起来看一下,希望对大家有帮助。

Redis怎么实现队列的阻塞、延时、发布和订阅

Redis不仅可作为缓存服务器,还可以用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:

Redis怎么实现队列的阻塞、延时、发布和订阅

由于Redis的列表是使用双向链表实现的,保存了头节点和尾节点,所以在列表的头部和尾部两边插入或获取元素都是非常快的,时间复杂度为O(1)。

普通队列

可以直接使用Redis的list数据类型实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。

lpush+rpop:左进右出的队列

rpush+lpop:左出右进的队列

下面使用redis的命令来模拟普通队列。

使用lpush命令生产消息:

>lpushqueue:single1"1">lpushqueue:single2"2">lpushqueue:single3"3"

使用rpop命令消费消息:

>rpopqueue:single"1">rpopqueue:single"2">rpopqueue:single"3"

下面使用Java代码来实现普通队列。

生产者SingleProducer

packagecom.morris.redis.demo.queue.single;importredis.clients.jedis.Jedis;/***生产者*/publicclassSingleProducer{publicstaticfinalStringSINGLE_QUEUE_NAME="queue:single";publicstaticvoidmain(String[]args){Jedisjedis=newJedis();for(inti=0;i<100;i++){jedis.lpush(SINGLE_QUEUE_NAME,"hello"+i);}jedis.close();}}

消费者SingleConsumer:

packagecom.morris.redis.demo.queue.single;importredis.clients.jedis.Jedis;importjava.util.Objects;importjava.util.concurrent.TimeUnit;/***消费者*/publicclassSingleConsumer{publicstaticvoidmain(String[]args)throwsInterruptedException{Jedisjedis=newJedis();while(true){Stringmessage=jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);if(Objects.nonNull(message)){System.out.println(message);}else{TimeUnit.MILLISECONDS.sleep(500);}}}}

上面的代码已经基本实现了普通队列的生产与消费,但是上述的例子中消息的消费者存在两个问题:

消费者需要不停的调用rpop方法查看redis的list中是否有待处理的数据(消息)。每调用一次都会发起一次连接,有可能list中没有数据,造成大量的空轮询,导致造成不必要的浪费。也许你可以使用Thread.sleep()等方法让消费者线程隔一段时间再消费,如果睡眠时间过长,这样不能处理一些时效性要求高的消息,睡眠时间过短,也会在连接上造成比较大的开销。

如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。

阻塞队列

消费者可以使用brpop指令从redis的list中获取数据,这个指令只有在有元素时才返回,没有则会阻塞直到超时返回null,于是消费端就不需要休眠后获取数据了,这样就相当于实现了一个阻塞队列,

使用redis的brpop命令来模拟阻塞队列。

>brpopqueue:single30

可以看到命令行阻塞在了brpop这里了,30s后没数据就返回。

Java代码实现如下:

生产者与普通队列的生产者一致。

消费者BlockConsumer:

packagecom.morris.redis.demo.queue.block;importredis.clients.jedis.Jedis;importjava.util.List;/***消费者*/publicclassBlockConsumer{publicstaticvoidmain(String[]args){Jedisjedis=newJedis();while(true){//超时时间为1sList<String>messageList=jedis.brpop(1,BlockProducer.BLOCK_QUEUE_NAME);if(null!=messageList&&!messageList.isEmpty()){System.out.println(messageList);}}}}

缺点:无法实现一次生产多次消费。

发布订阅模式

Redis除了对消息队列提供支持外,还提供了一组命令用于支持发布/订阅模式。利用Redis的pub/sub模式可以实现一次生产多次消费的队列。

发布:PUBLISH指令可用于发布一条消息,格式:

PUBLISHchannelmessage

返回值表示订阅了该消息的数量。

订阅:SUBSCRIBE指令用于接收一条消息,格式:

SUBSCRIBEchannel

使用SUBSCRIBE指令后进入了订阅模式,但是不会接收到订阅之前publish发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。

回复分为三种类型:

如果为subscribe,第二个值表示订阅的频道,第三个值表示是已订阅的频道的数量

如果为message(消息),第二个值为产生该消息的频道,第三个值为消息

如果为unsubscribe,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。

下面使用redis的命令来模拟发布订阅模式。

生产者:

127.0.0.1:6379>publishqueuehello(integer)1127.0.0.1:6379>publishqueuehi(integer)1

消费者:

127.0.0.1:6379>subscribequeueReadingmessages…(pressCtrl-Ctoquit)1)"subscribe"2)"queue"3)(integer)11)"message"2)"queue"3)"hello"1)"message"2)"queue"3)"hi"

Java代码实现如下:

生产者PubsubProducer:

packagecom.morris.redis.demo.queue.pubsub;importredis.clients.jedis.Jedis;/***生产者*/publicclassPubsubProducer{publicstaticfinalStringPUBSUB_QUEUE_NAME="queue:pubsub";publicstaticvoidmain(String[]args){Jedisjedis=newJedis();for(inti=0;i<100;i++){jedis.publish(PUBSUB_QUEUE_NAME,"hello"+i);}jedis.close();}}

消费者PubsubConsumer:

packagecom.morris.redis.demo.queue.pubsub;importredis.clients.jedis.Jedis;importredis.clients.jedis.JedisPubSub;/***消费者*/publicclassPubsubConsumer{publicstaticvoidmain(String[]args)throwsInterruptedException{Jedisjedis=newJedis();JedisPubSubjedisPubSub=newJedisPubSub(){@OverridepublicvoidonMessage(Stringchannel,Stringmessage){System.out.println("receivemessage:"+message);if(message.indexOf("99")>-1){this.unsubscribe();}}@OverridepublicvoidonSubscribe(Stringchannel,intsubscribedChannels){System.out.println("subscribechannel:"+channel);}@OverridepublicvoidonUnsubscribe(Stringchannel,intsubscribedChannels){System.out.println("unsubscribechannel"+channel);}};jedis.subscribe(jedisPubSub,PubsubProducer.PUBSUB_QUEUE_NAME);}}

消费者可以启动多个,每个消费者都能收到所有的消息。

可以使用指令UNSUBSCRIBE退订,如果不加参数,则会退订所有由SUBSCRIBE指令订阅的频道。

Redis还支持基于通配符的消息订阅,使用指令PSUBSCRIBE (pattern subscribe),例如:

psubscribechannel.*

用PSUBSCRIBE指令订阅的频道也要使用指令PUNSUBSCRIBE指令退订,该指令无法退订SUBSCRIBE订阅的频道,同理UNSUBSCRIBE也不能退订PSUBSCRIBE指令订阅的频道。

同时PUNSUBSCRIBE指令通配符不会展开。例如:PUNSUBSCRIBE \*不会匹配到channel.\*,所以要取消订阅channel.\*就要这样写PUBSUBSCRIBE channel.\*。

Redis的pub/sub也有其缺点,那就是如果消费者下线,生产者的消息会丢失。

延时队列和优先级队列

Redis中有个数据类型叫Zset,其本质就是在数据类型Set的基础上加了个排序的功能而已,除了保存原始的数据value之外,还提供另一个属性score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset会自动重新按新的score值进行排序。

如果score字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。

如果score字段代表的是消息想要执行时间的时间戳,将它插入Zset集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的score就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个Zset集合,以免造成性能浪费。

下面使用redis的zset来模拟延时队列。

生产者:

127.0.0.1:6379>zaddqueue:delay1order12order23order3(integer)0

消费者:

127.0.0.1:6379>zrangequeue:delay00withscores1)"order1"2)"1"127.0.0.1:6379>zremqueue:delayorder1(integer)1

Java代码如下:

生产者DelayProducer:

packagecom.morris.redis.demo.queue.delay;importredis.clients.jedis.Jedis;importjava.util.Date;importjava.util.Random;/***生产者*/publicclassDelayProducer{publicstaticfinalStringDELAY_QUEUE_NAME="queue:delay";publicstaticvoidmain(String[]args){Jedisjedis=newJedis();longnow=newDate().getTime();Randomrandom=newRandom();for(inti=0;i<10;i++){intsecond=random.nextInt(30);//随机订单失效时间jedis.zadd(DELAY_QUEUE_NAME,now+second*1000,"order"+i);}jedis.close();}}

消费者:

packagecom.morris.redis.demo.queue.delay;importredis.clients.jedis.Jedis;importredis.clients.jedis.Tuple;importjava.util.Date;importjava.util.List;importjava.util.Set;importjava.util.concurrent.TimeUnit;/***消费者*/publicclassDelayConsumer{publicstaticvoidmain(String[]args)throwsInterruptedException{Jedisjedis=newJedis();while(true){longnow=newDate().getTime();Set<Tuple>tupleSet=jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME,0,0);if(tupleSet.isEmpty()){TimeUnit.MILLISECONDS.sleep(500);}else{for(Tupletuple:tupleSet){Doublescore=tuple.getScore();longtime=score.longValue();if(time<now){jedis.zrem(DelayProducer.DELAY_QUEUE_NAME,tuple.getElement());System.out.println("order["+tuple.getElement()+"]istimeoutat"+time);}else{TimeUnit.MILLISECONDS.sleep(500);}break;}}}}}

应用场景

延时队列可用于订单超时失效的场景

二级缓存(local+redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。

产品猿社区致力收录更多优质的商业产品,给服务商以及软件采购客户提供更多优质的软件产品,帮助开发者变现来实现多方共赢;

日常运营的过程中我们难免会遇到各种版权纠纷等问题,如果您在社区内发现有您的产品未经您授权而被用户提供下载或使用,您可按照我们投诉流程处理,点我投诉

本文来自用户发布投稿,不代表产品猿立场 ;若对此文有疑问或内容有严重错误,可联系平台客服反馈;

部分产品是用户投稿,可能本文没有提供官方下下载地址或教程,若您看到的内容没有下载入口,您可以在我们产品园商城搜索看开发者是否有发布商品;若您是开发者,也诚邀您入驻商城平台发布的产品,地址:点我进入

如若转载,请注明出处:https://www.chanpinyuan.cn/28047.html;
(0)
上一篇 2022年11月29日 下午4:17
下一篇 2022年11月29日

相关推荐

发表回复

登录后才能评论
分享本页
返回顶部