当前位置: 首页 > 后端技术 > Java

基于Spring-AOP的自定义分片工具

时间:2023-04-01 17:06:37 Java

作者:陈昌浩1背景随着数据量的增加,发现系统与其他系统交互时,批处理接口会超时。发现原来在实现batch接口的时候,没有进行分片处理,当数据过大或者超过其他系统阈值时,就会报错。由于与其他系统的交互比较多,每个接口都被切分优化,改动量比较大,所以才考虑AOP来解决这个问题。2Spring-AOPAOP(AspectOrientProgramming),直译就是面向切面的编程。AOP是一种编程思想,是对面向对象编程(OOP)的补充。面向对象编程将程序抽象为各个层次的对象,而面向方面编程将程序抽象为各个方面。Spring中的AOP是通过动态代理实现的。SpringAOP不能拦截对对象字段的修改,也不支持构造函数连接点,我们不能在bean创建上应用通知。3.功能实现自定义片段处理分为三个部分:自定义注解(MethodPartAndRetryer)、重试器(RetryUtil)、切面实现(RetryAspectAop)。3.1MethodPartAndRetryer源码@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic@interfaceMethodPartAndRetryer{/***失败重试次数*@return*/inttimes()default3;/***失败间隔执行时间300毫秒*@return*/longwaitTime()default300L;/***fragmentsize*@return*/intparts()default200;}@interface表示这个类是注解。@Target是这个注解的范围publicenumElementType{/**类、接口(包括注解类型)或枚举声明*/TYPE,/**字段声明(包括枚举常量)*/FIELD,/**方法声明*/METHOD,/**形式参数声明*/PARAMETER,/**构造函数声明*/CONSTRUCTOR,/**局部变量声明*/LOCAL_VARIABLE,/**注解类型声明*/ANNOTATION_TYPE,/**程序包声明*/PACKAGE,/**类型参数声明*/TYPE_PARAMETER,/**类型使用*/TYPE_USE}@Retention注解生命周期publicenumRetentionPolicy{/**编译处理后不存入类*/SOURCE,/**注解会被由编译器记录在类文件中,但不需要在运行时由VM保留。这是默认值*/CLASS,/**编译器保存在类中,可以被虚拟机读取*/RUNTIME}times():接口调用失败时的重试次数。waitTime():当接口调用失败后,需要多长时间再次调用。intparts():分片时,每个分片的大小。3.2RetryUtil源码publicclassRetryUtil{publicRetryergetDefaultRetryer(inttimes,longwaitTime){Retryerretryer=RetryerBuilder.newBuilder().retryIfException().retryIfRuntimeException().retryIfExceptionOfType(Exception.class).withWaitStrategy(WaitStrategies.fixedWait(waitTime,TimeUnit.MILLISECONDS)).withStopStrategy(StopStrategies.stopAfterAttempt(times)).build();returnretryer;}}说明RetryerBuilder:用于配置和创建Retryer的构建器。retryIfException:抛出运行时异常或检查异常时会重试,抛出错误时不会重试。retryIfRuntimeException:只有在抛出运行时异常时才会重试,checkedexception和error都不会重试。retryIfExceptionOfType:允许我们仅在发生特定异常时重试。withWaitStrategy:等待策略,每次请求间隔。withStopStrategy:停止策略,重试多少次后停止。3.3RetryAspectAop源码:publicclassRetryAspectAop{publicObjectaround(finalProceedingJoinPointpoint)throwsThrowable{Objectresult=null;finalObject[]args=point.getArgs();booleanisHandler1=isHandler(args);if(isHandler1){StringclassName=point.getSignature().getDeclaringTypeName();StringmethodName=point.getSignature().getName();ObjectfirstArg=args[0];ListparamList=(List)firstArg;//获取方法信息Methodmethod=getCurrentMethod(point);//获取注解信息MethodPartAndRetryerretryable=AnnotationUtils.getAnnotation(method,MethodPartAndRetryer.class);//重试机制Retryerretryer=newRetryUtil().getDefaultRetryer(retryable.times(),retryable.waitTime());//分片List>requestList=Lists.partition(paramList,retryable.parts());for(ListpartList:requestList){args[0]=partList;ObjecttempResult=retryer.call(newCallable(){@OverridepublicObjectcall()throwsException{try{returnpoint.proceed(args);}catch(Throwablethrowable){log.error(String.format("Fragmentretryerror,class%s-method%s",className,methodName),throwable);thrownewRuntimeException("Shardretryerror");}}});if(null!=tempResult){if(tempResultinstanceofBoolean){if(!((Boolean)tempResult)){log.error(String.format("Segment执行报错返回类型不能转bolean,class%s-method%s",className,methodName));thrownewRuntimeException("Shard执行报错!");}result=tempResult;}elseif(tempResultinstanceofList){if(result==null){result=Lists.newArrayList();}((List)result).addAll((List)tempResult);}else{log.error(String.format("不支持片段执行返回的类型,类%s-方法%s",className,methodName));thrownewRuntimeException("不支持返回类型");}}else{log.error(String.format("片段执行返回的结果为空,类%s-方法%s",className,methodName));thrownewRuntimeException("调用结果为空");}}}else{result=point.继续(参数);}返回结果;}私有布尔值isHandler(Object[]args){booleanisHandler=false;if(null!=args&&args.length>0){ObjectfirstArg=args[0];//如果第一个参数是list且个数大于1if(firstArg!=null&&firstArginstanceofList&&((List)firstArg).size()>1){isHandler=true;}}returnisHandler;}privateMethodgetCurrentMethod(ProceedingJoinPointpoint){try{Signaturesig=point.getSignature();MethodSignaturemsig=(MethodSignature)sig;Objecttarget=point.getTarget();returntarget.getClass().getMethod(msig.getName(),msig.getParameterTypes());}catch(NoSuchMethodExceptione){抛出新的RuntimeException(e);}}}解释:getCurrentMethod:获取方法信息,即批量调用待分片的接口。.around:具体的分片逻辑。获取要分片的方法的参数。判断是否做分片处理。获取方法。获取重试次数、重试间隔、分片大小。生成重试器。根据设置的分片大小,进行分片处理。调用批处理接口并对结果进行处理。4函数使用4.1配置文件4.2代码示例@MethodPartAndRetryer(parts=100)publicBooleanwriteBackOfGoodsSN(ListlistSerial,ObCheckWorkerworkerData)在需要分片的batch接口方法上加上MethodPartAndRetryer注解即可,重试次数oftimes,retryinterval和fragmentsize可以在注解时设置,也可以使用默认值。5总结通过自定义分片工具,可以快速对旧代码进行分片,并增加了重试机制,提高了程序的易用性,提高了重构旧代码的效率。