当前位置: 首页 > 科技观察

并发场景下数据写入功能的实现

时间:2023-03-18 14:13:17 科技观察

1.准备工作1.1理论基础在并发场景下,要实现数据的正确写入,需要理解“锁”相关的原理和技术。并发写入数据时,需要考虑是否加锁。根本原因是数据是共享的,数据是会变化的,即多个线程会同时读写同一个数据。如果数据不是共享的,即不同的线程读写不同的数据,则不需要加锁;如果数据是共享的,所有线程只读不写数据,不需要加锁;如果线程写入数据,则需要加锁。当多个线程同时访问同一个数据,并且至少有一个线程会写入数据时,这种情况称为“数据竞争”。在并发场景下,锁的作用是将并发改为串行,保证数据的一致性(更具体地说,通过加锁,解决并发程序执行的原子性、可见性、顺序问题,有感兴趣的同学可以深入相关理论,本文以实战为主,不再展开)。因此,在并发场景下读写数据,首先要分析是否存在“datarace”问题,如果存在则需要“加锁”。如果本地应用程序发生数据竞争,使用本地锁;如果发生在分布式服务中,使用分布式锁;本地锁和分布式锁原理是一样的,不用单独讨论。与锁相关的技术包括“悲观锁”和“乐观锁”。(1)悲观锁我们通常所说的锁,如无特殊说明,均指“悲观锁”。它通过一些技术手段实现线程或服务之间的互斥和同步。使用时,有显式(或隐式)的锁持有/释放操作。因为加锁本身会消耗性能,加锁后并发处理会变成串行,所以加锁是一个影响系统性能的操作;锁的不当应用会导致潜在的死锁/活锁风险;因此,悲观锁的使用需要谨慎。(2)乐观锁乐观锁通常被称为“无锁技术”。它并不是通过“加锁”和将并发改为串行来保证数据一致性,而是通过CAS(CompareAndSwap)来保证数据的一致性。由于CAS通常非常快,进程不需要“加锁”,性能损失小。但是,通过CAS并发写入数据时,通常会伴随着“自旋”,即当有多个并发写入时,只有一个可以写入成功,其他的会在自旋后重新写入,直到写入成功或由于超时/未能超过重试次数。自旋会带来性能开销,频繁自旋的性能开销会超过加锁。所以乐观锁通常用在并发强度不高的场景,在这种场景下的性能要优于悲观锁。在高并发场景下,推荐使用悲观锁。1.2业务场景及分析本文的主题是介绍几种并发写入数据的方案。为此,我们先建立几个常用的业务场景,并进行简单的分析。在写入数据方面,我们讨论了最常见的向数据库写入数据的场景,主要包括新建数据和修改数据两个具体场景。这两种情况并不完全一致,所以我们分开讨论。(1)向数据库写入新数据向数据库写入数据时,如果数据之间是直接相互独立的,即不存在“数据竞争”,那么根据1.1节的理论,是不需要的这个时候要考虑锁的问题。对于存在“数据竞争”的场景,我们考虑写串口代码的场景:假设创建的数据有一个编码字段,形式为“CON_0001”,后半部分的“0001”为序号,需要根据当前最大的序号。Code+1计算出要创建的数据的序列号。这里存在数据范围竞争。并发创建数据时,如果不进行并发控制,会创建多个相同编码的数据。(2)并发更新数据库记录时,如果能保证每次并发请求更新的数据不一样,就不会出现“数据竞争”,不需要加锁;在并发更新同一条数据记录时,如果不进行并发控制,可能会出现一个写请求覆盖另一个写请求,导致最终结果出错的情况。这里我们考虑“visits+1”的场景,即设计一张visits表,每次请求都给visits+1。比如当前访问5次,5个并发请求同时给记录加1,正确的结果是10,但是没有并发控制,结果通常是<10。2.并发插入串码的实现方案2.1业务逻辑分析业务逻辑如下:取出当前数据库中串码最大的记录,从encoding字段中解析出当前最大串码+1,创建一个新记录进入数据库。:privateIntegeraddEntity(){ConcurrentEntity实体=dataMapper.getLatestConcurrentEntity();intnextNumber=entity==null?1:getNextNumberByCode(entity.getCode());字符串代码=String.format("CON_%04d",nextNumber);返回dataMapper.insertConcurrentEntity(newConcurrentEntity(code));}privateintgetNextNumberByCode(Stringcode){intindex=code.lastIndexOf("_");字符串编号=code.substring(index+1);返回Integer.parseInt(number)+1;}在并发场景下,该业务主要存在原子性隐患,即addEntity()中的代码需要整体执行。如果多个线程交替执行逐行代码,一个线程读取最新的串行代码后,代码已经被其他线程更改,而本线程不知道,导致写入了错误的数据。所以在这个业务场景的并发上,主要是避免了原子性和可见性的问题。最直接的方法就是通过加锁来解决。2.2实现方案2.2.1方案一:LockprivatefinalLocklock=newReentrantLock(false);publicIntegeraddEntityByLock(){synchronized(this){returnaddEntity();}}publicIntegeraddEntityByLock2(){锁.lock();尝试{返回addEntity();}最后{lock.unlock();在分布式系统中,可以使用redisson或者curator提供的分布式锁实例化锁。2.2.2方案二:在数据库中加锁不仅可以在代码中使用,也可以直接在数据库中使用。select...forupdate语句可以在数据库中写入锁。另外,由于业务执行的原子性问题,需要将addEntity()中的逻辑放到同一个事务中。@Transactional(isolation=Isolation.REPEATABLE_READ)publicIntegeraddEntityByTransactionWithLock(){returnaddEntity();}这里的事务隔离级别可以是RC或者RR。注意,在此实现中,addEntity()中对ConcurrentEntity的查询已更改为锁定读取方法:@Select("SELECT*FROMconcurrent_entity\n"+"ORDERBYcodeDESC\n"+"LIMIT1\n"+"更新;")ConcurrentEntitygetLatestConcurrentEntityWithWriteLock();2.2.3性能对比对于1000个并发请求,三种方式的性能对比如下:executeConcurrentAddByLock1:1000个并发,耗时1179msexecuteConcurrentAddByLock2:1000个并发,耗时863msAddexecuteConcurrencytakes1284ms2.3本业务中其他解决方案场景,由于串行代码的计算,存在数据竞争问题,所以并发时需要加锁。如果可以避免数据竞争,就可以避免并发问题。对于这种情况,可以将串口码获取的逻辑放到redis中。Redis本身是单线程的,避免了串行代码的数据竞争,从而避免了加锁的开销,而且redis本身是高性能的,所以理论上这个方案的性能只比上面的方案高。3.并发更新访问的实施方案3.1业务分析在数据库中并发更新访问时,“数据竞争”问题也是“原子性”的隐患。如果更新本身是一个原子操作,就不存在并发问题;如果更新操作分两步进行,先读取当前数据,+1后再重写,操作不是原子的,需要加锁。3.2实践方案3.2.1方案1:原子更新publicIntegerincreaseVisitCountAtomically(intid){returndataMapper.increaseConcurrentVisitAtomic(id);}@Update("UPDATEconcurrent_visit\n"+"SETvisit=visit+1,update_time=NOW()\n"+"WHEREid=#{id};")IntegerincreaseConcurrentVisitAtomic(intid);3.2.2方案2:代码中上锁publicIntegerincreaseVisitCountByLock(intid){synchronized(this){returnincreaseVisitCount(id);}}privateIntegerincreaseVisitCount(intid){ConcurrentVisitconcurrentVisit=dataMapper.getConcurrentVisitObject(id);concurrentVisit.increaseVisit().updateUpdateTime();返回dataMapper.updateConcurrentVisit(concurrentVisit);}3.2.3方案3:数据库中上锁@Transactional()publicIntegerincreaseVisitCountByTransaction(intid){returnincreaseVisitCount(id,true);}privateIntegerincreaseVisitCount(intid,booleanwithLock){ConcurrentVisitconcurrentVisit=withLock?dataMapper.getConcurrentVisitObjectWithLock(id):dataMapper.getConcurrentVisitObject(id);返回dataMapper.increaseConcurrentVisit(concurrentVisit.increaseVisit().updateUpdateTime());}@Select("SELECT*FROM"HEconcurrent_visit\n"+=#{id}\n"+"FORUPDATE;")ConcurrentVisitgetConcurrentVisitObjectWithLock(intid);3.2.4解决方案四:使用乐观锁使用乐观锁时,需要一个自增的版本字段(version),每次更新成功时设置版本,版本必须+1,要以版本为比较更新前的字段。如果当前读取的版本与数据库中的版本不一致,则更新失败publicIntegerincreaseVisitCountOptimistically(intid){ConcurrentVisitconcurrentVisit=dataMapper.getConcurrentVisitObject(id);返回dataMapper.increaseConcurrentVisitOptimistically(concurrentVisit.increaseVisit().updateUpdateTime());}vis"@Update("UPDATE_{visit},update_time=#{updateTime},version=#{version}+1\n"+"WHEREid=#{id}andversion=#{version};")整数increaseConcurrentVisitOptimistically(ConcurrentVisitconcurrentVisit);使用Optimistically加锁时,比较不一致,更新会失败,此时需要自旋重试,所以上面的代码可以优化为:publicIntegerincreaseVisitCountOptimisticallyWithRetry(intid){intresult=0;intmaxRetry=5;longinterval=20L;for(inti=0;i0){break;}interval=interval+i*50;helper.sleep(在terval);}returnresult;}3.2.5性能比较发起10000个并发更新操作,结果如下:executeConcurrentAddAtomically:10000并发,花费时间2112msexecuteConcurrentAddByLock:10000并发,花费时间5796msexecuteConcurrentAddByTransaction:10000并发,花费时间3902msexecuteConcurrentAddOptimisticallyWithRetry:10000Concurrent,spendingtime5998msmysql>select*fromconcurrent_visit;+----+-------------+-------+---------+---------------------+---------------------+|id|resourceKey|visit|version|create_time|update_time|+----+-------------+-------+---------+---------------------+---------------------+|1|resource1|39925|9925|2022-03-3111:42:54|2022-04-0112:06:36|+----+-------------+-------+---------+---------------------+---------------------+Itcanbeseenthatwhentheconcurrencyisintense,theperformanceofoptimisticlockingistheworst,andsomerequests,eveniftheyexceedthemaximumnumberofretries,arenotsuccessfullyupdatedandlockedinthedatabase.Theperformanceisbetterthanthatofthecode.Thereasonisthat,thelockonthedatabaseisfullyoptimizedforperformance,andthegranularityofthelockissmaller.Inourbusinessscenario,thegranularityofthelockinthecodeisalreadydifficulttoreducethegranularityofthelock,whichdeterminesthedegreeofconcurrency.Inconcurrentscenarios,thesmallerthelockgranularity,thebetter