当前位置: 首页 > 科技观察

远程写入prometheus存储

时间:2023-03-23 10:52:57 科技观察

介绍Prometheus一般采用pull的方式获取数据,但是在某些情况下不方便配置exporter,所以希望可以通过push的方式上传索引数据。1、可以使用pushgateway方式推送到pushgateway,然后prometheus通过pushgateway拉取数据。2、新版本增加了一个参数:--enable-feature=remote-write-receiver,可以让remote直接通过接口/api/v1/write向prometheus写入数据。Pushgateway在高并发的情况下还是很耗资源的,尤其是开启了一致性检查,高并发写的时候特别慢。第二种方法少了一层转发,速度应该会快一些。该接口可以通过prometheus的http接口/api/v1/write提交数据。该接口的数据格式有要求:使用POST提交需要protobuf编码,依赖github.com/gogo/protobuf/proto可以使用snappy压缩,依赖github.com/golang/snappy步骤:收集指标名称、时间戳、值和标签将数据转换成prometheus需要的数据格式用proto对数据进行编码,用snappy压缩通过httpClientpackagepromeimport("bufio""bytes""context""io""io/ioutil""提交数据net/http""net/url""正则表达式""时间""github.com/gogo/protobuf/proto""github.com/golang/snappy""github.com/opentracing-contrib/go-stdlib/nethttp“opentracing”github.com/opentracing/opentracing-go”“github.com/pkg/errors”“github.com/prometheus/common/model”“github.com/prometheus/prometheus/pkg/labels”“github.com/prometheus/prometheus/prompb")typeRecoverableErrorstruct{error}typeHttpClientstruct{url*url.URLClient*http.Clienttimeouttime.Duration}varMetricNameRE=regexp.MustCompile(`^[a-zA-Z_:][a-zA-Z0-9_:]*$`)typeMetricPointstruct{Metricstring`json:"metric"`//度量名称TagsMapmap[string]string`json:"tags"`//数据标签Timeint64`json:"time"`//时间戳,以秒为单位[]byte)error{httpReq,err:=http.NewRequest("POST",c.url.String(),bytes.NewReader(req))iferr!=nil{returnerr}httpReq.Header.Add("内容-编码","snappy")httpReq.Header.Set("Content-Type","application/x-protobuf")httpReq.Header.Set("User-Agent","opcai")httpReq.Header.Set("X-Prometheus-Remote-Write-Version","0.1.0")ctx,cancel:=context.WithTimeout(context.Background(),c.timeout)defercancel()httpReq=httpReq.WithContext(ctx)ifparentSpan:=opentracing.SpanFromContext(ctx);parentSpan!=nil{varht*nethttp.TracerhttpReq,ht=nethttp.TraceRequest(parentSpan.Tracer(),httpReq,nethttp.OperationName("RemoteStore"),nethttp.ClientTrace(false),)deferht.Finish()}httpResp,err:=c.Client.Do(httpReq)iferr!=nil{//ErrorsfromClient.Doarefrom(forexample)networkerrors,soare//recoverable.returnRecoverableError{err}}deferfunc(){io.Copy(ioutil.Discard,httpResp.Body)httpResp.Body.Close()}()ifhttpResp.StatusCode/100!=2{scanner:=bufio.NewScanner(io.LimitReader(httpResp.Body,512))line:=""ifscanner.Scan(){line=scanner.Text()}err=errors.Errorf("serverreturnedHTTPstatus%s:%s",httpResp.Status,行)}ifhttpResp.StatusCode/100==5{returnRecoverableError{err}}returnerr}funcbuildWriteRequest(samples[]*prompb.TimeSeries)([]byte,error){req:=&prompb.WriteRequest{Timeseries:samples,}data,err:=proto.Marshal(req)iferr!=nil{returnnil,err}compressed:=snappy.Encode(nil,data)returncompressed,nil}typesamplestruct{labelslabels.Labelstint64vfloat64}const(LABEL_NAME="__name__")funcconvertOne(item*MetricPoint)(*prompb.TimeSeries,e错误){pt:=prompb.TimeSeries{}pt.Samples=[]prompb.Sample{{}}s:=sample{}s.t=item.Times.v=item.Value//nameif!MetricNameRE.MatchString(item.Metric){return&pt,errors.New("invalidmetricsname")}nameLs:=labels.Label{Name:LABEL_NAME,Value:item.Metric,}s.labels=append(s.labels,nameLs)fork,v:=rangeitem.TagsMap{ifmodel.LabelNameRE.MatchString(k){ls:=labels.Label{Name:k,Value:v,}s.labels=append(s.labels,ls)}}pt.Labels=labelsToLabelsProto(s.labels,pt.Labels)//时间赋值问题,使用每秒时间sMs:=time.Unix(s.t,0).UnixNano()/1e6pt.Samples[0].Timestamp=tsMspt.Samples[0].Value=s.vreturn&pt,nil}funclabelsToLabelsProto(labelslabels.Labels,buf[]*prompb.Label)[]*prompb.Label{result:=buf[:0]ifcap(buf)

最新推荐
猜你喜欢