当前位置: 首页 > 科技观察

Go:使用Sync为什么使用Atomic?

时间:2023-03-13 06:21:11 科技观察

Go是一种擅长并发的语言,启动一个新的goroutine就像输入“go”一样简单。当您发现自己正在构建越来越复杂的系统时,正确保护对共享资源的访问以防止竞争条件就变得极其重要。这些资源可能包括可以动态更新的配置(例如功能标志)、内部状态(例如断路器状态)等。01什么是竞争条件?这可能是大多数读者的基础知识,但由于本文的其余部分取决于对竞争条件的理解,因此有必要进行简短的复习。竞争条件是程序的行为取决于其他不可控事件的顺序或时间的情况。在大多数情况下,这种情况是错误的,因为可能会出现不希望的结果。举个具体例子可能更容易理解://race_condition_test.gopackagemainimport("fmt""sort""sync""testing")funcTest_RaceCondition(t*testing.T){vars=make([]int,0)wg:=sync.WaitGroup{}//spawn10goroutinestomodifythesliceinparallelfori:=0;i<10;i++{wg.Add(1)gofunc(iint){deferwg.Done()s=append(s,i)//addanewitemtotheslice}(i)}wg.Wait()sort.Ints(s)//sorttheresponsetohavecomparableresultsfmt.Println(s)}执行一个:$gotest-vrace_condition_test.go===RUNTest_RaceCondition[0123456789]---PASS:Test_RaceCondition(0.00s)看这里一切都很好。这是我们的预期输出。该程序迭代10次,在每次迭代时向切片添加索引。执行2:===RUNTest_RaceCondition[03]---PASS:Test_RaceCondition(0.00s)等等,这里发生了什么?这次我们的响应切片中只有两个元素。这是因为slice的内容在加载和修改之间发生了变化,导致程序覆盖了一些结果。这种特殊的竞争条件是由数据竞争引起的,其中多个goroutine试图同时访问一个特定的共享变量,并且这些goroutine中至少有一个试图修改它。(注意上面的结果不一定相同,每次运行可能不同)如果你使用-race标志来执行测试,go甚至会告诉你有数据竞争并帮你查明:$gotestrace_condition_test。go-race==================警告:DATARACEReadat0x00c000132048bygoroutine9:command-line-arguments.Test_RaceCondition.func1()/home/sfinlay/go/src/benchmarks/race_condition_test.go:20+0xb4command-line-arguments.Test_RaceConditiondwrap1()/home/sfinlay/go/src/benchmarks/race_condition_test.go:21+0x47Previouswriteat0x00c000132048bygoroutine8:command-line-arguments.Test_RaceCondition.func1()/home/sfinlayench/mark/src/bla/race_condition_test.go:20+0x136command-line-arguments.Test_RaceCondition.dwrap.1()/home/sfinlay/go/src/benchmarks/race_condition_test.go:21+0x47Goroutine9(running)createdat:command-line-arguments.Test_RaceCondition()/home/sfinlay/go/src/benchmarks/race_condition_test.go:18+0xc5testing.tRunner()/usr/local/go/src/testing/testing.go:1259+0x22ftesting.(*T).Run·dwrap21()/usr/local/go/src/testing/testing.go:1306+0x47Goroutine8(finished)createdat:command-line-arguments.Test_RaceCondition()/home/sfinlay/go/src/benchmarks/race_condition_test.go:18+0xc5testing.tRunner()/usr/local/go/src/testing/testing.go:1259+0x22ftesting.(*T).Rundwrap21()/usr/local/go/src/testing/testing.go:1306+0x47==================02保护对这些共享资源的访问的并发控制通常涉及常见的内存同步机制,例如通道或互斥锁这是调整竞争条件以使用互斥锁的相同测试用例:funcTest_NoRaceCondition(t*testing.T){vars=make([]int,0)m:=sync.Mutex{}wg:=sync.WaitGroup{}//spawn10goroutinestomodifythesliceinparallelfori:=0;i<10;i++{wg.Add(1)gofunc(iint){m.Lock()deferwg.Done()deferm.Unlock()s=append(s,i)}(i)}wg.Wait()sort.Ints(s)//sorttheresponsetohavecomparableresultsfmt.Println(s)}这次它总是返回所有10个整数,因为它确保每个goroutine仅在没有其他人执行时才读取和写入切片。如果第二个goroutine同时尝试获取锁,它必须等到前一个goroutine完成(即直到它解锁)。然而,对于高吞吐量的系统,性能变得非常重要,因此减少锁争用(即一个进程或线程试图获取另一个进程或线程持有的锁的情况)变得更加重要。最基本的方法之一是使用读写锁(sync.RWMutex)而不是标准的sync.Mutex,但Go在atomic包中也提供了一些原子内存原语。03AtomicGo的原子包提供了用于实现同步算法的低级原子内存原语。这听起来像我们需要的,所以让我们尝试用atomic重写该测试:import"sync/atomic"funcTest_RaceCondition_Atomic(t*testing.T){vars=atomic.Value{}s.Store([]int{})//storeemptysliceasthebasewg:=sync.WaitGroup{}//spawn10goroutinestomodifythesliceinparallelfori:=0;i<10;i++{wg.Add(1)gofunc(iint){deferwg.Done()s1:=s.Load().([]int)s.Store(append(s1,i))//replacetheslicewithanewonecontainingthenewitem}(i)}wg.Wait()s1:=s.Load().([]int)sort.Ints(s1)//sorttheresponsetohavecomparableresultsfmt.Println(s1)}执行结果:===RUNTest_RaceCondition_Atomic[13]---PASS:Test_RaceCondition_Atomic(0.00s)什么?这和我们之前遇到的问题一模一样,那么这个包有什么好处呢?04Read-copy-updateatomic不是万能的,它显然不能替代互斥锁,但是当涉及到可以使用read-copy-update[1]模式管理的共享资源时,它就很棒了。在这种技术中,我们通过引用获取当前值,当我们想要更新它时,我们不修改原始值,而是替换指针(因此没有人访问另一个线程可能访问的相同资源)。使用此模式无法实现前面的示例,因为它应该随着时间的推移扩展现有资源而不是完全替换其内容,但在许多情况下读取-复制-更新是完美的。这是一个基本示例,我们可以在其中获取和存储布尔值(例如,对于功能标志很有用)。在这个例子中,我们正在执行一个比较原子和读写互斥锁的并行基准测试:.value.Load().(bool)}func(b*AtomicValue)Set(valuebool){b.value.Store(value)}funcBenchmarkAtomicValue_Get(b*testing.B){atomB:=AtomicValue{}atomB.value。Store(false)b.RunParallel(func(pb*testing.PB){forpb.Next(){atomB.Get()}})}/************/typeMutexBoolstruct{mutexsync.RWMutexflagbool}func(mb*MutexBool)Get()bool{mb.mutex.RLock()defermb.mutex.RUnlock()returnmb.flag}funcBenchmarkMutexBool_Get(b*testing.B){mb:=MutexBool{flag:true}b.RunParallel(func(pb*testing.PB){forpb.Next(){mb.Get()}})}结果:cpu:Intel(R)Core(TM)i7-8650UCPU@1.90GHzBenchmarkAtomicValue_GetBenchmarkAtomicValue_Get-810000000000.5472ns/opBenchmarkMutexBool_GetBenchmarkMutexBool_Get-82496612748.80ns/op结果一目了然。atomic快了89倍以上。并且可以通过使用更多原始类型来进一步改进:typeAtomicBoolstruct{flagint32}func(b*AtomicBool)Get()bool{returnatomic.LoadInt32(&(b.flag))!=0}func(b*AtomicBool)Set(valuebool){variint32=0ifvalue{i=1}atomic.StoreInt32(&(b.flag),int32(i))}funcBenchmarkAtomicBool_Get(b*testing.B){atomB:=AtomicBool{flag:1}b.RunParallel(func(pb*testing.PB){forpb.Next(){atomB.Get()}})}cpu:Intel(R)Core(TM)i7-8650UCPU@1.90GHzBenchmarkAtomicBool_GetBenchmarkAtomicBool_Get-810000000000.3161ns/op这个版本比mutex多锁定版本的速度提高了154倍以上。Writesalsoshowanoticeabledifference(albeitonalessimpressivescale):funcBenchmarkAtomicBool_Set(b*testing.B){atomB:=AtomicBool{flag:1}b.RunParallel(func(pb*testing.PB){forpb.Next(){atomB.Set(true)}})}/************/funcBenchmarkAtomicValue_Set(b*testing.B){atomB:=AtomicValue{}atomB.value.Store(false)b.RunParallel(func(pb*testing.PB){forpb.Next(){atomB.Set(true)}})}/************/funcBenchmarkMutexBool_Set(b*testing.B){mb:=MutexBool{flag:true}b.RunParallel(func(pb*testing.PB){forpb.Next(){mb.Set(true)}})}Result:cpu:Intel(R)Core(TM)i7-8650UCPU@1.90GHzBenchmarkAtomicBool_SetBenchmarkAtomicBool_Set-86462470516.79ns/opBenchmarkAtomicValue_SetBenchmarkAtomicValue_Set-84765412126.43ns/opBenchmarkMutexBool_SetBenchmarkMutexBool_Set-82012463766.50ns/op在这里我们可以看到atomic在写入时比在读取时慢得多,但仍然比Mutexesaremuchfaster.Interestingly,wecanseethatthedifferencebetweenmutexreadsandwritesisnotverynoticeable(30%slower).Still,atomicstillperformsbetter(2-4timesfasterthanmutexes).05Whyareatomicssofast?Inshort,atomicoperationsarefastbecausetheyrelyonatomicCPUinstructionsinsteadofrelyingonexternallocks.Whenusingamutex,thegoroutineisbrieflypausedorinterruptedeachtimethelockisacquired,andthisblockingaccountsforalargeportionofthetimespentusingthemutex.Atomicoperationscanbeperformedwithoutanyinterruption.06原子总是答案吗?正如我们在前面的例子中已经证明的那样,原子并不能解决所有问题,有些操作只能使用互斥体来解决。考虑以下示例,它演示了我们使用映射作为内存缓存的常见模式:packagemainimport("sync""sync/atomic""testing")//不要使用此实现!typeAtomicCacheMapstruct{valueatomic.Value//map[int]int}func(b*AtomicCacheMap)Get(keyint)int{returnb.value.Load().(map[int]int)[key]}func(b*AtomicCacheMap))Set(key,valueint){oldMap:=b.value.Load().(map[int]int)newMap:=make(map[int]int,len(oldMap)+1)fork,v:=rangeoldMap{newMap[k]=v}newMap[key]=valueb.value.Store(newMap)}funcBenchmarkAtomicCacheMap_Get(b*testing.B){atomM:=AtomicCacheMap{}atomM.value.Store(testMap)b.RunParallel(func(pb*testing.PB){forpb.Next(){atomM.Get(0)}})}funcBenchmarkAtomicCacheMap_Set(b*testing.B){atomM:=AtomicCacheMap{}atomM.value.Store(testMap)vari=0b.RunParallel(func(pb*testing.PB){forpb.Next(){atomM.Set(i,i)i++}})}/************/typeMutexCacheMapstruct{mutexsync.RWMutexvaluemap[int]int}func(mm*MutexCacheMap)Get(keyint)int{mm.mutex.RLock()defermm.mutex.RUnlock()returnmm.value[key]}func(mm*MutexCacheMap)Set(key,valueint){mm.mutex.Lock()defermm.mutex.Unlock()mm.value[key]=value}funcBenchmarkMutexCacheMap_Get(b*testing.B){mb:=MutexCacheMap{value:testMap}b.RunParallel(func(pb*testing.PB){forpb.Next(){mb.Get(0)}})}funcBenchmarkMutexCacheMap_Set(b*testing.B){mb:=MutexCacheMap{value:testMap}vari=0b.RunParallel(func(pb*testing.PB){forpb.Next(){mb.Set(i,i)i++}})}Result:cpu:Intel(R)Core(TM)i7-8650UCPU@1.90GHzBenchmarkAtomicCacheMap_GetBenchmarkAtomicCacheMap_Get-83016645404.194ns/opBenchmarkAtomicCacheMap_SetBenchmarkAtomicCacheMap_Set-88763795889ns/opBenchmarkMutexCacheMap_GetBenchmarkMutexCacheMap_Get-82000095954.63ns/opBenchmarkMutexCacheMap_SetBenchmarkMutexCacheMap_Set-85012434267.2ns/op哎呀,这种表现是痛苦的这意味着,当必须复制大型结构时,atomic的性能非常差。Notonlythat,butthiscodecontainsaracecondition.Liketheslicecaseatthebeginningofthisarticle,theatomiccacheexamplehasaraceconditionwherenewcacheentriesmaybeaddedbetweenthetimethemapiscopiedandwhenthemapisstored,inwhichcasethenewentrieswillbelost.Inthiscase,the-raceflagwillnotdetectanydataraces,sincetherearenoconcurrentaccessestothesamemap.07CaveatsGo'sdocumentation[2]warnsaboutpotentialmisuseoftheatomicpackage:thesefunctionsrequiregreatcaretobeusedcorrectly.除了特殊的低级应用程序,同步最好使用通道或同步包的工具来完成。通过交流共享记忆;不要通过共享内存进行通信。开始使用原子包时,您可能遇到的第一个问题是:panic:sync/atomic:storeofinconsistentlytypedvalueintoValue对于atomic.Store,确保每次调用方法时都存储完全相同的类型很重要。这听起来很简单,但通常并不像听起来那么简单:",e.Code,e.Message)}funcInternalServerError(msgstring)error{returnCustomError{Code:500,Message:msg}}funcmain(){var(err1error=fmt.Errorf("errorhappened")err2error=InternalServerError("另一个错误发生"))errVal:=atomic.Value{}errVal.Store(err1)errVal.Store(err2)//panicshere}两个值都是error类型是不够的,因为他们只是实现了error接口。它们的具体类型还是不一样的,所以atomic不喜欢。08总结竞争条件很糟糕,应该保护对共享资源的访问。互斥量很酷,但由于锁争用而往往很慢。对于某些读取-复制-更新模式有意义的情况(这往往是动态配置之类的事情,例如功能标志、日志级别或映射或结构,例如通过JSON解析等填充一次),尤其是在读取时读取次数大于写入次数。atomic通常不应该用于其他用例(例如,随时间增长的变量,如缓存),并且需要非常小心地使用该功能。可能最重要的方法是将锁保持在最低限度,如果您正在考虑像原子这样的替代方案,请务必在投入生产之前对它们进行广泛的测试和试验。原文链接:https://www.sixt.tech/golangs-atomic参考文献[1]Read-Copy-Update:https://en.wikipedia.org/wiki/Read-copy-update[2]文档:https://pkg.go.dev/sync/atomic本文转载自微信公众号“幽灵”,可通过以下二维码关注。转载本文请联系有鬼公众号。