当前位置: 首页 > 后端技术 > Node.js

基于nest和redis的消息(email、SMS、websocket、app)推送系统的实现

时间:2023-04-03 10:25:47 Node.js

系统功能概述针对各子系统普遍需要的消息推送功能需求(订单流程中的sms和email消息推送),为了避免冗余,代码将push功能抽离到一个独立的平台中,各个子系统通过通用的RESTful接口或消息(kafka)生成任务。推送功能可以是来自推送方法的任何类型的电子邮件、短信、WebSocket或应用程序。任务可以立即执行,也可以在固定时间执行。为了满足多次提醒的功能,在延迟任务的基础上延长了周期。任务。循环任务是一种特殊的延时任务。任务执行完成后,根据任务类型标记动态重新注入任务,直至所有任务执行完毕。(详见系统代码)Redis任务队列定时任务的技术难点|任务队列定时任务有多种实现方式。节点计时模块node-schedule和nativeAPI中的timer适用于单任务的执行,不适合多任务并行(不适合!==不可能)。*消息队列的引入主要用于各种服务的解耦;*就系统本身而言,推送任务可能存在消息量大、并发高的问题,引入消息队列可以起到降峰的作用*Message中有一个特别重要的点推:异步。在大多数情况下,服务的调用者可能对消息推送结果的回调没有很高的要求。消息生产者通过特定通道发起消费请求后,可以继续(只适用于不依赖执行反馈的场景【Serial不适合】)系统使用节点生态中比较好的模块bull。流程图如下。系统调用者发起任务请求,系统首先对请求的合法性进行校验(权限校验见下篇),然后根据不同类型压入任务执行栈。bull中的任务大致有六种状态,waiting,delay,activation,completion,failure。任务堆栈中的任务当状态被标记为活动时,任务进入执行阶段,直到完成并转到下一个任务。*bull:github:https://github.com/OptimalBits/bull*bull:底层使用的Redis发布订阅特性权限验证1.身份验证是为了防止系统被恶意调用,恶意注入任务,应用入口和网关在层次上做身份认证是非常有必要的。本文只讲解应用入口的鉴权,网关的鉴权会在后面的博客中进行讲解。系统也采用了主流的JWTtoken机制。服务生成一个令牌,然后客户端必须为每个请求携带令牌。对于单体微服务的设计,单体系统的身份认证完全可以迁移到网关层面去实现。2.应用认证应用认证是单体系统中的主要功能。系统使用节点中负责安全的模块cryptocrypto。简单使用见代码块key)=>{returncrypto.publicEncrypt(key,Buffer.from(data));}//解密constdecrypt=(data,key)=>{returncrypto.privateDecrypt(key,encrypted);}consttest='测试加密信息'//encryptconstcrypted=encrypt(test,keys.pubKey);//解密constdecrypted=decrypt(crypted,keys.privKey);keys.privKeykeys.pubKey是对应的私钥和公钥[crypto](https://nodejs.org/api/crypto.html)系统编码模块的具体实现就不详细解释了,只说大概方法将进行说明。redis通用方法封装##redis封装import{HttpService,Inject,Injectable}from'@nestjs/common';从'nestjs-redis'导入{RedisService};@Injectable()exportclassRedisCacheService{[https://github.com/OptimalBits/bull](https://github.com/OptimalBits/bull)公共客户端;构造函数(@Inject(RedisService)私有redisService:RedisService){这个.getClient().then(r=>{});}asyncgetClient(){this.client=awaitthis.redisService.getClient();}//设置值的方法asyncset(key:string,value:any,seconds?:number){value=JSON.stringify(value);如果(!this.client){awaitthis.getClient();}if(!seconds){awaitthis.client.set(key,value);}else{awaitthis.client.set(key,value,'EX',seconds);}}//获取值的方法asyncget(key:string){if(!this.client){awaitthis.getClient();}const数据:any=awaitthis.client.get(key);如果(!数据){返回;}返回JSON.parse(数据);}}bulltaskinjection//TaskService.tsexportclassTaskService{constructor(//将创建的任务队列注入服务@InjectQueue('email')privatereadonlyemailNoticeQueue:Queue,@InjectQueue('message')privatereadonlymessageNoticeQueue:Queue,){}//延迟Value:延迟时间publicasyncaddTask(){awaitthis.emailNoticeQueue.add('emitEmail',{id:taskResult.insertedId,config:config.name,},{delay:delayValue});}}任务周期处理函数(email.processor.ts)大牛周期处理函数可以参考https://github.com/OptimalBit...@Injectable()@Processor('email')exportclassNoticeEmailProcessor{constructor(@InjectQueue('email')私有只读emailNoticeQueue:队列,私有只读redisCacheService:RedisCacheService,私有只读taskLogService:TaskLogService,私有只读taskEmitEmailService:TaskEmitEmailService,@Inject(TaskService)私有只读taskService:TaskService,){}@Process('emitEmail'Job){}/***下一步执行任务()*/protectedasyncnextLoopTak(task:TaskEntity,isSuccessFlag:boolean,status:number){}}日期处理使用moment.jshttps://github.com/妈妈ent/moment系统部署系统采用docker部署,系统默认启动在10001端口。getinstallredis-server&&\npmconfigsetregistryhttps://registry.npm.taobao.org&&\npminstall&&\npminstallpm2-gCMD["sh","-c","redis-server&&pm2-runtimestartecosystem.config.js"]2、pm2启动脚本##ecosystem.config.jsmodule.exports=[{script:'dist/main.js',name:'simpleNoticeCenterApi',exec_mode:'cluster',instances:2}]结语系统目前处于不断迭代开发中,可能存在不同程度的问题。对于普通的消息推送还是可以完成的。之后会进一步提升系统的安全性,其他功能会迭代详细讲解。会在后续的博客中进行讲解,包括前后端。原文地址:http://blog.canyuegongzi.xyz/资源github消息推送系统