微信公众号:LinuGo,欢迎关注Golang标准库搭建的http服务器会为每个请求创建一个协程来处理,虽然每个协程占用的栈空间很小,但是如果有几百万个请求(当然这种可能性有点极端),服务端只能乖乖的为每个请求创建一个协程。这时候go进程中就会有大量的goroutines。不说占用服务器资源,还会增加gc的压力。这时候想在机制上加一个限制,设置一个协程池来限制最大可以处理请求的协程数。看看这部分标准库的源码实现func(srv*Server)Serve(lnet.Listener)error{.....ctx:=context.WithValue(baseCtx,ServerContextKey,srv)for{//等待连接建立,如果没有请求,则阻塞rw,err:=l.Accept()iferr!=nil{......}connCtx:=ctxifcc:=srv.ConnContext;cc!=nil{......}......//启用协程处理请求,这里主要需要修改的是goc.serve(connCtx)}}主要是需要对协程的创建加上限制条件,如果小于协程池指定的个数则允许创建,否则等待协程池有空闲空间后再创建。大体的思路就是大体上使用生产者-消费者模型。使用两个带缓冲区的通道来实现协程的并发控制。一个sigChannel通过缓冲空间限制协程的最大数量,另一个jobChannel用于传输请求的数据(包括请求函数和参数)。这个jobChannel是为了是否不需要Buffering。流程(1)首先,当请求到来时,将标志位数据写入sigChannel。如果此时有空闲位置,则不会阻塞在这里;(2)然后将要执行的函数和参数写入jobChannel;(3)worker,在后台监听jobChannel函数(该函数需要不断读取pipeline数据),取出pipeline中的数据;(4)worker创建一个goroutine来执行request函数;(5)request函数执行完成后,goroutine再取出sigChannel管道中的flag数据,释放空间;注意:如果一开始sigChannel写数据写不出来,说明pool满了,需要阻塞等待。这样就实现了使用sigChannel控制并发量的功能。代码实现下面用代码实现这个思路1.首先在net/http包中保存一份代码,防止被破坏。直接在目录下创建一个git仓库,先commit一次源码,然后创建一个分支自己玩玩。在net/http下为协程池函数创建一个文件夹,并创建一个go文件。2.首先定义两个通道,一个用于存储信号,一个用于存储函数和参数,这里结合http处理typeInfostruct{//函数名,对应httpParamFuncfunc(ctxcontext.Context)//函数的参数,对应c.serve()的connCtx参数Paramcontext.Context}typeTaskstruct{//用来传递函数和参数的管道,对应jobChanneltaskLetchanInfo//的用于传输信号量的pipelinetaskCmpchanint64}typePoolstruct{//两条pipeline对应的结构体task*Task//协程池的容量taskNumint64}3.创建协程池对象,即初始化两个管道funcNewPool(nint64)*Pool{taskc:=make(chanInfo,n)workc:=make(chanint64,n)return&Pool{tasks:&Task{taskLet:taskc,taskCmp:workc,},taskNum:n,}}4.创建一个put函数,用来往两个channel里面塞数据,即producerfunc(p*Pool)Put(aInfo){//往sigChannel里放数据,如果是阻塞的,说明有nofreetimep.tasks.taskCmp<-1//在jobChannel中放入数据p.tasks.taskLet<-a}5.创建一个runfunction来监控pipeline并获取数据,即consumerfunc(p*Pool)Run(){//不断监控jobChannel管道,只要有数据监控,就说明已经有空闲空间了。//需要创建一个goroutine来执行传递过来的函数和参数for{select{caselet:=<-p.tasks.taskLet:gop.Work(let)}}}func(p*Pool)Work(fInfo){//执行传入的函数f.ParamFunc(f.Param)//执行函数后,取出sigChannel中的flag<-p.tasks.taskCmp}6.修改源码,添加代码需要修改的去server.gofunc(srv*Server)Serve(lnet.Listener)error{.....//初始化一个连接池po:=currencyctl.NewPool(srv.CorrencyNum)//start这个池是异步的,否则会阻塞gopo.Run()ctx:=context.WithValue(baseCtx,ServerContextKey,srv)for{//等待建立连接,如果没有请求,会阻塞rw,err:=l.Accept()iferr!=nil{...}connCtx:=ctxifcc:=srv.ConnContext;cc!=nil{...}......//goc.serve(connCtx)//改造进协程池po.Put(currencyctl.Info{ParamFunc:c.serve,Param:connCtx})}}我把并发数的参数放到server结构体中,通过http.ListenAndServe()方法传递给server,下次赋值测试阶段,接下来跑一个测试用例:测试代码很简单,如下:packagemainimport("fmt""net/http"_"net/http/pprof""time")funcmain(){gofunc(){//使用pprof跟踪http.ListenAndServe(":6060",nil,10)}()http.HandleFunc("/",func(writerhttp.ResponseWriter,request*http.Request){fmt.Println("收到请求...")time.Sleep(time.Second*1)writer.Write([]byte("hellohttp"))})http.ListenAndServe(":8000",nil,100)//限制最大并发为100}启动Project,做压力测试(这里我使用的是go-stress-testing工具):当并发请求量为1000时,查看pprof工具,查看系统协程数,控制在100左右。设置协程池coroutineamountto200,使用1000个并发请求,看到协程数控制在200个,经验证,协程池在net/http标准库上的应用基本成功,只是接口简单已经测试了,没有复杂的业务验证,可能会有很多未知的问题,所以,我乖乖的gitcheckout,切到原来的分支。
