背景本题简单说一下背景。不明白的可以看前面的文章。不想读也没关系。这是一个通用的解决方案,我会在后面总结。上一篇文章最后提到,在对每一个被移除的地址进行决策时,需要顺序执行,并且每个被移除的地址都必须实时获取集群的地址信息,这样才能使得是否需要覆盖的决定。当被移除的机器很多时,获取地址信息的请求量会非常大,给注册中心带来很大的压力。请求数据源的接口如下(其中cuuid是集群的id)很简单的背景并且能想到一些解决办法。每次决策都需要根据cuuid获取集群,即单独获取实时集群地址信息。既然是实时信息,首先排除了缓存,其次自然会想到,如果能合并请求,是不是就可以解决大量请求呢?问题?难点在于,如果只是改逻辑合并请求,改代码就完事了,不值得写这篇文章。如何改最少的代码实现mergerequest是最难的。解决方案那天遇到了这个问题,晚上辗转反侧想到了这个解决方案。其实我主要参考Gohttp客户端的实现。他们说看源码没用。这不就是有用吗?Read数据源接口定义不变,即上层业务代码完全不需要改动,只需要替换ListClusterEndpoints的实现即可。我们可以使用队列对每个请求进行排队。进入队列后调用者被阻塞,然后启动一些协程从队列中取出一批请求参数,发起批量请求,响应后唤醒被阻塞的调用者。为此,我们实现了一个可以阻塞并被其他协程唤醒的工具:Token)Done(valueinterface{},errerror){t<-token{value:value,err:err}}func(tToken)Wait(timeouttime.Duration)(valueinterface{},errerror){iftimeout<=0{tk:=<-treturntk.value,tk.err}select{casetk:=<-t:returntk.value,tk.errcase<-time.After(timeout):返回nil,ErrTokenTimeout}}其次,定义队列等参数:)ListClusterEndpoints(ctxcontext.Context,cuuidstring)([]ptypes.Endpoint,error){req:=param{cuuid:cuuid,token:NewToken(),}选择{casep.paramCh<-req:default:return无,fmt.Errorf("list集群端点写入通道失败")}value,err:=req.token.Wait(p.readTimeout)iferr!=nil{returnnil,err}eps,ok:=value.([]ptypes.Endpoint)if!ok{returnnil,fmt.Errorf("值不是端点")}returnendpoints,nil}启动几个协程来处理任务:func(p*DataSource)startListClusterEndpointsLoop(){fori:=0;i
