当前位置: 首页 > 后端技术 > Node.js

关于并发和锁的经验分享

时间:2023-04-03 12:04:37 Node.js

并发场景秒杀秒杀系统一般可以指多个用户对同一个资源发起请求,正确响应的次数小于用户请求的次数。这时候最稳妥的办法就是使用悲观锁,数据级锁,比如oracle的sql:selectforupdate。但是悲观锁的缺点在高并发场景下也很明显,就是允许并发度低,容易造成504。就像安检一样,一次只能一个人通过,而且效率和经验都很低。所以应该使用乐观锁,或者使用redis的原子性来限制并发量,然后使用mq进行任务分发。正常流程:用户下单->redis并发锁库存,减少库存->通过mq生产订单任务->mq消费者消费任务,生成订单,更新库存等一系列操作。乐观锁:redis原生提供了乐观锁watch,watch是基于链接的,而主流nodejs中的redis模块是基于pipeline的,没有连接池,所以watch对于纯基于业务的nodejs没用,只要正确使用redis的原子性就足够了。但是,当系统变大时,就会涉及到集群问题。在多系统(multi-link)设计中,手表尤为重要。我个人认为手表可以提供一个“二次操作”的空间。对于秒杀系统,库存更新和秒杀业务可以同时存在。例如:卖家在秒杀期间补库存,因业务问题锁定库存等。此时watch可以提供优先级,即管理员锁定库存(清仓)时同时发送请求由于多个买家发起限时抢购,可以保证管理员的请求被正确传递,买家更新库存。请求失败。constredis=Redis.createClient();letlock=asyncfunction(key){lettransactionStatus=false;等待redis.watchAsync(键);让stock=awaitredis.getAsync(key);if(+stock<1){//库存不足}letreply=awaitredis.multi().decr(key).execAsync();if(!reply){//事务失败时,reply为null,进行错误处理}elseif(reply[0]<0){//事务成功时返回数组,multi可以理解为Promise.all相当//Robust处理超卖的情况,这时候要补redis库存,避免以后补库存出现负库存错误,执行和事务失败一样的操作redis.incr(key);}else{//transactionStatus=true当交易成功且库存正确减少时;}returntransactionStatus;};letproduceOrder=function(orderId,userId){letpayload=JSON.stringify({orderId,userId});让生产者=新生产者();returnproducer.produce(TOPIC.ORDER,payload);};letbuy=asyncfunction(orderId,userId){letkey=`lock:order:${orderId}:${userId}`;letlockResult=awaitlock(key);if(lockResult){awaitproduceOrder(orderId,userId);}返回Promise.resolve();};关闭注意followconcurrency可以代表同一用户对同一资源的重复请求。比如网络不好的时候,用户点击关注某人时,响应太慢,同时发送多个请求。这相当于过滤掉无用的请求。client需要相应的处理,但是健壮的后台也要考虑到这种情况。范式设计的数据库可以通过使用redis的set、hash、bitmap等数据结构设置唯一的联合索引来避免去重。如果是反范式设计(类似于mongo的嵌入式数组设计),可以在db层使用$in,$addToSet等操作符)使用悲观锁做去重,从逻辑层去重可以使用redis或setex操作完成悲观锁的原子性:constredis=Redis.createClient();letcheckLock=asyncfunction(key){letttl=awaitredis.ttlAsync(key);if(+ttl===-1){//如果ttl为-1,则没有过期时间,立即设置过期时间,避免死锁redis.expire(key,10)}returnPromise.resolve();};letlock=asyncfunction(key){letresult=awaitredis.incrAsync(key);if(+result!==1){checkLock(key)//已经加锁Lock,错误处理}else{//加锁成功,设置过期时间避免死锁redis.expire(key,10);}returnPromise.resolve();};letreleaseLock=function(key){returnredis.setAsync(key,0)};letfollow=asyncfunction(followId,userId){letkey=`lock:follow:${followId}:${userId}`;尝试{等待锁(钥匙);//遵循逻辑:查询、遍历、插入等}catch(err){//错误处理}finally{awaitreleaseLock(key)}};