Redis分布式锁架构设计
原理&实现
分布式锁的一个很重要的特性就是互斥性,同一时间内多个调用方加锁竞争,只能有一个调用方加锁成功。而redis是基于单线程模型的,可以利用这个特性让调用方的请求排队,对于并发请求,只会有一个请求能获取到锁。
核心API
redis实现分布式锁也很简单,基于客户端的几个API就可以完成,主要涉及三个核心API:
实现分布式锁
实现加锁
通过jedis.set进行加锁,如果返回值是OK,代表加锁成功
如果加锁失败,则自旋不断尝试获取锁,同时在一定时间内如果仍没有获取到锁,则退出自旋,不再尝试获取锁。
requestId:用于标识当前每个线程自己持有的锁标记
代码编写
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| public class SingleRedisLock {
JedisPool jedisPool = new JedisPool("192.168.200.128",6379);
protected long internalLockLeaseTime = 30000;
private long timeout = 999999;
SetParams setParams = SetParams.setParams().nx().px(internalLockLeaseTime);
public boolean tryLock(String lockKey, String requestId){
String threadName = Thread.currentThread().getName();
Jedis jedis = this.jedisPool.getResource();
Long start = System.currentTimeMillis();
try{ for (;;){ String lockResult = jedis.set(lockKey, requestId, setParams); if ("OK".equals(lockResult)){ System.out.println(threadName+": 获取锁成功"); return true; } System.out.println(threadName+": 获取锁失败,等待中"); long l = System.currentTimeMillis() - start; if (l>=timeout) { return false; } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } }finally { jedis.close(); }
} }
|
实现解锁
解锁时,要避免当前线程将别人的锁释放掉。假设线程A加锁成功,当过了一段时间线程A来解锁,但线程A的锁已经过期了,在这个时间节点,线程B也来加锁,因为线程A的锁已经过期,所以线程B时可以加锁成功的。此时,就会出现问题,线程A将线程B的锁给释放了。
对于这个问题,就需要使用到加锁时的requestId。当解锁时要判断当前锁键的value与传入的value是否相同,相同的话,则代表是同一个人,可以解锁。否则不能解锁。
但是对于这个操作,有非常多的人,会先查询做对比,接着相同则删除。虽然思路是对的,但是忽略了一个问题,原子性。判断与删除分成两步执行,则无法保证原子性,一样会出现问题。所以解锁时不仅要保证加锁和解锁是同一个人还要保证解锁的原子性。因此结合lua脚本完成查询&删除操作。
代码编写
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
public boolean releaseLock(String lockKey,String requestId){
String threadName = Thread.currentThread().getName(); System.out.println(threadName+":释放锁"); Jedis jedis = this.jedisPool.getResource();
String lua = "if redis.call('get',KEYS[1]) == ARGV[1] then" + " return redis.call('del',KEYS[1]) " + "else" + " return 0 " + "end";
try { Object result = jedis.eval(lua, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if("1".equals(result.toString())){ return true; } return false; }finally { jedis.close(); }
}
|
编写测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class LoclTest {
public static void main(String[] args) {
for (int i=0;i<5;i++) { Thread thread = new Thread(new LockRunnable()); thread.start(); } }
private static class LockRunnable implements Runnable { @Override public void run() {
SingleRedisLock singleRedisLock = new SingleRedisLock();
String requestId = UUID.randomUUID().toString(); boolean lockResult = singleRedisLock.tryLock("lock", requestId); if (lockResult){
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } }
singleRedisLock.releaseLock("lock",requestId); } } }
|
此时可以发现,多线程会竞争同一把锁,且没有获取获取到锁的线程会自旋不断尝试去获取锁。每当一个线程将锁释放后,则会有另外一个线程持有锁。依次类推。
存在的问题
单节点问题
锁续期问题
当对业务进行加锁时,锁的过期时间,绝对不能想当然的设置一个值。
假设线程A在执行某个业务时加锁成功并设置锁过期时间。但该业务执行时间过长,业务的执行时间超过了锁过期时间,那么在业务还没执行完时,锁就自动释放了。接着后续线程就可以获取到锁,又来执行该业务。就会造成线程A还没执行完,后续线程又来执行,导致同一个业务逻辑被重复执行。因此对于锁的超时时间,需要结合着业务执行时间来判断,让锁的过期时间大于业务执行时间。
上面的方案是一个基础解决方案,但是仍然是有问题的。
业务执行时间的影响因素太多了,无法确定一个准确值,只能是一个估值。无法百分百保证业务执行期间,锁只能被一个线程占有。
如想保证的话,可以在创建锁的同时创建一个守护线程,同时定义一个定时任务每隔一段时间去为未释放的锁增加过期时间。当业务执行完,释放锁后,再关闭守护线程。 这种实现思想可以用来解决锁续期。
服务单点&集群问题
在单点redis虽然可以完成锁操作,可一旦redis服务节点挂掉了,则无法提供锁操作。
在生产环境下,为了保证redis高可用,会采用异步复制方法进行主从部署。当主节点写入数据成功,会异步的将数据复制给从节点,并且当主节点宕机,从节点会被提升为主节点继续工作。假设主节点写入数据成功,在没有将数据复制给从节点时,主节点宕机。则会造成提升为主节点的从节点中是没有锁信息的,其他线程则又可以继续加锁,导致互斥失效。
Redisson实现分布式锁
redisson是redis官网推荐实现分布式锁的一个第三方类库。
其内部完成的功能非常强大,对各种锁都有实现,同时对于使用者来说非常简单,让使用者能够将更多的关注点放在业务逻辑上。此处重点利用Redisson解决单机Redis锁产生的两个问题。
实现分布式锁
基于redisson实现分布式锁很简单,直接基于lock()&unlock()方法操作即可。
添加依赖
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency>
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.13.1</version> </dependency>
|
修改配置文件
1 2 3 4 5 6 7 8 9 10
| server: redis: host: 192.168.200.150 port: 6379 database: 0 jedis: pool: max-active: 500 max-idle: 1000 min-idle: 4
|
修改springboot启动类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Value("${spring.redis.host}") private String host;
@Value("${spring.redis.port}") private String port;
@Bean public RedissonClient redissonClient(){ RedissonClient redissonClient;
Config config = new Config(); String url = "redis://" + host + ":" + port; config.useSingleServer().setAddress(url);
try { redissonClient = Redisson.create(config); return redissonClient; } catch (Exception e) { e.printStackTrace(); return null; } }
|
定义锁工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Component public class RedissonLock {
@Autowired private RedissonClient redissonClient;
public boolean addLock(String lockKey){
try { if (redissonClient == null){ System.out.println("redisson client is null"); return false; }
RLock lock = redissonClient.getLock(lockKey);
lock.lock(10, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()+": 获取到锁");
return true; } catch (Exception e) { e.printStackTrace(); return false; } }
public boolean releaseLock(String lockKey){
try{ if (redissonClient == null){ System.out.println("redisson client is null"); return false; }
RLock lock = redissonClient.getLock(lockKey); lock.unlock(); System.out.println(Thread.currentThread().getName()+": 释放锁"); return true; }catch (Exception e){ e.printStackTrace(); return false; } } }
|
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @SpringBootTest @RunWith(SpringRunner.class) public class RedissonLockTest {
@Autowired private RedissonLock redissonLock;
@Test public void easyLock(){ for (int i=0;i<10;i++) { Thread thread = new Thread(new LockRunnable()); thread.start(); }
try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } }
private class LockRunnable implements Runnable { @Override public void run() { redissonLock.addLock("demo"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } redissonLock.releaseLock("demo"); } } }
|
执行效果
根据执行效果可知,多线程并发获取所时,当一个线程获取到锁,其他线程则获取不到,并且其内部会不断尝试获取锁,当持有锁的线程将锁释放后,其他线程则会继续去竞争锁。
源码分析
lock()源码分析
在上述加锁方法实现中,最核心就是**getLock()和lock()**。get()源码非常简单,根据当前传入的锁名称创建并返回一个RLock对象。
当获取到RLock对象后,调用其内部的lock()执行加锁操作。根据源码描述,当线程获取锁时,如果没有获取到锁,则会让其进入自旋,直到获取到锁。 如果获取到锁,则会一直保留到调用unLock()手动释放或根据传入的leaseTime时间自动释放。
当前传入两个参数值:锁超时时间,时间单位。主要用于避免死锁的出现,假设持有锁的redis节点宕机,到期后锁可以自动释放。
lock()方法中还会调用lock()的另外一个重载方法,需要传入三个参数:过期时间、时间单位、是否中断。
在三个参数的lock()重载方法中,首先会获取当前线程id,接着调用tryAcquire()方法尝试获取锁,如果返回值为null,代表获取到锁。 如果返回值不是null,则根据当前线程id创建异步任务并放入线程池中,接着进入自旋,在自旋过程中,尝试调用tryAcquire()获取锁,如果获取到则退出自旋。否则会不断的尝试获取锁。
在lock()方法中,最核心的是tryAcquire()。其内部核心实现会调用tryAcquireAsync(),并传入过期时间、时间单位和当前线程id,进行锁的获取。如果leaseTime不为-1,代表设置了有效时间,接着调用tryAcquireAsync()去获取锁。如果是-1的话,则默认把永不过期改为30秒过期,并且创建异步任务,如果没有获取到锁,则什么都不做。如果获取到了锁,则调用scheduleExpirationRenewal()对当前线程id的锁进行延时。
最终的tryLockInnerAsync()则是获取锁的具体实现。可以看到,其内部是基于lua脚本语言完成锁获取的。因为获取锁的过程涉及到了多步,为了保证执行过程的原子性,所以使用了lua,最核心的就是要理解这段lua脚本的执行过程。
对于这款lua脚本来说,KEYS[1]代表需要加锁的key,ARGV[1]代表锁的超时时间,ARGV[2]代表锁的唯一标识。对于这段lua脚本,简单来说:
- 检查锁key是否被占用了,如果没有则设置锁key和唯一标识,初始值为1,并且设置锁key的过期时间。
- 如果锁key存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间。
- 返回锁key的失效时间毫秒数。
unLock()源码分析
在释放锁时,unlock()内部会调用unlockAsync()对当前线程持有的锁进行释放。其内部最终会执行unlockInnerAsync()方法完成锁释放并返回结果。
在unlockInnerAsync()中仍然是结合lua脚本完成释放锁操作。
相关参数
- KEYS[1]:当前锁key。
- KEYS[2]:redis消息的ChannelName,每个锁对应唯一的一个 channelName。
- ARGV[1]:redis消息体,用于标记redis的key已经解锁,用于通知其他线程申请锁。
- ARGV[2]:锁超时时间。
- ARGV[3]:锁的唯一标识。
释放流程
- 判断锁key和锁的唯一标识是否匹配,如果不匹配,表示锁已经被占用,那么直接返回。
- 如果是当前线程持有锁,则value值-1,用于重入操作。
- 如果-1后的值大于0,则对锁设置过期时间。
- 如果-1后的值为0,则删除锁key,并发布消息,该锁已被释放。用于通知其他线程申请锁。
锁续期
对于锁续期问题,在单点redis实现分布式锁时已经介绍过了,用于防止业务执行超时或宕机而引起的业务被重复执行。
根据对lock方法的解析,可以发现,当设置完过期时间后,当前锁的过期时间就已经被设定了,不会发生改变,锁到期后则会被自动释放,因此在业务执行中,通过**lock()**方法加锁会造成隐患。
看门狗
所谓的看门狗是redisson用于自动延长锁有效期的实现机制。其本质是一个后台线程,用于不断延长锁key的生存时间。
改造锁示例代码,让锁超时时间为1秒,但是业务执行时,需要耗时3秒,此时执行可以发现,多线程间在上一个锁没有释放的情况下,后续线程又获取到了锁。但是解锁的时候,出现异常,因为加锁时的唯一标识与解锁时的唯一标识发生了改变,造成死锁。
因为业务执行多久无法确定一个准确值,所以在看门狗的实现中,不需要对锁key设置过期时间,当过期时间为-1时,这时会启动一个定时任务,在业务释放锁之前,会一直不停的增加这个锁的有效时间,从而保证在业务执行完毕前,这把锁不会被提前释放掉。
要开启看门狗机制也很简单,只需要将加锁时使用lock()改为tryLock()即可。
并且根据之前lock的源码分析,如果没有设置锁超时,默认过期时间为30秒即watchdog每隔30秒来进行一次续期,该值可以修改。
1
| config.setLockWatchdogTimeout(3000L);
|
进行测试,当加锁后,线程睡眠10秒钟,然后释放锁,可以看到在这段时间内,当前线程会一直持有锁,直到锁释放。在多线程环境下,也是阻塞等待进行锁的获取。
红锁
当在单点redis中实现redis锁时,一旦redis服务器宕机,则无法进行锁操作。因此会考虑将redis配置为主从结构,但在主从结构中,数据复制是异步实现的。假设在主从结构中,master会异步将数据复制到slave中,一旦某个线程持有了锁,在还没有将数据复制到slave时,master宕机。则slave会被提升为master,但被提升为slave的master中并没有之前线程的锁信息,那么其他线程则又可以重新加锁。
redlock算法
redlock是一种基于多节点redis实现分布式锁的算法,可以有效解决redis单点故障的问题。官方建议搭建五台redis服务器对redlock算法进行实现。
红锁流程
在redis官网中,对于redlock算法的实现思想也做了详细的介绍。地址:https://redis.io/topics/distlock。整个实现过程分为五步:
记录获取锁前的当前时间
使用相同的key,value获取所有redis实例中的锁,并且设置获取锁的时间要远远小于锁自动释放的时间。假设锁自动释放时间是10秒,则获取时间应在5-50毫秒之间。通过这种方式避免客户端长时间等待一个已经关闭的实例,如果一个实例不可用了,则尝试获取下一个实例。
客户端通过获取所有实例的锁后的时间减去第一步的时间,得到的差值要小于锁自动释放时间,避免拿到一个已经过期的锁。并且要有超过半数的redis实例成功获取到锁,才算最终获取锁成功。如果不是超过半数,有可能出现多个客户端重复获取到锁,导致锁失效。
当已经获取到锁,那么它的真正失效时间应该为:过期时间-第三步的差值。
如果客户端获取锁失败,则在所有redis实例中释放掉锁。为了保证更高效的获取锁,还可以设置重试策略,在一定时间后重新尝试获取锁,但不能是无休止的,要设置重试次数。
红锁缺点
虽然通过redlock能够更加有效的防止redis单点问题,但是仍然是存在隐患的。
假设redis没有开启持久化,clientA获取锁后,所有redis故障重启,则会导致clientA锁记录消失,clientB仍然能够获取到锁。这种情况虽然发生几率极低,但并不能保证肯定不会发生。
保证的方案就是开始AOF持久化,但是要注意同步的策略,使用每秒同步,如果在一秒内重启,仍然数据丢失。使用always又会造成性能急剧下降。
官方推荐使用默认的AOF策略即每秒同步,且在redis停掉后,要在ttl时间后再重启。 缺点就是ttl时间内redis无法对外提供服务。
红锁实现
redisson对于红锁的实现已经非常完善,通过其内部提供的api既可以完成红锁的操作。
配置红锁
新建配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Configuration public class RedissonRedLockConfig {
public RedissonRedLock initRedissonClient(String lockKey){
Config config1 = new Config(); config1.useSingleServer().setAddress("redis://192.168.200.150:7000").setDatabase(0); RedissonClient redissonClient1 = Redisson.create(config1);
Config config2 = new Config(); config2.useSingleServer().setAddress("redis://192.168.200.150:7001").setDatabase(0); RedissonClient redissonClient2 = Redisson.create(config2);
Config config3 = new Config(); config3.useSingleServer().setAddress("redis://192.168.200.150:7002").setDatabase(0); RedissonClient redissonClient3 = Redisson.create(config3);
Config config4 = new Config(); config4.useSingleServer().setAddress("redis://192.168.200.150:7003").setDatabase(0); RedissonClient redissonClient4 = Redisson.create(config4);
Config config5 = new Config(); config5.useSingleServer().setAddress("redis://192.168.200.150:7004").setDatabase(0); RedissonClient redissonClient5 = Redisson.create(config5);
RLock rLock1 = redissonClient1.getLock(lockKey); RLock rLock2 = redissonClient2.getLock(lockKey); RLock rLock3 = redissonClient3.getLock(lockKey); RLock rLock4 = redissonClient4.getLock(lockKey); RLock rLock5 = redissonClient5.getLock(lockKey);
RedissonRedLock redissonRedLock = new RedissonRedLock(rLock1,rLock2,rLock3,rLock4,rLock5);
return redissonRedLock; } }
|
测试
新建测试类,完成加锁与解锁操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| @SpringBootTest @RunWith(SpringRunner.class) public class RedLockTest {
@Autowired private RedissonRedLockConfig redissonRedLockConfig;
@Test public void easyLock(){ for (int i=0;i<10;i++) { Thread thread = new Thread(new RedLockTest.RedLockRunnable()); thread.start(); }
try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } }
private class RedLockRunnable implements Runnable { @Override public void run() { RedissonRedLock redissonRedLock = redissonRedLockConfig.initRedissonClient("demo");
try { boolean lockResult = redissonRedLock.tryLock(100, 10, TimeUnit.SECONDS);
if (lockResult){ System.out.println("获取锁成功"); TimeUnit.SECONDS.sleep(3); } } catch (InterruptedException e) { e.printStackTrace(); }finally { redissonRedLock.unlock(); System.out.println("释放锁"); } } } }
|
源码分析
redissonRedLock加锁源码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
| public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long newLeaseTime = -1; if (leaseTime != -1) { newLeaseTime = unit.toMillis(waitTime)*2; }
long time = System.currentTimeMillis(); long remainTime = -1; if (waitTime != -1) { remainTime = unit.toMillis(waitTime); } long lockWaitTime = calcLockWaitTime(remainTime);
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<>(locks.size()); for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) { RLock lock = iterator.next(); boolean lockAcquired;
try { if (waitTime == -1 && leaseTime == -1) { lockAcquired = lock.tryLock(); } else { long awaitTime = Math.min(lockWaitTime, remainTime); lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (RedisResponseTimeoutException e) { unlockInner(Arrays.asList(lock)); lockAcquired = false; } catch (Exception e) { lockAcquired = false; }
if (lockAcquired) {
acquiredLocks.add(lock); } else {
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) { break; }
if (failedLocksLimit == 0) { unlockInner(acquiredLocks); if (waitTime == -1 && leaseTime == -1) { return false; } failedLocksLimit = failedLocksLimit(); acquiredLocks.clear(); while (iterator.hasPrevious()) { iterator.previous(); } } else { failedLocksLimit--; } }
if (remainTime != -1) { remainTime -= System.currentTimeMillis() - time; time = System.currentTimeMillis(); if (remainTime <= 0) { unlockInner(acquiredLocks); return false; } } }
if (leaseTime != -1) { List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size()); for (RLock rLock : acquiredLocks) { RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS); futures.add(future); }
for (RFuture<Boolean> rFuture : futures) { rFuture.syncUninterruptibly(); } }
return true; }
|