剖析分布式锁

我们不生产代码,我们是代码的搬运工

前不久,阿里大牛虾总再次抛出了分布式锁的讨论,对照之前项目中实现的redis分布式锁总结一下

天才是1%的灵感,加上99%的汗水;编程是1%的编码,加上99%的在Google/StackOverflow/Github上找代码
残酷的现实是,找来的代码可能深藏bug,而不知

在多核多线程环境中,通过锁机制,在某一个时间点上,只能有一个线程进入临界区代码,从而保证临界区中操作数据的一致性

怎么样才是把好锁?

可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。
这把锁要是一把可重入锁(避免死锁)
支持阻塞和非阻塞:和 ReentrantLock 一样支持 lock 和 trylock 以及 tryLock(long timeOut)
这把锁最好是一把公平锁(根据业务需求考虑要不要这条)
有高可用的获取锁和释放锁功能
获取锁和释放锁的性能要好

分布式锁三要素

  1. 外部存储

    分布式锁是在分布式部署环境中给多个主机提供锁服务,需要另外的存储载体

  2. 全局唯一标识

    在多线程环境中,锁可以使一个对象引用,也可以是变量,都有唯一的标识来区分锁保护的不同资源;
    在分布式环境下,也需要,比如对某一特定用户资源操作,业务+userId即可唯一标识

  3. 至少有两种状态,获取和释放

    锁至少需要两种状态:加锁(lock)和解锁(unlock)。
    用状态区分当前尝试获取的锁是否已经被其他操作占用,
    被占用只有等待锁释放后才能尝试获取锁并加锁,保护共享资源

实现

理论知识知道得再多,还得落地才行;只要遵从三要素,就能打造一把好锁,不要拘泥于某一种工具。

网上有很多实现方式,主要是”外部存储“使用了不同的组件,比如数据库,redis,zk,由于这些组件各自特性的不同,实现复杂度各有不同

这儿主要说下在实际工作中使用到的两种方式,数据库与redis

数据库

数据库,任何系统都需要的组件,常规手法,都是使用version来实现乐观锁

version

比如A、B操作员同时读取一余额为1000元的账户,A操作员为该账户增加100元,B操作员同时为该账户扣除50元,A先提交,B后提交。最后实际账户余额为1000-50=950元,但本该为1000+100-50=1050。这就是典型的并发问题

假设数据库中帐户信息表中有一个version字段,当前值为1;而当前帐户余额字段(balance)为1000元。假设操作员A先更新完,操作员B后更新。
a、操作员A此时将其读出(version=1),并从其帐户余额中增加100(1000+100=1100)。
b、在操作员A操作的过程中,操作员B也读入此用户信息(version=1),并从其帐户余额中扣除50(1000-50=950)。
c、操作员A完成了修改工作,将数据版本号加一(version=2),连同帐户增加后余额(balance=1100),提交至数据库更新,此时由于提交数据版本大于数据库记录当前版本,数据被更新,数据库记录version更新为2。
d、操作员B完成了操作,也将版本号加一(version=2)试图向数据库提交数据(balance=950),但此时比对数据库记录版本时发现,操作员B提交的数据版本号为2,数据库记录当前版本也为2,不满足 “提交版本必须大于记录当前版本才能执行更新 “的乐观锁策略,因此,操作员B的提交被驳回。
这样,就避免了操作员B用基于version=1的旧数据修改的结果覆盖操作员A的操作结果的可能。

1
set balance=1100,version=version+1 where id=#{id} and version=#{version};


version简单,除了对业务数据表有侵入性,还有一些场景是胜任不了

比如,在操作一个数量之前,需要确认一下能不能操作

1
2
3
4
5
6
7
int countLimit = select count from limit where id = ${id};

if(countlimit>0){
set balance=1100,version=version+1 where id=#{id} and version=#{version};
}

update count;

这儿操作了多张表,此时就需要再配合事务,才能保证原子性

redis

由于db性能的限制,而redis性能卓越,很多时候会选择redis实现方式

怎么使用redis正确地实现分布式锁,需要了解两方面

  1. 实现分布式锁时,使用到的redis命令
  2. 网上示例可能都有毒

redis命令

setnx 命令(『SET if Not eXists』(如果不存在,则 SET)的简写):
设置成功,返回 1
设置失败,返回 0
该命令是原子操作

getset 命令:
自动将key对应到value并且返回原来key对应的value。如果key存在但是对应的value不是字符串,就返回错误。
返回值:返回之前的旧值,如果之前Key不存在将返回nil。
该命令是原子操作。

get 命令:
get 获取key的值,如果存在,则返回;如果不存在,则返回nil;

del 命令:
del 删除key及key对应的值,如果key不存在,程序忽略

SET 命令:
set key value [EX seconds] [PX milliseconds] [NX|XX]
将字符串值 value 关联到 key
如果 key 已经持有其他值, SET 就覆写旧值,无视类型。
对于某个原本带有生存时间(TTL)的键来说, 当 SET 命令成功在这个键上执行时, 这个键原有的 TTL 将被清除。

可选参数从 Redis 2.6.12 版本开始,SET 命令的行为可以通过一系列参数来修改:

EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value

PX millisecond:设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value

NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value

XX:只在键已经存在时,才对键进行设置操作。

示例

原来项目中使用分布式锁,整个逻辑:

  1. setnx(lockkey, 当前时间+过期超时时间),如果返回 1,则获取锁成功;如果返回 0 则没有获取到锁,转向 2。
  2. get(lockkey) 获取值 oldExpireTime ,并将这个 value 值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向 3。
  3. 计算 newExpireTime = 当前时间+过期超时时间,然后 getset(lockkey, newExpireTime) 会返回当前 lockkey 的值currentExpireTime。判断 currentExpireTime 与 oldExpireTime 是否相等,如果相等,说明当前 getset 设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。
  4. 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行 delete 释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。

获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private boolean acquireLock(Jedis j,String lock) throws Exception{
int timeOut = timeoutSeconds*1000;
boolean acquired = false;
long start = System.currentTimeMillis();
int times = 0;
do {
String value = String.valueOf(System.currentTimeMillis() + timeOut + 1);
// 第一个得到这个锁
if (j.setnx(lock, value) == 1) {
logger.info("第一次获取全局锁:{} 成功", lock);
acquired = true;
break;
}
// j.expire(lock, timeoutSeconds); 网络抖动,可能失败
String currentValue = j.get(lock);

// 小于时,可能是上次没有清除,自上次超时后没有别的线程操作过
if (currentValue != null && Long.valueOf(currentValue) < System.currentTimeMillis()) {
// 这是同步操作,只会一个成功
String oldValue = j.getSet(lock, value);
// 别的线程没有赋上值,当前成功得到锁
if (oldValue != null && oldValue.equals(currentValue)) {
acquired = true;
logger.info("获取全局锁:{} 成功,尝试了{}次,经过了{}ms",lock,times,System.currentTimeMillis()-start);
break;
}
}
times++;
Thread.sleep(100);
} while (start + timeOut > System.currentTimeMillis());
if(!acquired){
logger.info("获取全局锁:{} 失败,尝试了{}次",lock,times);
}
return acquired;
}

解锁

1
2
3
4
5
6
7
8
9
private void releaseLock(Jedis j,String lock){
String currentValue = j.get(lock);
if(currentValue != null){
if(System.currentTimeMillis() < Long.valueOf(currentValue) ){
j.del(lock);
logger.info("释放锁{}",lock);
}
}
}

示例缺陷

特地从多年前的项目中把这段代码找出来,当年写完,心里还挺美

网上有很多资料也是差不多样的,但事实并不那么完美,甚至是错误的

加锁
  • 使用jedis.setnx()和jedis.expire()组合实现加锁
    1
    2
    3
    4
    5
    Long result = jedis.setnx(lockKey, value); 
    if (result == 1) {
    // 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
    jedis.expire(lockKey, expireTime);
    }

这个问题很明显,setnx与expire不是同一个事务,不俱备原子性;程序崩溃或者网络抖动都会出现死锁问题

  • System.currentTimeMillis()
    这个需要各个client时间必须一致,一旦不一致,就可能加锁失败

  • getSet()
    如果锁为了灵活性,会把timeout作为入参

当锁过期的时候,如果多个客户端同时执行jedis.getSet()方法,那么虽然最终只有一个客户端可以加锁,但是这个客户端的锁的过期时间可能被其他客户端覆盖

解锁
  • jedis.del()直接删除

这种不先判断锁的拥有者而直接解锁的方式,会导致任何客户端都可以随时进行解锁,即使这把锁不是它的

有种错误改进,增加参数传入requestId

1
2
3
4
5
6
7
public static void releaseLock(Jedis jedis, String lockKey, String requestId) { 
// 判断加锁与解锁是不是同一个客户端
if (requestId.equals(jedis.get(lockKey))) {
// 若在此时,这把锁突然不是这个客户端的,则会误解锁
jedis.del(lockKey);
}
}

还是原子性的问题
如代码注释,问题在于如果调用jedis.del()方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。那么是否真的有这种场景?答案是肯定的,比如客户端A加锁,一段时间之后客户端A解锁,在执行jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除了

缺陷总结

心里认为本来很简单的事,代码大概:

1
2
3
4
5
6
7
8
Lock lock = DistributedReentrantLock.newLock("testlock11");//定义testlock11为key的锁,默认可重入锁
if(lock.tryLock()){
    try{
     xxxxxx
    }finally{
      lock.unlock(); //释放testlock11为key的锁,释放需要放在finally里,防止出异常导致锁没有及时释放
    }
  }

为了提高性能,通过redis原子性接口SETNX:

  1. 使用SETNX命令获取锁,若返回0(key已存在,锁已存在)则获取失败,反之获取成功
  2. 为了防止获取锁后程序出现异常,导致其他线程/进程调用SETNX命令总是返回0而进入死锁状态,需要为该key设置一个“合理”的过期时间释放锁
  3. 使用DEL命令将锁数据删除

结果为了弥补setnx()与expire()两个接口的原子性问题,引入了一堆问题,外强中干

缺陷修正

加锁

Redis 2.6.12版本后,增强了set()命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 尝试获取分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}

加锁就一行代码:jedis.set(String key, String value, String nxxx, String expx, int time),
这个set()方法一共有五个入参:

  1. 第一个为key,我们使用key来当锁,因为key是唯一的
  2. 第二个为value,我们传的是requestId,通过给value赋值为requestId,就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成
  3. 第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
  4. 第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定
  5. 第五个为time,与第四个参数相呼应,代表key的过期时间

高可用:

  1. set()加入了NX参数,可以保证如果已有key存在,则不会调用成功,也就是只有一个客户端能持有锁,满足互斥性
  2. 由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁
  3. 将value赋值为requestId,代表加锁的客户端请求标识,那么在解锁的时候就可以进行校验是否是同一个客户端,防止锁交叉
解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 释放分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}

首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)

使用eval()配置lua保证原子性

在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令

有效时间

为什么需要一个有效时间呢?主要就是防止死锁

疑难

  • 执行业务代码操作共享资源的时间大于设置锁的过期时间?

客户端需要设置接口访问超时,接口超时时间需要远远小于锁超时时间,比如锁自动释放的时间是10s,那么接口超时大概设置5-50ms

【虽然能解决问题,但时间设置成了难点,微服务中多少接口,而且接口的timeout都是可配置的,不能每次调整接口timeout时,还是考虑一下锁的timeout】

  • GC的STW

客户端1获得了锁,正准备处理共享资源的时候,发生了Full GC直到锁过期。这样,客户端2又获得了锁,开始处理共享资源。在客户端2处理的时候,客户端1 Full GC完成,也开始处理共享资源,这样就出现了2个客户端都在处理共享资源的情况

续命丸

引入锁续约机制,也就是获取锁之后,释放锁之前,会定时进行锁续约,比如以3min间隔周期进行锁续约

这样如果应用重启了,最多3min等待时间,不会因为时间太长导致的死锁问题,也不会因为时间太短导致被其他线程抢占的问题,也就是锁分布式锁不需要设置过期时间,过期时间对于这个锁来说是滑动的

Redission

虾总给了总结性阐述:

首先启动Daemon线程,一直循环检测所有的分布式key,异步递延分布锁的过期时间,只要在处理业务逻辑,就递延分布锁过期时间3min。
每次添加分布式锁key,同时会生成一个uuid token,定义一个ConcurrentHashMap构造一个全局map维护所有的分布式key,上面Daemon线程会遍历这个map,每次解锁需要比对这个token,token一致才能解锁。
这样以来如果应用重启了,最多会有3min等待时间,不会导致时间太长导致的死锁问题,也不会因为时间太短导致的被其他线程抢占的问题,也就是锁分布式锁不需要设置过期时间,过期时间对于这个锁来说是滑动的

跟随虾总思路,找到了一个开源组件:Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。

相对于平时使用的jedis,redission进行比较高的抽象

redission中的lock主要是RLock接口,继承的juc的Lock接口

1
public interface RLock extends Lock, RExpirable, RLockAsync


Lock

先看lock(),有两种形式,一个不带leaseTime,一个带leaseTime

1
2
public void lock() ;
public void lock(long leaseTime, TimeUnit unit) ;

边看源码,边解释

两个方法共用了lockInterruptibly()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}

  1. 尝试获取锁tryAcquire
  2. 获取失败,订阅此channel的消息(订阅的意义,在解锁时就会发现)
  3. 进入循环,不停的尝试获取锁,其中使用了JUC的Semaphore
  4. 一旦获取成功,则跳出循环
  5. 取消订阅

尝试获取锁tryAcquire里面会用到两个核心方法tryAcquireAsync(),tryLockInnerAsync()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}

Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}

  • 1.根据锁的持续时间不同,处理也不同
  • 2.没有设置持续时间,那就是阻塞型,一直等待
    • 2.1.为了防止业务方法执行时间超过锁timeout,则定时续约scheduleExpirationRenewal()
  • 3.设置了持续时间,则不需要进行续约
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {

RFuture<Boolean> future = renewExpirationAsync(threadId);

future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}

if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}

}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
task.cancel();
}
}

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
  1. 以internalLockLeaseTime/3间隔时间,定时续约
  2. 如果当前client自身有并发时,通过putIfAbsent保证只有一个task
  3. 续约:当lock存在时,使用pexpire设置过期时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);

return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

protected String getLockName(long threadId) { 
return id + ":" + threadId;
}
  • 1.lockname不存在
    • 1.1.hset(lockname,uuid+threadid,1),value=uuid+threadid,有uuid可以区分各个client,有threadid区分各个线程,这样锁就具备了可重入性
    • 1.2.pexpire设置过期时间,防止client挂掉,造成死锁
      • 2.lockname存在
    • 2.1.hexists(lockname,uuid+threadid),这样保证了是同一个锁在同一个client
    • 2.2.hincrby 再次进锁,计数器+1
    • 2.3.pexpire 再次设置超时
      • 3.lockname存在,并且不在同一client
    • 3.1.pttl 返回剩余有效时长

unLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);

future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause());
return;
}

Boolean opStatus = future.getNow();
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
if (opStatus) {
cancelExpirationRenewal(null);
}
result.trySuccess(null);
}
});

return result;
}

void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = expirationRenewalMap.get(getEntryName());
if (task != null && (threadId == null || task.getThreadId() == threadId)) {
expirationRenewalMap.remove(getEntryName());
task.getTimeout().cancel();
}
}
  1. 从方法名看,虽然对外好像是直接解锁,但内部是异步执行的
  2. unlockInnerAsync()进行解锁
  3. 从expirationRenewalMap移除,并把task.cancel()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
    "if (redis.call('exists', KEYS[1]) == 0) then " +
    "redis.call('publish', KEYS[2], ARGV[1]); " +
    "return 1; " +
    "end;" +
    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
    "return nil;" +
    "end; " +
    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
    "if (counter > 0) then " +
    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
    "return 0; " +
    "else " +
    "redis.call('del', KEYS[1]); " +
    "redis.call('publish', KEYS[2], ARGV[1]); " +
    "return 1; "+
    "end; " +
    "return nil;",
    Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

    }
  4. lockname不存在,说明已经解锁,publish channelname unlockmessage;return 1

  5. lockname存在,但对于uuid+id不存在,说明不是加锁的client,return nil
  6. lockname存在,并且是当前加锁client
  7. 对lockname uuid+id进行-1,如果counter>0则走5,如果=0 则走6
  8. counter>0 说明锁重入了,计数器-1,并expire
  9. counter=0 说明最终解锁,直接del key,并publish channelname unlockmessage;return 1

redission缺陷

使用cluster时

一个场景:A在向主机1请求到锁成功后,主机1宕机了。现在从机1a变成了主机。但是数据没有同步,从机1a是没有A的锁的。那么B又可以获得一个锁。这样就会造成数据错误。

redlock主要思想就是做数据冗余。建立5台独立的集群,当我们发送一个数据的时候,要保证3台(n/2+1)以上的机器接受成功才算成功,否则重试或报错

redlock实现会更复杂,但从他的算法上看,有zk选举的味道。对于更高可用分布锁,可以借助zk本身特性去实现

总结

对于锁,主要考虑性能与安全,即要保持锁的活跃性,又得保证锁的安全性

分布式锁,除了以上两点,还要考虑实现时的三要素

对于redission,对于锁部分的源码,还有很多的内容,很多的细节需要挖掘,此篇就不写了,太长。

后面再结合JUC,写篇更详细的源码分析

参考资料

Redis分布式锁的正确实现方式

redission

公众号:码农戏码
欢迎关注微信公众号『码农戏码』