4.整体流程从上面的部分可以看出,Source和Sink都依赖于Channel,所以应该先启动Channel,启动时再启动Source或Sink。.Flume有两种启动方式:使用EmbeddedAgent嵌入到Java应用程序中或者使用Application单独启动一个进程。这里我们着重于应用分析。首先进入org.apache.flume.node.Application的main方法启动:Java代码//1、设置默认值启动参数,参数是否必须Optionsoptions=newOptions();Optionoption=newOption("n",“name”,true,“thenameofthisagent”);option.setRequired(true);options.addOption(option);option=newOption(“f”,“conf-file”,true,“specifyaconfigfile(requiredif-zmissing)”);option.setRequired(false);options.addOption(option);//2、然后解析命令行参数CommandLineParserparser=newGnuParser();CommandLinecommandLine=parser.parse(options,args);StringagentName=commandLine.getOptionValue('n');booleanreload=!commandLine.hasOption("no-reload-conf");if(commandLine.hasOption('z')||commandLine.hasOption("zkConnString")){isZkConfigured=true;}if(isZkConfigured){//3、如果是通过ZooKeeper配置的,使用ZooKeeper参数启动,这里忽略,我们用配置文件说明}else{/4,打开配置文件,如果不存在,则会很快失败figurationfiledoesnotexist:"+path);}Listcomponents=Lists.newArrayList();if(reload){//5.如果需要定期重新加载配置文件,请按以下方式进行//5.1.使用提供的那个byGuavahereEventBusEventBusEventBus=newEventBus(agentName+"-event-bus");//5.2、读取配置文件,使用定时轮训策略,拉取PollingPropertiesFileConfigurationProviderconfigurationProvider=newPollingPropertiesFileConfigurationProviderconfigurationProvider=newPollingPropertiesFileConfigurationProvider(agentName,configurationFile,eventBus),30);components.add(configurationProvider);application=newApplication(components);//5.3,向Application注册组件//5.4,向事件总线注册这个应用,EventBus会自动注册eventBus.register(application)声明的方法inApplicationusing@Subscribe;}else{//5.配置文件不支持正则reloadPropertiesFileConfigurationProviderconfigurationProvider=newPropertiesFileConfigurationProvider(agentName,configurationFile);application=newApplication();//6.2.直接使用配置文件初始化Flume组件application.handleConfigurationEvent(configurationProvider.getConfiguration());}}//7。启动Flume应用application.start();//8.注册虚拟机关闭挂钩。当虚拟机关闭时,调用Application的stop方法终止finalApplicationappReference=application;Runtime.getRuntime().addShutdownHook(newThread("agent-shutdown-hook"){@Overridepublicvoidrun(){appReference.stop();}});上述过程只摘录了部分核心代码,如ZK的实现直接忽略,Flume启动的大致流程如下:1.读取命令行参数;2.读取配置文件;3、根据是否需要reload,使用不同的策略初始化Flume;如果需要reload,使用Guava的Eventbus实现,Application的handleConfigurationEvent是一个事件订阅者,PollingPropertiesFileConfigurationProvider是一个事件发布者,它会定时检查文件是否有变化,如果有变化,会重新读取配置文件,发布配置文件事件变化,handleConfigurationEvent会收到配置变化重新初始化;4.启动Application,注册虚拟机关闭钩子。handleConfigurationEvent方法比较简单。首先调用stopAllComponents停止所有组件,然后调用startAllComponents使用配置文件初始化所有组件:startAllComponents(conf);}MaterializedConfiguration存储了Flume在运行时需要的组件:Source、Channel、Sink、SourceRunner、SinkRunner等,通过ConfigurationProvider初始化获取。例如,PollingPropertiesFileConfigurationProvider会读取配置文件,然后初始化组件。startAllComponents的实现大致如下:Java代码//1.首先启动Channelsupervisor.supervise(Channels,newSupervisorPolicy.AlwaysRestartPolicy(),LifecycleState.START);//2.确保所有通道都已启动(Channelch:materializedConfiguration.getChannels().values()){while(ch.getLifecycleState()!=LifecycleState.START&&!supervisor.isComponentInErrorState(ch)){try{Thread.sleep(500);}catch(InterruptedExceptione){Throwables.propagate(e);}}}//3。启动SinkRunnersupervisor.supervise(SinkRunners,newSupervisorPolicy.AlwaysRestartPolicy(),LifecycleState.START);//4.StartSourceRunnersupervisor.supervise(SourceRunner,newSupervisorPolicy.AlwaysRestartPolicy(),LifecycleState.START);//5、初始化监控服务this.loadMonitoring();从下面的代码可以看出,首先要准备好Channel,因为Source和Sink都会对它进行操作。如果Channel的初始化失败,则整个流程失败;然后启动SinkRunner并准备第一个Goodconsumer;然后启动SourceRunner开始收集日志。这里我们发现有两个独立的组件LifecycleSupervisor和MonitorService,一个是组件guardsentinel,一个是监控服务。守护哨兵守护着这些组件。如果出现问题,默认策略是自动重启这些组件。stopAllComponents的实现大致如下:Java代码//1、先停止SourceRunnersupervisor.unsupervise(SourceRunners);//2、再停止SinkRunnersupervisor.unsupervise(SinkRunners);//3、再停止Channelsupervisor.unsupervise(Channels);//4、***停止MonitorServicemonitorServer.stop();这里可以看出停止的顺序是Source、Sink、Channel,即先停止生产,再停止消费,最后停止管道。Application中start方法的代码实现如下:Java代码publicsynchronizedvoidstart(){for(LifecycleAwarecomponent:components){supervisor.supervise(component,newSupervisorPolicy.AlwaysRestartPolicy(),LifecycleState.START);}}循环注册的组件通过Application,然后由守卫哨兵守卫它。默认策略是在出现问题时自动重启组件。假设我们支持reload配置文件,那么之前Application启动的时候注册了PollingPropertiesFileConfigurationProvider组件,也就是该组件会被guardsentinel看守。如果出现问题,默认策略会自动重启。应用程序关闭执行以下操作:Java代码publicsynchronizedvoidstop(){supervisor.stop();if(monitorServer!=null){monitorServer.stop();}}关闭监护人哨兵和监控服务。至此,基本的Application分析就结束了,关于守护哨兵是如何实现的,我们还有很多疑问。总体流程可以概括如下:1.首先初始化命令行配置;2.然后读取配置文件;3.根据是否需要reload初始化配置文件中的组件;如果需要重新加载,Guava事件总线将用于发布和订阅更改;4.然后创建Application,创建guardiansentinel,先停止所有组件,再启动所有组件;启动序列:Channel、SinkRunner、SourceRunner,并将这些组件注册到guardiansentinel,初始化监控服务;停止序列:SourceRunner、SinkRunner、Channel;5、如果配置文件需要定时重新加载,需要将Polling***ConfigurationProvider注册到守护哨兵;6.***注册虚拟机关闭钩子,停止守护哨兵和监控服务。round-robin训练实现的SourceRunner和SinkRunner会创建一个线程来工作,其工作方式之前已经介绍过。接下来我们看守护者sentinel的实现。首先创建LifecycleSupervisor:Java代码//1、用于存放组件supervisedProcesses=newHashMap();//2、用于存放被监控的组件monitorFutures=newHashMap>();//3。创建监控服务线程池monitorService=newScheduledThreadPoolExecutor(10,newThreadFactoryBuilder().setNameFormat("lifecycleSupervisor-"+Thread.currentThread().getId()+"-%d").build());monitorService.setMaximumPoolSize(20);monitorService.setKeepAliveTime(30,TimeUnit.SECONDS);//4.定期清理取消的组件。purger=newPurger();//4.1.默认不清理。needToPurge=假;会执行如下操作:Java代码publicsynchronizedvoidstart(){monitorService.scheduleWithFixedDelay(purger,2,2,TimeUnit.HOURS);lifecycleState=LifecycleState.START;}先每两小时执行一次cleanup组件,然后将状态改为开始。LifecycleSupervisor停止时,直接停止监控服务,然后更新守护组件状态为STOP:Java代码//1、先停止守护监控服务if(monitorService!=null){monitorService.shutdown();try{monitorService.awaitTermination(10,TimeUnit.SECONDS);}catch(InterruptedExceptione){logger.error("Interruptedwhilewaitingformonitorservicetostop");}}//2.更新所有守护组件的状态为STOP,调用组件的stop方法停止for(finalEntryentry:supervisedProcesses.entrySet()){if(entry.getKey().getLifecycleState().equals(LifecycleState.START)){entry.getValue().status.desiredState=LifecycleState.STOP;entry.getKey()。停止();}}//3。更新此组件的状态if(lifecycleState.equals(LifecycleState.START)){lifecycleState=LifecycleState.STOP;}//4.SupervisedProcesses.clear();监控期货。清除();接下来就是调用superviser来保护组件了:Java代码if(this.monitorService.isShutdown()||this.monitorService.isTerminated()||this.monitorService.isTerminating()){//1,如果如果sentinel已经停止,会抛出异常,不会接收到组件进行监护}//2.初始化监护人组件Supervisoreeprocess=newSupervisoree();process.status=newStatus();//2.1、默认策略是重启process.policy=policy;//2.2。初始化组件的默认状态。大多数组件默认为STARTprocess.status.desiredState=desiredState;process.status.error=false;//3。组件monitor,用于定时获取组件的最新状态,或者重启组件;//4。定时执行组件监听获取组件的最新状态,或者重启组件。ScheduledFuture>future=monitorService.scheduleWithFixedDelay(monitorRunnable,0,3,TimeUnit.SECONDS);monitorFutures.put(lifecycleAware,future);}如果不需要守卫,需要调用unsupervise:Java代码publicsynchronizedvoidunsupervise(LifecycleAwarelifecycleAware)){synchronized(lifecycleAware){Supervisoreesupervisoree=supervisedProcesses.get(lifecycleAware);//1.1、设置被守护组件的状态被丢弃supervisoree.status.discard=true;//1.2、设置想要的最大生命周期状态组件停止this.setDesiredState(lifecycleAware,LifecycleState.STOP);//1.3、停止组件lifecycleAware.stop();}//2、从守护者中移除supervisedPro组件cesses.remove(lifecycleAware);//3.取消定时监控组件服务monitorFutures.get(lifecycleAware).cancel(false);//3.1.通知Purger需要清理,Purger会周期性的移除被取消的组件needToPurge=true;monitorFutures.remove(lifecycleAware);}接下来我们看一下MonitorRunnable的实现,它负责组件状态迁移或者组件失效recovery:Java代码publicsynchronizedvoidunsupervise(LifecycleAwarelifecycleAware){synchronized(lifecycleAware){Supervisoreesupervisoree=supervisedProcesses.get(lifecycleAware);//1.1、设置守护组件状态为废弃supervisoree.status.discard=true;//1.2、设置组件期望的最大生命周期状态STOPthis.setDesiredState(lifecycleAware,LifecycleState.STOP);//1.3、停止组件lifecycleAware.stop();}//2、移除supervisedProcesses.remove(lifecycleAware);//3、取消定时监控组件服务monitorFutures.get(lifecycleAware).cancel(false);//3.1.通知Purger它需要清理。Purger会周期性的移除取消组件needToPurge=true;monitorFutures.remove(lifecycleAware);}接下来我们看一下MonitorRunnable的实现,它负责组件状态迁移或者组件故障恢复:Java代码publicvoidrun(){longnow=System.currentTimeMillis();try{if(supervisoree.status.firstSeen==null){supervisoree.status.firstSeenen=now;//1、记录最后一次状态检查时间}supervisoree.status.lastSeen=now;//2、记录最后一次状态检查时间报错,直接返回if(supervisoree.status.discard||supervisoree.status.error){return;}//4.更新最后看到的状态supervisoree.status.lastSeenState=lifecycleAware.getLifecycleState();//5.如果组件的状态与守护组件看到的状态不一致,则以守护组件的状态为准,然后初始化if(!lifecycleAware.getLifecycleState().equals(supervisoree.status.desiredState)){switch(supervisoree.status.desiredState){caseSTART://6.如果处于启动状态,则启动组件try{lifecycleAware.start();}catch(Throwablee){if(einstanceofError){supervisoree.status.desiredState=LifecycleState。停止;尝试{lifecycleAware.stop();}catch(Throwablee1){supervisoree.status.error=true;if(e1instanceofError){throw(Error)e1;}}}supervisoree.status.failures++;}break;caseSTOP://7。如果是stopped状态,则停止组件catch(Throwablet){}}}上面的代码进行了简化,整体逻辑就是定时收集组件的状态,如果守护组件和如果组件的状态不一致,可能需要启动或停止。daemonmonitor可用于确保组件在失败时自动启动。默认策略是总是在失败后重启,另一种策略是只启动一次。【本文为专栏作者张凯涛原创文章,作者微信公众号:凯涛博客,id:kaitao-1234567】