当前位置: 首页 > 科技观察

Golang处理数百万请求-分钟

时间:2023-03-20 15:25:43 科技观察

我在不同的公司从事反爬虫、反病毒、反恶意软件程序已经15年了,我知道这些系统最终会因为数据量大而改变每天都需要处理和处理。这很复杂。目前我是smsjunk.com的首席执行官和KnowBe4的首席架构师,两家公司都活跃在网络安全行业。搞笑的是,在过去10年的软件工程师生涯中,我参与的后端开发项目几乎都是用RubyonRails完成的。但是不要误会我的意思,我喜欢RubyonRails,我认为它是一个优秀的开发环境,但是当你用ruby??的想法设计和开发系统一段时间后,你往往会忘记你也可以利用多线程、并行化、高速执行和更小的内存开销来开发系统。我做了很多年的c/c++、Delphi和c#开发人员,后来才慢慢意识到使用正确的工具让系统变得更简单明了才是正确的做法。编程界关于编程语言和框架的争论从未停止过,我也不想参与其中。我相信很大一部分代码的效率、生产力和可维护性取决于你设计的架构是否足够简单。待解决的问题当我们开发一个匿名遥测和数据分析系统时,其中一个需求就是能够处理和应对数以百万计的POST请求。网络请求处理器会收到一个POSToverJSON,里面会包含很多需求。写入AmazonS3的数据集合,以便我们的map-reduce系统可以稍后处理数据。一般情况下,我们会考虑搭建一个worker分层结构,使用一些中间件,比如:SidekiqResqueDelayedJobElasticbeanstalkWorkerTierRabbitMQ等。然后搭建两个不同的集群,一个是给webclient的,另一个是给worker的,然后我们可以将工作人员扩展到我们处理业务所需的数量。但是从一开始,我们的团队就意识到所有这些都可以用Go来实现,因为在讨论中我们认为这将是一个非常高流量的系统。我已经使用Go进行开发两年了。我曾经用它开发过一些系统,但是负载规模远远小于这次的需求。我们首先定义一些结构来指定我们的POST接收到的请求体,并定义一个方法来上传到S3存储桶UploadToS3typePayloadCollectionstruct{WindowsVersionstring`json:"version"`Tokenstring`json:"token"`Payloads[]Payload`json:“数据”`}typePayloadstruct{//[redacted]}func(p*Payload)UploadToS3()error{//thestorageFoldermethoddensuresthattherareenonamecollisionin//casewegetsametimestampinthekeynamestorage_path:=fmt.Sprintf("%v/%v",p.storageFolder,time.Now().UnixNano())bucket:=S3Bucketb:=new(bytes.Buffer)encodeErr:=json.NewEncoder(b).Encode(payload)ifencodeErr!=nil{returnencodeErr}//我们发布到S3bucket的所有东西应标记为“私有”varacl=s3.PrivatevarcontentType="application/octet-stream"returnbucket.PutReader(storage_path,b,int64(b.Len()),contentType,acl,s3.Options{})}天真的使用Goruntinues一开始我们很天真地实现了一个POSThook方法如下,简单的把每个requestbody的uploadaction放到Gorutinues中让他们去执行in并行:funcpayloadHandler(whttp.ResponseWriter,r*http.Request){ifr.Method!="POST"{w.WriteHeader(http.StatusMethodNotAllowed)return}//Readthebodyintoastringforjsondecodingvarcontent=&PayloadCollection{}err:=json.NewDecoder(io.LimitReader(r.Body,MaxLength)).Decode(&content)iferr!=nil{w.Header().Set("内容-Type","application/json;charset=UTF-8")w.WriteHeader(http.StatusBadRequest)return}//通过每个payload和queueitems分别发布到S3for_,payload:=rangecontent.Payloads{gopayload.UploadToS3()//<-----DON'TDOTHIS}w.WriteHeader(http.StatusOK)}这个方法在中等规模的负载情况下对于大多数人来说是没问题的,但是在处理更大规模的请求量时,我们很快就会不知所措当我们部署这个版本的代码到生产环境,我们预计会有大量的请求进来,但实际数量不可能达到百万量级。我们完全低估了这个系统要处理的流量。但无论如何,上述方法是有缺陷的。因为它没有办法让我们控制启动的Goruntimes的数量。所以当我们的系统每分钟面临数百万个POST请求时,它很快就崩溃了。为了再次战斗,我们需要另辟蹊径。一开始我们在讨论如何让我们的请求处理程序生命周期尽可能短,并上传到S3以在后台或异步运行。当然,您必须在RubyonRails中执行此操作,否则您将阻塞所有其他网络请求处理程序。无论您使用的是puma、unicorn还是passerby(请不要参与JRuby讨论)。然后我们想到了使用消息队列的一种比较常见的方法来实现我们的目的,比如Resque,Sidekiq,SQS等,而工具也是数不胜数,因为实现这个功能的方法太多了。所以在第二次迭代的时候,我们需要创建一个缓冲队列,我们??将任务放入队列中,一个一个的上传到S3,但是因为我们要达到控制这个队列最大容量的目的,而我们有足够的RAM让我们可以在内存中存储请求体,所以我们认为我们可以直接使用Go提供的通道,然后将我们的请求直接入队到通道中进行处理。varQueuechanPayloadfuncinit(){Queue=make(chanPayload,MAX_QUEUE)}funcpayloadHandler(whttp.ResponseWriter,r*http.Request){...//通过每个payload和queueitems单独发布到S3for_,payload:=rangecontent.Payloads{Queue<-payload}...}从通道获取任务并执行它们的上传操作知道这是在做什么。一定是因为太晚了,我们已经喝了太多红牛了。这种变化并没有以任何方式改善我们的困境,我们将并发任务放入队列中执行,似乎只是解决了问题。但是我们的异步程序一次只会向S3上传一个请求体,但是此时我们请求的数量远远大于我们向S3上传的数量。可以想象,我们的缓冲队列很快就会达到它的极限并被填满,然后它会阻塞其他网络请求的入队。就好像我们只是回避了问题,让我们系统的崩溃时间进入了倒计时。在我们的缺陷版本发布后,整个系统的延迟率每分钟都在持续增加。更好的解决方案我们决定使用协作的方式来改进我们的Go通道,通过构建一个具有2个的通道处理系统,一个用于将请求主体入队,另一个用于控制JobQueue中worker的并发运行小时数。这个思路的核心是将数据以相对稳定的频率并行上传到S3。这样我们的服务器就不会宕机,也不会因为连接太多导致很多S3连接错误。所以我们开始研究Job/Worker模型。这对于熟悉Java和c#开发的人来说并不陌生。你可以理解为Go使用channels实现工作线程池的方式。var(MaxWorker=os.Getenv("MAX_WORKERS")MaxQueue=os.Getenv("MAX_QUEUE"))//JobrepresentsthejobtoberuntypeJobstruct{PayloadPayload}//Abufferedchannelthatwecansendworkrequestson.varJobQueuechanJob//WorkerrepresentstheworkerthatexecutesthejobtypeWorkerstruct{WorkerPoolchanchanJobJobChannelchanJobquitchanbool}funcNewWorker(workerPoolchanchanJob)Worker{returnWorker{WorkerPool:workerPool,JobChannel:make(chanJob),quit:make(chanbool)}}//Startmethodstartstherunloopfortheworker,listeningforaquittchannelin//caseweneedtostopitfunc(wWorker)Start(){gofunc(){for{//将当前worker注册到workerqueue.w.WorkerPool<-w.JobChannelselect{casejob:=<-w.JobChannel://wehavereceivedaworkrequest.iferr:=job.Payload.UploadToS3();err!=nil{log.Errorf("ErroruploadingtoS3:%s",err.Error())}case<-w.quit://wehavereceivedasignaltostopreturn}}}()}//Stopsignalstheworkertostoplisteningforworkrequests.func(wWorker)Stop(){gofunc(){w.quit<-true}()}接下来修改我们网络请求的钩子函数,它负责创建Job结构体实例,放入JobQueue通道等待worker获取执行funcpayloadHandler(whttp.ResponseWriter,r*http.Request){ifr.Method!="POST"{w.WriteHeader(http.StatusMethodNotAllowed)return}//Readthebodyintoastringforjsondecodingvarcontent=&PayloadCollection{}err:=json.NewDecoder(io.LimitReader(r.Body,MaxLength)).Decode(&content)iferr!=nil{w.Header().Set("Content-Type","application/json;charset=UTF-8")w.WriteHeader(http.StatusBadRequest)return}//GothrougheachpayloadandqueueitemsindividuallytobepostedtoS3for_,payload:=rangecontent.Payloads{//let'screateajobwiththepayloadwork:=Job{Payload:payload}//Pushtheworkontothequeue.JobQueue<-work}w.WriteHeader(http.StatusOK)}在我们的网络服务初始化创建一个Dispatcher并调用Run()创建一个线程池,里面有一定数量的worker来接收和处理来自JobQueue的Jobdispatcher:=NewDispatcher(MaxWorker)dispatcher.Run()下面是我们的DispatchertypeDispatcherstruct的实现{//ApoolofworkerschannelsthatareregisteredwiththedispatcherWorkerPoolchanchanJob}funcNewDispatcher(maxWorkersint)*Dispatcher{pool:=make(chanchanJob,maxWorkers)return&Dispatcher{WorkerPool:pool}}func(d*Dispatcher)Run(){//startingnnumberofworkersfori:=0;i