前提 在很早很早之前,写过一篇文章介绍过Redis
中的red lock
的实现,但是在生产环境中,笔者所负责的项目使用的分布式锁组件一直是Redisson
。Redisson
是具备多种内存数据网格特性的基于Java
编写的Redis
客户端框架(Redis Java Client with features of In-Memory Data Grid
),基于Redis
的基本数据类型扩展出很多种实现的高级数据结构,具体见其官方的简介图:
本文要分析的R(ed)Lock
实现,只是其中一个很小的模块,其他高级特性可以按需选用。下面会从基本原理、源码分析和基于Jedis
仿实现等内容进行展开。本文分析的Redisson
源码是2020-01
左右Redisson
项目的main
分支源码,对应版本是3.14.1
。
基本原理 red lock
的基本原理其实就”光明正大地”展示在Redis
官网的首页文档中(具体链接是https://redis.io/topics/distlock
):
摘录一下简介进行翻译:在许多环境中不同进程必须以互斥方式使用共享资源进行操作时,分布式锁是一个非常有用的原语。此试图提供一种更规范的算法来实现Redis的分布式锁。我们提出了一种称为Redlock
的算法,它实现了DLM
(猜测是Distributed Lock Manager
的缩写,分布式锁管理器),我们认为它比普通的单实例方法更安全。
算法的三个核心特征(三大最低保证):
Safety property
(安全性):互斥。确保在任何给定时刻下,只有一个客户端可以持有锁Liveness property A
(活性A
):无死锁。即使存在曾经锁定资源的客户端崩溃或者出现网络分区异常,确保锁总是能够成功获取Liveness property B
(活性B
):容错性。只要大多数Redis
节点处于正常运行状态,客户端就可以获取和释放锁文档中还指出了目前算法对于故障转移的实现还存在明显的竞态条件问题(描述的应该是Redis
主从架构下的问题):
客户端A
获取Redis
主节点中的锁(假设锁定的资源为X
) 在Redis
主节点把KEY
同步到Redis
从节点之前,Redis
主节点崩溃 Redis
从节点因为故障晋升为主节点此时,客户端B
获取资源X
的锁成功,问题是资源X
的锁在前面已经被客户端A
获取过,这样就出现了并发问题 算法的实现很简单,单个Redis
实例下加锁命令如下:
1 SET $resource_name $random_value NX PX $ttl
这里的Nx
和PX
是SET
命令的增强参数,自从Redis
的2.6.12
版本起,SET
命令已经提供了可选的复合操作符:
EX
:设置超时时间,单位是秒PX
:设置超时时间,单位是毫秒NX
:IF NOT EXIST
的缩写,只有KEY
不存在的前提下才会设置K-V
,设置成功返回1
,否则返回0
XX
:IF EXIST
的缩写,只有在KEY
存在的前提下才会设置K-V
,设置成功返回1
,否则返回0
单个Redis
实例下解锁命令如下:
1 2 3 4 5 6 7 # KEYS[1] = $resource_name # ARGV[1] = $random_value if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
使用Redisson中的RLock 使用RLock
要先实例化Redisson
,Redisson
已经适配了Redis
的哨兵、集群、普通主从和单机模式,因为笔者本地只安装了单机Redis
,所以这里使用单机模式配置进行演示。实例化RedissonClient
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 static RedissonClient REDISSON;@BeforeClass public static void beforeClass () throws Exception { Config config = new Config(); config.useSingleServer() .setTimeout(10000 ) .setAddress("redis://127.0.0.1:6379" ); REDISSON = Redisson.create(config); }
加锁和解锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Test public void testLockAndUnLock () throws Exception { String resourceName = "resource:x" ; RLock lock = REDISSON.getLock(resourceName); Thread threadA = new Thread(() -> { try { lock.lock(); process(resourceName); } finally { lock.unlock(); System.out.println(String.format("线程%s释放资源%s的锁" , Thread.currentThread().getName(), resourceName)); } }, "threadA" ); Thread threadB = new Thread(() -> { try { lock.lock(); process(resourceName); } finally { lock.unlock(); System.out.println(String.format("线程%s释放资源%s的锁" , Thread.currentThread().getName(), resourceName)); } }, "threadB" ); threadA.start(); threadB.start(); Thread.sleep(Long.MAX_VALUE); } private void process (String resourceName) { String threadName = Thread.currentThread().getName(); System.out.println(String.format("线程%s获取到资源%s的锁" , threadName, resourceName)); try { Thread.sleep(1000 ); } catch (InterruptedException ignore) { } } 线程threadB获取到资源resource:x的锁 线程threadB释放资源resource:x的锁 线程threadA获取到资源resource:x的锁 线程threadA释放资源resource:x的锁
更多的时候,我们会选用带等待时间周期和锁最大持有时间的API
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Test public void testTryLockAndUnLock () throws Exception { String resourceName = "resource:x" ; int waitTime = 500 ; int leaseTime = 1000 ; Thread threadA = new Thread(() -> { process(resourceName, waitTime, leaseTime); }, "threadA" ); Thread threadB = new Thread(() -> { process(resourceName, waitTime, leaseTime); }, "threadB" ); threadA.start(); threadB.start(); Thread.sleep(Long.MAX_VALUE); } private void process (String resourceName, int waitTime, int leaseTime) { RLock lock = REDISSON.getLock(resourceName); try { String threadName = Thread.currentThread().getName(); boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS); if (tryLock) { try { System.out.println(String.format("线程%s获取到资源%s的锁" , threadName, resourceName)); Thread.sleep(800 ); } finally { lock.unlock(); System.out.println(String.format("线程%s释放资源%s的锁" , Thread.currentThread().getName(), resourceName)); } } else { System.out.println(String.format("线程%s获取资源%s的锁失败,等待时间:%d ms" , threadName, resourceName, waitTime)); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } 线程threadA获取到资源resource:x的锁 线程threadB获取资源resource:x的锁失败,等待时间:500 ms 线程threadA释放资源resource:x的锁
为了使用的时候更加简单,可以参考spring-tx
中的编程式事务那样进行轻度封装:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 @RequiredArgsConstructor private static class RedissonLockProvider { private final RedissonClient redissonClient; public <T> T executeInLock (String resourceName, LockAction lockAction) { RLock lock = redissonClient.getLock(resourceName); try { lock.lock(); lockAction.onAcquire(resourceName); return lockAction.doInLock(resourceName); } finally { lock.unlock(); lockAction.onExit(resourceName); } } public <T> T executeInLock (String resourceName, int waitTime, int leaseTime, LockAction lockAction) throws InterruptedException { RLock lock = redissonClient.getLock(resourceName); boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS); if (tryLock) { try { lockAction.onAcquire(resourceName); return lockAction.doInLock(resourceName); } finally { lock.unlock(); lockAction.onExit(resourceName); } } return null ; } public void executeInLockWithoutResult (String resourceName, int waitTime, int leaseTime, LockActionWithoutResult lockAction) throws InterruptedException { RLock lock = redissonClient.getLock(resourceName); boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS); if (tryLock) { try { lockAction.onAcquire(resourceName); lockAction.doInLock(resourceName); } finally { lock.unlock(); lockAction.onExit(resourceName); } } } public void executeInLockWithoutResult (String resourceName, LockActionWithoutResult lockAction) { RLock lock = redissonClient.getLock(resourceName); try { lock.lock(); lockAction.onAcquire(resourceName); lockAction.doInLock(resourceName); } finally { lock.unlock(); lockAction.onExit(resourceName); } } } @FunctionalInterface interface LockAction { default void onAcquire (String resourceName) { } <T> T doInLock (String resourceName) ; default void onExit (String resourceName) { } } @FunctionalInterface interface LockActionWithoutResult { default void onAcquire (String resourceName) { } void doInLock (String resourceName) ; default void onExit (String resourceName) { } }
使用RedissonLockProvider
(仅供参考):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @Test public void testRedissonLockProvider () throws Exception { RedissonLockProvider provider = new RedissonLockProvider(REDISSON); String resourceName = "resource:x" ; Thread threadA = new Thread(() -> { provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() { @Override public void onAcquire (String resourceName) { System.out.println(String.format("线程%s获取到资源%s的锁" , Thread.currentThread().getName(), resourceName)); } @Override public void doInLock (String resourceName) { try { Thread.sleep(800 ); } catch (InterruptedException ignore) { } } @Override public void onExit (String resourceName) { System.out.println(String.format("线程%s释放资源%s的锁" , Thread.currentThread().getName(), resourceName)); } }); }, "threadA" ); Thread threadB = new Thread(() -> { provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() { @Override public void onAcquire (String resourceName) { System.out.println(String.format("线程%s获取到资源%s的锁" , Thread.currentThread().getName(), resourceName)); } @Override public void doInLock (String resourceName) { try { Thread.sleep(800 ); } catch (InterruptedException ignore) { } } @Override public void onExit (String resourceName) { System.out.println(String.format("线程%s释放资源%s的锁" , Thread.currentThread().getName(), resourceName)); } }); }, "threadB" ); threadA.start(); threadB.start(); Thread.sleep(Long.MAX_VALUE); } 线程threadA获取到资源resource:x的锁 线程threadA释放资源resource:x的锁 线程threadB获取到资源resource:x的锁 线程threadB释放资源resource:x的锁
Redisson中RLock的实现原理 Redisson
中RLock
的实现是基本参照了Redis
的red lock
算法进行实现,不过在原始的red lock
算法下进行了改良,主要包括下面的特性:
互斥 无死锁 可重入,类似于ReentrantLock
,同一个线程可以重复获取同一个资源的锁(一般使用计数器实现),锁的重入特性一般情况下有利于提高资源的利用率 续期 ,这个是一个比较前卫解决思路,也就是如果一个客户端对资源X
永久锁定,那么并不是直接对KEY
生存周期设置为-1
,而是通过一个守护线程每隔固定周期延长KEY
的过期时间,这样就能实现在守护线程不被杀掉的前提下,避免客户端崩溃导致锁无法释放长期占用资源的问题 锁状态变更订阅,依赖于org.redisson.pubsub.LockPubSub
,用于订阅和通知锁释放事件 不是完全参考red lock
算法的实现,数据类型选用了HASH
,配合Lua
脚本完成多个命令的原子性 续期或者说延长KEY
的过期时间在Redisson
使用watch dog
实现,理解为用于续期的守护线程,底层依赖于Netty
的时间轮HashedWheelTimer
和任务io.netty.util.Timeout
实现,俗称看门狗 ,下面会详细分析。
先看RLock
的类图:
这里有一个疑惑点,RedissonRedLock(RedissonMultiLock的子类)的注释中提到RedLock locking algorithm implementation for multiple locks. It manages all locks as one. 但从直观上看,RedissonLock才是整个锁体系的核心,里面的实现思路也是遵从red lock算法的。
RedissonLock
就是RLock
的直接实现,也是分布式锁实现的核心类,从源码中看到Redisson#getLock()
就是直接实例化RedissonLock
1 2 3 4 5 6 7 8 9 10 11 public class Redisson implements RedissonClient { @Override public RLock getLock (String name) { return new RedissonLock(connectionManager.getCommandExecutor(), name); } }
因此只需要围绕RedissonLock
的源码进行分析即可。RedissonLock
的类继承图如下:
这里需要有几点认知:
RedissonLock
实现了java.util.concurrent.locks.Lock
接口中除了newCondition()
方法外的所有方法,也就是可以基本无缝适配Lock
接口,对于习惯Lock
接口的API
的使用者来说是一个福音RedissonLock
基本所有同步API
都依赖于异步API
的实现,也就是RLock
的实现依赖于RLockAsync
的实现,底层依赖的是Netty
的io.netty.util.concurrent.Promise
,具体见RedissonPromise
,如果用过JUC
中的Future
的开发者应该比较熟悉Future#get()
,这里的做法类似右边的几个父类的简单功能描述如下:RObjectAsync
:所有Redisson
对象的基础接口,提供一些内存测量、对象拷贝、移动等的异步方法RObject
:RObjectAsync
的同步版本RExpirableAsync
:提供对象TTL
相关的异步方法RExpirable
:RExpirableAsync
的同步版本RedissonObject
:直接实现类RObject
接口中的方法RedissonExpirable
:主要是实现了RExpirable
接口中的方法 接着先看RedissonLock
的构造函数和核心属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();protected long internalLockLeaseTime;final String id;final String entryName;protected final LockPubSub pubSub;final CommandAsyncExecutor commandExecutor;public RedissonLock (CommandAsyncExecutor commandExecutor, String name) { super (commandExecutor, name); this .commandExecutor = commandExecutor; this .id = commandExecutor.getConnectionManager().getId(); this .internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this .entryName = id + ":" + name; this .pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); } public static class ExpirationEntry { private final Map<Long, Integer> threadIds = new LinkedHashMap<>(); private volatile Timeout timeout; public ExpirationEntry () { super (); } public void addThreadId (long threadId) { Integer counter = threadIds.get(threadId); if (counter == null ) { counter = 1 ; } else { counter++; } threadIds.put(threadId, counter); } public boolean hasNoThreads () { return threadIds.isEmpty(); } public Long getFirstThreadId () { if (threadIds.isEmpty()) { return null ; } return threadIds.keySet().iterator().next(); } public void removeThreadId (long threadId) { Integer counter = threadIds.get(threadId); if (counter == null ) { return ; } counter--; if (counter == 0 ) { threadIds.remove(threadId); } else { threadIds.put(threadId, counter); } } public void setTimeout (Timeout timeout) { this .timeout = timeout; } public Timeout getTimeout () { return timeout; } }
这里需要关注一下Config
中的lockWatchdogTimeout
参数:
翻译一下大意:lockWatchdogTimeout
参数只有在没有使用leaseTimeout
参数定义的成功获取到锁的场景(简单来说就是不设置时限的加锁)下生效,如果看门狗在下一个lockWatchdogTimeout
周期内不进行续期,那么锁就会过期释放(从源码上看,每三分之一lockWatchdogTimeout
就会执行一次续期任务,每次通过pexpire
把KEY
的存活周期延长lockWatchdogTimeout
),lockWatchdogTimeout
的默认值为30000
,也就是30
秒。
这里先列举一下RedissonLock
中获取名称的方法,以便后面分析这些名称作为K-V
结构的KEY
时候使用:
id
:由配置实例化时候实例化的UUID
实例生成,从源码上分析每个连接方式的Redisson
实例有唯一的UUID
,ConnectionManager
初始化的时候会调用UUID id = UUID.randomUUID()
,笔者认为可以理解为Redisson
实例在某个应用程序进程中的唯一标识,毕竟一般情况下,一个应用程序应该只会应用一种Redisson
的连接方式getEntryName()
:返回的是UUID + : + $KEY
,例如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x
getName()
:返回的是$KEY
,例如resource:x
getChannelName()
:返回的是redisson_lock__channel:{$KEY}
,例如redisson_lock__channel:{resource:x}
getLockName(long threadId)
:返回的是UUID + : + $threadId
,例如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
接着看加锁的方法,核心实现主要是:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException
:lock
方法体系public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException
:tryLock
方法体系先看只包含锁最大持有时间的lock()
方法体系:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 private void lock (long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); if (ttl == null ) { return ; } RFuture<RedissonLockEntry> future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } try { while (true ) { ttl = tryAcquire(leaseTime, unit, threadId); if (ttl == null ) { break ; } if (ttl >= 0 ) { try { future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(future, threadId); } } private Long tryAcquire (long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync (long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1 ) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null ) { return ; } if (ttlRemaining == null ) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; } <T> RFuture<T> tryLockInnerAsync (long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);" , Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
先留意一下属性internalLockLeaseTime
,它在tryLockInnerAsync()
方法内被重新赋值,在leaseTime == -1L
的前提下,它被赋值为lockWatchdogTimeout
,这个细节很重要,决定了后面续期方法(看门狗)的调度频率。另外,leaseTime != -1L
不会进行续期,也就是不会启动看门狗机制。
接着需要仔细分析一下tryLockInnerAsync()
中执行的LUA
脚本,笔者把它提取出来通过注释进行描述:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 if (redis.call('exists' , KEYS[1 ]) == 0 ) then redis.call('hset' , KEYS[1 ], ARGV[2 ], 1 ); redis.call('pexpire' , KEYS[1 ], ARGV[1 ]); return nil ; end ;if (redis.call('hexists' , KEYS[1 ], ARGV[2 ]) == 1 ) then redis.call('hincrby' , KEYS[1 ], ARGV[2 ], 1 ); redis.call('pexpire' , KEYS[1 ], ARGV[1 ]); return nil ; end ;return redis.call('pttl' , KEYS[1 ]);
这里画一个图演示一下这个Lua
脚本中三段代码出现的逻辑:
剩下一个scheduleExpirationRenewal(threadId)
方法还没有分析,里面的逻辑就是看门狗的定期续期逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 private void scheduleExpirationRenewal (long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null ) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); renewExpiration(); } } private void renewExpiration () { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null ) { return ; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run (Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null ) { return ; } Long threadId = ent.getFirstThreadId(); if (threadId == null ) { return ; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null ) { log.error("Can't update lock " + getName() + " expiration" , e); return ; } if (res) { renewExpiration(); } }); } }, internalLockLeaseTime / 3 , TimeUnit.MILLISECONDS); ee.setTimeout(task); } protected RFuture<Boolean> renewExpirationAsync (long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;" , Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
基于源码推断出续期的机制由入参leaseTime
决定:
当leaseTime == -1
的前提下(一般是lock()
和lockInterruptibly()
这类方法调用),续期任务的调度周期为lockWatchdogTimeout / 3
,锁的最大持有时间(KEY
的过期时间)被刷新为lockWatchdogTimeout
当leaseTime != -1
的前提下(一般是lock(long leaseTime, TimeUnit unit)
和lockInterruptibly(long leaseTime, TimeUnit unit)
这类方法调用指定leaseTime
不为-1
),这种情况下会直接设置锁的过期时间为输入值转换为ms
单位的时间量,不会启动续期机制 提取续期的Lua
脚本如下:
1 2 3 4 5 6 7 8 if (redis.call('hexists' , KEYS[1 ], ARGV[2 ]) == 1 ) then redis.call('pexpire' , KEYS[1 ], ARGV[1 ]); return 1 ; end ;return 0 ;
到此为止,不带waitTime
参数的加锁和续期逻辑基本分析完毕,而带waitTime
参数的tryLock(long waitTime, long leaseTime, TimeUnit unit)
实现其实和只存在leaseTime
参数的lock(long leaseTime, TimeUnit unit, boolean interruptibly)
实现底层调用的方法是一致的,最大的区别是会在尝试获取锁操作之后基于前后的System.currentTimeMillis()
计算出时间差和waitTime
做对比,决定需要阻塞等待还是直接超时获取锁失败返回,处理阻塞等待的逻辑是客户端本身的逻辑,这里就不做详细展开,因为源码实现也不是十分优雅(太多long currentTime = System.currentTimeMillis()
的代码段了)。接着花点功夫分析一下解锁的实现,包括一般情况下的解锁unlock()
和强制解锁forceUnlockAsync()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 @Override public void unlock () { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } } @Override public RFuture<Void> unlockAsync () { long threadId = Thread.currentThread().getId(); return unlockAsync(threadId); } @Override public RFuture<Void> unlockAsync (long threadId) { RPromise<Void> result = new RedissonPromise<Void>(); RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { if (e != null ) { cancelExpirationRenewal(threadId); result.tryFailure(e); return ; } if (opStatus == null ) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return ; } cancelExpirationRenewal(threadId); result.trySuccess(null ); }); return result; } protected RFuture<Boolean> unlockInnerAsync (long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;" , Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); } void cancelExpirationRenewal (Long threadId) { ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (task == null ) { return ; } if (threadId != null ) { task.removeThreadId(threadId); } if (threadId == null || task.hasNoThreads()) { Timeout timeout = task.getTimeout(); if (timeout != null ) { timeout.cancel(); } EXPIRATION_RENEWAL_MAP.remove(getEntryName()); } } @Override public boolean forceUnlock () { return get(forceUnlockAsync()); } @Override public RFuture<Boolean> forceUnlockAsync () { cancelExpirationRenewal(null ); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('del', KEYS[1]) == 1) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end" , Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE); }
这里列出一般情况下解锁和强制解锁的Lua
脚本,分析如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 if (redis.call('hexists' , KEYS[1 ], ARGV[3 ]) == 0 ) then return nil ; end ;local counter = redis.call('hincrby' , KEYS[1 ], ARGV[3 ], -1 );if (counter > 0 ) then redis.call('pexpire' , KEYS[1 ], ARGV[2 ]); return 0 ; else redis.call('del' , KEYS[1 ]); redis.call('publish' , KEYS[2 ], ARGV[1 ]); return 1 ; end ;return nil ;if (redis.call('del' , KEYS[1 ]) == 1 ) then redis.call('publish' , KEYS[2 ], ARGV[1 ]); return 1 else return 0 end
其他辅助方法都相对简单,这里弄个简单的”流水账”记录一番:
isLocked()
:基于getName()
调用Redis
的EXISTS $KEY
命令判断是否加锁isHeldByThread(long threadId)
和isHeldByCurrentThread()
:基于getName()
和getLockName(threadId)
调用Redis
的HEXISTS $KEY $LOCK_NAME
命令判断HASH
中对应的field-value
是否存在,存在则说明锁被对应线程ID
的线程持有getHoldCount()
:基于getName()
和getLockName(threadId)
调用Redis
的HGET $KEY $LOCK_NAME
命令,用于获取线程对于某一个锁的持有量(注释叫holds
,其实就是同一个线程对某一个锁的KEY
的续期次数)订阅和发布 部分设计到大量Netty
组件使用相关的源码,这里不详细展开,这部分的逻辑简单附加到后面这个流程图中。最后,通过一个比较详细的图分析一下Redisson
的加锁和解锁流程。
带有waitTime
参数的加锁流程(图右边的流程基本不变,主要是左边的流程每一步都要计算时间间隔):
假设不同进程的两个不同的线程X
和Y
去竞争资源RESOURCE
的锁,那么可能的流程如下:
最后再概括一下Redisson
中实现red lock
算法使用的HASH
数据类型:
KEY
代表的就是资源或者锁,创建、存在性判断,延长生存周期和删除操作总是针对KEY
进行的 FIELD
代表的是锁名称lockName()
,但是其实它由Redisson
连接管理器实例的初始化UUID
拼接客户端线程ID
组成,严格来说应该是获取锁的客户端线程唯一标识VALUE
代表的是客户端线程对于锁的持有量,从源码上看应该是KEY
被续期的次数基于Jedis实现类似Redisson的分布式锁功能 前面的章节已经比较详细分析了Redisson
中分布式锁的实现原理,这里使用Jedis
和多线程技巧做一个类似的实现。为了简单起见,这里只实现一个无入参的lock()
方法(类似于Redisson
中leaseTime == -1
的场景)和unlock()
方法。定义接口RedLock
:
1 2 3 4 5 6 public interface RedLock { void lock (String resource) throws InterruptedException ; void unlock (String resource) ; }
为了简单起见,笔者把所有实现逻辑都写在实现类RedisRedLock
中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 @RequiredArgsConstructor public class RedisRedLock implements RedLock { private final JedisPool jedisPool; private final String uuid; private static final String WATCH_DOG_TIMEOUT_STRING = "30000" ; private static final long WATCH_DOG_TASK_DURATION = 10000L ; private static final String CHANNEL_PREFIX = "__red__lock:" ; private static final String UNLOCK_STATUS_STRING = "0" ; private static final String LOCK_LUA = "if (redis.call('exists', KEYS[1]) == 0) then\n" + " redis.call('hset', KEYS[1], ARGV[2], 1);\n" + " redis.call('pexpire', KEYS[1], ARGV[1]);\n" + " return nil;\n" + "end;\n" + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then\n" + " redis.call('hincrby', KEYS[1], ARGV[2], 1);\n" + " redis.call('pexpire', KEYS[1], ARGV[1]);\n" + " return nil;\n" + "end;\n" + "return redis.call('pttl', KEYS[1]);" ; private static final String UNLOCK_LUA = "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then\n" + " return nil;\n" + "end;\n" + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);\n" + "if (counter > 0) then\n" + " redis.call('pexpire', KEYS[1], ARGV[2]);\n" + " return 0;\n" + "else\n" + " redis.call('del', KEYS[1]);\n" + " redis.call('publish', KEYS[2], ARGV[1]);\n" + " return 1;\n" + "end;" ; private static final String RENEW_LUA = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;" ; private static final ExecutorService SUB_PUB_POOL = Executors.newCachedThreadPool(); private static final ScheduledExecutorService WATCH_DOG_POOL = new ScheduledThreadPoolExecutor( Runtime.getRuntime().availableProcessors() * 2 ); private static class ThreadEntry { private final ConcurrentMap<Long, Integer> threadCounter = Maps.newConcurrentMap(); private volatile WatchDogTask watchDogTask; public synchronized void addThreadId (long threadId) { Integer counter = threadCounter.get(threadId); if (counter == null ) { counter = 1 ; } else { counter++; } threadCounter.put(threadId, counter); } public synchronized boolean hasNoThreads () { return threadCounter.isEmpty(); } public synchronized Long getFirstThreadId () { if (threadCounter.isEmpty()) { return null ; } return threadCounter.keySet().iterator().next(); } public synchronized void removeThreadId (long threadId) { Integer counter = threadCounter.get(threadId); if (counter == null ) { return ; } counter--; if (counter == 0 ) { threadCounter.remove(threadId); } else { threadCounter.put(threadId, counter); } } public void setWatchDogTask (WatchDogTask watchDogTask) { this .watchDogTask = watchDogTask; } public WatchDogTask getWatchDogTask () { return watchDogTask; } } @Getter private static class SubPubEntry { private final String key; private final Semaphore latch; private final SubscribeListener subscribeListener; public SubPubEntry (String key) { this .key = key; this .latch = new Semaphore(0 ); this .subscribeListener = new SubscribeListener(key, latch); } } private static final ConcurrentMap<String, ThreadEntry> THREAD_ENTRY_MAP = Maps.newConcurrentMap(); @Override public void lock (String resource) throws InterruptedException { long threadId = Thread.currentThread().getId(); String lockName = uuid + ":" + threadId; String entryName = uuid + ":" + resource; Long ttl = acquire(resource, lockName, threadId, entryName); if (Objects.isNull(ttl)) { return ; } SubPubEntry subPubEntry = subscribeAsync(resource); try { for (; ; ) { ttl = acquire(resource, lockName, threadId, entryName); if (Objects.isNull(ttl)) { return ; } if (ttl > 0L ) { subPubEntry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } } finally { unsubscribeSync(subPubEntry); } } private Long acquire (String key, String lockName, long threadId, String entryName) { Object result = execute0(jedis -> jedis.eval(LOCK_LUA, Lists.newArrayList(key), Lists.newArrayList(WATCH_DOG_TIMEOUT_STRING, lockName))); if (Objects.nonNull(result)) { return Long.parseLong(String.valueOf(result)); } ThreadEntry entry = new ThreadEntry(); ThreadEntry oldEntry = THREAD_ENTRY_MAP.putIfAbsent(entryName, entry); if (oldEntry != null ) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); Runnable renewAction = () -> executeWithoutResult(jedis -> jedis.eval(RENEW_LUA, Lists.newArrayList(key), Lists.newArrayList(WATCH_DOG_TIMEOUT_STRING, lockName))); WatchDogTask watchDogTask = new WatchDogTask(new AtomicReference<>(renewAction)); entry.setWatchDogTask(watchDogTask); WATCH_DOG_POOL.scheduleWithFixedDelay(watchDogTask, 0 , WATCH_DOG_TASK_DURATION, TimeUnit.MILLISECONDS); } return null ; } private SubPubEntry subscribeAsync (String key) { SubPubEntry subPubEntry = new SubPubEntry(key); SUB_PUB_POOL.submit(() -> { SubscribeListener subscribeListener = subPubEntry.getSubscribeListener(); executeWithoutResult(jedis -> jedis.subscribe(subscribeListener, subscribeListener.getChannelName())); return null ; }); return subPubEntry; } private void unsubscribeSync (SubPubEntry subPubEntry) { SubscribeListener subscribeListener = subPubEntry.getSubscribeListener(); subscribeListener.unsubscribe(subscribeListener.getChannelName()); } @Override public void unlock (String resource) { long threadId = Thread.currentThread().getId(); String entryName = uuid + ":" + resource; String lockName = uuid + ":" + threadId; String channelName = CHANNEL_PREFIX + resource; Object result = execute0(jedis -> jedis.eval(UNLOCK_LUA, Lists.newArrayList(resource, channelName), Lists.newArrayList(UNLOCK_STATUS_STRING, WATCH_DOG_TIMEOUT_STRING, lockName))); ThreadEntry threadEntry = THREAD_ENTRY_MAP.get(entryName); if (Objects.nonNull(threadEntry)) { threadEntry.removeThreadId(threadId); if (threadEntry.hasNoThreads() && Objects.nonNull(threadEntry.getWatchDogTask())) { threadEntry.getWatchDogTask().cancel(); } } if (Objects.isNull(result)) { throw new IllegalMonitorStateException(); } } private static class SubscribeListener extends JedisPubSub { @Getter private final String key; @Getter private final String channelName; @Getter private final Semaphore latch; public SubscribeListener (String key, Semaphore latch) { this .key = key; this .channelName = CHANNEL_PREFIX + key; this .latch = latch; } @Override public void onMessage (String channel, String message) { if (Objects.equals(channelName, channel) && Objects.equals(UNLOCK_STATUS_STRING, message)) { latch.release(); } } } @RequiredArgsConstructor private static class WatchDogTask implements Runnable { private final AtomicBoolean running = new AtomicBoolean(true ); private final AtomicReference<Runnable> actionReference; @Override public void run () { if (running.get() && Objects.nonNull(actionReference.get())) { actionReference.get().run(); } else { throw new WatchDogTaskStopException("watch dog cancel" ); } } public void cancel () { actionReference.set(null ); running.set(false ); } } private <T> T execute0 (Function<Jedis, T> function) { try (Jedis jedis = jedisPool.getResource()) { return function.apply(jedis); } } interface Action { void apply (Jedis jedis) ; } private void executeWithoutResult (Action action) { try (Jedis jedis = jedisPool.getResource()) { action.apply(jedis); } } private static class WatchDogTaskStopException extends RuntimeException { @Override public synchronized Throwable fillInStackTrace () { return this ; } } public static void main (String[] args) throws Exception { String resourceName = "resource:x" ; RedLock redLock = new RedisRedLock(new JedisPool(new GenericObjectPoolConfig()), UUID.randomUUID().toString()); Thread threadA = new Thread(() -> { try { redLock.lock(resourceName); process(resourceName); } catch (InterruptedException e) { e.printStackTrace(); } finally { redLock.unlock(resourceName); System.out.println(String.format("线程%s释放资源%s的锁" , Thread.currentThread().getName(), resourceName)); } }, "threadA" ); Thread threadB = new Thread(() -> { try { redLock.lock(resourceName); process(resourceName); } catch (InterruptedException e) { e.printStackTrace(); } finally { redLock.unlock(resourceName); System.out.println(String.format("线程%s释放资源%s的锁" , Thread.currentThread().getName(), resourceName)); } }, "threadB" ); threadA.start(); threadB.start(); Thread.sleep(Long.MAX_VALUE); } private static void process (String resourceName) { String threadName = Thread.currentThread().getName(); System.out.println(String.format("线程%s获取到资源%s的锁" , threadName, resourceName)); try { Thread.sleep(1000 ); } catch (InterruptedException ignore) { } } }
上面的实现短时间内编写完,没有做详细的DEBUG
,可能会有纰漏。某次执行结果如下:
1 2 3 4 线程threadB获取到资源resource:x的锁 线程threadB释放资源resource:x的锁 线程threadA获取到资源resource:x的锁 线程threadA释放资源resource:x的锁
小结 Redisson
中的red lock
实现,应用到下面的核心技术:
合理应用Redis
的基本数据类型HASH
Redis
的订阅发布Lua
脚本的原子性Netty
中的Promise
实现Netty
中的时间轮HashedWheelTimer
和对应的定时任务(HashedWheel)Timeout
Semaphore
进行带期限、永久或者可中断的阻塞以及唤醒,替代CountDownLatch
中的无等待期限阻塞上面的核心技术相对合理地应用,才能实现一个高效而且容错能力相对比较高的分布式锁方案,但是从目前来看,Redisson
仍未解决red lock
算法中的故障转移缺陷,笔者认为这个有可能是Redis
实现分布式锁方案的一个底层缺陷,此方案在Redis
单实例中是相对完善 ,一旦应用在Redis
集群(普通主从、哨兵或者Cluster
),有几率会出现前文提到的节点角色切换导致多个不同客户端获取到同一个资源对应的锁的问题。暂时无解。
参考资料:
画图用的是ProcessOn
:https://www.processon.com/view/link/5ffc540de0b34d2060d2d715
(c-2-w e-a-20210110 2021年的第一篇文章,希望这一年不要这么鸽,这个系列的下一篇是《冷饭新炒:理解JDK中UUID的底层实现》)