GoReplay简介随着应用程序的复杂性增长,测试它所需的工作量也呈指数增长。GoReplay为我们提供了重用现有流量进行测试的简单想法。GoReplay是一款使用golang开发的简单行车记录插件,支持多种方式的过滤、限流放大、重写等特性。GoReplay可以完全不侵入代码,不需要更改您的生产基础设施,并且与语言无关。它不是代理,而是直接监听网卡上的流量。GoReplay的工作原理:监听服务器抓取流量发送给重放服务器或者保存到文件,或者保存到kafka。然后重播服务器会将流量传输到配置的地址。使用流程需求:接到算法端的需求,需要记录真实的生产环境流量,随时回放到任意环境。由于算法端部分场景使用非Java语言编写,现有的流量记录平台暂时无法支持,需要使用新的记录组件来支持压测需求,所以选择了goreplay。GoReplay支持将录制的数据存储到本地文件中,然后在播放时从文件中读取。考虑到每次录制和回放的存储和传送文件的复杂性,我们期望使用更方便的方式来管理数据。GoReplay也原生支持录音数据存储到Kafka中,但是在使用的时候发现有比较大的局限性;使用Kafka存储数据时,必须同时进行流量记录和流量回放,其架构图如下流程1-4不能拆分,只能同时进行。这样会使行车记录和回放功能很弱。我们需要随时回放记录的数据,也支持一个记录数据的多次回放。既然已经把流量数据存储到kafka中了,我们就可以考虑改造GoReplay来支持我们的需求了。修改后的行车记录回放架构图:图中1-2阶段和3-5阶段相互独立,即行车记录过程和回放过程可以拆解。我们只需要记录Kafka在记录开始和结束的offset,就可以知道记录任务包含了哪些数据。我们可以很方便地将每条记录的数据组织成一个记录任务,然后在需要的时候进行流量回放。kafkaoffset改造整合支持改造简要流程:源码中InputKafkaConfig的定义typeInputKafkaConfigstruct{producersarama.AsyncProducerconsumersarama.ConsumerHoststring`json:"input-kafka-host"`Topicstring`json:"input-kafka-topic"`UseJSONbool`json:"input-kafka-json-format"`}定义修改后的InputKafkaConfigtypeInputKafkaConfigstruct{producersarama.AsyncProducerconsumersarama.ConsumerHoststring`json:"input-kafka-host"`主题字符串`json:"input-kafka-topic"`UseJSONbool`json:"input-kafka-json-format"`StartOffsetint64`json:"input-kafka-offset"`EndOffsetint64`json:"input-在kafka-end-offset"`}的源码中,从kafka读取的数据段:可以看到,它选择的offset是Newestforindex,partition:=rangepartitions{consumer,err:=con.ConsumePartition(config.Topic,partition,sarama.OffsetNewest)gofunc(consumersarama.PartitionConsumer){deferconsumer.Close()formessage:=范围消费者。Messages(){i.messages<-message}}(consumer)}从kafka读取数据修改段:forindex,partition:=rangepartitions{consumer,err:=con.ConsumePartition(config.Topic,partition,config.StartOffset)offsetEnd:=config.EndOffset-1gofunc(consumersarama.PartitionConsumer){deferconsumer.Close()formessage:=rangeconsumer.Messages(){//比较消息的偏移量,当它超过这个批次时当数据达到最大值时,关闭通道ifoffsetFlag&&message.Offset>offsetEnd{i.quit<-struct{}{}break}i.messages<-message}}(consumer)}此时,才开始播放任务,指定kafkaoffset范围即可达到预期效果。融入压测平台只需在页面填写选择操作,即可生成启动命令,替代冗长的命令编写StringBuilderbuilder=newStringBuilder("nohup/opt/apps/gor/gor");//拼接参数组合命令builder.append("--input-kafka-host").append("'").append(kafkaServer).append("'");builder.append("--input-kafka-topic").append("'").append(kafkaTopic).append("'");builder.append("--input-kafka-start-offset").append(record.getStartOffset());建造者。append("--input-kafka-end-offset").append(record.getEndOffset());builder.append("--output-http").append(replayDTO.getTargetAddress());builder.append("--exit-after").append(replayDTO.getMonitorTimes()).append("s");if(StringUtils.isNotBlank(replayDTO.getExtParam())){builder.append("").append(replayDTO.getExtParam());}builder.append(">/opt/apps/gor/replay.log2>&1&");StringcompleteParam=builder.toString();压测平台通过Java代理暴露的接口控制GoReplay进程的启停StringsourceAddress=replayDTO.getSourceAddress();String[]split=sourceAddress.split(COMMA);for(Stringip:split){Stringuri=String.format(HttpTrafficRecordServiceImpl.BASE_URL+"/gor/start",ip,HttpTrafficRecordServiceImpl.AGENT_PORT);//重新创建对象GoreplayRequestrequest=newGoreplayRequest();request.setConfig(replayDTO.getCompleteParam());请求.setType(0);尝试{restTemplate.postForObject(uri,request,String.class);}catch(RestClientExceptione){LogUtil.error("启动或失败,请检查!",e);MSException.throwException("启动失败,请检查!",e);}}
