当前位置: 首页 > 科技观察

SpringBoot:Event实现了发布-订阅模型

时间:2023-03-12 02:55:15 科技观察

如图所示,在支付业务中,用户支付成功后,后续的业务流程很多,但是对用户是透明的,所以为了提高界面的响应速度,提高用户体验,后续操作将异步执行。异步执行方式异步执行主体@ServicepublicclassOrderService{publicvoidorderSuccess(){//下单完成后,可以开启异步任务,统一下单打包order=newOrder();order.setOrderNo(String.valueOf(System.currentTimeMillis()));MaporderSuccessServiceMap=SpringContextUtil.getBeansOfType(OrderSuccessService.class);orderSuccessServiceMap.values().forEach(服务->{service.orderSuccess(order);});}}异步执行接口publicinterfaceOrderSuccessService{/***订单支付成功*@paramorder*/publicCompletableFutureorderSuccess(Orderorder);}@Slf4j@ServicepublicclassMerchantNoticeServiceImplimplementsOrderSuccessService{@Override@Async("taskExecutor")publicCompletableFutureorderSuccess(Orderorder){log.info("{}MerchantNotification:{}",Thread.currentThread(),order);//返回异步调用的结果returnCompletableFuture.completedFuture(true);}}@Slf4j@Service公共课MerchantNoticeServiceImpl实现OrderSuccessService{@Override@Async("taskExecutor")publicCompletableFutureorderSuccess(Orderorder){log.info("{}MerchantNotification:{}",Thread.currentThread(),order);//返回异步调用的结果returnCompletableFuture.completedFuture(true);}}@Slf4j@ServicepublicclassMerchantNoticeServiceImplimplementsOrderSuccessService{@Override@Async("taskExecutor")publicCompletableFutureorderSuccess(Orderorder){log.info("{}商家通知:{}",Thread.currentThread(),命令);//返回异步调用的结果returnCompletableFuture.completedFuture(true);}}自定义线程池,线程池隔离,开启异步任务执行@Configuration//配置类@EnableAsync//@Async注解可以生效新的ThreadPoolTask??Executor();//创建线程池时初始化的线程数executor.setCorePoolSize(5);//线程池最大线程数只有在缓冲队列满后才会申请超过核心线程数线程executor.setMaxPoolSize(10);//用于缓冲执行任务的队列executor.setQueueCapacity(200);//当线程超过核心线程时,空闲时间到达后销毁executor.setKeepAliveSeconds(60);//可以用来定位线程池executor.setThreadNamePrefix("taskExecutor-orderSuccess-");//这里采用了CallerRunsPolicy策略。当线程池没有处理能力时,策略会直接在execute方法调用线程中运行被拒绝的任务;//如果执行器关闭,任务将被丢弃executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());//当线程池关闭后,等待所有任务完成,然后继续销毁其他Bean,//这样这些异步任务的销毁会先于数据库连接池对象的销毁,在它之前强制销毁被销毁以确保应用程序可以在最后关闭而不是阻塞。executor.setAwaitTerminationSeconds(60);回归执行人;}}SpringEvent实现发布/订阅模式的自定义事件:通过继承ApplicationEvent,重写构造函数,实现事件扩展。publicclassOrderApplicationEventextendsApplicationEvent{publicOrderApplicationEvent(OrderDataorderData){super(orderData);}}定义事件的消息体@DatapublicclassOrderData{/***ordernumber*/privateStringorderNo;}事件监听器@Slf4j@ServicepublicclassMerchantNoticeListener{@Async("asyncEventTaskExecutor")@EventListenerpublicCompletableFutureorderSuccess(OrderApplicationEventevent){log.info("{}MerchantNotification:{}",Thread.currentThread(),event);//返回异步调用结果returnCompletableFuture.completedFuture(true);}}@Slf4j@ServicepublicclassUserNoticeListenerimplementsApplicationListener{@Override@Async("asyncEventTaskExecutor")publicvoidonApplicationEvent(OrderApplicationEventevent){log.}info(":{}",Thread.currentThread(),event);}}@Slf4j@ServicepublicclassUserNoticeListenerimplementsApplicationListener{@Override@Asasync("asyncEventTaskExecutor")publicvoidonApplicationEvent(OrderApplicationEventevent){log.info("{}UserNotification:{}",Thread.currentThread(),event);}}@Slf4j@ServicepublicclassUserNoticeListenerimplementsApplicationListener{@Override@Async("asyncEventTaskExecutor")publicvoidonApplicationEvent(OrderApplicationEventevent){log.info("{}用户通知:{}",Thread.currentThread(),事件);}}自定义线程池@Configuration@Slf4j@EnableAsync//@Async注解可以生效ThreadPoolTask??Executor()(CoreolSecutor();执行者);executor.setMaxPoolSize(50);executor.setQueueCapacity(30);executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());executor.setThreadNamePrefix("asyncEventTaskExecutor--orderSuccess-");executor.initialize();returnexecutor;}@OverridepublicExecutorgetAsyncExecutor(){returnexecutor();}/***异常处理*@return@OverridepublicAsyncUncaughtExceptionHandlergetAsyncUncaughtExceptionHandler(){return(ex,method,params)->log.error(String.format([async]task{}error:",method),ex);}}事件发布@Service@Slf4jpublicclassOrderEventService{privatefinalApplicationEventPublisherapplicationEventPublisher;publicOrderEventService(ApplicationEventPublisherapplicationEventPublisher){this.applicationEventPublisher=applicationEventPublisher;}publicvoidsuccess(){OrderDataorderData=newOrderData();orderData.setOrderNo(String.valueOf(System.currentTimeMillis()));//消息OrderApplicationEventorderApplicationEvent=newOrderApplicationEvent(orderData);//发布事件applicationEventPublisher.publishEvent(orderApplicationEvent);}}写在最后:不管是否基于springboot发布订阅模型,最终开启线程执行任务,使用第三方MQ消息组件。问题在于重新启动服务器或由于未知原因而崩溃。恢复机制应该自己处理。推荐用在一些边缘服务中,比如日志记录等,对这些要求不是很高。