简介EventBus是一种自动事件处理器,也叫事件总线,基于观察者设计模式,可以自动处理某些任务,比如添加审批日志。提供了发布订阅模型,可以方便的在EventBus上注册订阅者。发布者可以简单地将事件传递给EventBus,EventBus会自动将事件传递给关联的订阅者,只能用于线程间通信。与线程池的区别事件总线可以有多个订阅者,而线程池只会有一个执行逻辑。事件总线的底层是基于线程池的优点。根据记忆,如果断电,事件将丢失。只能使用一个进程。代码太分散,不方便调试。//事件总线的定义publicclassEventBusCenter{privatestaticEventBuseventBus=newEventBus();privateEventBusCenter(){}publicstaticEventBusgetInstance(){returneventBus;}publicstaticvoidregister(Objectobj){eventBus.register(obj);}publicstaticvoidunregister(Objectobj){eventBus.unregister(obj);(Objectobj){eventBus.post(obj);}}//ConsumerpublicclassDataObserver{/***只有用@Subscribe注解的方法才会注册到EventBus*并且该方法有且只能有1个参数*/@Subscribepublicvoidfunc(Stringmsg){System.out.println("字符串消息:"+msg);}}//ProviderpublicclassTest{publicstaticvoidmain(String[]args)throwsInterruptedException{DataObserverobserver=newDataObserver1();EventBusCenter.register(观察者);//只有注册了String类型参数的方法才会被调用EventBusCenter.post("poststringmethod");EventBusCenter.post(123);}}以上代码分别是事件总线、Consumers和Providers的定义,代码逻辑很简单,但是在实际开发中,代码会解耦实际开发中,类图如下entity*/voidasyncPost(Objectevent);/***发布同步事件**@paramevent事件实体*/voidsyncPost(Objectevent);/***添加消费者**@paramobj消费者对象,默认以class为key*/voidaddConsumer(Objectobj);/***移除消费者**@paramobj消费者对象,默认以类为键*/voidremoveConsumer(Objectobj);/***scanconsumer**@parampackageNamescanpackage*/voidscanConsumer(StringpackageName);}以上是顶层事件接口/***GuavaEventBus和Spring之间的桥梁*/公共抽象类AbstractSpringEventBus实现了IEventBus,ApplicationContextAware{私有ApplicationContext上下文;@OverridepublicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{this.context=applicationContext;这个.scanConsumer(null);}@OverridepublicvoidscanConsumer(StringpackageName){context.getBeansOfType(IEventConsumer.class).forEach((k,v)->{this.addConsumer(v);});}}上面的代码很有意思,首先抽象类实现了接口,实现ApplicationContextAware接口并不需要实现所有的接口方法。这是Bean初始化的第一步。该Bean还没有完全初始化,即Spring启动时会调用setApplicationContext,其中初始化应用上下文,调用scanConsumer方法。扫描消费者的Bean,注册到事件总线上,实现IEventBus接口中的scanConsumer,为第一步提供服务。当然,抽象类还是太抽象了,实际使用中可能需要分为异步事件和同步事件。而且AbstractSpringEventBus还没有实现addConsumer方法,所以需要在不同的业务中定义不同的实现,如下私有最终事件总线syncEventBus;publicInvoiceEventBus(){//异步事件配置线程池asyncEventBus=newAsyncEventBus(newThreadPoolExecutor(4,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,newSynchronousQueue<>(),newThreadFactoryBuilder().setNameFormat("invoiceAsyncEventBus-thread-%d").build()),newEventBusSubscriberExceptionHandler());//同步事件配置syncEventBus=newEventBus("SyncEventBus");}@OverridepublicvoidasyncPost(Objectevent){asyncEventBus.post(event);}@OverridepublicvoidsyncPost(Objectevent){syncEventBus.post(event);}@OverridepublicvoidaddConsumer(Objectobj){asyncEventBus.register(obj);syncEventBus.register(obj);}@OverridepublicvoidremoveConsumer(Objectobj){asyncEventBus.unregister(obj);syncEventBus.unregister(obj);代码已经完成了所有的消费者注册工作,所以扩展只需要实现IEventConsumer,它就会被注册,监听事件publicinterfaceIEventConsumer
