通过学习如何在处理并发时发现陷阱来避免未来的陷阱。在复杂的分布式系统中进行任务处理时,通常需要进行并发操作。在Mode.net,我们每天都使用实时、快速和灵活的软件。没有高并发的系统,就不可能构建一个毫秒级动态路由数据包的全球专用网络。这种动态路由是基于网络状态的,虽然在这个过程中需要考虑很多因素,但我们的重点是链路指标。在我们的环境中,链路指标可以是与网络链路的状态和当前属性相关的任何事物,例如链路延迟。并发探测链路监控我们的动态路由算法H.A.L.O.(逐跳自适应链路状态最佳路由)部分依赖于链路度量来计算路由表。这些指标由位于每个PoP(存在点)上的单独组件收集。PoP是代表我们网络中单个路由实体的机器,通过链路连接并分布在我们网络拓扑中的不同位置。一个组件用网络数据包探测周围的机器,周围的机器用数据包回复前者。链路延迟可以从接收到的探测数据包中获得。由于每个PoP都有多个邻居,因此这一探测任务本质上是并发的:我们需要实时测量每个相邻连接点的延迟。我们不能连续处理;为了计算这个指标,必须尽快处理每个探测。延迟计算图序列号和重置:重新排列场景我们的探测组件相互发送和接收数据包,并依赖序列号进行数据包处理。这是为了避免处理重复或无序的数据包。我们的第一个实现依赖于特殊序列号0来重置序列号。此编号仅在组件初始化时使用。主要问题是我们考虑到递增的序列号总是从0开始。组件重启后,包的顺序可能会重新排列,包的序列号很容易被重置前的值替换。这意味着后续数据包将被忽略,直到重置之前使用的序列值排队。UDP握手和有限状态机这里的问题是重启前后组件的序列号是否相同。有几种方法可以解决这个问题,经过讨论,我们选择实现一个状态定义明确的三次握手协议。此握手过程在初始化时通过链路建立会话。这可确保节点通过同一会话进行通信并使用适当的序列号。为了正确地实现这个过程,我们必须定义一个具有不同状态和转换的有限状态机。这样我们就可以在握手期间妥善管理所有极端情况。有限状态机图会话ID由握手的初始化过程生成。一个完整的交换顺序如下:发送方发送一个SYN(ID)包。接收方存储收到的ID并发送SYN-ACK(ID)。发送方收到SYN-ACK(ID)并发送ACK(ID)。它还发送一个以序列号0开头的数据包。接收方检查最后收到的ID并在ID匹配时接受ACK(ID)。它还开始接受序列号为0的数据包。处理状态超时基本上,在每个状态中您需要处理最多三种类型的事件:链接事件、数据包事件和超时事件。这些事件可以并发发生,因此您必须正确处理并发性。链接事件包括网络连接变化或网络断开,相应地发起链接会话或断开已建立的会话。数据包事件是控制数据包(SYN/SYN-ACK/ACK)或只是探测响应。在当前会话状态的预定超时期限到期后,将触发超时事件。这里的主要问题是如何处理并发超时到期和其他事件。这里很容易陷入死锁和资源竞争的陷阱。第一种方法本项目使用的语言是Golang。它确实提供了原生的同步机制,例如内置的通道和锁,并且可以使用轻量级线程进行并发处理。地鼠先开派对,你可以设计两个结构体分别代表我们的会话和超时处理程序。typeSessionstruct{StateSessionStateIdSessionIdRemoteIpstring}typeTimeoutHandlerstruct{callbackfunc(Session)sessionSessiondurationinttimer*timer.Timer}Session标识连接会话,包含会话ID、相邻连接点的IP和当前会话状态字段。TimeoutHandler包含回调函数、相应的会话、持续时间和指向调度计时器的指针。连接点附近的每个会话都包含一个已调度TimeoutHandler的全局映射。SessionTimeoutmap[Session]*TimeoutHandler下面方法注册和取消超时://调度超时回调函数。func(timeout*TimeoutHandler)Register(){timeout.timer=time.AfterFunc(time.Duration(timeout.duration)*time.Second,func(){timeout.callback(timeout.session)})}func(timeout*TimeoutHandler)Cancel(){iftimeout.timer==nil{return}timeout.timer.Stop()}您可以使用类似下面的方法来创建和存储超时:funcCreateTimeoutHandler(callbackfunc(Session),sessionSession,durationint)*TimeoutHandler{ifsessionTimeout[session]==nil{sessionTimeout[session]:=new(TimeoutHandler)}timeout=sessionTimeout[session]timeout.session=sessiontimeout.callback=callbacktimeout.duration=duration返回超时}创建超时处理程序后,回调函数将在设置的持续时间(秒)过去后执行。但是,某些事件会导致您重新安排超时处理程序(在SYN状态下每3秒一次)。为此,您可以让回调函数重新安排超时:funcsynCallback(sessionSession){sendSynPacket(session)//重新安排相同的回调。newTimeout:=NewTimeoutHandler(synCallback,session,SYN_TIMEOUT_DURATION)newTimeout.Register()sessionTimeout[state]=newTimeout}这次回调在新的超时处理程序中重新安排自身并更新全局映射sessionTimeout。数据竞争和引用您的解决方案已有。可以通过检查定时器超时后是否执行超时回调来进行简单测试。为此,注册超时,休眠duration秒,然后检查回调的处理是否已执行。执行此测试后,最好取消预定的超时(因为它会被重新安排),以免对下一次测试产生副作用。令人惊讶的是,这个简单的测试发现了这个解决方案中的一个问题。使用取消方法取消超时未正确处理。以下事件序列导致数据资源争用:您有一个已安排的超时处理程序。线程1:你收到了一个控制包,现在你想取消注册超时,切换到下一个会话状态(比如发送一个SYN后收到一个SYN-ACK)你调用timeout.Cancel(),这个函数调用timer。停止()。(注意Golang定时器的停止并不会终止过期的定时器。)线程2:取消调用前定时器已过期,回调即将执行。执行回调以安排新的超时并更新全局地图。线程1:切换到新的会话状态并注册新的超时,更新全局地图。两个线程同时更新超时映射。最终的结果是你无法取消注册超时,然后你也失去了对线程2重新安排的超时的引用。这导致处理程序继续执行和重新安排一段时间,出现意外行为。锁不能解决问题使用锁也不能完全解决问题。如果您在处理所有事件和执行回调之前锁定,它仍然不会阻止运行过期的回调:func(timeout*TimeoutHandler)Register(){timeout.timer=time.AfterFunc(time.Duration(timeout.duration)*time._Second_,func(){stateLock.Lock()deferstateLock.Unlock()timeout.callback(timeout.session)})}现在不同的是,全局地图的更新是同步的,但这仍然不阻止您在timeout.Cancel()之后调用回调的执行-当调度计时器已过期但尚未获取锁时会发生这种情况。您仍然会丢失对已注册超时的引用。使用取消通道您可以使用取消通道而不是依赖golang函数timer.Stop(),它不能阻止过期计时器的执行。这是一个稍微不同的方法。现在您不能再通过回调递归地重新安排时间;相反,您可以注册一个无限循环,该循环在收到取消信号或超时事件时终止。newRegister()生成一个新的go线程,该线程在超时后执行您的回调,并在前一个超时执行后安排新的超时。返回一个取消通道给调用者以控制循环的终止。func(timeout*TimeoutHandler)Register()chanstruct{}{cancelChan:=make(chanstruct{})gofunc(){select{case_=<-cancelChan:returncase_=<-time.AfterFunc(time.持续时间(timeout.duration)*time.Second):func(){stateLock.Lock()deferstateLock.Unlock()timeout.callback(timeout.session)}()}}()returncancelChan}func(timeout*TimeoutHandler)Cancel(){iftimeout.cancelChan==nil{return}timeout.cancelChan<-struct{}{}}此方法为您注册的所有超时提供取消通道。取消调用向通道发送一个空结构并触发取消操作。但是,这并没有解决之前的问题;在您通过通道取消之前和超时线程获取锁之前,可能会达到超时。这里的解决办法是,拿到锁后,在超时时间内检查取消通道。case_=<-time.AfterFunc(time.Duration(timeout.duration)*time.Second):func(){stateLock.Lock()deferstateLock.Unlock()select{case_=<-handler.cancelChan:返回default:timeout.callback(timeout.session)}}()}最后,这确保在获取锁后执行回调,而不会触发取消。注意死锁这个解决方案似乎有效;但是仍然存在一个隐患:死锁。请阅读上面的代码并尝试自己找到它。考虑并发调用下面描述的所有函数。这里的问题出在取消通道本身。我们正在创建无缓冲通道,也就是说,我们正在发送阻塞调用。当您在超时处理程序中调用取消函数时,在取消处理程序之前无法继续处理。当您对同一个取消通道进行多次调用时,就会出现问题,其中取消请求只处理一次。当多个事件同时取消同一个超时处理程序时,例如连接断开或控制数据包事件,很容易发生这种情况。这会导致死锁,可能会导致应用程序崩溃。有人在听吗?(经TrevorForrey许可。)这里的解决方案是创建缓冲区大小至少为1的通道,以便向通道发送数据不会阻塞,并且还明确地使发送成为非阻塞的,避免并发调用。这确保了取消操作只发送一次并且不会阻塞后续的取消调用。func(timeout*TimeoutHandler)Cancel(){iftimeout.cancelChan==nil{return}select{casetimeout.cancelChan<-struct{}{}:default://不能在频道上发送,已经有人请求了取消。}}总结您在实践中学到的有关执行并发操作时发生的常见错误的知识。由于它们的不确定性,即使进行大量测试也不容易发现这些问题。以下是我们在初始实施中遇到的三个主要问题:异步更新共享数据这似乎是一个显而易见的问题,但如果并发更新发生在不同的地方,则很难发现。结果是数据竞争,其中一些更新在对同一数据的多次更新中丢失,因为一个更新覆盖了另一个。在我们的例子中,我们同时更新同一个共享映射中的调度超时引用。(有趣的是,如果Go在同一映射对象上检测到并发读写,Go将抛出一个致命错误——尝试运行Go的数据竞争检测器)。这最终将导致丢失超时引用,并且无法取消给定的超时。永远不要忘记在必要时使用锁。不要忘记同步gophers的工作缺少条件检查。当您不能完全依赖锁的排他性时,需要进行条件检查。我们遇到的场景略有不同,但核心思想和条件变量是一样的。假设一个生产者和多个消费者使用共享队列的经典场景,生产者可以向队列添加一个元素并唤醒所有消费者。这个唤醒调用意味着队列中的数据是可访问的,并且由于队列是共享的,消费者必须使用锁来同步访问。每个消费者都可以获得锁;但是,您仍然需要检查队列中是否有元素。因为在拿到锁的那一刻并不知道队列的状态,所以还是要检查条件。在我们的例子中,超时处理程序在计时器到期时收到一个“唤醒”调用,但它仍然需要检查它是否已收到取消信号,然后再继续回调。如果您要唤醒多个地鼠,则可能需要进行条件检查。当线程卡住无限期地等待唤醒信号,但该信号永远不会到达时,就会发生死锁。死锁会使整个程序停止运行,从而彻底杀死您的应用程序。在我们的例子中,这是由于向非缓冲和阻塞通道发送多个请求而发生的。这意味着向一个通道发送数据只有在从该通道接收到数据后才能返回。我们的超时线程循环快速接收来自取消通道的信号;然而,在接收到第一个信号后,它会跳出循环并且不再从通道中读取数据。其他电话会一直卡住。为避免这种情况,您需要仔细检查您的代码,小心处理阻塞调用,并确保不会发生线程饥饿。我们案例中的解决方案是使取消调用成为非阻塞调用——我们不需要阻塞调用。
