大家好,我是Tom。最近有很多朋友给我留言。你能总结一下异步编程吗?今天就和大家简单聊聊这个话题。早期的系统都是同步的,容易理解,我们来看一个例子。同步编程当用户创建一个电商交易订单时,要经历的业务逻辑流程还是很长的,每一步都需要一定的时间,所以整体RT会比较长。于是,聪明人开始思考,能不能把一些非核心业务从主流程中分离出来,于是就有了异步编程的雏形。异步编程是一种允许程序并发运行的方法。它允许多个事件同时发生。当程序调用长时间运行的方法时,不会阻塞当前的执行流程,程序可以继续运行。核心思想:利用多线程优化性能,将串行操作变为并行操作。采用异步模式设计的程序可以显着减少线程等待,从而大大提高系统的整体性能,显着降低高吞吐场景下的延迟。接下来说一下异步的编程实现方法。1、ThreadThread直接继承Thread类是创建异步线程最简单的方法。首先,创建Thread子类,普通类或者匿名内部类;然后创建子类的实例;最后通过start()方法启动线程。publicclassAsyncThreadextendsThread{@Overridepublicvoidrun(){System.out.println("当前线程名:"+this.getName()+",执行线程名:"+Thread.currentThread().getName()+“-你好”);}}publicstaticvoidmain(String[]args){//模拟业务流程//......//创建一个异步线程AsyncThreadasyncThread=newAsyncThread();//启动异步线程asyncThread.start();}当然如果每次都创建一个Thread线程,频繁的创建和销毁会浪费系统资源。我们可以使用线程池。@Bean(name="executorService")publicExecutorServicedownloadExecutorService(){returnnewThreadPoolExecutor(20,40,60,TimeUnit.SECONDS,newArrayBlockingQueue<>(2000),newThreadFactoryBuilder().setNameFormat("defaultExecutorService-%d").build(),(r,executor)->log.error("defaultExecutorpoolisfull!"));}将业务逻辑封装成Runnable或Callable,交给线程池执行。2、上述Future方法虽然实现了多线程并行处理,但是有些业务不仅需要执行流程,还需要获取执行结果。从1.5版本开始,Java提供了Callable和Future,可以在任务执行完成后获取任务执行结果。当然还提供了其他功能,比如:取消任务,检查任务是否完成等。Future类位于java.util.concurrent包下,接口定义:publicinterfaceFuture{booleancancel(booleanmayInterruptIfRunning);布尔isCancelled();布尔isDone();Vget()抛出InterruptedException、ExecutionException;Vget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException;}方法说明:cancel():取消任务,如果任务取消成功,返回true,如果任务取消失败,返回false。isCancelled():表示任务是否取消成功,如果在任务正常完成前取消成功则返回true。isDone():表示任务是否已经完成,如果完成则返回true。get():获取执行结果。该方法会阻塞等待,直到任务执行完毕才返回。get(longtimeout,TimeUnitunit):用于获取执行结果。如果在指定时间内没有得到结果,则直接返回null。代码示例:publicclassCallableAndFuture{publicstaticExecutorServiceexecutorService=newThreadPoolExecutor(4,40,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue(1024),newThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(),newThreadPoolExecutor.AbortPolicy());staticclassMyCallableimplementsCallable{@OverridepublicStringcall()throwsException{return"异步处理,Callable返回结果";}}publicstaticvoidmain(String[]args){Futurefuture=executorService.submit(newMyCallable());尝试{System.out.println(future.get());}catch(Exceptione){//nodo}finally{executorService.shutdown();}}}Future表示可能尚未完成的异步任务的结果。通过get方法获取执行结果,会阻塞直到任务返回结果。3、FutureTaskFutureTask实现了RunnableFuture接口,RunnableFuture接口继承了Runnable接口和Future接口,所以FutureTask对象可以作为任务提交给ThreadPoolExecutor执行,也可以直接由Thread执行;并且因为实现了Future接口,所以也可以用来获取任务的执行结果。FutureTask构造函数:publicFutureTask(Callablecallable)publicFutureTask(Runnablerunnable,Vresult)FutureTask常用于封装Callable和Runnable,可以作为任务提交到线程池中执行。除了作为一个独立的类之外,它还提供了一些功能性的功能供我们创建自定义任务类。FutureTask线程安全由CAS保证。ExecutorServiceexecutor=Executors.newCachedThreadPool();//FutureTask包装callbale任务,然后交给线程池执行FutureTaskfutureTask=newFutureTask<>(()->{System.out.println("子线程开始计算:");Integersum=0;for(inti=1;i<=100;i++)sum+=i;returnsum;});//线程池执行任务,运行结果在futureTask对象中();}executor.shutdown();Callable与Future的区别:Callable是用来产生结果的,Future是用来获取结果的。如果多个任务的多个自由串行或并行组合涉及到多个线程间的同步阻塞获取结果,Future代码实现会比较繁琐,需要我们手动处理每个交点,容易出错。4.异步框架CompletableFutureFuture类使用get()方法阻塞等待获取异步执行的运行结果,性能比较差。在JDK1.8中,Java提供了CompletableFuture类,它是基于异步函数式编程的。相对于阻塞等待结果,CompletableFuture可以通过回调的方式处理计算结果,实现了异步非阻塞,性能更好。优点:当异步任务结束时,会自动回调一个对象的方法。当一个异步任务失败时,它会自动回调一个对象的方法。主线程设置回调后,就不再关心异步任务的执行了。泡茶实例:(内容摘自:极客时间的《Java 并发编程实战》)。//任务1:洗水壶->烧水CompletableFuturef1=CompletableFuture.runAsync(()->{System.out.println("T1:Washthekettle...");sleep(1,TimeUnit.SECONDS);System.out.println("T1:Boilwater...");sleep(15,TimeUnit.SECONDS);});//Task2:洗茶壶->洗杯子->拿teaCompletableFuturef2=CompletableFuture.supplyAsync(()->{System.out.println("T2:洗茶壶...");sleep(1,TimeUnit.SECONDS);System.out.println("T2:洗茶杯...");sleep(2,TimeUnit.SECONDS);System.out.println("T2:喝茶...");sleep(1,TimeUnit.SECONDS);return"龙井";});//任务3:任务1和任务2完成后执行:maketeaCompletableFuturef3=f1.thenCombine(f2,(__,tf)->{System.out.println("T1:gettea:"+tf);System.out.println("T1:Maketea...");return"Servetea:"+tf;});//等待任务3执行结果System.out.println(f3.join());}CompletableFuture提及提供了非常丰富的API,大约有50种方法来处理串行、并行、组合和错误处理五、SpringBoot注解@Async除了硬编码的异步编程方法,SpringBoot框架还提供注解解决方案,使用方法体是边界,方法体内部的代码逻辑都是异步执行的。首先,使用@EnableAsync注释启用异步。@SpringBootApplication@EnableAsync公共类StartApplication{publicstaticvoidmain(String[]args){SpringApplication.run(StartApplication.class,args);}}自定义线程池:@Configuration@Slf4jpublicclassThreadPoolConfiguration{@Bean(name="defaultThreadPoolExecutor",destroyMethod="shutdown")publicThreadPoolExecutorsystemCheckPoolExecutorService(){returnnewThreadPoolExecutor(3,10,60,TimeUnit.SECONDS,newLinkedBlockingQueue(10000),newThreadFactoryBuilder().setNameFormat("default-executor-%d").build(),(r,executor)->log.error("系统池已满!"));}}在异步处理方法上添加注解@Async,在调用execute方法时,通过自定义线程池defaultThreadPoolExecutor异步执行execute方法。@ServicepublicclassAsyncServiceImplimplementsAsyncService{@Async("defaultThreadPoolExecutor")publicBooleanexecute(Integernum){System.out.println("Thread:"+Thread.currentThread().getName()+",task:"+num);返回真;}}用@Async注解标记的方法称为异步方法。在springboot应用中使用@Async非常简单:调用异步方法类或者启动注解@EnableAsync的类。需要异步调用的方法加上@Async。使用的@Async注解方法的类对象应该是Spring容器管理的bean对象。6.SpringApplicationEvent事件机制在一些大型项目中经常使用。Spring专门提供了一套事件机制接口来满足架构原理的解耦。ApplicationContext通过ApplicationEvent类和ApplicationListener接口处理事件。如果将实现ApplicationListener接口的bean注入到上下文中,则每次使用ApplicationContext发布ApplicationEvent时都会通知该bean。本质上,这是标准的观察者设计模式。ApplicationEvent是Spring提供的所有Event类的基类。首先自定义业务事件子类,继承自ApplicationEvent,通过泛型注入业务模型参数类。相当于MQ的消息体。publicclassOrderEventextendsAbstractGenericEvent{publicOrderEvent(OrderModelsource){super(source);}}然后,编写事件监听器。ApplicationListener接口是Spring提供的事件订阅者必须实现的接口。我们需要定义一个子类来继承ApplicationListener。相当于MQ的消费端。@ComponentpublicclassOrderEventListenerimplementsApplicationListener{@OverridepublicvoidonApplicationEvent(OrderEventevent){System.out.println("[OrderEventListener]监听器处理!"+JSON.toJSONString(event.getSource()));}}最后,发布一个事件,告诉所有与事件相关的监听器。相当于MQ的生产端。OrderModelorderModel=newOrderModel();orderModel.setOrderId((long)i);orderModel.setBuyerName("Tom-"+i);orderModel.setSellerName("judy-"+i);orderModel.setAmount(100L);//发布Spring事件通知SpringUtils.getApplicationContext().publishEvent(newOrderEvent(orderModel));加餐:【消费】线程:http-nio-8090-exec-1,消费事件{"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}[生产者]线程:http-nio-8090-exec-1,发布事件1[消费者]线程:http-nio-8090-exec-1,消费事件{"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}[生产端]线程:http-nio-8090-exec-1,发布事件2[消费者]线程:http-nio-8090-exec-1,消费事件{"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}[生产结束]线程:http-nio-8090-exec-1,上面的releaseevent3是运行一个demo的结果,我们发现无论是生产端还是消费端,都使用了同一个线程http-nio-8090-exec-1,并且Spring框架的事件机制默认是同步阻塞的。只是在代码规范上进行了解耦,具有更好的可扩展性,但底层仍然采用同步调用的方式。那么问题来了,如果要实现异步调用,怎么处理呢?我们需要手动创建一个SimpleApplicationEventMulticaster,并设置TaskExecutor。此时所有的消费事件都是异步线程执行的。@ComponentpublicclassSpringConfiguration{@BeanpublicSimpleApplicationEventMulticasterapplicationEventMulticaster(@Qualifier("defaultThreadPoolExecutor")ThreadPoolExecutordefaultThreadPoolExecutor){SimpleApplicationEventMulticastersimpleApplicationEventMulticaster=newSimpleApplicationEventMulticaster();simpleApplicationEventMulticaster.setTaskExecutor(defaultThreadPoolExecutor);返回simpleApplicationEventMulticaster;}}我们看下面修改后的运行结果:[productionend]thread:http-nio-8090-exec-1,releaseevent1[productionend]thread:http-nio-8090-exec-1,releaseevent2[生产端]线程:http-nio-8090-exec-1,发布事件3[消费者]线程:default-executor-1,消费事件{"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}[消费者]线程:default-executor-2,消费事件{"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}[consumer]线程:default-executor-0,消费事件{"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}SimpleApplicationEventMulticaster这个Bean是怎么被我们自己实例化的以及系统默认的加载顺序呢?会吗冲突?查看Spring源码,处理逻辑在AbstractApplicationContext#initApplicationEventMulticaster方法中,通过beanFactory查找是否有自定义bean,如果没有,容器会自己注入一个新的SimpleApplicationEventMulticaster对象到容器中代码地址:https://github.com/aalansehaiyang/wx-project.7、消息队列异步架构是互联网系统中典型的架构模式,对应于同步架构。消息队列天生就是这种异步架构,具有超高吞吐量和超低延迟。消息队列异步架构的主要角色包括消息生产者、消息队列和消息消费者。消息生产者是主要的应用程序,生产者将调用请求封装成消息发送到消息队列中。消息队列的职责是缓存消息,等待消费者消费。按消费方式分为点对点方式和发布-订阅方式两种。消息消费者用于从消息队列中拉取消息并消费,完成业务逻辑处理。当然,市面上的消息队列框架有很多,比如RabbitMQ、Kafka、RocketMQ、ActiveMQ、Pulsar等。不同消息队列的功能特性会略有不同,但整体结构大同小异,这里不再展开。我们只需要记住一个关键点。借助消息队列中间件,可以高效地实现异步编程。