当前位置: 首页 > 科技观察

基于SpringBoot和数据库表记录的方式实现分布式锁

时间:2023-03-17 16:23:45 科技观察

同进程不同线程操作共享资源时,我们只需要对资源进行加锁,比如使用JUC下的工具,保证操作的正确性.对于不熟悉JUC的同学,可以阅读以下文章:浅谈Synchronized同步优化的JUC基石——不安全类但是,我们的系统为了高可用,总是存在多个副本,分布在不同的机器上。同进程中的锁机制不再起作用。为了保证多副本系统对共享资源的访问,我们引入了分布式锁。分布式锁的主要实现方式有:基于数据库,又细分为基于数据库的表记录、悲观锁、乐观锁。基于缓存的,比如Redis,它是基于Zookeeper的。今天来演示一下最简单的分布式锁。解决方案——基于数据库表记录的分布式锁的主要原理是利用数据库的唯一索引(对数据库索引不了解的同学可以参考我的另一篇文章Mysql索引浅谈)比如有下面一张表:CREATETABLE`test`.`Untitled`(`id`int(11)NOTNULLAUTO_INCREMENTCOMMENT'自增数',`name`varchar(255)NOTNULLCOMMENT'锁名',`survival_time`int(11)NOTNULLCOMMENT'生存时间,单位ms',`create_time`timestamp(3)NOTNULLDEFAULTCURRENT_TIMESTAMP(3)COMMENT'创建时间',`thread_name`varchar(255)NOTNULLCOMMENT'线程名',PRIMARYKEY(`id`)USINGBTREE,UNIQUEINDEX`uk_name`(`name`)USINGBTREE)ENGINE=InnoDBROW_FORMAT=Dynamic;name字段添加了唯一索引,如果有多个new操作有相同的name值,数据库只能保证只有一个操作成功,其他操作将被拒绝,出现“Duplicatekey”错误。然后,当系统1即将获得分布式锁时,它尝试向数据库中插入一条name="key"的记录。如果插入成功,则表示获取锁成功。如果其他系统想要获取分布式锁,也需要向数据库中插入同名记录。当然数据库会报错,插入失败就意味着这些系统获取锁失败。当系统1要释放锁时,删除这条记录即可。thread_name列可以用来保证只有自己创建的锁才能主动释放。我们希望实现的分布式锁有如下效果:获取锁是阻塞的,获取不到就一直阻塞。锁会失效,超过锁的生命周期后会自动释放。这样可以避免一些系统因为宕机而不能主动释放锁的问题。大致流程图如下:使用到的依赖如下:SpringBootMyBatis-plusLombok项目的工程目录为:pom文件中使用到的依赖:org.springframework.bootspring-boot-starter-weborg.projectlomboklombok<版本>1.18.6mysqlmysql-connector-javacom.baomidoumybatis-plus-boot-starter3.3.1com.baomidoumybatis-plus-extension3.3.1org.springframework.bootspring-boot-starter-testtest配置项为:server:port:9091spring:datasource:driver-class-name:com.mysql.cj.jdbc.Driverurl:jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai用户名:root密码:a123logging:level:root:info用于映射数据库字符段的实体类为:packagecom.yang.lock1.entity;importcom.baomidou.mybatisplus.annotation.IdType;importcom.baomidou.mybatisplus.annotation.TableField;importcom.baomidou.mybatisplus.annotation.TableId;importcom.baomidou.mybatisplus.annotation.TableName;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importjava.util.Date;/***@authorqcy*@create2020/08/2515:03:47*/@Data@NoArgsConstructor@TableName(value="t_lock")publicclassLock{/***自增序号*/@TableId(value="id",type=IdType.AUTO)privateIntegerid;/***锁名*/privateStringname;/***生存时间,单位ms*/privateintsurvivalTime;/***锁创建时间*/privateDatecreateTime;/***线程名*/privateStringThreadName;}Dao层:packagecom.yang.lock1.dao;importcom.baomidou.mybatisplus.core.mapper.BaseMapper;importcom.yang.lock1.entity.Lock;importorg.apache.ibatis.annotations.Mapper;/***@authorqcy*@create2020/08/2515:06:24*/@MapperpublicinterfaceLockDaoextendsBaseMapper{}服务接口层:packagecom.yang.lock1.service;importcom.baomidou.mybatisplus.extension.service.IService;importcom.yang.lock1.entity.Lock;/***@authorqcy*@create2020/08/2515:07:44*/publicinterfaceLockServiceextendsIService{/***阻塞获取分布式锁**@paramname锁名*@paramsurvivalTimes生存时间*/voidlock(Stringname,intsurvivalTime);/***释放锁**@paramname锁名*/publicvoidunLock(Stringname);}服务实现层:packagecom.yang.lock1.service.impl;importcom.baomidou.mybatisplus.extension.service.impl。ServiceImpl;importcom.yang.lock1.dao.LockDao;importcom.yang.lock1.entity.Lock;导入portcom.yang.lock1.service.LockService;importlombok.extern.slf4j.Slf4j;importorg.springframework.dao.DuplicateKeyException;importorg.springframework.stereotype.Service;importjava.util.Date;/***@authorqcy*@create2020/08/2515:08:25*/@Slf4j@ServicepublicclassLockServiceImplextendsServiceImplimplementsLockService{@Overridepublicvoidlock(Stringname,intsurvivalTime){StringthreadName="system1-"+Thread.currentThread().getName();while(true){Locklock=this.lambdaQuery().eq(Lock::getName,name).one();if(lock==null){//描述没有锁Locklk=newLock();lk.setName(name);路克。setSurvivalTime(survivalTime);lk.setThreadName(threadName);try{save(lk);log.info(threadName+"获取锁成功");return;}catch(DuplicateKeyExceptione){//继续重试log.info(threadName+"获取锁失败");continue;}}//此时有锁,判断锁是否过期Datenow=newDate();DateexpireDate=newDate(lock.getCreateTime().getTime()+lock.getSurvivalTime());if(expireDate.before(now)){//锁已过期booleanresult=removeById(lock.getId());if(result){log.info(threadName+"deletedexpiredlock");}//尝试获取锁Locklk=newLock();lk.setName(name);lk.setSurvivalTime(survivalTime);lk.setThreadName(threadName);try{save(lk);log.info(threadName+"获取锁成功");return;}catch(DuplicateKeyExceptione){log.info(threadName+"获取锁失败");}}}}@OverridepublicvoidunLock(Stringname){//释放锁时需要注意只释放自己创建的锁StringthreadName="system1-"+Thread.currentThread().getName();Locklock=lambdaQuery().eq(Lock::getName,name).eq(Lock::getThreadName,threadName).one();if(lock!=null){booleanb=removeById(lock.getId());if(b){log.info(threadName+"已释放锁");}else{log.info(threadName+"准备释放锁,但锁已过期,被其他客户端强行释放");}}else{log.info(threadName+"准备释放锁锁,但是锁过期了被其他客户端强行释放了");}}}测试类如下:packagecom.yang.lock1;importcom.yang.lock1.service.LockService;importlombok.extern.slf4j.Slf4j;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;importjavax.annotation.Resource;/***@authorqcy*@create2020/08/2515:10:54*/@Slf4j@RunWith(SpringRunner.class)@SpringBootTestpublicclassLock1ApplicationTest{@ResourceLockServicelockService;@TestpublicvoidtestLock(){log.info("system1准备获取锁");lockService.lock("key",6*1000);try{//模拟业务耗时Thread.sleep(4*1000));}catch(Exceptione){e.printStackTrace();}finally{lockService.unLock("key");}}}复制代码并将system1更改为system2现在,同时启动两个系统:system1如下:system2的输出如下:23.037秒,system1尝试获取锁,23.650秒成功,持有分布式锁。第26秒,system2尝试获取锁,被阻塞。在27.701秒时,system1释放了锁,system2在27.749秒时获取了锁,并在31秒时释放了锁。现在我们将system1的业务时长改为10秒,就可以模拟system2释放system1的超时锁的场景。先启动system1,再启动system2。此时system1的输出如下:system2的输出如下:14秒时,system1获取到锁,然后因为业务耗时突然超出预期,需要运行10秒。在此期间,system1创建的锁超过了它的生命周期。此时system2在19秒时删除了过期锁,然后获取了锁。24秒时,system1回头发现自己的锁已经被释放,终于system2正常释放了自己的锁。基于数据库的分布式锁,还有悲观锁和乐观锁,我再开一页。