当前位置: 首页 > 科技观察

使用DB实现分布式锁的思路

时间:2023-03-22 02:01:54 科技观察

概述之前参与过一个库存系统,由于其业务比较复杂,开发了很多应用来支持。这样,一条库存数据可能会有多个应用同时修改库存数据。比如计划任务域xx.cron,SystemA域和SystemB域有几个JAVA应用程序,可能同时修改同一个库存数据。如果没有协调,就会有脏数据。跨JAVA进程的线程协调,可以使用DB或Redis等外部环境。下面介绍如何使用DB实现分布式锁。设计本文设计的分布式锁的交互模式如下:1.根据业务字段生成transaction_id,线程安全地创建锁资源2.根据transaction_id申请锁3.释放锁,动态创建锁资源使用synchronized关键字时,必须指定一个锁对象。synchronized(obj){}进程中的线程可以根据obj进行同步。obj在这里可以理解为锁对象。线程要想进入synchronized代码块,必须先持有obj对象的锁。这种锁是JAVA中内置的锁,创建它的过程是线程安全的。那么借助DB,如何保证创建锁的过程是线程安全的呢?您可以在数据库中使用UNIQUEKEY功能。一旦出现重复键,由于UNIQUEKEY的唯一性,会抛出异常。在JAVA中是SQLIntegrityConstraintViolationException。createtabledistributed_lock(idBIGINTUNSIGNEDPRIMARYKEYAUTO_INCREMENTCOMMENT'自增主键',transaction_idvarchar(128)NOTNULLDEFAULT''COMMENT'事务id',last_update_timeTIMESTAMPDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPNOTNULLCOMMENT'***更新时间',create_timeTIMESTAMPDEFAULT'0000-00-0000:00:00'NOTNULLCOMMENT'创建时间',UNIQUEKEY`idx_transaction_id`(`transaction_id`))transaction_id为交易Id,例如可以用仓库+条码+销售模式组合成一个transaction_id,表示某仓库某销售模式下的某条码资源。不同的条形码,当然有不同的transaction_ids。如果两个应用创建相同transaction_id的锁资源,则只能创建一个应用成功。如果成功插入一条distributed_lock记录,则表示成功创建了一个锁资源。DB连接池列表设计在写操作频繁的业务系统中,通常会进行数据库的划分,以降低单库写的压力,提高写操作的吞吐量。如果采用分库,业务数据自然会分配到各个库中。在这个水平拆分的多数据库上使用DB分布式锁,可以自定义一个DataSource列表。并暴露一个getConnection(StringtransactionId)方法,根据transactionId找到对应的Connection。实际代码如下:packagedlock;importcom.alibaba.druid.pool.DruidDataSource;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;importjava.io.FileInputStream;importjava.io.IOException;importjava.sql.Connection;importjava。util.ArrayList;importjava.util.List;importjava.util.Properties;@ComponentpublicclassDataSourcePool{privateListdlockDataSources=newArrayList();@PostConstructprivatevoidinitDataSourceList()throwsIOException{Propertiesproperties=newProperties();FileInputStreamfis=newFileInputStream("db.properties");属性。加载(fis);IntegerlockNum=Integer.valueOf(properties.getProperty(“DLOCK_NUM”));for(inti=0;i“DLOCK_USER_”+i);Stringpassword=properties.getProperty(“DLOCK_PASS_”+i);IntegerinitSize=Integer.valueOf(properties.getProperty("DLOCK_INIT_SIZE_"+i));IntegermaxSize=Integer.valueOf(properties.getProperty("DLOCK_MAX_SIZE_"+i));Stringurl=properties.getProperty("DLOCK_URL_"+i);DruidDataSourcedataSource=createDataSource(user,password,initSize,maxSize,url);dlockDataSources.add(dataSource);}}privateDruidDataSourcecreateDataSource(Stringuser,Stringpassword,IntegerinitSize,IntegermaxSize,Stringurl){DruidDataSourcedataSource=newDruidDataSource();dataSource.setDriverClassName("com.mysql.jdbc.Driver");dataSource.setUsername(用户);dataSource.setPassword(密码);dataSource.setUrl(url);dataSource.setInitialSize(initSize);dataSource.setMaxActive(maxSize);returndataSource;}publicConnectiongetConnection(StringtransactionId)throwsException{if(dlockDataSources.size()0){returnnull;}if(transactionId==null||"".equals(transactionId)){thrownewRuntimeException("transactionId是必须的");}inthascode=transactionId.hashCode();if(hascode0){hascode=-hascode;}returndlockDataSources.get(hascode%dlockDataSources.size()).getConnection();}}首先编写一个initDataSourceList方法,并利用Spring的PostConstruct注解初化一个数据从db.properties中读取的源列表相关的数据库配置。DLOCK_NUM=2DLOCK_USER_0="user1"DLOCK_PASS_0="pass1"DLOCK_INIT_SIZE_0=2DLOCK_MAX_SIZE_0=10DLOCK_URL_0="jdbc:mysql://localhost:3306/test1"DLOCK_USER_1="user1"DLOCK_PASS_1="pass1"DLOCK_INIT_SIZE_1=2DLOCK_1=2DLOCK_INIT_SIZE_1=2DLOCK_1mysql://localhost:3306/test2"DataSource使用阿里的DruidDataSource。然后最重要的一个实现getConnection(StringtransactionId)方法。实现原理很简单,获取transactionId的hashcode,对DataSource的长度取模即可。连接池列表设计好之后,就可以往distributed_lock表中插入数据了。packagedlock;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.sql.*;@ComponentpublicclassDistributedLock{@AutowiredprivateDataSourcePooldataSourcePool;/***根据transactionId创建锁资源*/publicStringcreateLock(StringtransactionId)throwsException{if(transactionId==null){thrownewRuntimeException("transactionId是必须的");}Connectionconnection=null;Statementstatement=null;try{connection=dataSourcePool.getConnection(transactionId);connection.setAutoCommit(false);statement=connection。createStatement();statement.executeUpdate("INSERTINTOdistributed_lock(transaction_id)VALUES('"+transactionId+"')");connection.commit();returntransactionId;}catch(SQLIntegrityConstraintViolationExceptionicv){//说明已经生成了。if(connection!=null){connection.rollback();}returntransactionId;}catch(Exceptione){if(connection!=null){connection.rollback();}throwe;}finally{if(statement!=null){statement.close();}if(connection!=null){connection.close();}}}}根据transactionId锁定线程,然后使用DB的selectforupdate特性锁定线程。当多个线程根据同一个transactionId并发操作selectforupdate时,只有一个线程可以成功,其他线程阻塞,直到selectforupdate成功的线程使用commit操作,阻塞中所有线程的其中一个线程开始工作。我们在上面的DistributedLock类中创建了一个lock方法。publicbooleanlock(StringtransactionId)throwsException{Connectionconnection=null;PreparedStatementpreparedStatement=null;ResultSetresultSet=null;try{connection=dataSourcePool.getConnection(transactionId);preparedStatement=connection.prepareStatement("SELECT*FROMdistributed_lockWHEREtransaction_id=?FORUPDATE1State");准备,transactionId);resultSet=preparedStatement.executeQuery();if(!resultSet.next()){connection.rollback();returnfalse;}returntrue;}catch(Exceptione){if(connection!=null){connection.rollback();}throwe;}finally{if(preparedStatement!=null){preparedStatement.close();}if(resultSet!=null){resultSet.close();}if(connection!=null){connection.close();}}}实现解锁操作当线程执行完任务后,必须手动进行解锁操作,这样之前锁定的线程才能继续工作。在我们上面的实现中,其实就是获取当时selectforupdate成功的线程对应的Connection,执行commit操作。那么如何获得呢?我们可以利用ThreadLocal。首先在DistributedLock类中定义privateThreadLocalthreadLocalConn=newThreadLocal();每次调用lock方法时,将Connection放在ThreadLocal中。我们修改锁方法。publicbooleanlock(StringtransactionId)throwsException{Connectionconnection=null;PreparedStatementpreparedStatement=null;ResultSetresultSet=null;try{connection=dataSourcePool.getConnection(transactionId);threadLocalConn.set(connection);preparedStatement=connection.prepareStatement("SELECT*FROMdistributed_lockidWHERE?transaction);preparedStatement.setString(1,transactionId);resultSet=preparedStatement.executeQuery();if(!resultSet.next()){connection.rollback();threadLocalConn.remove();returnfalse;}returntrue;}catch(Exceptione){if(connection!=null){connection.rollback();threadLocalConn.remove();}抛出;}最后{if(preparedStatement!=null){preparedStatement.close();}if(resultSet!=null){resultSet.close();}if(connection!=null){connection.close();}}}这样,在获取到Connection时,将其设置为ThreadLocal,如果lock方法异常,则将其移除来自ThreadLocal。通过这几步,我们可以实现解锁操作。我们在DistributedLoc中添加一个unlock方法k.publicvoidunlock()throwsException{Connectionconnection=null;try{connection=threadLocalConn.get();if(!connection.isClosed()){connection.commit();connection.close();threadLocalConn.remove();}}catch(Exceptione){if(connection!=null){connection.rollback();connection.close();}threadLocalConn.remove();throwe;}}缺点是毕竟要用DB实现分布式锁,仍然会对DB造成一定的压力。当时考虑使用DB进行分发的一个重要原因是我们的应用是后端应用,通常流量不会很大,但关键是要保证库存数据的正确性。对于前端库存系统等操作,比如添加购物车占用库存,最好不要使用DB实现分布式锁。进一步思考如果要锁定多份数据,如何实现呢?例如,对于库存操作,需要修改物理库存和虚拟库存。当你想锁定实物库存时,你也需要锁定虚拟库存。其实并不难。参考lock方法,写一个multiLock方法,提供多个transactionId入参,for循环处理。这个后续有时间补上。

最新推荐
猜你喜欢