当前位置: 首页 > 科技观察

说说高并发下如何防止重载?_0

时间:2023-03-19 21:08:35 科技观察

最近的一次测试给我带来了一个bug,说我提供的批量复制商品的接口产生了重复的商品数据。追根溯源后发现,这件事情并没有想象中的那么简单,可以说是一波三折。1.需求产品有需求:用户选择了一些品牌,点击确定按钮后,系统需要根据默认品牌的产品数据复制一批新产品。拿到这个需求的时候觉得太简单了,三遍五遍二分就搞定了。我提供了一个复制商品的基础接口,调用到商城系统。当时的流程是这样的:如果每次复制的产品数量不大,使用同步接口调用方案问题也不会太大。2.性能优化,但由于每次需要复制的项目较多,可能有几千条。如果每次都使用同步接口复制产品,可能会出现性能问题。所以后来我把复制商品的逻辑改成使用mq异步处理。改造后的流程图:复制商品的结果需要通知商城系统:这个方案看起来还不错。但后来出了点问题。3.出了点问题。测试给我们带来了一个bug,说我提供的批量复制商品的接口产生了重复的商品数据。经过排查,发现商城系统出于性能的考虑,也改成了异步的。他们并没有在界面中直接调用基础系统的复制商品接口,而是在作业中调用。在他们看来,流程图是这样的:用户调用商城的接口时,会往请求记录表中写入一条数据,然后在另一个作业中,异步调用基础系统的接口复制产品。但实际情况是这样的:商城系统存在bug,在请求记录表中,同一个请求产生了重复的数据。这样做的结果是,在作业中调用基础系统复制商品接口时,会重复发送请求。恰好现在底层系统使用RocketMQ进行异步处理。因为商城的job会一次取一批数据(比如:20条记录),并且在很短的时间内多次调用接口(其实就是for循环),可能会出现这样的情况其中不断调用相同的请求参数复制商品接口。因此,存在并发插入重复数据的问题。为什么会出现这个问题?4、多线程消费RocketMQ消费者,出于性能考虑,默认使用多线程并发消费,最多支持64个线程。例如:@RocketMQMessageListener(topic="${com.susan.topic:PRODUCT_TOPIC}",consumerGroup="${com.susan.group:PRODUCT_TOPIC_GROUP}")@ServicepublicclassMessageReceiverimplementsRocketMQListener{@OverridepublicvoidonMessage(MessageExtmessage){Stringmessage=newString(message.getBody(),StandardCharsets.UTF_8);doSamething(消息);}}也就是说,如果在很短的时间内连续发送重复的消息,就会阻塞不同的线程消费。即使代码中有这样的判断:ProductoldProduct=query(hashCode);if(oldProduct==null){productMapper.insert(product);}在插入数据之前,先判断数据是否已经存在,只有存在不存在将被插入。但是由于并发的情况,不同的线程判断商品数据不存在,于是同时进行insert操作,于是产生了重复数据。如下图所示:5.顺序消费为了解决上述并发消费重复消息的问题,我们从两个方面着手:商城系统修复了产生重复记录的bug。基础系统将消息改为单线程顺序消费。仔细想了想,如果仅仅依靠商城系统修复bug,以后很难避免类似的重复商品问题,比如:如果用户在很短的时间内多次点击创建商品按钮时间,或者商城系统发起重试。因此,基础系统还需要进一步处理。其实RocketMQ本身是支持顺序消费的,需要消息的生产者和消费者一起改变。生产者更改为:rocketMQTemplate.asyncSendOrderly(topic,message,hashKey,newSendCallback(){@OverridepublicvoidonSuccess(SendResultsendResult){log.info("sendMessagesuccess");}@OverridepublicvoidonException(Throwablee){log.error("发送消息失败!");}});关键是调用rocketMQTemplate对象的asyncSendOrderly方法来发送顺序消息。Consumer改为:@RocketMQMessageListener(topic="${com.susan.topic:PRODUCT_TOPIC}",consumeMode=ConsumeMode.ORDERLY,consumerGroup="${com.susan.group:PRODUCT_TOPIC_GROUP}")@ServicepublicclassMessageReceiverimplementsRocketMQListener{@OverridepublicvoidonMessage(MessageExtmessage){Stringmessage=newString(message.getBody(),StandardCharsets.UTF_8);doSamething(消息);}}接收消息的重点是RocketMQMessageListener注解中的consumeMode参数,设置为ConsumeMode.ORDERLY,这样就可以顺序消费消息了。修改后的关键流程图如下:双方修改后,复制产品部分将不再出现重复产品的问题。但是,在修复了这个bug之后,我想了很久。复制产品只是创建产品的入口之一。如果还有其他条目,在复制产品功能的同时创建一个新产品如何?不会有重复的产品问题吗?虽然,这种可能性非常非常小。但是,一旦出现重复商品的问题,后期要合并商品的数据就会很麻烦。有了这次的教训,我们要谨防岁月流逝。无论是用户还是你自己的内部系统,从不同的入口创建产品都需要解决创建重复产品的问题。那么,如何解决这个问题呢?6、唯一索引解决了商品数据重复的问题。最快、成本最低和最有效的方法是为表建立唯一索引。想法是好的,但是我们这里有一个规范:所有的业务表必须逻辑删除。而我们都知道,如果要删除表中的某条记录,如果使用delete语句来操作的话。例如:从productwhereid=123中删除;这种删除操作是物理删除,即记录被删除后,通过SQL语句基本查不到。(不过可以通过其他技术手段找回,那是后话)还有一种墓碑,主要是通过update语句来操作的。例如:updateproductsetdelete_status=1,edit_time=now(3)whereid=123;逻辑删除需要在表中额外增加一个删除状态字段,记录数据是否被删除。在所有的业务查询中,都需要过滤掉被删除的数据。通过这种方式删除数据后,数据还在表中,只是对删除状态的数据进行了逻辑过滤。事实上,不可能给这个逻辑删除的表加上唯一索引。为什么?假设product表中的name和model已经添加了唯一索引,如果用户删除了一条记录,delete_status设置为1,后来用户发现不对,又添加了完全相同的产品。由于唯一索引的存在,用户第二次添加商品会失败。即使删除了商品,也无法再次添加。这个问题显然有点严重。可能有人会说:把name、model、delete_status这三个字段同时做成唯一索引不就行了吗?答:这样做确实可以解决用户逻辑删除某个商品后重新添加相同商品后无法添加的问题。但是如果第二次添加的item又被删除了。用户第三次添加相同的商品,没有问题吗?可见,如果表中有逻辑删除功能,不方便创建唯一索引。7、分布式锁接下来解决数据重复问题的第二种方案可能是:加分布式锁。目前最常用的分布式锁性能最高的可能就是redis分布式锁了。使用redis分布式锁的伪代码如下:try{Stringresult=jedis.set(lockKey,requestId,"NX","PX",expireTime);if("OK".equals(result)){doSamething();返回真;}returnfalse;}finally{unlock(lockKey,requestId);}但是需要在finally代码块中释放锁。其中lockKey由product表中的name和model组成,requestId是每次请求的唯一标识,以便每次都能正确释放锁。还需要设置一个过期时间expireTime,防止锁释放失败,锁会一直存在,导致后续请求获取不到锁。如果只有一个项目,或者需要复制和添加少量项目,那么加分布式锁是没有问题的。主要流程如下:在复制和添加商品前可以先尝试加锁。如果加锁成功,则检查该item是否存在,如果不存在,则添加该item。另外,在这个过程中,如果加锁失败,或者查询时商品不存在,则直接返回。加分布式锁的目的是保证查询商品和添加商品这两个操作是原子操作。但是现在的问题是我们这次需要复制和添加大量的产品。如果每增加一个产品就加一个分布式锁,会极大地影响性能。显然,对于批处理接口,添加redis分布式锁并不是一个理想的方案。8.统一的mq异步处理我们之前已经讲过了。在批量复制商品的界面中,我们使用RocketMQ的顺序消息,通过单线程异步复制的方式添加商品,可以暂时解决商品重复的问题。但是那只改变了一个用于添加产品的条目,还有其他用于添加产品的条目。能不能把添加商品的底层逻辑统一起来,最后调用同一个代码。然后通过RocketMQ的顺序消息,单线程异步添加商品。主要流程如下图所示:这样确实可以解决重复产品的问题。但同时也带来另外两个问题:现在所有添加商品的功能都改为异步了,添加商品的接口如何同步返回数据?这需要修改前端交互,否则会影响用户体验。以前添加商品的入口是多线程的,现在只有一个线程可以添加商品。由于此修改,添加产品的整体效率有所降低。因此,在综合考虑各种因素后,最终否决了这个方案。9.insertonduplicatekeyupdate其实在mysql中有这样一种语法,即:insertonduplicatekeyupdate。添加数据时,如果mysql发现数据不存在,则直接插入。如果发现数据已经存在,则进行更新操作。但是要求表中有唯一索引或者PRIMARYKEY,这样当两个值相同时,才会触发更新操作,否则就插入。现在的问题是PRIMARYKEY是product表的主键,是按照雪花算法提前生成的,不可能产生重复数据。但是由于product表的逻辑删除功能,无法在product表中创建唯一索引。因此insertonduplicatekeyupdate方案暂时不可用。另外insertonduplicatekeyupdate在高并发的情况下可能会出现死锁问题,需要特别注意。有兴趣的朋友也可以私聊我。其实insertonduplicatekeyupdate的实战,我在另一篇文章《??我用kafka两年踩过的一些非比寻常的坑??》介绍过,有兴趣的朋友可以看看。10.insertignoremysql中还有这样的语法,即:insert...ignore。insert语句执行过程中:mysql发现如果数据重复则忽略,否则插入。主要用于忽略插入重复数据产生的Duplicateentry'XXX'forkey'XXXX'异常。但是,它还要求表中存在唯一索引或PRIMARYKEY。但是由于product表的逻辑删除功能,无法在product表中创建唯一索引。可见这个方案行不通。温馨提示,使用insert...ignore也可能会造成死锁。11、我们之前讲过防重表,因为逻辑删除功能,product表加唯一索引不起作用。后面提到加分布式锁,或者通过mq单线程异步添加商品,影响创建商品的性能。那么,如何解决问题呢?能不能换个思路,加一张防重表,在防重表中增加商品表的名称和型号字段作为唯一索引。例如:CREATETABLE`product_unique`(`id`bigint(20)NOTNULLCOMMENT'id',`name`varchar(130)DEFAULTNULLCOMMENT'name',`model`varchar(255)NOTNULLCOMMENT'specification',`user_id`bigint(20)unsignedNOTNULLCOMMENT'创建用户id',`user_name`varchar(30)NOTNULLCOMMENT'创建用户名',`create_date`datetime(3)NOTNULLDEFAULTCURRENT_TIMESTAMP(3)COMMENT'createTime',PRIMARYKEY(`id`),UNIQUEKEY`ux_name_model`(`name`,`model`))ENGINE=InnoDBDEFAULTCHARSET=utf8mb4COMMENT='商品反重表';表中的id可以用于commodity表的id,表中的name和model就是商品表的name和model,只是在这个防重表中添加了这两个字段的唯一索引。视野一下子被打开了。添加商品数据前,先添加反重表。如果添加成功,则表示可以正常添加商品。如果添加失败,说明存在重复数据。如果添加防重表失败,后续业务处理根据实际业务需要确定。如果业务允许添加一批商品,如果存在重复,则直接抛出异常,并提示用户:系统检测到重复商品,请刷新页面重试。例如:try{transactionTemplate.execute((status)->{productUniqueMapper.batchInsert(productUniqueList);productMapper.batchInsert(productList);returnBoolean.TRUE;});}catch(DuplicateKeyExceptione){thrownewBusinessException("systemdetection如果看到重复的商品,请刷新页面重试");}批量插入数据时,如果出现重复数据,捕获DuplicateKeyException,转化为类似BusinessException的运行时业务异常。还有一种业务场景,要求即使有重复的商品也不抛出异常,这样业务流程才能正常进行。例如:try{transactionTemplate.execute((status)->{productUniqueMapper.insert(productUnique);productMapper.insert(product);returnBoolean.TRUE;});}catch(DuplicateKeyExceptione){product=productMapper.query(product);}插入数据时,如果有重复数据,捕获DuplicateKeyException,在catch代码块中再次查询商品数据,直接返回数据库中已有的商品。如果调用同步添加商品的接口,这里的重点是返回已有数据的id,业务系统需要使用这个id进行后续操作。当然,在执行execute之前,还是要检查产品数据是否存在。如果存在,则直接返回已有的数据。如果不存在,则执行execute方法。这一步一定不能错过。例如:ProductoldProduct=productMapper.query(product);if(Objects.nonNull(oldProduct)){returnoldProduct;}try{transactionTemplate.execute((status)->{productUniqueMapper.insert(productUnique);productMapper.insert(product);returnBoolean.TRUE;});}catch(DuplicateKeyExceptione){product=productMapper.query(product);}返回产品;请注意:防重表操作和添加商品必须在同一个事务中,否则会出现问题。对了,需要对产品的删除功能做特殊处理。当逻辑删除产品表时,必须物理删除防重表。只需要使用产品表id作为查询条件即可。老实说,解决重复数据问题的方法有很多。没有最好的方案,只有最适合业务场景的最优方案。另外,如果你对重复数据导出的幂等性问题感兴趣,可以看我的另一篇文章《??高并发下如何保证接口的幂等性???》,里面有很详细的介绍。