使用rabbitmq时,保证消息的可靠传递[3]消息传递成功后,在数据库中记录消息状态为成功。如果传递不成功,则消息仍处于传递的初始状态。这时启动定时任务获取数据库中的信息。消息状态为数据正在投递,再次投递;另外,为了防止rabbitmq故障时信息无法投递,对重复投递的次数进行了限制,比如3次,每次重复投递都要在数据库中更新重复投递次数.elastic-job配置es-job是依赖zookeeper的分布式事务,所以需要一个zookeeper服务,es-job需要两个依赖2.1.4com.dangdangelastic-job-lite-core${elastic-job.version}com.dangdangelastic-job-lite-spring${elastic-job.version}首先需要配置zookeeper注册中心配置importcom.当当网.ddframe.job.reg.zookeeper.ZookeeperConfiguration;导入com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;导入org.springframework.beans.factory.annotation.Value;导入org.springframework。boot.autoconfigure.condition.ConditionalOnExpression;导入org.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@Configuration@ConditionalOnExpression("'${zookeeper.address}'.length()>0")publicclassRegisterCenterConfig{//这里记得调用init初始化方法,init会启动与zk的连接@Bean(initMethod="init")publicZookeeperRegistryCenterregistryCenter(//最重要的配置是address-"ip:port//andnamespace@Value("${zookeeper.address}")finalStringserverLists,@Value("${zookeeper.namespace}")finalString命名空间,@Value("${zookeeper.connectionTimeout}")finalint连接超时,@Value("${zookeeper.sessionTimeout}")finalintsessionTimeout,@Value("${zookeeper.maxRetries}")finalintmaxRetries){ZookeeperConfigurationzookeeperConfiguration=newZookeeperConfiguration(serverLists,namespace);zookeeperConfiguration.setConnectionTimeoutMilliseconds(connectionTimeout);zookeeperConfiguration.setSessionTimeoutMilliseconds(sessionTimeout);zookeeperConfiguration.setMaxRetries(maxRetries);返回新的ZookeeperRegistryCenter(zookeeperConfiguration);}}需要一个SimpleJobConfig来配置简单的任务;首先需要一个任务调度bean@Bean(initMethod="init")publicJobSchedulersimpleJobScheduler(finalSimpleJobsimpleJob,@Value("${simpleJob.cron}")finalStringcron,@Value("${simpleJob.shardingTotalCount}")finalintshardingTotalCount,@Value("${simpleJob.shardingItemParameters}")finalStringshardingItemParameters,@Value("${simpleJob.jobParameter}")finalStringjobParameter,@Value("${simpleJob.failover}")final布尔故障转移,@Value("${simpleJob.monitorExecution}")finalbooleanmonitorExecution,@Value("${simpleJob.monitorPort}")finalintmonitorPort,@Value("${simpleJob.maxTimeDiffSeconds}")finalintmaxTimeDiffSeconds,@Value("${simpleJob.jobShardingStrategyClass}")finalStringjobShardingStrategyClass){returnnewSpringJobScheduler(simpleJob,registryCenter,getLitJobconfiguration(simpleJob.getClass(),cron,shardingTotalCount,failover,jobParameter,shardingItemParameters,jobShardingStrategyClass,monitorExecution,monitorPort,maxTimeDiffSeconds),jobEventConfiguration,newSimpleJobListener());}看下SpringJobScheduler的参数,按照参数再依次Createobject纳斯));this.elasticJob=elasticJob;}4-1。ElasticJobelasticJob实现了SimpleJob接口,在execute方法中执行任务@ComponentpublicclassMySimpleJobimplementsSimpleJob{//ShardingContextshardingContext任务信息@Overridepublicvoidexecute(ShardingContextshardingContext){System.err.println("---------启动任务MySimpleJob--------");}}4-2CoordinatorRegistryCenterregCenter是第二步实现的zookeeperRegister4-3LiteJobConfigurationjobConfig任务相关的属性和任务类会配置在这里对象,这些属性值由properties文件配置,或者从注解中获取,这里是属性配置文件ers,StringjobShardingStrategyClass,booleanmonitorExecution,intmonitorPort,intmaxTimeDiffSeconds){JobCoreConfigurationjobCoreConfiguration=JobCoreConfiguration.newBuilder(jobClass.getName(),cron,shardingTotalCount).misfire(真).failover(故障转移).jobParametershardingItemParameters).build();SimpleJobConfigurationsimpleJobConfiguration=newSimpleJobConfiguration(jobCoreConfiguration,jobClass.getCanonicalName());/*1.创建LiteJobConfiguration需要传入JobTypeConfiguration故而创建SimpleJobConfiguration;2.创建SimpleJobConfiguration需要传人JobCoreConfigurationcoreClass.getCanonicalName()jobClassb,String.创建JobCoreConfiguration并调用publicstaticBuildernewBuilder(finalStringjobName,finalStringcron,finalintshardingTotalCount){returnnewBuilder(jobName,cron,shardingTotalCount);}*/LiteJobConfigurationliteJobConfiguration=LiteJobConfiguration.newBuilder(simpleJobConfiguration).jobShardingStrategyClass(jobShardingStrategyClass).monitorExecution(monitorExecution).monitorPort(monitorPort).maxTimeDiffSeconds(maxTimeDiffSeconds).overwrite(true).build();返回liteJobConfiguration;}4-3。继续看SpringJobScheduler的参数,JobEventConfigurationjobEventConfig用于事务跟踪annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjavax.sql.DataSource;@ConfigurationpublicclassJobEventCon图{@AutowiredDataSource数据源;@BeanpublicJobEventConfigurationjobEventConfiguration(){返回新的JobEventRdbConfiguration(dataSource);}}4-4。事件监听器importcom.alibaba.fastjson.JSON;importcom.dangdang.ddframe.job.executor.ShardingContexts;importcom.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;publicclassSimpleJobListener实现ElasticJobListener{privatestaticfinalLoggerlog=LoggerFactory.getLogger(SimpleJobListener.class);@OverridepublicvoidbeforeJobExecuted(ShardingContextsshardingContexts){log.info("----------------beforeJobExecuted:{}",JSON.toJSONString(shardingContexts));}@OverridepublicvoidafterJobExecuted(ShardingContextsshardingContexts){log.info("----------------afterJobExecuted:{}",JSON.toJSONString(shardingContexts));}}至此,事件调度器就创建完成,触发init方方法,开始执行任务最后粘贴配置文件zookeeper.address=192.168.205.13:2181zookeeper.namespace=elastic-jobzookeeper.connectionTimeout=10000zookeeper.sessionTimeout=10000zookeeper.maxRetries=3#这里是simplejob配置simpleJob.cron=0/5****?simpleJob.shardingTotalCount=2simpleJob.shardingItemParameters=0=北京,1=shanghaisimpleJob.jobParameter=source1=public,source2=privatesimpleJob.failover=truesimpleJob.monitorExecution=truesimpleJob.monitorPort=8889simpleJob.maxTimeDiffSeconds=-1simpleJob。jobShardingStrategyClass=com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy#这里是dataflowJob.cron=0/10****?dataflowJob.shardingTotalCount=2dataflowJob.shardingItemParameters=0=北京,1=上海