精灵王


  • 首页

  • 文章归档

  • 所有分类

  • 关于我

  • 搜索
设计模式-行为型 设计模式-创建型 设计模式-结构型 设计 系统设计 设计模式之美 分布式 Redis 并发编程 个人成长 周志明的软件架构课 架构 单元测试 LeetCode 工具 位运算 读书笔记 操作系统 MySQL 异步编程 技术方案设计 集合 设计模式 三亚 游玩 转载 Linux 观察者模式 事件 Spring SpringCloud 实战 实战,SpringCloud 源码分析 线程池 同步 锁 线程 线程模型 动态代理 字节码 类加载 垃圾收集器 垃圾回收算法 对象创建 虚拟机内存 内存结构 Java

用Redis实现一个相对可靠的分布式锁

发表于 2021-06-24 | 分类于 架构设计 | 0

将 Redis 作为分布式锁的实现,本质上就是让我们的目标线程到Redis里面去占据一个“萝卜坑”,萝卜就是目标线程要对共享资源访问变更的整个操作动作,当别的线程也想来占坑时,发现已经有萝卜在坑里面了,就需要放弃或者等待坑空余出来。

使用 SETNX 命令

只在键 key 不存在的情况下, 将键 key 的值设置为 value 。

若键 key 已经存在, 则 SETNX 命令不做任何动作。

// 加锁命令:
SETNX  key  value
do something
// 解锁命令
del key

其中key是萝卜坑的唯一标识,根据业务需求组合而来,value值随意。

一个完整的正常业务逻辑就是,先通过SETNX 获得锁,然后做业务处理,最后释放锁。

上面这种实现方式看着非常简单,实现起来也很简单,但是存在很多的问题,比如:

  1. 持有锁的线程因为某些异常(进程退出,网络异常等),导致没有成功的执行解锁命令(del key),会导致其他线程永远也拿不到锁
  2. 当前线程持有锁的情况下,其他线程可以调用del命令错误的删除锁,当前线程是没办法阻止的

为了解决上面第一个问题,我们可以再获得锁后,给key再设置一个过期时间,形成新的命令组合。

// 加锁命令:
SETNX  key  value
// 获得锁后,设置锁过期时间
EXPIRE key seconds
do something
// 解锁命令
del key

其中EXPIRE命令可以给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被Redis自动删除,这样其他线程就可以再次获得该锁了。

上面这种实现能解决正常情况下的问题,但是某些异常情况下,还是可能会出现问题,比如:

  1. 在SETNX 命令执行后,并没有成功执行EXPIRE 命令(也有可能是进程或者网络原因等)
    这样又回到了上面第一个问题,所以这样还是没办法根本解决。
    这里的主要原因是SETNX 命令和EXPIRE命令两步不是一个原子的操作。

使用 SET 扩展命令

为了解决STNX、EXPIRE命令非原子操作带来的问题,我们可以使用SET命令来完成。

Redis的set命令可以通过参数来实现 SETNX 、 SETEX 以及 PSETEX 命令的效果,Redis 将来的版本也可能会移除并废弃 SETNX 、 SETEX 和 PSETEX 这三个命令。

SET key value [EX seconds] [PX milliseconds] [NX|XX]

所以我们的逻辑就变成了:

// 加锁命令,并设置过期时间
SET key value NX EX 10
do something
// 解锁命令
del key

NX : 只在键不存在时, 才对键进行设置操作。 执行 SET key value NX 的效果等同于执行 SETNX key value 。

EX seconds : 将键的过期时间设置为 seconds 秒。 执行 SET key value EX seconds 的效果等同于执行 SETEX key seconds value。

通过SET命令能够解决原子性问题,但是并不能从根本上解决下面两个问题:

  1. 持有锁超时问题:线程A获得锁并设置过期时间为10S,持有锁时长超过10S,Redis key过期被自动删除,线程B这时就可以成功拿到锁,也可能造成后续资源冲突数据不一致。
  2. 锁被其他线程误删:加入以上情形中,A线程持有锁超时,线程B再次获得了锁,A线程执行完再次删除了锁,线程B还没有执行完持有的锁就被释放了。

为了避免上面的锁超时问题,我们可以延长锁的超时时间,但是这同样也解决不了根本问题,这样只有建议不要在耗时过长的场景中使用Redis的分布式锁。

防止误删锁

为了解决线程持有的锁被其他线程误删,我们只需要将锁的value设置成一个唯一的随机数(线程ID等),在执行del命令的之前进行判断,只有当前线程才知道持有锁的随机数,从而完成只有当前线程才能删除释放锁。

// 加锁命令,并设置过期时间,random_value 是当前线程才知道的随机值
SET key random_value NX EX 10
do something
// 值匹配,才可以解锁
if random_value == reids.get(key) 
   del key

但判断 value 和删除 key 是两个独立的操作,也不是原子性的,所以这个时候需要用到Lua脚本来处理,Redis 会保证脚本以原子性(atomic)的方式执行:当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。

使用 LUA 脚本

执行下面的Redis Lua脚本来释放锁,脚本如下:

if redis.call("get",KEYS[1]) == ARGV[1] then
	return redis.call("del",KEYS[1])
else
	return 0
end

代码解释:先获取指定key的值,然后和传入的arg比较是否相等,相等值删除key,否则直接返回0。

对应Java代码实现:

/** lua 脚本 */
private static final String LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

@Autowired
private StringRedisTemplate stringRedisTemplate;

public boolean releaseLock(String myKey, String value) {
    DefaultRedisScript<Long> script = new DefaultRedisScript();
    script.setScriptText(LUA_SCRIPT);
    script.setResultType(Long.TYPE);
    Long result = stringRedisTemplate.execute(script, Collections.singletonList(myKey),Collections.singleton(value));
    if ("1".equals(result)) {
        return true;
    }
    return false;
}

这段Lua脚本在执行的时候要把前面的myKey作为ARGV[1]的值传进去,把value作为KEYS[1]的值传进去。

使用Lua脚本优点:

  1. 脚本的原子性

    lua脚本里面的所有命令是作为一个整体执行的, 整个是一个原子操作, 当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。

  2. 能减少一定的网络开销

    可以将多个命令放入到lua脚本中, 一次发生到Redis,减少网络请求次数。

以上方法都不能避免Redis锁超时提前释放,其他线程乘“虚”而入拿到锁的问题

使用 Redission

为了避免锁超时的情况,Redisson内部提供了一个监控锁的看门狗。

它的作用是在Redisson实例被关闭前,每 lockWatchdogTimeout/3 执行一次,去检查该线程的锁是否存在,如果存在则对锁的过期时间重新设置为 lockWatchdogTimeout,防止锁由于过期提前释放。

默认情况下,看门狗的检查锁的超时时间是30秒钟,可以通过修改Config.lockWatchdogTimeout来另行指定。

Redisson还可以通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

所以当lock方法有支持leaseTime参数时,其都是不支持看门狗续命的功能的,这种加锁方式还是会出现锁超时的问题。

// 等待锁的最长时间30S
rLock.tryLock(30,TimeUnit.SECONDS);
// 尝试加锁,最多等待30秒,上锁以后60秒自动解锁
rLock.tryLock(30,60,TimeUnit.SECONDS);
// 尝试加锁,等待锁的最长时间30S,持有锁最多60S
rLock.tryLockAsync(30,60,TimeUnit.SECONDS)

// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 无限期等待获得锁
rLock.lock();

Redission的看门狗实现部分源代码:

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) { // 没有指定leaseTime参数,没有开启看门狗
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()  获取默认的配置参数
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                // 当前线程续命
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }

   
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            // Lua 脚本续命,重新设置过期时间internalLockLeaseTime(默认30S)
            RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "return 0;",
                      Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
            
            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can't update lock " + getName() + " expiration", future.cause());
                        return;
                    }
                    
                    if (future.getNow()) {
                        // reschedule itself
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 默认每10S检查一次

    if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
        task.cancel();
    }
}

分布式锁 Redlock

以上几种基于 Redis 单机实现的分布式锁,假如Redis节点宕机了,那么所有客户端就都无法获得锁了,服务变得不可用。

为了提高可用性,可以给这个Redis节点挂一个Slave,当Master节点不可用的时候,系统自动切到Slave上(failover)。但由于Redis的主从复制(replication)是异步的,这可能导致在failover过程中丧失锁的安全性。

可能的执行序列:

  1. 客户端1从Master获取了锁。
  2. Master宕机了,存储锁的key还没有来得及同步到Slave上。
  3. Slave升级为Master。
  4. 客户端2从新的Master获取到了对应同一个资源的锁。

于是,客户端1和客户端2同时持有了同一个资源的锁。锁的安全性被打破。针对这个问题,Redis作者antirez设计出了Redlock算法。

antirez提出的新的分布式锁的算法Redlock,它是基于N个完全独立的Redis节点(通常情况下N可以设置成5)。运行Redlock算法的客户端依次执行下面各个步骤,来完成获取锁的操作:

  1. 获取当前时间(毫秒数)。
  2. 按顺序依次向N个Redis节点执行获取锁的操作。
    这个获取操作跟前面基于单Redis节点的获取锁的过程相同,包含随机字符串random_value,也包含过期时间(比如PX 30000,即锁的有效时间)。为了保证在某个Redis节点不可用的时候算法能够继续运行,这个获取锁的操作还有一个超时时间(time out),它要远小于锁的有效时间(几十毫秒量级)。客户端在向某个Redis节点获取锁失败以后,应该立即尝试下一个Redis节点。这里的失败,应该包含任何类型的失败,比如该Redis节点不可用,或者该Redis节点上的锁已经被其它客户端持有。
  3. 计算整个获取锁的过程总共消耗了多长时间,计算方法是用当前时间减去第1步记录的时间。
    如果客户端从大多数Redis节点(>= N/2+1)成功获取到了锁,并且获取锁总共消耗的时间没有超过锁的有效时间(lock validity time),那么这时客户端才认为最终获取锁成功;否则,认为最终获取锁失败。
  4. 如果最终获取锁成功了,那么这个锁的有效时间应该重新计算,它等于最初的锁的有效时间减去第3步计算出来的获取锁消耗的时间。
  5. 如果最终获取锁失败了(可能由于获取到锁的Redis节点个数少于N/2+1,或者整个获取锁的过程消耗的时间超过了锁的最初有效时间),那么客户端应该立即向所有Redis节点发起释放锁的操作(即前面介绍的Redis Lua脚本),不管这些节点当时在获取锁的时候成功与否。

这个时候理论上,Redlock已经很完美了,但如果有节点发生崩溃重启,还是会对锁的安全性有影响的。具体的影响程度跟Redis对数据的持久化程度有关。

假设一共有5个Redis节点:A, B, C, D, E。设想发生了如下的事件序列:

  1. 客户端1成功锁住了A, B, C,获取锁成功(但D和E没有锁住)。
  2. 节点C崩溃重启了,但客户端1在C上加的锁没有持久化下来,丢失了。
  3. 节点C重启后,客户端2锁住了C, D, E,获取锁成功。

这样,客户端1和客户端2同时获得了锁(针对同一资源)。

在默认情况下,Redis的AOF持久化方式是每秒写一次磁盘(即执行fsync),因此最坏情况下可能丢失1秒的数据。为了尽可能不丢数据,Redis允许设置成每次修改数据都进行fsync,但这会降低性能。当然,即使执行了fsync也仍然有可能丢失数据(系统原因)

所以,上面分析的由于节点重启引发的锁失效问题,还是有可能出现的。

为了应对这一问题,antirez又提出了延迟重启(delayed restarts)的概念。也就是说,一个节点崩溃后,先不立即重启它,而是等待一段时间再重启,这段时间应该大于锁的有效时间(lock validity time)。这样的话,这个节点在重启前所参与的锁都会过期,它在重启后就不会对现有的锁造成影响。

在最后释放锁的时候,antirez在算法描述中特别强调,客户端应该向所有Redis节点发起释放锁的操作。设想这样一种情况,客户端发给某个Redis节点的获取锁的请求成功到达了该Redis节点,这个节点也成功执行了SET操作,但是它返回给客户端的响应包却丢失了。这在客户端看来,获取锁的请求由于超时而失败了,但在Redis这边看来,加锁已经成功了。因此,释放锁的时候,客户端也应该对当时获取锁失败的那些Redis节点同样发起请求。

实际上,这种情况在异步通信模型中是有可能发生的:客户端向服务器通信是正常的,但反方向却是有问题的。

Redission Redlock

基于Redis的Redisson红锁RedissonRedLock对象,实现了Redis作者介绍的Redlock加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
// 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
lock.lock(10, TimeUnit.SECONDS);
// 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

基于ZooKeeper的分布式锁更安全吗?

一般来说,基于Zookeeper加锁的流程大致如下:

  1. 客户端尝试创建一个znode节点,比如/lock
    znode应该被创建成ephemeral(临时节点)的,这是znode的一个特性,它的生命周期和客户端会话绑定, 一旦客户端会话失效, 这个客户端创建的所欲临时节点都会被移除,这就保证了客户端发生异常锁就一定会被释放。
  2. 第一个成功创建的客户端相当于成功拿到锁。其他后面的客户端相当于获取锁失败(创建节点失败)
  3. 客户端访问共享资源完成后,主动将znode节点删掉(释放锁)
  4. 其他客户端接下来继续获取锁

看起来这个分布式锁很完美,没有Redlock过期时间的问题,而且能在需要的时候让锁自动释放。

但仔细考察的话,并不尽然。

ZooKeeper是如何发现客户端已经崩溃的呢?

每个客户端都与ZooKeeper的某台服务器维护着一个Session,这个Session依赖定期的心跳(heartbeat)来保持。如果ZooKeeper长时间收不到客户端的心跳(这个时间称为Sesion的过期时间),就认为Session过期了,通过这个Session所创建的所有的ephemeral类型的znode节点都会被自动删除。

那么是否可能会发生如下的执行流程?

  1. 客户端1创建了znode节点/lock,获得了锁。
  2. 客户端1进入了长时间的GC pause。
  3. 客户端1连接到ZooKeeper的Session被认为过期。
    znode节点/lock就被自动删除。
  4. 客户端2请求成功创建znode节点/lock,从而获得了锁。
  5. 客户端1从GC pause中恢复过来,此时仍然认为自己持有锁。

最后,客户端1和客户端2都认为自己持有了锁,那就有可能发生冲突。

基于ZooKeeper的锁和基于Redis的锁相比在实现特性上有两个不同:

  1. 基于ZooKeeper的锁是依靠Session(心跳)来维持锁的持有状态的,而Redis不支持Sesion。
    Session机制让ZooKeeper避免了基于Redis的锁对于有效时间(lock validity time)到底设置多长的两难问题。
  2. 基于ZooKeeper的锁支持在获取锁失败之后等待锁重新释放的事件。这让客户端对锁的使用更加灵活。
    ZooKeeper有一个watch机制。当客户端试图创建/lock的时候,发现它已经存在了,这时候创建失败,但客户端不一定就此对外宣告获取锁失败。客户端可以进入一种等待状态,等待当/lock节点被删除的时候,ZooKeeper通过watch机制通知它,这样它就可以继续完成创建操作(获取锁)。

看起来,用ZooKeeper实现的分布式锁也不一定就是安全的。该有的问题它还是有。但是,ZooKeeper作为一个专门为分布式应用提供方案的框架,它提供了一些非常好的特性,是Redis之类的方案所没有的。像前面提到的ephemeral类型的znode自动删除的功能就是一个例子。

还有一个很有用的特性是ZooKeeper的watch机制。这个机制可以这样来使用,比如当客户端试图创建/lock的时候,发现它已经存在了,这时候创建失败,但客户端不一定就此对外宣告获取锁失败。客户端可以进入一种等待状态,等待当/lock节点被删除的时候,ZooKeeper通过watch机制通知它,这样它就可以继续完成创建操作(获取锁)。这可以让分布式锁在客户端用起来就像一个本地的锁一样:加锁失败就阻塞住,直到获取到锁为止。这样的特性Redlock就无法实现。

小结,基于ZooKeeper的锁和基于Redis的锁相比在实现特性上有两个不同:

  • 在正常情况下,客户端可以持有锁任意长的时间,这可以确保它做完所有需要的资源访问操作之后再释放锁。这避免了基于Redis的锁对于有效时间(lock validity time)到底设置多长的两难问题。实际上,基于ZooKeeper的锁是依靠Session(心跳)来维持锁的持有状态的,而Redis不支持Sesion。
  • 基于ZooKeeper的锁支持在获取锁失败之后等待锁重新释放的事件。这让客户端对锁的使用更加灵活。

顺便提一下,如上所述的基于ZooKeeper的分布式锁的实现,并不是最优的。它会引发“herd effect”(羊群效应),降低获取锁的性能。

羊群效应:所有请求锁的客户端watch锁的持有者节点,当锁持有者节点znode被删除后,所有请求锁客户端者都会都会接收到watch通知,但是只有一个客户端能拿到锁,这样就会造成很多不必要的网络开销。

解决羊群效应:用公平锁的方式获取锁,多个客户端请求依次顺序监听上一次节点,只有当监听到自己的上一个节点释放锁后才会获取锁。

这样虽然解决了羊群效应,但是也有弊端,假如中间某个节点出现异常,session过期,那后面的节点就永远watch不到获取锁的通知了。

精 灵 王 wechat
👆🏼欢迎扫码关注微信公众号👆🏼
  • 本文作者: 精 灵 王
  • 本文链接: https://jinglingwang.cn/archives/distributed-lock
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# 设计模式-行为型 # 设计模式-创建型 # 设计模式-结构型 # 设计 # 系统设计 # 设计模式之美 # 分布式 # Redis # 并发编程 # 个人成长 # 周志明的软件架构课 # 架构 # 单元测试 # LeetCode # 工具 # 位运算 # 读书笔记 # 操作系统 # MySQL # 异步编程 # 技术方案设计 # 集合 # 设计模式 # 三亚 # 游玩 # 转载 # Linux # 观察者模式 # 事件 # Spring # SpringCloud # 实战 # 实战,SpringCloud # 源码分析 # 线程池 # 同步 # 锁 # 线程 # 线程模型 # 动态代理 # 字节码 # 类加载 # 垃圾收集器 # 垃圾回收算法 # 对象创建 # 虚拟机内存 # 内存结构 # Java
源码分析:线程安全的集合—CopyOnWriteArraySet
Java IO总结
  • 文章目录
  • 站点概览
精 灵 王

精 灵 王

青春岁月,以此为伴

106 日志
14 分类
48 标签
RSS
Github E-mail
Creative Commons
Links
  • 添加友链说明
© 2023 精 灵 王
渝ICP备2020013371号
0%