当前位置: 首页 > 科技观察

使用Go每分钟处理数百万请求

时间:2023-03-15 10:21:22 科技观察

这篇文章在medium上很受欢迎。作者用实际案例分析,讲的很好。我们经常听说使用Go的goroutine和channel很容易实现高并发。把所有的代码都跑在goroutine中是否可以实现高并发程序?很明显不是。本文将一步步教你如何编写一个简单的、高并发的Go程序。Text我在几家不同的公司从事反垃圾邮件、反病毒和反恶意软件工作超过15年,现在我知道这些系统最终会变得越来越复杂,因为我们必须处理大量数据天。目前,我是smsjunk.com的首席执行官和KnowBe4的首席架构师,这两家公司都在网络安全行业。有趣的是,在过去10年里,我作为软件工程师参与的大部分Web后端开发都是使用RubyonRails完成的。不要误会我的意思,我喜欢RubyonRails,我相信它是一个了不起的生态系统,但过了一段时间,你开始以Ruby的方式思考和设计系统,忘记了如何高效地利用多线程、并行、实现速度快,内存消耗小,简化了软件架构。多年来,我一直是C/C++、Delphi和C#开发人员,现在我才开始意识到让工具适合您的工作是多么复杂。我对网络上那些语言和框架大战不太感兴趣,比如哪种语言更好,哪种框架更快。我一直认为效率、生产力和代码可维护性主要取决于构建解决方案的难易程度。问题在我们的匿名监控和分析系统上工作时,我们的目标是能够处理来自数百万端点的大量POST请求。Web处理程序将收到一个JSON文档,其中可能包含需要写入AmazonS3的多个有效负载的集合,以便我们的map-reduce系统稍后可以对该数据进行操作。传统上,我们会考虑创建工作层架构,利用技术堆栈,例如:SidekiqResqueDelayedJobElasticbeanstalkWorkerTierRabbitMQ...并构建2个不同的集群,一个用于Web前端,一个用于工作人员,因此我们可以将机器扩展到处理传入的请求。从一开始,我们的团队就知道我们可以在Go中做到这一点,因为在讨论阶段我们看到这可能是一个非常高流量的系统。我已经使用Go大约2年了,我们用Go开发了一些系统,但没有一个有这么大的流量。我们首先创建几个结构来定义我们通过POST调用接收的Web请求并将它们上传到S3存储。typePayloadCollectionstruct{WindowsVersionstring`json:"version"`Tokenstring`json:"token"`Payloads[]Payload`json:"data"`}typePayloadstruct{//[redacted]}func(p*Payload)UploadToS3()error{//thestorageFoldermethoddensuresthatattherareenonamecollisionin//casewegetsametimestampinthekeynamestorage_path:=fmt.Sprintf("%v/%v",p.storageFolder,time.Now().UnixNano())bucket:=S3Bucketb:=new(bytes.Buffer)encodeErr:=json。NewEncoder(b).Encode(payload)ifencodeErr!=nil{returnencodeErr}//我们投递到S3bucket的所有东西都应该标记为'private'varacl=s3.PrivatevarcontentType="application/octet-stream"returnbucket.PutReader(storage_path,b,int64(b.Len()),contentType,acl,s3.Options{})}Naive的做法——硬核使用Goroutine最初,我们实现了一个非常简单粗暴的POSThandler实现,将每个请求直接放入一个要运行的新goroutine:funcpayloadHandler(whttp.ResponseWriter,r*http.Request){ifr.Method!="POST"{w.WriteHeader(http.StatusMethodNotAllowed)return}//Readthebodyintoastringforjsondecodingvarcontent=&PayloadCollection{}err:=json.NewDecoder(io.LimitReader(r.Body,MaxLength)).Decode(&content)iferr!=nil{w.Header().Set("Content-Type","应用程序/json;charset=UTF-8")w.WriteHeader(http.StatusBadRequest)return}//通过每个payload和queueitems单独发布到S3for_,payload:=rangecontent.Payloads{gopayload.UploadToS3()//<-----DON'TDOTHIS}w.WriteHeader(http.StatusOK)}对于一般的并发,这个其实是可行的,但是很快证明不能应用到高并发的场景。当我们将第一个版本部署到生产环境时,我们可能会有更多的请求。当时,我们开始看到并非如此的数量级,我们低估了并发量。上述方法有几个问题。无法控制工作goroutines的数量。此外,由于我们每分钟有100万个POST请求,系统崩溃非常快。我们再次需要找到另一种方法。从一开始我们就开始讨论如何使请求处理程序的生命周期尽可能短,并在后台生成处理。当然,这是必须在RubyonRails中完成的事情,否则,无论您使用puma、unicorn还是passenger,所有可用的webworker都会阻塞。那么我们就需要使用通用的解决方案来完成这个工作,比如Resque、Sidekiq、SQS等。当然还有其他的工具,方法有很多种。所以我们的第二个改进是创建一个缓冲通道,我们可以在其中将一些作业请求放入队列并将它们上传到S3,因为我们可以控制队列的最大长度并且有足够的RAM来对内存作业中的处理进行排队,所以我们以为我们可以缓冲通道队列中的作业。varQueuechanPayloadfuncinit(){Queue=make(chanPayload,MAX_QUEUE)}funcpayloadHandler(whttp.ResponseWriter,r*http.Request){...//通过每个payload和queueitems单独发布到S3for_,payload:=rangecontent.Payloads{Queue<-payload}...},为了从缓冲通道中取出任务并处理它们,我们使用这种方式:funcStartProcessor(){for{select{casejob:=<-Queue:job.payload.UploadToS3()//<--STILLNOTGOOD}}}老实说,我不知道我们在想什么,那一定是一个艰难的夜晚。这种做法并没有给我们带来任何改进,我们用缓冲队列替换了有缺陷的并发,它只是延迟了问题。我们的同步处理器一次只将一个有效负载上传到S3,并且由于传入请求的速率远远大于单个处理器上传到S3的能力,我们的缓冲通道很快就达到了极限,队列被阻塞,无法继续前进.添加任务。我们只是绕过了这个问题,最终导致我们的系统完全崩溃。在我们部署了这个有问题的版本之后,我们的延迟继续增加。更好的解决方案我们决定在Go通道上使用一种通用模式来创建一个2层(双)通道系统,一个用于处理排队的作业,另一个用于控制有多少工人在JobQueue上同时工作。这个想法是将上传到S3的并行速度提高到可持续的速度,而不会导致机器崩溃或触发S3的连接错误。所以我们选择创建Job/Worker模式。对于熟悉Java、C#等的人来说,将其视为Golang使用通道实现WorkerThread-Pool的方式。var(MaxWorker=os.Getenv("MAX_WORKERS")MaxQueue=os.Getenv("MAX_QUEUE"))//JobrepresentsthejobtoberuntypeJobstruct{PayloadPayload}//Abufferedchannelthatwecansendworkrequestson.varJobQueuechanJob//WorkerrepresentstheworkerthatexecutesthejobtypeWorkerstruct{WorkerPoolchanchanJobJobChannelchanJobquitchanbool}funcNewWorker(workerPoolchanchanJob)Worker{returnWorker{WorkerPool:workerPool,JobChannel:make(chanJob),quit:make(chanbool)}}//Startmethodstartstherunloopfortheworker,listeningforaquittchannelin//caseweneedtostopitfunc(wWorker)Start(){gofunc(){for{//将当前worker注册到workerqueue.w.WorkerPool<-w.JobChannelselect{casejob:=<-w.JobChannel://wehavereceivedaworkrequest.iferr:=job.Payload.UploadToS3();err!=nil{log.Errorf("ErroruploadingtoS3:%s",err.Error())}case<-w.quit://wehavereceivedasignaltostopreturn}}}()}//Stopsignalstheworkertostoplisteningforworkrequests.func(wWorker)Stop(){gofunc(){w.quit<-true}()}我们修改了我们的Web请求处理程序以创建一个带有有效负载的Job结构并将其发送到JobQueueChannel以供工作人员处理funcpayloadHandler(whttp.ResponseWriter,r*http.Request){ifr.Method!="POST"{w.WriteHeader(http.StatusMethodNotAllowed)return}//Readthebodyintoastringforjsondecodingvarcontent=&PayloadCollection{}err:=json.NewDecoder(io.LimitReader(r.Body,MaxLength)).Decode(&content)iferr!=nil{w.Header().Set("Content-Type","application/json;charset=UTF-8")w.WriteHeader(http.StatusBadRequest)return}//通过每个payload和queueitems分别发布到S3for_,payload:=rangecontent.Payloads{//let'createajobwiththepayloadwork:=Job{Payload:payload}//Pushtheworkontothequeue.JobQueue<-work}w.WriteHeader(http.StatusOK)}在我们的网络服务器初始化期间,我们创建一个Dispatcher并调用Run()来创建工作池并开始监控出现的作业在作业队列中。dispatcher:=NewDispatcher(MaxWorker)dispatcher.Run()下面是我们调度器实现的代码:func(d*Dispatcher)Run(){//startingnnumberofworkersfori:=0;i