当前位置: 首页 > 科技观察

构建一个即时通讯应用(四):消息

时间:2023-03-18 21:53:46 科技观察

本文是系列文章的第四篇。第1部分:模式第2部分:OAuth第3部分:对话在本文中,我们将编写端点代码以创建消息并列出它们,以及一个端点以更新参与者上次阅读消息的时间。首先在main()函数中添加这些路由。router.HandleFunc("POST","/api/conversations/:conversationID/messages",requireJSON(guard(createMessage)))router.HandleFunc("GET","/api/conversations/:conversationID/messages",guard(getMessages))router.HandleFunc("POST","/api/conversations/:conversationID/read_messages",guard(readMessages))消息进入对话,因此端点包含对话ID。创建消息此端点使用仅包含消息内容的JSON正文处理对/api/conversations/{conversationID}/messages的POST请求,并返回新创建的消息。它有两个副作用:更新对话last_message_id和更新参与者messages_read_at。funccreateMessage(whttp.ResponseWriter,r*http.Request){varinputstruct{Contentstring`json:"content"`}deferr.Body.Close()iferr:=json.NewDecoder(r.Body)。解码(&输入);错误!=nil{http.Error(w,err.Error(),http.StatusBadRequest)return}errs:=make(map[string]string)input.Content=removeSpaces(input.Content)ifinput.Content==""{errs["content"]="Messagecontentrequired"}elseiflen([]rune(input.Content))>480{errs["content"]="Messagetoolong.480max"}iflen(errs)!=0{respond(w,Errors{errs},http.StatusUnprocessableEntity)return}ctx:=r.Context()authUserID:=ctx.Value(keyAuthUserID).(string)conversationID:=way.Param(ctx,"conversationID")tx,err:=db.BeginTx(ctx,nil)iferr!=nil{respondError(w,fmt.Errorf("无法开始tx:%v",err))return}defertx.Rollback()isParticipant,err:=queryParticipantExistance(ctx,tx,authUserID,conversationID)iferr!=nil{respondError(w,fmt.Errorf("无法查询参与者是否存在:%v",err))return}if!isParticipant{http.Error(w,"找不到对话",http.StatusNotFound)return}varmessageMessageiferr:=tx.QueryRowContext(ctx,`插入消息(content,user_id,conversation_id)VALUES($1,$2,$3)RETURNINGid,created_at`,input.Content,authUserID,conversationID).Scan(&message.ID,&message.CreatedAt,);err!=nil{respondError(w,fmt.Errorf("无法插入消息:%v",err))return}if_,err:=tx.ExecContext(ctx,`UPDATEconversationsSETlast_message_id=$1WHEREid=$2`,message.ID,conversationID);err!=nil{respondError(w,fmt.Errorf("无法更新对话最后一条消息ID:%v",err))return}iferr=tx.Commit();err!=nil{respondError(w,fmt.Errorf("couldnotcommittxtocreateamessage:%v",err))return}gofunc(){iferr=updateMessagesReadAt(nil,authUserID,conversationID);err!=nil{log.Printf("无法更新读取的消息:%v\n",err)}}()message.Content=input.Contentmessage.UserID=authUserIDmessage。ConversationID=conversationID//TODO:notifyaboutnewmessage.message.Mine=truerespond(w,message,http.StatusCreated)}首先,它将请求文本解码为包含消息内容的结构然后,它验证内容不为空且少于480个字符。varrxSpaces=regexp.MustCompile("\\s+")funcremoveSpaces(sstring)string{ifs==""{returns}lines:=make([]string,0)for_,line:=范围字符串.Split(s,"\n"){line=rxSpaces.ReplaceAllLiteralString(line,"")line=strings.TrimSpace(line)ifline!=""{lines=append(lines,line)}}返回字符串。Join(lines,"\n")}这是一个去除空格的函数。它遍历每一行,删除两个以上的连续空格,并返回非空行。验证后,它启动一个SQL事务。首先,它查询对话中的参与者是否存在。funcqueryParticipantExistance(ctxcontext.Context,tx*sql.Tx,userID,conversationIDstring)(bool,error){ifctx==nil{ctx=context.Background()}var存在booliferr:=tx.QueryRowContext(ctx,`SELECTEXISTS(SELECT1FROMparticipantsWHEREuser_id=$1ANDconversation_id=$2)`,userID,conversationID).Scan(&exists);err!=nil{returnfalse,err}returnexists,nil}我把它提取到一个函数中,因为它可以在以后重用。如果用户不是对话参与者,我们将返回404NOTFound错误。然后,它插入消息并更新对话last_message_id。从这一点开始,last_message_id不能为NULL,因为我们不允许删除消息。接下来提交事务并更新goroutine中的参与者messages_read_at。funcupdateMessagesReadAt(ctxcontext.Context,userID,conversationIDstring)错误{ifctx==nil{ctx=context.Background()}if_,err:=db.ExecContext(ctx,`UPDATEparticipantsSETmessages_read_at=now()WHEREuser_id=$1ANDconversation_id=$2`,userID,conversationID);err!=nil{returnerr}returnnil}在回复这个新消息之前,我们必须通知。这是我们将在下一篇文章中写的现场部分,所以我在那里留了个便条。获取消息此端点处理对/api/conversations/{conversationID}/messages的GET请求。它使用包含会话中所有消息的JSON数组进行响应。它还具有更新actor的messages_read_at的副作用。funcgetMessages(whttp.ResponseWriter,r*http.Request){ctx:=r.Context()authUserID:=ctx.Value(keyAuthUserID).(string)conversationID:=way.Param(ctx,"conversationID")tx,err:=db.BeginTx(ctx,&sql.TxOptions{ReadOnly:true})iferr!=nil{respondError(w,fmt.Errorf("无法开始tx:%v",err))return}defertx.Rollback()isParticipant,err:=queryParticipantExistance(ctx,tx,authUserID,conversationID)iferr!=nil{respondError(w,fmt.Errorf("无法查询参与者存在性:%v",err))return}if!isParticipant{http.Error(w,"Conversationnotfound",http.StatusNotFound)return}rows,err:=tx.QueryContext(ctx,`SELECTid,content,created_at,user_id=$1作为mineFROMmessagesWHEREmessages.conversation_id=$2ORDERBYmessages.created_atDESC`,authUserID,conversationID)iferr!=nil{respondError(w,fmt.Errorf("couldnotquerymessages:%v",err))return}deferrows.Close()消息:=make([]Message,0)forrows.Next(){varmessageMessageiferr=rows.Scan(&message.ID,&message.Content,&message.CreatedAt,&message.Mine,);err!=nil{respondError(w,fmt.Errorf("无法扫描消息:%v",err))return}messages=append(messages,message)}iferr=rows.Err();err!=nil{respondError(w,fmt.Errorf("无法遍历消息:%v",err))return}iferr=tx.Commit();err!=nil{respondError(w,fmt.Errorf("couldnotcommittxtogetmessages:%v",err))return}gofunc(){iferr=updateMessagesReadAt(nil,authUserID,conversationID);err!=nil{log.Printf("couldnotupdatemessagesreadat:%v\n",err)}}()respond(w,messages,http.StatusOK)}首先,它在read-中启动一个SQL事务only模式,检查参与者是否存在,并查询所有消息。在每条消息中,我们使用当前经过身份验证的用户ID来了解用户是否拥有该消息(我的)。然后它提交事务,更新goroutine中的参与者messages_read_at并用消息响应。读取消息此端点处理对/api/conversations/{conversationID}/read_messages的POST请求。没有请求或响应主体。在前端,每??当有新消息到达实时流时,我们都会发出此请求。funcreadMessages(whttp.ResponseWriter,r*http.Request){ctx:=r.Context()authUserID:=ctx.Value(keyAuthUserID).(string)conversationID:=way.Param(ctx,"conversationID")if错误:=updateMessagesReadAt(ctx,authUserID,conversationID);err!=nil{respondError(w,fmt.Errorf("couldnotupdatemessagesreadat:%v",err))return}w.WriteHeader(http.StatusNoContent)}它使用与更新参与者messages_read_at相同的功能。停在这里。实时消息是留在后台的所有内容。请等待下一篇文章。源代码