当前位置: 首页 > 科技观察

如何用Go搭建一个高负载的WebSocket服务器

时间:2023-03-18 00:04:22 科技观察

大家好!我叫SergeyKamardin,是Mail.Ru的一名工程师。简介要从我们故事的上下文开始,应该有几点说明我们需要此服务器的原因。Mail.Ru有许多有状态的系统。用户电子邮件存储就是其中之一。有几种方法可以跟踪系统中的状态变化和系统事件。这主要是通过定期系统轮询或关于其状态变化的系统通知。两种方式各有利弊。但在邮件方面,用户收到新邮件的速度越快越好。邮件轮询每秒涉及大约50,000个HTTP查询,其中60%返回304状态,这意味着没有邮箱更改。因此,为了减轻服务器的负载,加快向用户发送邮件的速度,决定写一个发布-订阅服务器,一方面接收状态变化的通知,另一方面将接收此类通知的订阅。之前现在第一个方案显示了它过去的样子。浏览器定期轮询API并查询存储(电子邮件服务)的更改。第二个场景描述了新架构。浏览器与通知API建立WebSocket连接,通知API是Bus服务器的客户端。收到新电子邮件时,Storage会向Bus(1)发送通知,Bus(1)会将其发送给订阅者。API确定连接以发送收到的通知并将其发送到用户的浏览器(3)。那么今天我们就来聊一聊API或者WebSocket服务器。我们的服务器将拥有大约300万个在线连接。实现让我们看看服务器的某些部分如何在没有任何优化的情况下使用Go函数实现。在继续net/http之前,让我们谈谈我们如何发送和接收数据。位于WebSocket协议之上的数据(例如JSON对象)在下文中将被称为数据包。我们开始实现包含通过WebSocket连接发送和接收这些数据包的Channel结构。通道结构//Packetrepresentsapplicationleveldata.typePacketstruct{...}//Channelwrapsuserconnection.typeChannelstruct{connnet.Conn//WebSocketconnection.sendchanPacket//Outgoingpacketsqueue.}funcNewChannel(connnet.Conn)*Channel{c:=&Channel{conn:conn,send:make(chanPacket,N),}goc.reader()goc.writer()returnc}请注意,有用于读取器和写入器的goroutine。每个goroutine都需要自己的内存栈,其初始大小可能为2到8KB,具体取决于操作系统和Go版本。在300万个在线连接中,我们将需要24GB内存(堆栈需要4KB)来维护所有连接。这还不包括为Channel结构分配的内存、传出数据包ch.send和其他内部字段消耗的内存。I/Ogoroutines下面看一下“reader”的实现:func(c*Channel)reader(){//wemakeabufferedreadtoreducereadsyscalls.buf:=bufio.NewReader(c.conn)for{pkt,_:=readPacket(buf)c.handle(pkt)}}在这里,我们使用bufio.Reader来减少read()系统调用的次数,并读取尽可能多的buf缓冲区大小。在***循环中,我们期望新数据到达。请记住:新数据预计会到达。我们稍后会回来。我们将把传入数据包的解析和处理作为对我们将要讨论的优化不重要的事情。但是,buf现在值得我们注意:默认情况下,它是4KB,这意味着我们还需要12GB的内存。"writer"也有类似的情况:func(c*Channel)writer(){//Wemakebufferedwritoreducewritesyscalls.buf:=bufio.NewWriter(c.conn)forpkt:=rangec.send{_:=writePacket(buf,pkt??)buf.Flush()}}我们遍历c.send并将它们写入缓冲区。细心的读者已经猜到,我们的300万个连接也会消耗12GB的内存。我们已经有了一个简单的HTTPChannel实现,现在我们需要一个WebSocket连接来使用它。注意:如果您不知道WebSocket是如何工作的。客户端通过称为升级的特殊HTTP机制切换到WebSocket协议。成功处理升级请求后,服务器和客户端使用TCP连接交换二进制WebSocket帧。这里是对连接中帧结构的描述。import("net/http""some/websocket")http.HandleFunc("/v1/ws",func(whttp.ResponseWriter,r*http.Request){conn,_:=websocket.Upgrade(r,w)ch:=NewChannel(conn)//...})请注意,http.ResponseWriter为bufio.Reader和bufio.Writer(使用4KB缓冲区)分配内存,用于*http.Request初始化和进一步的响应写入。无论使用何种WebSocket库,在成功响应升级请求后,服务器都会在responseWriter.Hijack()调用后接收I/O缓冲区以及TCP连接。提示:在某些情况下,go:linkname可用于通过调用net/http.putBufio{Reader,Writer}将缓冲区返回到net/http中的sync.Pool。因此,我们需要另外24GB的内存来维护300万个链接。因此,即使我们的程序什么都不做,它仍然需要72G的内存。优化让我们回顾一下我们在介绍部分讨论的内容,并记住用户连接的行为。切换到WebSocket后,客户端发送包含相关事件的数据包,即订阅事件。然后(忽略ping/pong等技术信息),客户端可能不会在整个连接生命周期内发送任何其他信息。连接生命周期可以从几秒到几天不等。所以大多数时候,我们的Channel.reader()和Channel.writer()都在等待接收或发送数据。每个都有一个4KB的I/O缓冲区。现在显然可以做得更好,不是吗?netpoll你记得在bufio.Reader.Read()里面,Channel.reader()实现了conn.read()在没有新数据的时候是锁死的。如果连接中有数据,Go运行时会“唤醒”我们的goroutine并允许它读取下一个数据包。之后,goroutine再次锁定,等待新数据。让我们看看Go运行时如何理解goroutine必须被“唤醒”。如果我们查看conn.Read()实现,我们会看到其中调用了net.netFD.Read()://net/fd_unix.gofunc(fd*netFD)Read(p[]byte)(nint,errerror){//...for{n,err=syscall.Read(fd.sysfd,p)iferr!=nil{n=0iferr==syscall.EAGAIN{iferr=fd.pd.waitRead();err==nil{continue}}}//...break}//...}Go在非阻塞模式下使用套接字。EAGAIN表示socket中没有数据,从空socket读取时不会加锁,OS将控制权交还给我们。我们从连接文件描述符中看到一个read()系统调用。如果读取返回EAGAIN错误,运行时将调用pollDesc.waitRead()://net/fd_poll_runtime.gofunc(pd*pollDesc)waitRead()error{returnpd.wait('r')}func(pd*pollDesc)wait(modeint)error{res:=runtime_pollWait(pd.runtimeCtx,mode)//...}如果深入挖掘,我们会发现netpoll在Linux中是使用epoll实现的,在BSD中是使用kqueue实现的。为什么不使用相同的连接方法?我们可以分配一个读缓冲区,只在真正需要的时候才使用goroutine:当套接字中确实有数据要读的时候。在github.com/golang/go导出netpoll函数有问题。摆脱goroutines假设我们有Go的netpoll实现。现在我们可以避免使用内部缓冲区启动Channel.reader()goroutine,并订阅连接上的事件以获取可读数据:ch:=NewChannel(conn)//Makeconntobeobservedbynetpollinstance.poller.Start(conn,netpoll.EventRead,func(){//Wespawngoroutineheretopreventpollerwaitloop//tobecomelockedduringreceivingpacketfromch.goReceive(ch)})//Receivereadsapacketfromconnandhandlesitsomehow.func(ch*Channel)Receive(){buf:=bufio.NewReader(ch.conn)pkt:=readPacket(buf)c.hand(pkt)}使用Channel.writer()更容易,因为只有当我们想要发送数据包时,我们才能运行goroutine并分配缓冲区:func(ch*Channel)Send(pPacket){ifc.noWriterYet(){goch.writer()}ch.send<-p}请注意,我们不处理操作系统在write()系统调用上返回EAGAIN的情况。对于这种情况,我们倾向于像Goruntime一样处理。如果需要,可以用相同的方式处理它。从ch.send(一个或多个)读取输出数据包后,writer将完成其操作并释放goroutine堆栈和发送缓冲区。***!通过去除两个连续运行的goroutine中的堆栈和I/O缓冲区,我们节省了48GB。资源控制大量的连接不仅与高内存消耗有关。在开发服务器时,我们会遇到重复的竞争条件和死锁,通常是在所谓的自动DDoS中,当应用程序客户端不加区别地尝试连接到服务器时,就会发生这种情况,从而破坏服务器。例如,如果由于某种原因我们突然无法处理ping/pong消息,但是空闲连接的处理程序关闭了这样的连接(假设连接断开所以没有提供数据),客户端继续尝试连接而不是等待事件。如果锁定或过载的服务器停止接受新连接,并且负载平衡器(例如nginx)将请求一路传递到下一个服务器实例,压力将是巨大的。此外,无论服务器负载如何,如果所有客户端出于任何原因突然想要发送数据包(可能是由于错误),之前保存的48GB将再次使用,因为我们实际上将恢复到goroutine的初始状态并且缓冲区是按连接分配的。GoroutinePools我们可以使用goroutinepools来限制同时处理的数据包数量。下面是一个简单的goroutinepool的实现:(taskfunc())error{select{casep.work<-task:casep.sem<-struct{}{}:gop.worker(task)}}func(p*Pool)worker(taskfunc()){deferfunc(){<-p.sem}for{task()task=<-p.work}}现在我们的netpoll代码如下:pool:=gopool.New(128)poller.Start(conn,netpoll.EventRead,func(){//Wewillblockpollerwaitloopwhen//allpoolworkersarebusy.pool.Schedule(func(){Receive(ch)})})现在我们可以使用池中空闲的goroutines读取数据包。同样,我们将更改Send():pool:=gopool.New(128)func(ch*Channel)Send(pPacket){ifc.noWriterYet(){pool.Schedule(ch.writer)}ch.send<-p}而不是去ch.writer(),我们想写一个可重用的goroutine。因此,对于一个包含N个goroutine的池,我们可以保证当N个请求被并发处理并到达N+1时,我们不会分配N+1个缓冲区用于读取。goroutine池还允许我们限制新连接的Accept()和Upgrade(),避免在大多数情况下被DDoS淹没。零拷贝升级让我们稍微偏离了WebSocket协议。如前所述,客户端使用HTTP升级请求切换到WebSocket协议。协议看起来像:GET/wsHTTP/1.1Host:mail.ruConnection:UpgradeSec-Websocket-Key:A3xNe7sEB9HixkmBhVrYaA==Sec-Websocket-Version:13Upgrade:websocketHTTP/1.1101SwitchingProtocolsConnection:UpgradeSec-Websocket-Accept+:ksu0QnadeRWG+P0mkThat在我们的例子中,我们需要HTTP请求和标头来切换到WebSocket协议。这个知识点和http.Request的内部实现表明我们可以做优化。我们将在处理HTTP请求时放弃不必要的内存分配和复制,并放弃标准的net/http服务器。例如,http.Request包含一个同名类型header的字段,它通过从连接复制数据到值字符串来无条件地填充所有请求标头。想象一下,这个字段中可以保留多少额外的数据,例如一个大的Cookie标头。但是怎么办?WebSocket实现不幸的是,在我们的服务器优化时存在的所有库都允许我们升级标准的net/http服务器。此外,并非所有库都可以使用上述所有读写优化。为了使这些优化生效,我们必须使用相当低级的API来处理WebSocket。为了重用缓冲区,我们需要协议函数看起来像这样:funcReadFrame(io.Reader)(Frame,error)funcWriteFrame(io.Writer,Frame)errorconnection取数据包如下(数据包写法类似)://getReadBuf,putReadBufareintendedto//reuse*bufio.Reader(withsync.Poolforexample).funcgetReadBuf(io.Reader)*bufio.ReaderfuncputReadBuf(*bufio.Reader)//当可以从conn.funcreadPacket(connio.Reader)error{buf:=getReadBuf()deferputReadBuf(buf)buf.Reset(conn)frame,_:=ReadFrame(buf)parsePacket(frame.Payload)//...}简而言之,是时候创建我们自己的库了。github.com/gobwas/ws为了避免将协议操作逻辑强加给用户,我们编写了WS库。所有读取和写入方法都接受标准的io.Reader和io.Writer接口,并且可以使用也可以不使用缓冲区或任何其他I/O包装器。除了标准net/http的升级请求外,ws还支持零拷贝升级,处理升级请求并切换到WebSocket,无需分配内存或复制。ws.Upgrade()接受io.ReadWriter(net.Conn实现了这个接口)。换句话说,我们可以使用标准的net.Listen()并立即将接收到的连接从ln.Accept()传递给ws.Upgrade()。该库可以复制任何请求数据以供将来在应用程序中使用(例如用于验证会话的cookie)。以下是升级请求处理的基准:带有net.Listen()的标准net/http服务器加上零拷贝升级:BenchmarkUpgradeHTTP5156ns/op8576B/op9allocs/opBenchmarkUpgradeTCP973ns/op0B/op0allocs/op切换到ws和零拷贝升级又节省了24GB内存-这是在net/http处理程序处理请求时为I/O缓冲区分配的空间。小结下面结合代码给大家说说我们做的优化。读取内部缓冲区的Goroutines非常昂贵。解决方案:netpoll(epoll,kqueue);重用缓冲区。写入内部缓冲区的goroutine非常昂贵。解决方案:必要时启动goroutines;重用缓冲区。DDOS,netpoll将不起作用。解决方案:复用有限数量的goroutine。net/http不是处理升级到WebSocket的最快方法。解决方案:在连接上使用零拷贝升级。这是服务器代码的样子:import("net""github.com/gobwas/ws")ln,_:=net.Listen("tcp",":8080")for{//Trytoacceptincomingconnectioninsidefreepoolworker.//如果没有1毫秒的空闲工作人员,请不要接受任何事情并稍后再试。//这将帮助我们防止许多自我ddos??或超出资源限制情况。err:=pool.ScheduleTimeout(time.Millisecond,func(){conn:=ln.Accept()_=ws.Upgrade(conn)//WrapWebSocketconnectionwithourChannelstoourendsstruct.//Thiswillhapps/packshelps'.ch:=NewChannel(conn)//等待来自connection.poller.Start(conn,netpoll.EventRead,func(){//Donotcrosstheresourcelimits.pool.Schedule(func(){//Readandlehandleincomingpacket(s).ch.Recevie()})})})iferr!=nil{time.Sleep(time.Millisecond)}}结论过早的优化是万恶之源。DonaldKnuth当然,上述优化是有道理的,但并非在所有情况下都如此。例如,如果可用资源(内存、CPU)和在线连接数之间的比率相当高(服务器空闲),那么优化可能没有意义。但是,您可以从需要改进的地方和需要改进的地方受益匪浅。