场景一个线程从某处接收消息(数据),可以是另一台主机也可以是消息队列,然后转到另一个线程池执行处理消息的具体逻辑,而消息的处理速度小于接收消息的速度。这种场景很常见,试想一下,你会如何设计和实现呢?直观的想法显然是利用JUC的线程框架来快速写代码。消息接收者:publicclassReceiver{privatestaticvolatilebooleaninited=false;privatestaticvolatilebooleanshutdown=false;privatestaticvolatileintcnt=0;privateMessageHandlermessageHandler;publicvoidstart(){Executors.newSingleThreadExecutor().execute(newRunnable(){@Overridepublicvoidrun(){while(!shutdown()){init;recv();}}});}/***模拟消息接收*/publicvoidrecv(){Messagemsg=newMessage("Msg"+System.currentTimeMillis());System.out.println(String.format("接收到消息(%d):%s",++cnt,msg));messageHandler.handle(msg);}publicvoidinit(){if(!inited){messageHandler=newMessageHandler();inited=true;}}publicstaticvoidmain(String[]args){newReceiver().start();}}消息处理:publicclassMessageHandler{privatestaticfinalintTHREAD_POOL_SIZE=4;privateExecutorServiceservice=Executors.newFixedThreadPool(THREAD_POOL_SIZE);publicvoidhandle(Messagemsg){try{service.execute(newRunnable(){@Overridepublicvoidrun(){parseMsg(msg);}});}catch(Throwablee){System.out.println("消息处理异常"+e);}}/***消息处理过程耗时*/publicvoidparseMsg(Messagemessage){while(true){try{System.out.println("Parsemessage:"+message);Thread.sleep(5000);System.out.println("=============================");}catch(InterruptedExceptione){e.printStackTrace();}}}}效果:该方案造成的现象是收到的消息会很快堆积起来。我们从消息队列(或者其他地方)取出大量的消息,但是处理线程的速度跟不上,所以造成的问题就是大量的Tasks会堆积在底层维护的一个阻塞队列中线程池,会消耗大量的存储空间,影响系统的性能分析:在执行()任务时,如果有空闲的工作线程,则投入运行,否则见设置的最大线程数,如果线程数没有达到限制,则创建一个新的线程并接收一个新的任务,否则任务会被缓冲到一个阻塞队列中。问题是这个队列。默认大小是无限的,这样只会堆积大量的任务,必然会消耗堆空间。publicstaticExecutorServicenewFixedThreadPool(intnThreads){returnnewThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue());}publicLinkedBlockingQueue(){this(Integer.MAX_VALUE);//capacity}countinglimit面对上面的问题,想到了要限制接收消息的速度,自然而然会想到各种线程同步原语,不过这里最简单的就是使用Volatile计数器。消息接收者:publicclassReceiver{privatestaticvolatilebooleaninited=false;privatestaticvolatilebooleanshutdown=false;privatestaticvolatileintcnt=0;privateMessageHandlermessageHandler;publicvoidstart(){Executors.newSingleThreadExecutor().execute(newRunnable(){@Overridepublicvoidrun(){while(!shutdown()){init;recv();}}});}/***模拟消息接收*/publicvoidrecv(){Messagemsg=newMessage("Msg"+System.currentTimeMillis());System.out.println(String.format("接收到消息(%d):%s",++cnt,msg));messageHandler.handle(msg);}publicvoidinit(){if(!inited){messageHandler=newMessageHandler();inited=true;}}publicstaticvoidmain(String[]args){newReceiver().start();}}消息处理:publicclassMessageHandler{privatestaticfinalintTHREAD_POOL_SIZE=1;privateExecutorServiceservice=Executors.newFixedThreadPool(THREAD_POOL_SIZE);publicvoidhandle(Messagemsg){try{service.execute(newRunnable(){@Overridepublicvoidrun(){parseMsg(msg);}});}catch(Throwablee){System.out.println("消息处理异常"+e);}}/***消息处理过程耗时*/publicvoidparseMsg(Messagemessage){try{Thread.sleep(10000);System.out.println("Parsemessage:"+message);}catch(InterruptedExceptione){e.printStackTrace();}finally{Receiver.limit--;}}}作用:通过控制消息数量来阻塞消息的接收进程,不会导致任务堆积,系统的内存消耗会减少比较平坦,消息数量的限制和下面任务队列的大小限制基本一致。使用同步队列SynchronousQueueSynchronousQueue称为队列,但它不缓冲任务对象。它只是对象传输的控制点。如果有空闲线程或者没有达到最大线程限制,就会交给工作线程执行,否则会被拒绝。我们需要自己实现相应的拒绝策略RejectedExecutionHandler。默认是抛出异常RejectedExecutionException。消息的接收者同上。消息处理:publicclassMessageHandler{privatestaticfinalintTHREAD_POOL_SIZE=4;ThreadPoolExecutorservice=newThreadPoolExecutor(THREAD_POOL_SIZE,THREAD_POOL_SIZE,0L,TimeUnit.MILLISECONDS,newSynchronousQueue(),newRejectedExecutionHandler(){@OverridepublicvoidrejectedExecution(Runnabler,ThreadPoolExecutorexecutor){System.out.println("自定义拒绝策略");try{executor.getQueue().put(r);System.out.println("重新将任务放回队列");}catch(InterruptedExceptione){e.printStackTrace();}}});publicvoidhandle(Messagemsg){try{System.out.println(service.getTaskCount());System.out.println(service.getQueue().size());System.out.println(service.getCompletedTaskCount());service.execute(newRunnable(){@Overridepublicvoidrun(){parseMsg(msg);}});}catch(Throwablee){System.out.println("消息处理异常"+e);}}/***消息处理过程耗时*/publicvoidparseMsg(Messagemessage){while(true){try{System.out.println("线程名称:"+Thread.currentThread().getName());System.out.println("解析消息:"+message);Thread.sleep(1000);}catch(InterruptedExceptione){e.printStackTrace();}}}}作用:我们可以控制接收消息的速度,但是需要实现某种blockinginrejectedExecution但是,如果在rejection发生时选择将task放回队列,问题是Task会饿死,使用一个大小受限的阻塞队列,使用LinkedBlockingQueue作为线程框架的底层任务缓冲区,并设置一个sizelimit,思路和上面的方案一样,有一个阻塞点,但是通过***jvmmonitor,可以看到这里的CPU消耗少了,内存占用减少了,并且波动较小(具体原因有待探讨)消息接收同上。消息处理:publicclassMessageHandler{privatestaticfinalintTHREAD_POOL_SIZE=4;privatestaticfinalintBLOCK_QUEUE_CAP=500;ThreadPoolExecutorservice=newThreadPoolExecutor(THREAD_POOL_SIZE,THREAD_POOL_SIZE,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue(BLOCK_QUEUE_CAP),newSimpleThreadFactory(),newRejectedExecutionHandler(){@OverridepublicvoidrejectedExecution(Runnabler,ThreadPoolExecutorexecutor){System.out.println("自定义拒绝策略");try{executor.getQueue().put(r);System.out.println("重新将任务放回队列");}catch(InterruptedExceptione){e.printStackTrace();}}});publicvoidhandle(Messagemsg){try{service.execute(newRunnable(){@Overridepublicvoidrun(){parseMsg(msg);}});}catch(Throwablee){System.out.println("消息处理异常"+e);}}/***消息处理过程耗时*/publicvoidparseMsg(Messagemessage){try{Thread.sleep(5000);System.out.println("线程名称:"+Thread.currentThread().getName());System.out.println("解析消息:"+message);}catch(InterruptedExceptione){e.printStackTrace();}}staticclassSimpleThreadFactoryimplementsThreadFactory{@OverridepublicThreadnewThread(Runnabler){Threadthread=newThread(r);thread.setName("Thread-"+System.currentTimeMillis());returnthread;}??}}容易出错,特别是当你不熟悉该方法时