当前位置: 首页 > 后端技术 > Java

[xxl-job]分布式任务调度平台使用总结+关键源码解读

时间:2023-04-01 18:27:57 Java

1.简介XXL-JOB是一个分布式任务调度平台。其核心设计目标是快速开发、易学易学、轻量级、易扩展。1.1使用XXL-JOB的背景在微服务架构中,由于需要满足高可用,每个微服务可能会部署多个实例,此时微服务中的定时任务会出现重复执行的问题。通过XXL-JOB可以实现定时任务的分布式调度,避免同一个执行者的任务重复执行的问题。2、系统架构调度中心:“调度中心”采用中心化设计,自研调度组件,支持集群部署,可保证调度中心HA执行器:分布式执行任务,“执行器”支持集群部署,保证任务执行HA数据库:数据库使用Mysql。如果调度中心有多个实例,需要连接同一个Mysql数据库。1任务调度下面是一个任务调度流程:预读下一次触发时间trigger_next_time<[当前时间+5秒]任务遍历任务,比较当前时间和任务的下一次触发时间如果当前时间>task的nexttriggertime+5秒表示任务调度到期。使用任务调度过期策略判断是需要立即调度还是忽略。否则,如果当前时间>任务的下一次触发时间,则立即调度并计算任务的下一次触发时间。如果新计算出的下一次触发时间小于当前时间+5秒,则直接将任务放入ringData;否则,如果当前时间<任务下一次触发时间,将任务放入ringData,更新xxl_job_info表中任务的调度信息(如下调度时间)try{//获取数据库连接,并设置autoCommit为false,不自动提交conn=XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit=conn.getAutoCommit();conn.setAutoCommit(false);//用于update加锁,防止调度中心的其他实例同时执行后面的调度工作preparedStatement=conn.prepareStatement("select*fromxxl_job_lockwherelock_name='schedule_lock'forupdate");preparedStatement.execute();//事务开始//1.预读下次触发时间trigger_next_time小于[当前时间+5秒]的任务longnowTime=System.currentTimeMillis();列表scheduleList=XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao()。scheduleJobQuery(nowTime+PRE_READ_MS,preReadCount);if(scheduleList!=null&&scheduleList.size()>0){//2.将任务放入ringData,ringData是一个ConcurrentHashMap,key是seconds,value是Listfor(XxlJobInfojobInfo:scheduleList){//如果当前时间大于下一次触发时间+5秒,说明任务调度已经过期if(nowTime>jobInfo.getTriggerNextTime()+PRE_READ_MS){logger.warn(">>>>>>>>>>>>xxl-job,schedulemisfire,jobId="+jobInfo.getId());//判断任务的调度过期策略MisfireStrategyEnummisfireStrategyEnum=MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(),MisfireStrategyEnum.DO_NOTHING);如果(MisfireStrategyEnum.DO_NOTHING);FIRE_ONCE_NOW==misfireStrategyEnum){//调度策略是立即调度一个JobTriggerPoolHelper.trigger(jobInfo.getId(),TriggerTypeEnum.MISFIRE,-1,null,null,null);logger.debug(">>>>>>>>>>>>xxl-job,schedulepushtrigger:jobId="+jobInfo.getId());}//刷新下一次调度时间refreshNextValidTime(jobInfo,newDate());}elseif(nowTime>jobInfo.getTriggerNextTime()){//如果当前时间大于下一次触发时间//ScheduleJobTriggerPoolHelper.trigger(jobInfo.getId(),TriggerTypeEnum.CRON,-1,null,null,null);logger.debug(">>>>>>>>>>>xxl-job,schedulepushtrigger:jobId="+jobInfo.getId());//刷新下一次调度时间refreshNextValidTime(jobInfo,newDate());//如果新计算的下一次调度时间小于当前时间+5秒if(jobInfo.getTriggerStatus()==1&&nowTime+PRE_READ_MS>jobInfo.getTriggerNextTime()){//获取本次调度的秒数调度时间intringSecond=(int)((jobInfo.getTriggerNextTime()/1000)%60);//将任务放入ringDatapushTimeRing(ringSecond,jobInfo.getId());//刷新下一次调度时间refreshNextValidTime(jobInfo,newDate(jobInfo.getTriggerNextTime()));}}else{//当前时间小于等于下一次任务触发时间//获取调度时间的秒数intringSecond=(int)((jobInfo.getTriggerNextTime()/1000)%60);//将任务放入ringDatapushTimeRing(ringSecond,jobInfo.getId());//刷新一个调度时间refreshNextValidTime(jobInfo,newDate(jobInfo.getTriggerNextTime()));}}//3.更新xxl_job_info表中任务的调度信息(下一次调度时间)}}else{preReadSuc=false;}//事务结束}3.3.2触发器调度下面是一个触发器调度的过程:每次取当前秒和上一秒的数据,如果当前秒为10,则获取10和9对应的任务seconds,将它们添加到ringItemData中遍历ringItemData,并触发调度将ringItemDatatry一一清除{//每次获取当前秒和上一秒的数据,如果当前秒为10,则取10个对应的任务9秒ListringItemData=newArrayList<>();intnowSecond=Calendar.getInstance().get(Calendar.SECOND);之前检查体重秤;对于(inti=0;i<2;i++){List<整数>tmpData=ringData.remove((nowSecond+60-i)%60);如果(tmpData!=null){ringItemData.addAll(tmpData);}}logger.debug(">>>>>>>>>>>>xxl-job,time-ringbeat:"+nowSecond+"="+Arrays.asList(ringItemData));if(ringItemData.size()>0){//遍历需要调度的任务(intjobId:ringItemData){//触发调度JobTriggerPoolHelper.trigger(jobId,TriggerTypeEnum.CRON,-1,null,null,无效的);}ringItemData.clear();}}catch(Exceptione){if(!ringThreadToStop){logger.error(">>>>>>>>>>>xxl-job,JobScheduleHelper#ringThreaderror:{}",e);}}4.Executor4.1主要功能向调度中心注册心跳,接收调度中心发出的调度请求,执行任务的业务逻辑将任务的执行结果发送给调度中心4.2启动流程4.3关键源码4.3.1任务执行下面是执行器执行一个任务的过程:调度中心发起调度请求,执行器收到调度请求,判断任务的运行方式(BEAN,GLUE_GROOVY,GLUE_SCRIPT,由于生产环境常用BEAN模式,这里只介绍BEAN模式,其他两种模式类似)如果是BEAN模式,判断任务是否绑定到JobThread线程如果是bound,判断任务的JobHandler,如果有变化,跳到第5步,如果没有变化,进一步判断阻塞策略。return,丢弃本次调度请求COVER_EARLY:Overridescheduling,跳到第5步如果没有绑定,下一步就是向jobThreadRepository注册一个新的JobThread,并启动、中断和移除旧的JobThread,并将任务放入JobThread的triggerQueue中threadQueue排队JobThread线程不断从自己的triggerQueue队列中获取数据执行publicReturnTrun(TriggerParamtriggerParam){//获取任务对应的jobThread线程JobThreadjobThread=XxlJobExecutor.loadJobThread(triggerParam.getJobId());IJobHandlerjobHandler=jobThread!=null?jobThread.getHandler():null;StringremoveOldReason=null;//获取运行模式GlueTypeEnumglueTypeEnum=GlueTypeEnum.match(triggerParam.getGlueType());if(GlueTypeEnum.BEAN==glueTypeEnum){//获取JobHandler(bean实例+@XxlJob注解方法)IJobHandlernewJobHandler=XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());//判断JobHandler是否发生改变if(jobThread!=null&&jobHandler!=newJobHandler){//改变处理程序,需要杀死旧线程作业线程=空;作业处理程序=空;}//判断JobHandler是否为空if(jobHandler==null){jobHandler=newJobHandler;if(jobHandler==null){returnnewReturnT(ReturnT.FAIL_CODE,"作业处理程序["+triggerParam.getExecutorHandler()+"]未找到。");}}}elseif(GlueTypeEnum.GLUE_GROOVY==glueTypeEnum){//...}elseif(glueTypeEnum!=null&&glueTypeEnum.isScript()){//...}else{returnnewReturnT(ReturnT.FAIL_CODE,"glueType["+triggerParam.getGlueType()+"]无效。");}//如果任务对应的JobThread线程不为空??,则判断任务的阻塞策略if(ExecutorBlockStrategyEnum.DISCARD_LATER==blockStrategy){//如果任务正在运行,丢弃这个调度if(jobThread.isRunningOrHasQueue()){returnnewReturnT(ReturnT.FAIL_CODE,"blockstrategyeffect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}}elseif(ExecutorBlockStrategyEnum.COVER_EARLY==blockStrategy){//覆盖调度,设置对应的JobThread线程为nullif(jobThread.isRunningOrHasQueue()){removeOldReason="blockstrategyeffect:"+ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();jobThread=null;}}else{//会被调度到队列中}}//判断JobThread是否为空,如果为空,注册一个新的JobThreadif(jobThread==null){jobThread=XxlJobExecutor.registJobThread(triggerParam.getJobId(),jobHandler,removeOldReason);}//将调度信息推送到JobThread的调度队列中返回推送结果;}5.相关表表名说明xxl_job_groupexecutor表,维护executor信息xxl_job_info任务表,包括任务调度配置、执行器JobHandler、失败重试次数、下次调度时间等信息xxl_job_lock作为排他锁使用xxl_job_log任务Log表,包括执行者地址、调度日志、调度结果、执行日志、执行结果等信息xxl_job_log_report任务报告,以天计算