当前位置: 首页 > 科技观察

供应链时效领域的高级接口性能之路

时间:2023-03-13 04:21:04 科技观察

1.前言经过近一年的供应链时效领域的发展,已经形成了一套理论和两个强大的工具(估计模型和路由系统)时效性估计条款。以现货为例,通过不断升级技术方案,预估模型准确率接近90%,满足向用户公开的条件。但是在访问前台场景的过程中,前台对我们提出了界面性能的要求。以连接业务明细浮层场景为例,接口调用环节经过业务明细、竞价、交易,给我们的供应链只用了15ms。要在15ms内完成所有的业务逻辑处理,是一个不小的挑战。2.初始状态——春风得意,马蹄姬抛开业务场景,谈接口性能就是耍流氓。时效预估接口依赖的数据源很多:模型基础数据、模型自底向上数据、仓库数据、SPU类目数据、卖家信息数据等,如何快速批量获取到内存中进行逻辑运算是性能的关键改进。最先接入时效性表达的是现货业务。单现货SKU时效查询初始接口调用链接如下:根据trace分析,接口性能的瓶颈在于数据查询,而非逻辑处理。数据查询后的逻辑处理只消耗了0.6%的时间。数据查询分为外部查询和内部查询。外部查询由3个RPC调用组成(占时间的27%),内部查询由11个DB查询组成(占时间的73%)。为什么会有这么多内部查询?因为估计模型是分段的,每个分段根据不同的影响因素有不同的来回策略,所以不能聚合成一个query。单个SKU的时效查询已经达到76.5ms。以商户详情浮层页30个现货SKU的预估时效查询,一个请求需要76.5*30=2295ms,无法接受,性能提升迫在眉睫。3.优化第一轮-昨夜西风枯绿树3.1内部查询优化由于内部查询所需的预估模型数据都是离线清洗,每日同步,所以对实时性要求不高。有多种方案可供选择:序号方案描述优缺点结论1离线处理完成后,更新现有的MySQL方案,不会产生开发成本。查询性能一般达不到要求,所以不要使用。3、经过离线处理,刷入本地内存后查询性能非常好。数据量有限制。该模型的数据量约为15G。由于模型数据量增加不多,而日常同步更多的是覆盖,使用32G实例完全可以满足要求。3.2外部查询优化一一分析三个RPC查询接口,找到优化方案:序号查询描述外域优化方案原因1城市名到codeTMS本地缓存由于城市名和code的映射关系数据只有20K左右,可以在应用启动时被请求一次后放入本地缓存。另外,城市名称和代码的变化频率很低,jetcache的@CacheRefresh每8小时自动刷新一次,完全可以满足要求。而卖家信息是低频变化的数据,可以使用T+1同步到Redis3获取商品类目Redis缓存同样的商品类目数据也是低频变化的数据,使用T+1同步到Redis3。3优化后,优化后的效果非常好很明显,单个SKU时效查询的RT从76.5ms降低到了27ms。同时减少了对外域的直接依赖,一定程度上提高了稳定性。27ms还是达不到要求。目前的瓶颈是查询Redis(96%的时间都花在了上面),能否进一步优化?4.优化第二轮——腰带越宽不后悔通过上面的分析,我们可以看出,当前的耗时集中在一个接一个的RedisI/O操作上。如果将一组Redis命令组装起来,一次性传输到Redis并返回结果,可以大大减少耗时。4.1流水线原理Redis客户端执行一条命令分为以下四个过程:1)发送命令2)排队命令3)执行命令4)返回结果其中1-4称为RoundTrip时间(RTT,往返时间)。该管道通过一次向Redis服务器发送多个Redis命令,大大降低了RTT。4.2优化及效果虽然Redis提供了mget、mset等批处理接口,但是Redis不支持hget批处理操作,不支持mget和hget混批查询,只能使用pipeline。另外,我们的场景是多键读场景,允许一定比例(低概率事件)的读失败,其中一条管道读取失败(管道是非原子的),不影响时效性估计,因为有来回策略,所以很适合。由于Redis查询之间的相互依赖性,上一次查询的结果需要作为下一次查询的输入参数,所以不能将所有的Redis查询合并到一个Redis管道中。虽然最后还是有3个RedisI/O,但是7ms的RT满足要求。4.3代码//管道查询类publicclassRedisBasePipelineRegister{//存储找到的数据privateThreadLocal>context=ThreadLocal.withInitial(HashMap::new);//查询publicvoidfetch(finalRedisConsumersredisConsumers){if(redisConsumers.isNotEmpty()){Listret=redisClient.executePipelined((RedisCallback)connection->{connection.openPipeline();redisConsumers.get().forEach(t->t.accept(connection));返回null;});addValueToContext(ret,redisConsumers.getKeyList());}}/***把pipeline查到的数据存储到threadlocal中*注意redis读到的数据可能是空的,如果是空的,会填充一个nullobj,可以防止你发现那里后面用的时候线程本地没有数据,查redis*/privatevoidaddValueToContext(Listval,Listkeys){Mapt=context.get();IntStream.range(0,keys.size()).forEach(i->t.put(keys.get(i),val.get(i)==null?NULL_OBJ:val.get(i)));}publicObjectget(Stringkey){返回上下文。get().get(键);}}//redisqueryclasspublicclassRedisClient{//如果没有找到threadlocal,重新检查redispublicObjectget(Stringkey){Objectvalue=Optional.ofNullable(redisBatchPipelineRegister.get(key)).orElseGet(()->redisTemplate.opsForValue().get(key));返回值;}}即使pipeline部分失效,Redis单指令查询也可以作为底线5.优化第三轮-中离讯达千百度5.1背景由于时效预估精度满足代销、品牌直销等业务场景要求航运、保税业务,越来越多的业务类型需要接入时效表达接口。当初交易为了快速上线,内部根据出价类型连续调整了数次时效预估接口,导致RT的压力越来越大。为了领域内聚,时域在与交易开发商讨后,提供了不同竞价类型的聚合接口,同时保证聚合接口的RT性能。从此,进入并发区。5.2ForkJoinPoolvsThreadPoolExecutorJava7提供了ForkJoinPool,支持将一个任务拆分成多个“小任务”进行并行计算,然后将多个“小任务”的结果合并为一个总的计算结果。ForkJoinPool的工作窃取意味着每个线程都会维护一个队列来存储需要执行的任务。当线程自身队列中的任务全部执行完毕后,会从其他线程获取未执行的任务,帮助自己执行,充分利用多核CPU的优势下图为ForkJoinPool的实现:而并行流程Java8使用共享线程池(默认也是ForkJoinPool线程池),性能不可控,所以不考虑。优势区ForkJoinPoolForkJoinPool的实际分析结论可以使用有限数据的线程完成大量的父子关系任务。由于工作窃取机制,它在多任务和任务分配不均匀方面具有优势。1.没有亲子关系任务。2.不同出价类型的获取时限RT相似,不存在任务分配不均的情况。不使用ThreadPoolExecutorThreadPoolExecutor不会像ForkJoinPool那样创建大量的子任务,也不会进行大量的GC,所以在单线程甚至任务分发的情况下都有优势。使用选定的ThreadPoolExecutor后,需要考虑如何设计参数。根据实际情况分析,事务请求时效的峰值QPS在1000左右,我们一般将3~5个线程任务拆分成一个请求,不考虑机器数量,每秒任务数:taskNum=3000~5000。单个任务耗时taskCost=0.01s。上游允许最大响应时间为responseTime=0.015s。1)核心线程数=每秒任务数*单个任务耗时corePoolSize=taskNum*taskCost=(3000~5000)*0.01=30~50,取402)任务队列容量=核心线程数/单个任务的耗时*容忍度最大响应时间queueCapacity=corePoolSize/taskCost*responseTime=40/0.01*0.015=603)最大线程数=(每秒最大任务数-任务队列容量)*每个线程花费的时间taskmaxPoolSize=(5000-60)*0.01≈50当然以上计算都是理论值。实际上,在达到最大线程数之前,CPU负载可能已满。ThreadPoolExecutor的参数最终还是要根据压测数据来确定。5.3优化和压测经过优化和压测,聚合接口的平均RT从22.8ms(串口)降低到8.52ms(并口),99条线路13.22ms,满足要求。压力测试单机300QPS(比预估的峰值QPS高出2倍左右),接口性能和线程池运行状况都令人满意。最终优化后的应用内调用链接示意图如下:5.4代码//ParallelTimeEstimateClasspublicclassConcurrentEstimateCaller{//自定义线程池privateExecutorexecutor;//时间估计策略工厂privateEstimateStrategyFactoryestimateStrategyFactory;//存储异步返回KEY为竞价类型,VALUE为对应的老化结果privateConcurrentHashMap>>futures=newConcurrentHashMap<>();//提交并行任务publicConcurrentEstimateCallersubmit(PromiseEstimateAggreRequestrequest)for(Stringscene:request.getMap().keySet()){futures.put(scene,CompletableFuture.supplyAsync(()->{EstimateStrategyestimateStrategy=estimateStrategyFactory.getStrategy(场景);如果(estimateStrategy!=null){Resulttmp=estimateStrategy.promiseEstimateBatch(EstimateAggreConvertor.INSTANCE.convertBatchRequest(request,scene));if(R结果.SUCCESS_CODE.equals(tmp.getCode())){返回tmp.getData().getEstimateRes();}}返回空值;},执行者));}}//等待并获取指定时间内的所有子任务返回ResultpublicMap>join(longtimeout,TimeUnitunit)throwsException{//等待所有子任务完成CompletableFuture.allOf(futures.values().toArray(newCompletableFuture[]{})).get(timeout,unit);Map>res=newHashMap<>();对于(Map.Entry>>entry:futures.entrySet()){if(entry.getValue().get()!=null){res.put(entry.getKey(),entry.getValue().get());}}返回资源;}}六、总结界面性能提升之路随着业务的变化和技术的升级而永无止境。分享一些搭建过程中的小技巧:如果Redis和服务器机器不在同一个区域,会增加几毫秒的跨区域传输时间,所以对RT比较敏感。场景,如果机器和Redis区不一样,可以请运维帮忙重建机器。阻塞队列可以使用SynchronousQueue来提高响应时间,但只有在有足够多的消费者(线程池中的消费者)并且总是有消费者准备好获取交付的工作时才适合使用。后续建设的一些思路:随着业务和流量的增长,如何在不重启机器的情况下自动调整线程池的参数,可以参考美团开源的DynamicTp项目对线程池的动态管理,同时增加监控、报警等功能。