队列是一种数据结构中的线性表,从一端插入数据,然后从另一端删除数据。本文的目的不是解释各种队列算法,而是描述在应用层使用队列可以解决哪些场景。在我开发的系统中,不是所有的业务都必须实时处理,不是所有的请求都必须实时反馈给用户,不是所有的请求/处理都必须100%成功处理,不知道靠谁对“我”的处理结果,不关心其他系统如何处理后续业务,不需要强一致性,只需要保证最终一致性,要保证数据处理的顺序;这时候你应该考虑使用队列来解决这些问题。在实际开发中,我们经常使用队列来进行异步处理、系统解耦、数据同步、流量调峰、缓冲、限流等应用场景中的异步处理:使用队列的主要原因之一就是为了进行异步处理,比如如用户注册成功后发送注册成功邮件/新用户积分/优惠券等,缓存过期返回旧数据,然后异步更新缓存。异步写入日志等;通过异步处理,可以提高主流程的响应速度,非主流程/非重要业务可以异步集中处理,这样任务也可以进行聚合和批量处理;因此,消息队列/任务队列可用于异步处理。系统解耦:比如用户成功支付完成订单后,需要通知生产配送系统、发票系统、库存系统、推荐系统、搜索系统、风控系统等进行业务处理;不清楚未来需要增加/支持哪些业务,而且这些业务流程不需要实时处理或强一致性,只需要最终一致性,所以可以通过消息队列/任务队列来实现系统解耦。数据同步:比如想把Mysql的变化数据同步到Redis,或者同步Mysql的数据到Mongodb,或者机房数据同步,或者主从数据同步等,可以考虑使用databus,运河,此时还有水獭。使用数据总线队列进行数据同步的好处是可以保证数据修改的顺序。流量调峰:系统瓶颈一般在数据库上,比如扣库存、下单等;这时候可以考虑使用队列,将变更请求暂存到队列中,通过缓存+队列暂存的方式切数据库流量高峰;在秒杀系统中,订单服务会成为系统的瓶颈。这个时候会使用queue进行排队限流,保护订单服务。通过队列暂存或队列限流进行调峰。比如减少库存,可以考虑这样的设计:直接在Redis中reduce,然后记录减少的日志(FIFO队列),通过Worker同步到DB。实际队列的应用场景还是很多的。本文列举了笔者遇到过的很多场景。缓冲区队列通常是Log4j的日志缓冲区。我们在使用log4j记录日志的时候,可以配置一个bytebuffer。当字节缓冲区满时,会立即同步到磁盘(flush操作)。Log4j是使用BufferedWriter实现的;这种模式不是异步写入,当缓冲区满时主线程仍然会阻塞。如果需要异步模式,可以使用AsyncAppender,然后使用bufferSize来控制日志事件缓冲区的大小。通过缓冲队列可以实现:批处理、异步处理。任务队列使用任务队列将一些不需要与主线程同步执行的任务丢到任务队列中进行异步处理;作者使用最多的是线程池任务队列(默认LinkedBlockingQueue)和Disruptor任务队列(RingBuffer)。比如刷数据的时候,直接把任务丢进队列异步处理,处理成功后再异步通知用户;比如删除SKU操作,当用户请求时直接分解任务丢入队列,异步处理,处理成功后异步通知用户即可;还有查询聚合,将多个可以并行处理的任务丢到队列中,等待最慢的任务返回。如果您使用的是内存任务队列,请记住,系统重启等问题可能导致数据丢失。通过任务队列,可以实现:异步处理、任务分解/聚合处理。注:JDK7提供了ExecutorServiceForkJoinPool的新实现,提供了Work-stealing机制,更好的提高并发效率。使用Executors.newFixedThreadPool时,它不设置队列大小(默认Integer.MAX_VALUE)。如果大量任务缓存在LinkedBlockingQueue中等待线程执行,会出现GC慢等问题,导致系统响应变慢甚至OOM。因此,在使用线程池时,指定队列大小,并设置合理的RejectedExecutionHandler;记录请求源的参数,方便定位问题根源。MessageQueue笔者所在公司使用的是自研的JMQ;开源的包括ActiveMQ、Kafka和Redis。使用消息队列存储业务数据,其他系统可以根据需要进行订阅。常见的模式有:点对点(一条消息只有一个消费者)、发布订阅(一条消息可以有多个消费者);最常用的是发布-订阅模式。比如用户注册成功,修改商品数据,修改订单状态等,所有的修改都应该发送到消息队列中,以便其他系统可以根据需要订阅消息,然后根据自己的业务逻辑进行开发自己的需要。在增加新功能时,消息消费者只需要订阅消息,然后开发相应的业务逻辑即可。消息生产者不关心你如何使用消息以及你做了什么业务处理。同步调用,任何新功能都需要用户系统请求。如果其中一项服务失败,则整个服务将变得不可用。消息队列,用户系统只需要发布用户注册成功的消息,相关系统订阅消息,然后执行相关的业务逻辑。相关服务出现问题不会影响注册的主要流程。通过消息队列,可以实现:异步处理和系统解耦。请求队列请求队列是指在web环境中对用户请求进行排队,从而进行一些特殊的控制:流量控制、请求分类、请求隔离;比如根据功能将请求分到不同的队列中,这样不同队列出现问题后不会互相影响;请求也可以分级,优先处理一些重要的请求(功能应该在物理上进行一定程度的分离);服务器处理能力有限,在接近服务器瓶颈时需要考虑限流。最简单的限流当丢弃未处理的请求时,可以使用队列进行流控。数据总线队列一般消息队列中的消息都是业务维度的,比如业务key或者业务状态,比如哪个SKU发生了变化,有的订阅者需要再次查询获取最新修改的数据(比如缓存同步);现有的消息队列方式的缺点是难以只推送修改的部分并保证数据的顺序。这种场景更适合使用数据总线队列来实现。如果数据库数据修改后需要同步到缓存,或者一个机房的数据需要同步到另一个机房,那只是数据维度的同步。这时候就要用到canal、otter、databus等数据总线队列;使用数据总线队列的好处是可以保证数据的顺序。混合队列在《构建需求响应式亿级商品详情页》中介绍过这种方法的队列,使用混合队列解决实际问题。这里MQ使用京东自研的JMQ,消息可靠持久化存储;应用会根据不同的维度向JMQ发布消息;下游应用收到消息后,会放入Redis中,使用RedisList来存储这些任务;应用消费处理Redis消息后,根据不同维度聚合产品消息,再次发送出去。使用Redis队列的主要原因是为了提高消息的堆积能力和并发处理能力。另外,在使用Redis构建消息队列时,需要考虑网络抖动导致消息丢失的问题,因为Redis没有回滚事务,也没有确认机制。我们使用如下方法来防止消息丢失:try{id=queueRedis.opsForList().rightPopAndLeftPush(queueName,processingQueueName);}catch(Exceptione){//发生网络异常,需要返回处理中的id到等待队列中Stringmsg=queueName+"to"+processingQueueName+"rpoplpusherror";LOG.error(msg,e);//告警码}失败我们会重试3次,重试失败后放入失败队列,并且失败队列是具有反重功能的(来自本地队列和失败队列),使用RedisLua脚本实现:staticEventQueueScriptADD_TO_FAIL_QUEUE_REDIS_SCRIPT=newEventQueueScript("redis.call('lrem',KEYS[1],1,ARGV[1])redis.call('lrem',KEYS[2],1,ARGV[1])returnredis.call('lpush',KEYS[2],ARGV[1])");Redis作者AntirezDisque开发的内存分布式消息队列是未来内存消息队列更好的选择。其他优先级队列:在实际开发中,有些任务肯定是紧急的,此时应该优先处理紧急任务;所以请考虑对队列进行分类。复制队列:在进行一些系统改造或者增加新功能时,如果没有足够的信心保证业务逻辑是正确的,可以考虑存储一个队列的副本(比如1小时,1天),这样,当出现业务问题时,可以回放这些消息。镜像队列:每个队列不限制订阅个数,必须有一个限制;当达到限制时,请考虑使用镜像队列来解决这个问题。队列并发数:不同的队列实现,队列服务器上的并发连接数不同;一定不能增加队列的并发连接数,消费能力会增加;也不会因为增加而增加消费服务器的并发消费能力。需要根据实际情况设置合理的并发连接数。推送或拉取:消息体内容不是越完整越好,需要根据具体业务设计消息体;例如,有的系统依赖于产品变化消息(只有一个SKU),有的系统依赖于产品状态消息(SKU,status),有的系统依赖于产品属性变化消息(SKU,changedattributes)等,如果所有系统消费产品变更消息,然后这些系统会调用产品查询服务拉取最新的产品信息,然后进行处理。因此,需要根据实际情况决定是采用push方式(推送系统需要的所有信息)还是pull方式(只推送ID,然后再校验)。消息合并:如果消息写入量很大,可以考虑合并写入消息。可以“写入本地磁盘队列”-->“同步本地磁盘队列到消息中间件”;同步时,可以根据需要制定同步策略,比如每秒同步1次。
