防止并发破坏下游服务的几种方法本文转载请联系网管谢bi公众号。前言上一篇我用hibernation做并发控制,把下游服务搞坏了。发出去之后,得到了很多网友的回复。有人问自己平时用的方案行不行,有人建议借鉴TCP的拥塞控制策略动态调整发起的并发数,还有人问我为什么要控制下游能不能抗。今天总结一下调用下游服务时并发控制的几种方案。因为我们的文章是一篇科普文章,主要目的是总结如何享受并发带来的效率提升,同时做好并发控制,让整个系统的上下游更加稳定.加哪个服务,万一出事谁负责商量。并发控制方案前面我们提到过,使用sleep进行并发控制的最大缺点是没有考虑下游服务的感受。每次开启固定数量的goroutine执行任务后,调用者休眠1s再返回,而不是等待下游服务的反馈。开始下一批要执行的任务。funcbadConcurrency(){batchSize:=500for{data,_:=queryDataWithSizeN(batchSize)iflen(data)==0{break}for_,item:=rangedata{gofunc(iint){doSomething(i)}(item)}时间.Sleep(time.Second*1)}}另外,上游存在请求分配不均的问题。在睡眠期间,根本没有请求。休眠结束后,无论下游是否执行完毕,都会立即发起一批新的请求。因此,我们应该从等待下游反馈和尽可能均匀地分配请求的角度来实现并发控制。当然,在实际项目中,应该将这两个方面结合起来。请访问以下链接查看本文的可执行示例代码:https://github.com/kevinyan815/gocookbook/blob/master/codes/prevent_over_concurrency/main.go使用限流器,我们可以通过限流器执行一个当前限制,如果达到限制,它将被阻塞,直到可以再次发起请求。一听到blockinguntilblabla,有同学立马兴奋起来,想用通道实现一个限流器。“止咳于此”其实完全没有必要。Golang限流器官方的time/rate包的Wait方法正好可以为我们提供这个功能。funcuseRateLimit(){limiter:=rate.NewLimiter(rate.Every(1*time.Second),500)batchSize:=500for{data,_:=queryDataWithSizeN(batchSize)iflen(data)==0{fmt.Println("endofalldata")break}for_,item:=rangedata{//阻塞直到令牌桶有足够的Tokenerr:=limiter.Wait(context.Background())iferr!=nil{fmt.Println("Error:",err)return}gofunc(iint){doSomething(i)}(item)}}}//模拟调用下游服务funcdoSomething(iint){time.Sleep(2*time.Second)fmt.Println("End:",i)}//模拟查询N条数据funcqueryDataWithSizeN(sizeint)(dataList[]int,errerror){rand.Seed(time.Now().Unix())dataList=rand.Perm(size)return}time/rate包提供的限流器使用令牌桶算法。使用Wait方法时,当桶中没有足够的令牌时,调用者会阻塞,直到可以获取到令牌。当然也可以通过Wait方法接受的Context参数来设置等待超时时间。时间。限流器向桶中放入令牌的速率是恒定的,这比简单地使用time.Sleep请求更均匀。时间/速率限流器的详细使用方法请查看我之前的文章:Golang官方限流器使用详解。使用限流器之后,只是让我们的并发请求分布的更均匀而已。最好我们在收到下游的反馈后才能开始下一次并发。使用WaitGroup,我们可以等待上一批并发请求执行完毕,然后再开始下一批任务。估计大部分同学听到这里都会马上想到加入WaitGroup。Point)等待一组正在执行任务的workergoroutines完成。如果正在执行任务的workergoroutines还没有全部完成,等待的goroutines会阻塞在checkpoint,直到所有的wokergoroutines都完成后才能继续执行。funcuseWaitGroup(){batchSize:=500for{data,_:=queryDataWithSizeN(batchSize)iflen(data)==0{fmt.Println("Endofalldata")break}varwgsync.WaitGroupfor_,item:=rangedata{wg.Add(1)gofunc(iint){doSomething(i)wg.Done()}(item)}wg.Wait()fmt.Println("Nextbunchofdata")}}这里调用程序会等待所有的任务执行完才开始为下一批请求检查下一批数据,等待时间取决于这批请求中最新的响应返回需要多长时间。如果使用Semaphore,如果不想等一个batch完成再开始下一个batch,也可以采用完成后再填充下一个batch的策略。这比使用WaitGroup进行并发控制要好。如果下游资源充足,整个任务的处理时间就会减少。快点。该策略需要使用信号量(Semaphore)进行并发控制。Go语言通过扩展库golang.org/x/sync/semaphore提供信号量并发原语。关于信号量的使用方法和实现原理可以看我之前的文章:并发编程-信号量的使用方法和实现原理上面的程序改成使用semaphore信号量。){varconcurrentNumint64=10varweightint64=1varbatchSizeint=50s:=semaphore.NewWeighted(concurrentNum)for{data,_:=queryDataWithSizeN(batchSize)iflen(data)==0{fmt.Println("Endofalldata")break}for_,item:=rangedata{s.Acquire(context.Background(),weight)gofunc(iint){doSomething(i)s.Release(weight)}(item)}}}使用producer-consumer模式,很多读者反应不错加一个线程池还好,因为大家的公司可能都有线程池的实现在用,直接拿来用就行了,我就不在这里胡闹去实现线程池了。在我看来,其实我们需要实现一个生产者和消费者的模型,让线程池帮我们限制只有固定数量的消费者线程去调用下游的服务,生产者拿出来存储数据。通道只能充当两者之间的中介。funcuseChannel(){batchSize:=50dataChan:=make(chanint)varwgsync.WaitGroupwg.Add(batchSize+1)//producergofunc(){for{data,_:=queryDataWithSizeN(batchSize)iflen(data)==0{break}for_,item:=rangedata{dataChan<-item}}close(dataChan)wg.Done()}()//消费者gofunc(){fori:=0;i<50;i++{gofunc(){for{select{casev,ok:=<-dataChan:if!ok{wg.Done()return}doSomething(v)}}}()}}()wg.Wait()}在这个代码实现中,如果使用ErrorGroupinsteadofWaitGroup可以进一步简化,但我留给读者去探索。关于ErrorGroup的用法总结,推荐阅读文章:觉得WaitGroup不好用?尝试错误组!小结通过文中总结的一些方法,我们也可以看出,在并发编程的场景下,除了要关注发起的并发线程数,更重要的是还需要关注底层的反馈异步调用的服务。不是简单的增加并发数就能解决问题的。理解为什么我们要在并发编程中关注下层服务的反馈是非常重要的。否则,我们列出的解决方案实际上可以在goroutines中启动goroutines,不管是否执行完成都直接返回。
