今天给大家带来的开源项目是Godis:一个用Go语言实现的Redis服务器。支持:5种数据结构(string、list、hash、set、sortedset)自动过期(TTL)发布订阅、地理位置、持久化等功能你可能不需要自己实现Redis服务,但你是否厌倦了每天写?我想通过业务代码的增删改查来提高自己的编程能力。我尝试从头开始写一个项目,打开IDE发现无法启动。手工造轮子一定是提高编程能力的好方法。下面就带大家用Go(Godis)从零开始写一个Redis服务器,从中你将学到:如何用Go语言编写一个TCP服务器设计并实现一个安全可靠的通信协议(redis协议)如何使用Go语言来开发高并发编程设计和实现分布式集群和分布式事务熟悉链表和哈希表、跳表、时间轮等常用数据结构,不用担心内容太难!!示例代码虽然是Go语言,但即使你不懂Go语言,也不影响你了解Redis的原理、底层协议以及高性能的秘密。而且,为了照顾读者,作者对技术的讲解进行了优化。示例代码在原工程的基础上进行了精简,逐行添加了注释。如果你是进阶玩家,请直接访问项目阅读源码:https://github.com/HDT3213/godis让我们一起拨开Redis的迷雾。1、写一个TCP服务器众所周知,Redis是C/S模型,使用TCP协议进行通信。接下来,开始实现TCP服务器。Golang作为服务端广泛使用的编程语言,提供了非常简洁的TCP接口,实现起来非常方便。示例代码:funcListenAndServe(addressstring){//绑定监听地址listener,err:=net.Listen("tcp",address)iferr!=nil{log.Fatal(fmt.Sprintf("listenerr:%v",err))}deferlistener.Close()log.Println(fmt.Sprintf("bind:%s,startlistening...",address))for{//Accept会阻塞直到新的连接建立或listen中断返回conn,err:=listener.Accept()iferr!=nil{//一般是由于监听器被关闭无法继续监听导致的错误log.Fatal(fmt.Sprintf("accepterr:%v",err))}//开启一个新的goroutine来处理连接goHandle(conn)}}funcHandle(connnet.Conn){reader:=bufio.NewReader(conn)for{//ReadString会阻塞直到遇到分隔符'\n'//遇到到达定界符后,ReadString会返回自上次遇到定界符后接收到的所有数据//如果遇到定界符前发生异常,ReadString会返回接收到的数据和错误信息ReadString('\n')iferr!=nil{//通常遇到的错误是连接中断或关闭,使用io.EOF表示iferr==io.EOF{log.Println("connectionclose")}else{log.Println(err)}return}b:=[]byte(msg)//将接收到的信息发送给客户端conn.Write(b)}}funcmain(){ListenAndServe(":8000")}到此为止只用了40行代码就完成了服务器!启动上面的TCP服务后,在终端输入telnet127.0.0.18000连接到你刚才写的服务器,它会返回你发给你的信息(所以请不要骂):这个TCP服务器非常简单。主协程调用accept函数监听端口,接受新的连接后,开启一个Goroutine处理。这种简单的阻塞IO模型有点类似于早期的Tomcat/Apache服务器。阻塞IO模型就是用一个线程来处理一个连接。当没有接收到新的数据时,监听线程被阻塞,直到数据准备好时唤醒该线程进行处理。因为阻塞IO模型需要开启大量的线程,进行频繁的上下文切换,效率很低。Redis使用的epoll技术(IO多路复用)使用一个线程处理大量的连接,大大提高了吞吐量。那么我们的TCP服务器会不会比Redis慢很多呢?当然不是。Golang利用Goroutine调度开销远小于线程调度开销的优势,封装了一个goroutine-per-connection风格的极简接口,net/tcp库封装了epoll,看起来像阻塞IO,避免了需要复杂的异步代码原生epoll接口,同时享受epoll的高性能。在笔者的电脑上,Redis每秒可以响应10.6k的PING命令,而Godis(完整代码)的吞吐量为9.2kqps,相差不大。如果想了解更多Golang高性能的秘密,可以搜索gonetpoller或者go语言网络轮询器关键词。另外,一个合格的TCP服务器在关闭时不应该停止,而是需要完成接收到Request的响应,释放TCP连接等必要的清理工作。我们一般称这个功能为gracefulshutdown或者优雅关机。Gracefulshutdown步骤:首先关闭监听器停止接受新的连接,然后将所有存活的连接一一遍历。优雅关机的代码有很多,这里就不全部贴出来了。2、对Redis协议的透视解决了通信之后,接下来就是搞清楚Redis的协议了。其实就是一个类似于JSON和ProtocolBuffers的序列化协议。可以看到底层其实是一些基础知识。从Redis2.0开始,通信被统一成RESP协议(??REdisSerializationProtocol)。该协议易于实现,不仅可以被程序高效解析,而且可以被人阅读和调试。RESP是一种在TCP协议之上运行的二进制安全文本协议。RESP以行为单位,客户端和服务端发送的所有命令或数据都使用\r\n(CRLF)作为换行符。二进制安全是指允许协议中的任意字符而不会导致失败。比如C语言中的字符串以\0结尾,不允许字符串中间有\0,而Go语言中的字符串允许\0。我们说Go语言中的字符串是二进制安全的,但是C语言中的字符串不是二进制安全的。的。RESP的二进制安全性允许我们在键或值中包含特殊字符,如\r或\n。二进制安全在使用Redis存储二进制数据如protobuf、msgpack等时尤为重要。RESP定义了5种格式:简单字符串(SimpleString):服务器用于返回简单的结果,例如“OK”非二进制安全,anddoesnotallowlinebreaks错误信息(Error):服务器用于返回简单的错误信息,例如“ERRInvalidSynatx”不是二进制安全的,不允许换行整数:llen和llen等命令的返回值scard,64-bitsignedintegerstring(BulkString):二进制安全字符串,如get等命令返回值数组(Array,也称为MultiBulkStrings):BulkString数组,客户端发送的命令格式和lrangeRESP等命令的响应格式由第一个字符表示:简单字符串:以“+”开头,如:“+OK\r\n”错误:以“-”开头,如:“-ERRInvalidSynatx\r\n"整数:以":"开头,如:":1\r\n"字符串:以$开头起始数组:以*开头让我们通过一些实际例子来理解协议。2.1StringBulkString有两行,第一行是$+textlength,第二行是实际内容。例如:$3\r\nSET\r\n字符串(BulkString)是二进制安全的,也就是说BulkString内部可以包含“\r\n”字符(隐藏行尾的CRLF):$4a\r\nb2.2空$-1表示无。例如,当使用get命令查询一个不存在的key时,响应为$-1。2.3数组数组(Array)格式的第一行是“*”+数组长度,后面是对应的字符串个数(BulkString)。例如["foo","bar"]message(传输时的内容):*2$3foo$3bar客户端也使用Array格式向服务端发送指令。命令本身会作为第一个参数,比如SETkeyvalue命令的RESP消息:*3$3SET$3key$5value会打印出换行符:*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n2.4分析准备了解常用RESP报文的内容后,就可以开始分析了。但需要注意的是,RESP是一个二进制安全协议,它允许文本中包含\r\n字符。例如Redis可以正确接收并执行SET"a\r\nb"helloithub命令。该命令的正确信息如下:*3$3SET$4a\r\nb$11hellogithub当ReadBytes读到第五行时"a\r\nb\r\n"会被误认为是两行:*3$3SET$4a//wrongbranchb//wrongbranch$11hellogithub所以当第四行$4被读取时,不是继续使用ReadBytes('\n')读取下一行,而是使用io.ReadFull(reader,msg)方法读取指定长度的内容。msg=make([]byte,4+2)//文本长度4+换行符长度2_,err=io.ReadFull(reader,msg)2.5写RESP协议解析器解决上面内容包含"\r\n"问题,我们可以开始编写Redis协议解析器了!typePayloadstruct{Dataredis.ReplyErrerror}//ParseStream通过io.Reader读取数据并通过channel返回结果给调用者//流处理接口适合client/server使用funcParseStream(readerio.Reader)<-chan*Payload{ch:=make(chan*Payload)goparse0(reader,ch)return}因为parser代码较多,这里简单介绍一下核心流程。funcparse0(readerio.Reader,chchan<-*Payload){//初始化读取状态readingMultiLine:=falseexpectedArgsCount:=0varargs[][]bytevarbulkLenint64for{//我们上面提到RESP是以行为为单位的//因为行划分对于简单的字符串和二进制安全的BulkString,我们需要封装一个readLine函数来兼容line,err=readLine(reader,bulkLen)iferr!=nil{//handleerrorreturn}//接下来我们读取linejustreadParsing//我们简单的把Reply分为两类://单行:StatusReply,IntReply,ErrorReply//多行:BulkReply,MultiBulkReplyif!readingMultiLine{ifisMulitBulkHeader(line){//我们收到了MulitBulkReply的第一行//得到MulitBulkReply中BulkString的个数expectedArgsCount=parseMulitBulkHeader(line)//等待后续的MulitBulkReply行readingMultiLine=true}elseifisBulkHeader(line){//我们已经收到了第一行BulkReply//获取第二行BulkReply的长度,通过bulkLen告诉读取linefunctionthelengthofthenextlineofBulkStringbulkLen=parseBulkHeader()//本次ReplyExpectedArgsCount=1中共有1个BulkString//等待BulkReply的后续行readingMultiLine=true}else{//处理StatusReply,IntReply,ErrorReply等单行Replyreply:=parseSingleLineReply(line)//通过chemitReply(ch)返回结果}}else{//进入这个分支表示我们在等待MulitBulkReply的后续行或者BulkReply//后续行有两种MulitBulkReply,BulkHeader或者BulkStringifisBulkHeader(line){bulkLen=parseBulkHeader()}else{//我们正在读取一个BulkString,它可能是MulitBulkReply或BulkReplyargs=append(args,line)}iflen(args)==expectedArgsCount{//我们读取了All后续行//通过chemitReply(ch)返回结果//重置状态,准备解析下一个ReplyreadingMultiLine=falseexpectedArgsCount=0args=nilbulkLen=0}}}}3.实现内存数据库至此我们完成了数据接收和解析剩下的就是我们应该把数据存储在哪里?抛开持久化部分不谈,作为一个基于内存的KV数据库Redis,所有的数据都需要在内存中存储一??个哈希表,而这个哈希表就是我们今天最后一个需要写的组件,不同于单-线程化的Redis。我们实现的Redis(godis)是并行工作的,所以要考虑各种并发安全问题。常见的并发安全哈希表设计有几种:sync.map:Golang官方提供的并发哈希表,适用于读多写少的场景。但是,提升m.dirty后,m.read会被复制到新的m.dirty中。在数据量很大的情况下,复制操作会阻塞所有的协程,存在很大的隐患。juc.ConcurrentHashMap:Java的并发哈希表是使用分段锁实现的。扩容时,访问哈希表的线程会协助rehash操作,rehash结束前所有的读写操作都会被阻塞。由于缓存数据库的键值对数量庞大,读写操作的响应时间高,使用juc的策略并不合适。memcachedhashtable:当后台线程进行rehash操作时,主线程会判断要访问的hash槽是否已经被rehash过,从而决定是操作old_hashtable还是new_hashtable。这种设计称为渐进式重新散列。它的优点是rehash操作基本不会阻塞主线程的读写,是最理想的方案。但是progressiverehash的实现非常复杂,所以godis采用了Golang社区广泛使用的分段锁策略(不是上面三种),就是将key分散到固定数量的shards中,避免整体rehash操作。分片是受锁保护的地图。当分片进行rehash时,会阻塞分片内的读写,但不会影响其他分片。代码如下:typeConcurrentDictstruct{table[]*Shardcountint32}typeShardstruct{mmap[string]interface{}mutexsync.RWMutex}func(dict*ConcurrentDict)spread(hashCodeuint32)uint32{tableSize:=uint32(len(dict.table))return(tableSize-1)&uint32(hashCode)}func(dict*ConcurrentDict)getShard(indexuint32)*Shard{returndict.table[index]}func(dict*ConcurrentDict)Get(keystring)(valinterface{},existsbool){hashCode:=fnv32(key)index:=dict.spread(hashCode)shard:=dict.getShard(index)shard.mutex.RLock()defershard.mutex.RUnlock()val,exists=shard.m[key]返回}func(dict*ConcurrentDict)Put(keystring,valinterface{})(resultint){ifdict==nil{panic("dictisnil")}hashCode:=fnv32(key)index:=dict.spread(hashCode)shard:=dict.getShard(index)shard.mutex.Lock()defershard.mutex.Unlock()if_,ok:=shard.m[key];ok{shard.m[key]=valreturn0}else{shard.m[key]=valdict.addCount()return1}}ConcurrentDict可以保证单键操作的并发安全,但是还是不能满足并发的要求安全。例如:incr命令需要完成:read->add->write分三步,读写两步操作MSETNX命令的操作不是原子的。当且仅当所有给定的键都不存在时,所有给定的键都设置值。我们需要保证“检查多个key是否存在”和“写入多个key”这两个操作的原子性,所以我们需要实现db.Locker来锁定一个key或者一组key,直到我们完成所有操作后才释放db.储物柜。最直接的想法是使用map[string]*sync.RWMutex。加锁过程分为两步:初始化互斥锁->加锁和解锁过程也分为两步:解锁->释放互斥锁然后有一个无法解决的并发问题:timecoroutineAcoroutineB1locker["a"]。Unlock()2locker["a"]=&sync.RWMutex{}3delete(locker["a"])4locker["a"].Lock()因为协程B在t3释放了锁,协程A试图在t4锁定将失败。如果协程B在解锁时不执行delete(locker["a"]),可以避免异常,但是会造成严重的内存泄漏。我们注意到哈希槽的数量远少于键的数量,反之,多个键可以共享一个哈希槽。因此,我们不再直接锁定key,而是锁定key所在的hash槽,以保证安全。另一方面,hash槽数量少,即使不释放也不会消耗太多内存。typeLocksstruct{table[]*sync.RWMutex}funcMake(tableSizeint)*Locks{table:=make([]*sync.RWMutex,tableSize)fori:=0;i
