当前位置: 首页 > 科技观察

如何调用只支持batch_call的服务?

时间:2023-03-18 01:04:27 科技观察

先来说说标题是什么意思吧。为了更好地理解我在说什么,让我们举个例子。假设你正在做一个类似于哔哩哔哩的系统,里面有各种视频。用户每天都会在其中上传各种视频。按理说,每个视频都要审核一下,看看有没有颜色,但用人眼是不可能一个一个看的。毕竟唐哥说这东西他看多了,太阳永远是绿色的,所以才会有专门训练的算法服务来做检测。但是你不能只上传整个视频的每一帧进行审核,所以在每个视频中,根据时长和视频类型随机选择几张图片进行审核。比如视频标签很美,算法很喜欢。然后再画几张。标签是经过编程的,狗甚至都不会看,所以它们就少画了几张。发送这些提取的图片进行审查。为了实现这个功能,我们将视频作为审核的维度,每个视频会有数量不定的N张图片。下游服务是使用GPU检测图片的算法服务。现在问题来了。下游服务的算法开发告诉你,这些下游服务不支持高并发,但是在请求参数里给你加了一个数组,你可以批量传一个比较大的。图像阵列,这样可以提高图像处理量。所以,我们的场景就变成了。上游服务的入参是一个视频及其N张图片,输出参数是视频是否通过审核。下游服务的入参为N张图片,出参为视频是否通过审核。batch_callupstreamanddownstream现在我们要使用上游服务来访问下游服务。该怎么办?看起来很容易搞定,一梭子不就完了吗?当有视频进来的时候,将视频的十多张图片作为一批来调用。当有几个视频进来的时候,就开几个这样的并发。这样做的结果就是,当并发量比较大的时候,你会发现性能很差,性能很不稳定。比如下面这个监控图,一会儿是3qps,一会儿是15qps。处理后的图片只支持20qps左右。狗看到后摇摇头。图1-直接调用时qps很低,怎么办?为什么下游需要批量调用?本着先问是不是,再问为什么的精神,我们先来看看为什么下游需求如此独特。为什么也是处理多张图片,下游不支持并发而是批量调用?这个设定有点奇怪?其实也不奇怪,在算法服务中甚至很常见,举个例子就可以理解。它还在处理多张图片。为简单起见,我将假设三张图片。如果是单cpu处理。不管是并发还是批处理,由于cpu内部的计算单元是有限的,可以简单理解为这三张图是串行计算的。cpu处理图片的时候,我计算第一张图片能不能通过,第二张图片能不能通过。两者之间没有逻辑联系,所以两张图并行计算也是情理之中。但是我的CPU计算单元有限,做不到。但。如果我打破计算单元有限的条件,给CPU增加很多计算单元,弱化一些对计算没有用的部件,比如缓存,控制单元。然后我们有足够的计算能力来并行计算这些图像。图片的并行处理是的,如果CPU这么集成,其实就变成了GPU。GPU和CPU的区别上面的解释只是为了方便理解。实际上,GPU会以更细的粒度进行并发计算,比如可以细化到图片中的像素级别。这就是为什么如果我们运行一些3D游戏,就需要用到显卡,因为它可以快速并行的计算出画面中各个地方的光影,远近的效果,然后渲染画面。回到为什么要做成批量调用的问题。实际上,在一次算法服务调用中,在数据真正进入GPU之前,CPU其实是用来做一些预处理的。所以,我们可以简单的把调用的时间理解为做了下面的事情。GPU处理图片时的流程服务由CPU逻辑和GPU处理逻辑组成。调用到服务后,会有一些前置逻辑,需要CPU来完成,然后再用GPU进行并行计算。结果返回后,还有一些PostCPU的处理逻辑。中间的GPU部分,不管是计算1张还是100张,只要算力支持,都是并行计算,时间差不多。如果将这多张图片拆解并发调用算法服务,就会有N组CPU+GPU的消耗,中间的并行计算其实并没有得到充分利用。并且会有更多的前后CPU逻辑部分。算法服务一般都是python服务。一些主流的web框架几乎都是以多进程而不是多线程的方式处理外部请求。这个有可能。导致额外的进程间切换开销。当并发请求过多,无法处理请求时,后面的请求需要等待前面的处理完才能处理,后面的请求会显得很费时间。这也是上图1中接口延迟(latency)像过山车一样上升的原因。还是上面图一的截图,一张图用了两次,哈哈,按道理降低并发,增加每次调用的图片数,就可以解决这个问题了。这就是建议批量调用的原因。但是问题又来了。对于每个调用,上游服务都会输入一段视频和几张图片。调用下游服务时,batch的数量只能是这些图片的数量。我们怎样才能增加批次的数量?这里的调用需要分为同步调用和异步调用。同步调用和异步调用的区别同步调用是指上游发起请求后,阻塞等待,下游处理逻辑,返回结果给上游。普通的形式就像我们平时的http调用一样。同步调用和异步调用是指上游发起请求后立即返回,下游收到消息后慢慢处理,处理完后再以一定的形式通知上游。一种常见的形式是使用消息队列或mq。消息发送到mq后,下游消费mq消息,触发处理逻辑,然后将处理结果发送给mq,上游消费mq的结果。异步调用异步调用的形式访问异步调用的实现回到我们文章开头提到的例子,当上游服务收到一个请求(一个视频及其对应的图片)时,上游服务作为生产者和发送数据Write到mq并请求返回。然后新建一个C服务,负责批量消费mq中的消息。这时服务C可以根据下游服务的性能来控制自己的消费速度,比如一次消费10条数据(视频),每条数据下挂10张图片,那么我的图片数量batch为10*10=100,原来的10个请求变成了1个请求。这对下游是相当友好的。下游返回结果后,服务C将结果写入mq的另一个topic,上游消费,从而结束整个调用过程。当然,上面的方案,如果把mq换成数据库,也是可以的。这时候服务C可以不断的定期轮询数据库表,看看有哪些请求没有处理完,将没有处理完的请求批处理掉,再对下游进行批处理调用。不管是mq还是数据库,它们的作用无非是充当一个中转,暂时存储数据,让服务C根据下游的消费能力去消费这些数据。这样无论以后增加多少新的服务,都可以在原有的基础上进行扩展。如果是MQ,就加topic,如果是数据库,就加数据表,每一个新的服务都可以根据自己的消费能力进行调整。消费速度。事实上,mq连接了多个不同性能的服务。其实异步调用最适合这种上下游服务处理性能不一致的场景。而且,涉及到的服务性能差距越大,服务数量越多,这种方案的优势就越明显。同步调用接入虽然异步调用在这种场景下优势明显,但也有一个劣势,就是需要最上游的调用者接受异步的方式来消费结果。事实上,涉及算法的服务调用链是比较耗时的,使用异步接口是非常合理的。但又合情合理。一些最上游的可能不听你的,但是他们不能接受异步调用。这就需要使用同步调用的方案,但是如何改造同步接口更适合这种调用场景也是本文的重点。如果限流直接向下游算法服务发送请求,下游根本承受不了,所以首先要做的是在上游调用下游的地方加一个限速(ratelimit)。这样的组件一般不需要自己写,几乎任何语言都会有现成的。比如在golang中可以使用golang.org/x/time/rate这个库,其实就是一个令牌桶算法实现的限流器。不知道令牌桶是什么没关系,不影响理解。限流逻辑当然,这个限制是当前服务调用下游的qps,也就是所谓的单节点限流。如果有多个服务,网上有很多现成的分布式限流框架。但是,俗话说,适可而止。限流只能保证下游算法服务不会被压垮,不能增加单次批量调用的图片数量。有什么办法可以解决这个问题吗?参考Nagle算法。在我们熟悉的TCP协议中,有一种算法叫做Nagle算法。设计它的目的是为了避免一次传输的数据太少,增加数据包的有效数据负载。当我们要发送一些数据包时,数据包会被放入一个缓冲区中,并没有立即发送,那什么时候发送呢?数据包在两种情况下被发送:当缓冲区的数据包长度达到一定大小(MSS)时。或者等待超时(通常为200毫秒)。超时之前,来了那么多数据包,但是MSS的长度不够。现在超时结束了,不用等了,马上发送。这个想法非常值得我们借鉴。我们可以自己在代码层实现一个wave,实现起来非常简单。1、我们定义一个带锁的全局队列(链表)。2、当上游服务输入一个视频及其对应的N张图片时,将这N张图片数据和一个用于存储返回结果的结构体加锁,放入全局队列中。然后无限循环读取这个结构,直到有结果。这有点像阻塞和等待。3、同时在服务启动时,有一个线程A专门负责收集这个全局队列的图片数据。线程A负责发起调用下游服务的请求,但仅限于以下两种情况。当采集到的图片数量达到xx时,距离上次发起请求已经xx毫秒(超时)。4.下游调用完成后,然后根据一开始传入的数据,将调用结果拆解返回给刚才说的用来存放结果的结构体。5、第2步的死循环是因为存放返回结果的结构体是有值的,所以可以跳出死循环,继续执行后面的逻辑。batch_call同步调用改造这个好比公交车站。一个公交车站不可能为每个顾客都派一辆公交车。当然,希望公交车上的顾客越多越好。每次上游有请求,将请求中的图片,即乘客,塞进公交车,公交车要么准时发车(向下游服务发起请求),要么公交车客满,不用等,直接发车。这样可以保证每次发车时,车上都有足够的顾客,发车次数尽可能少。大体思路同上,如果用go实现,会更简单。例如,可以将步骤1中锁定的全局队列更改为具有缓冲区长度的通道。步骤2中的“用于存储结果的结构”也可以换成另一个无缓冲的通道。执行res:=<-ch可以达到阻塞等待的效果。而核心的类Nagle代码大致如下。当然,不看也没关系,反正你已经知道这个意思了。funcCallAPI()error{size:=100//这个数组用来采集视频中的图片,每个IVideoInfo有N张图片videoInfos:=make([]IVideoInfo,0,size)//设置一个200ms的定时器Tick:=time.NewTicker(200*time.Microsecond)defertick.Stop()//无限循环for{select{//由于定时器的原因,每200ms,将执行此行case<-tick.C:iflen(videoInfos)>0{//200ms超时,请求下游limitStartFunc(videoInfos,true)//请求结束后清空之前采集的数据,重新开始采集。videoInfos=make([]IVideoInfo,0,size)}//AddChan就是所谓的globalqueuecasevideoInfo,ok:=<-AddChan:if!ok{//当通道关闭时,如果还有数据不发起请求,只请求一波下游服务limitStartFunc(videoInfos,false)videoInfos=make([]IVideoInfo,0,size)returnnil}else{videoInfos=append(videoInfos,videoInfo)如果videoInfos中的图片满足xxnumber{limitStartFunc(videoInfos,false)videoInfos=make([]IVideoInfo,0,size)//重置定时器tick.Reset(200*time.Microsecond)}}}}returnnil}通过这个操作,每次请求从上游将发送的视频中的图片进行采集,当图片堆积到一定数量时统一请求,这样大大增加了每次批量调用的图片数量,同时也减少了对下游的调用次数服务。真是一石二鸟。优化的效果也很明显。upstream服务支持的qps从不稳定的3q~15q变为稳定的90q。下游接口耗时也稳定了很多,从原来的过山车15s到稳定的500ms。处理图片的速度也从20qps提升到了350qps。至此,已经大大超出了业务需求的预期(40qps),已经足够了,再多一个qps就是浪费。好了,我们下班吧。总结为了充分利用GPU的并行计算能力,很多算法服务都希望上游通过增加batchsize的方式来进行接口调用,同时降低并发度。对于上下游性能差距明显的服务,建议结合mq使用异步调用将服务串联起来。如果一定要同步调用,建议模仿Nagle算法,保存一批数据,然后发起请求。这样可以同时增加批处理和降低并发。真是一石二鸟,亲测有效。最后讲了这么多提高性能的方法。既然需求来了,资源够,时间不够,就直接同步叫班车吧。性能不够?下游加机器,gpu卡,买!那么下个季度,我们会提到一个技术优化,性能提升xx%,cpu和gpu降低xx%。你闻到了吗?这就是KPI的味道。