当前位置: 首页 > 科技观察

Go实现分布式高可用后台:使用gRPC实现日志微服务

时间:2023-03-18 00:53:56 科技观察

掌握了gRPC的基本原理后,我们就可以使用它来实现日志微服务功能了。在构建高并发系统时,内部服务组件通常使用gRPC来实现高效的数据传输,所以我们将使用json完成的日志服务改为使用gRPC。第一步定义proto文件,修改proglog/api/v1下的log.proto文件:syntax="proto3";packagelog.v1;optiongo_package="api/log_v1";serviceLog{rpcProduce(ProduceRequest)返回(ProduceResponse){}rpcConsume(ConsumeRequest)返回(ConsumeResponse){}rpcConsumeStream(ConsumeRequest)返回(streamConsumeResponse){}rpcProduceStream(streamProduceRequest)returns(streamProduceResponsebyvalue){}}messageRecord;uint64offset=2;}messageProduceRequest{Recordrecord=1;}messageProduceResponse{uint64offset=1;}messageConsumeRequest{uint64offset=1;}messageConsumeResponse{Recordrecord=2;}代码逻辑相同与前面几节我们尝试使用gRPC时的proto文件定义逻辑是一样的。Produce接口用于客户端向服务器端提交一条日志信息。Consume是客户端向服务器提交日志号,然后服务器返回日志信息。终端提交一系列日志编号,然后服务器返回一系列日志信息。ProduceStream是客户端向服务端提交一系列日志信息,添加日志后服务端返回相应的编号。完成上述proto文件的定义后,将其编译成对应的pb.go文件,该文件会放在api/v1/api_log_v1目录下,接下来我们来看server的逻辑设计。在internal/server下新建一个server.go文件。首先,我们添加依赖模块,生成一个gRPC服务器对象,注册我们要实现的接口:commitLoginterface{Append(*api.Record)(uint64,error)Read(uint64)(*api.Record,error)}typeConfigstruct{//实现依赖注入CommitLogcommitLog}var_api.LogServer=(*grpcServer)(nil)//gRPC服务器对象funcNewGRPCServer(config*Config)(*grpc.Server,error){gsrv:=grpc.NewServer()srv,err:=newgrpcServer(config)iferr!=nil{returnnil,err}api.RegisterLogServer(gsrv,srv)returngsrv,nil}typegrpcServerstruct{api.UnimplementedLogServer*Config}funcnewgrpcServer(config*Config)(srv*grpcServer,errerror){srv=&grpcServer{//grpcServer将实现proto文件中定义的接口Config:config,}returnsrv,nil}上面的代码有一点需要注意,那就是它使用了一种常见的设计模式,叫做依赖注入,我们的服务需要用到日志模块提供的功能,但是这里我们只需要知道日志模块提供的接口,也就是Append和Read,不需要关心它的具体实现,所以我们可以实现逻辑解耦。对于我们的服务程序,我们只需要调用方将实现commitLog接口的实例传递给我们即可。至于接口的实现细节,我们不用关心。依赖注入的设计模式可以降低系统设计的复杂度,使其具有灵活性。性增强接下来是四个服务接口的实现,其逻辑和我们前面两节没有区别:func(s*grpcServer)Produce(ctxcontext.Context,req*api.ProduceRequest)(*api.ProduceResponse,error){//接收到客户端的日志添加请求,然后调用日志模块Append接口添加偏移量,err:=s.CommitLog.Append(req.Record)iferr!=nil{returnnil,err}//返回添加后的日志编号return&api.ProduceResponse{Offset:offset},nil}func(s*grpcServer)Consume(ctxcontext.Context,req*api.ConsumeRequest)(*api.ConsumeResponse,error){//接收到客户端发送的日志号,返回日志内容记录,err:=s.CommitLog.Read(req.Offset)iferr!=nil{returnnil,err}return&api.ConsumeResponse{Record:record},nil}func(s*grpcServer)ProduceStream(streamapi.Log_ProduceStreamServer)error{for{//客户端发送一系列日志数据,服务器依次接收throughRecv(),然后添加logreq,err:=stream.Recv()iferr!=nil{returnerr}res,err:=s.Produce(stream.Context(),req)iferr!=nil{returnerr}如果err=stream.Send(res);err!=nil{returnerr}}}func(s*grpcServer)ConsumeStream(req*api.ConsumeRequest,streamapi.Log_ConsumeStreamServer)error{for{//客户端发送一系列日志号,服务端返回一系列oflog对应数字的内容select{case<-stream.Context().Done()://这里输入表示客户端已连接returnnildefault:res,err:=s.Consume(stream.Context(),req)switcherr.(type){casenil:caseapi.ErrorOffsetOutOfRange:continuedefault:returnerr}//将获取到的日志信息发送给客户端iferr=stream.Send(res);err!=nil{returnerr}req.Offset++}}}上面代码的实现逻辑和我们前面描述的完全一样,这里就不多赘述了。最后我们来测试一下上面代码的执行情况,新建一个server_test.go,添加如下内容:packageserverimport("context""io/ioutil""net""testing""github.com/stretchr/testify/require"api"api/v1/api/log_v1""internal/log""google.golang.org/grpc")funcTestServer(t*testing.T){对于场景,fn:=rangemap[string]func(t*testing.T,clientapi.LogClient,config*Config,){"从日志成功生产/消费信息":testProduceConsume,"生产/消费流成功":testProduceConsumeStream,"消费过去logboundaryfails:":testConsumePastBoundary,}{t.Run(scenario,func(t*testing.T){//在运行测试用例之前创建一个服务器对象client,config,teardown:=setupTest(t,nil)deferteardown()//关闭服务器fn(t,client,config)})}}funcsetupTest(t*testing.T,fnfunc(*Config))(clientapi.LogClient,cfg*Config,teardownfunc()){t.Helper()//生成tcp连接,使用0表示使用随机端口l,err:=net.Listen("tcp",":0")require.NoError(t,err)clientOptions:=[]grpc.DialOption{grpc.WithInsecure()}cc,err:=grpc.Dial(l.Addr().String(),clientOptions...)require.NoError(t,err)目录,err:=ioutil.TempDir("","server-test")require.NoError(t,err)clog,err:=log.NewLog(dir,log.Config{})require.NoError(t,err)cfg=&Config{CommitLog:clog,}如果fn!=nil{fn(cfg)}//创建服务器对象server,err:=NewGRPCServer(cfg)require.NoError(t,err)gofunc(){//启动服务器server.Serve(l)}()//创建客户端对象client=api.NewLogClient(cc)返回客户端,cfg,func(){server.Stop()cc.Close()l.Close()clog.Remove()}}functestProduceConsume(t*testing.T,clientapi.LogClient,config*Config){ctx:=context.Background()want:=&api.Record{Value:[]byte("helloworld"),}//客户端提交一条日志,获取日志号然后用它来请求日志内容,检查服务器返回的日志内容是否与提交的produce一致,err:=client.Produce(ctx,&api.ProduceRequest{Record:want,})require.NoError(t,err)消耗,err:=client.Consume(ctx,&api.ConsumeRequest{Offset:produce.Offset,})require.NoError(t,err)require.Equal(t,want.Value,consume.Record.Value)require.Equal(t,want.Offset,consume.Record.Offset)}functestConsumePastBoundary(t*testing.T,clientapi.LogClient,config*Config){ctx:=context.Background()produce,err:=client.Produce(ctx,&api.ProduceRequest{Record:&api.Record{Value:[]byte("helloworld"),},})//使用不存在的日志编号进行请求,服务端应返回相关错误require.NoError(t,err)consume,err:=client.Consume(ctx,&api.ConsumeRequest{Offset:produce.Offset+1,})ifconsume!=nil{t.Fatal("consumenotnil")}got:=grpc.Code(err)want:=grpc.Code(api.ErrorOffsetOutOfRange{}.GRPCStatus().Err())ifgot!=want{t.Fatalf("goterr:%v,want%v",得到,想要)}}functestProduceConsumeStream(t*testing.T,客户端api.LogClient,config*Config){ctx:=context.Background()records:=[]*api.Record{{Value:[]byte("firstmessage"),Offset:0,},{Value:[]byte("secondmessage"),Offset:0,},}//客户端向服务器提交多条日志,获取多个日志编号,然后将获取的编号提交,从而使服务器返回一系列日志数据//然后将服务端返回的日志内容与服务端进行对比{stream,err:=client.ProduceStream(ctx)require.NoError(t,err)foroffset,record:=rangerecords{err=stream.Send(&api.ProduceRequest{Record:record,})require.NoError(t,err)res,err:=stream.Recv()require.NoError(t,err)ifres.Offset!=uint64(offset){t.Fatalf("gotoffset:%d,want:%d",res.Offset,offset,)}}}{stream,err:=client.ConsumeStream(ctx,&api.ConsumeRequest{Offset:0},)require.NoError(t,err)fori,record:=范围记录{res,err:=stream.Recv()require.NoError(t,err)require.Equal(t,res.Record,&api.Record{值:record.Value,偏移量:uint64(i),})}}}测试代码的逻辑可以通过注释理解。在测试用例中,客户端的创建,数据的发送和接收与我们前面描述的没有区别。因此,我们依赖gRPC框架来完成日志服务。下节我们看一下gRPC框架提供的数据安全功能