百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

Redisson实现分布式锁的原理—加解锁

ztj100 2024-11-04 15:16 16 浏览 0 评论

2023的金三银四来的没想象中那么激烈,一个朋友前段时间投了几十家,多数石沉大海,好不容易等来面试机会,就恰好被问到项目中关于分布式锁的应用,后涉及Redisson实现分布式锁的原理,答不上来。

锁的可重入性

我们都知道,Java中synchronized和lock都支持可重入,synchronized的锁关联一个线程持有者和一个计数器。当一个线程请求成功后,JVM会记下持有锁的线程,并将计数器计为1。此时其他线程请求该锁,则必须等待。

而该持有锁的线程如果再次请求这个锁,就可以再次拿到这个锁,同时计数器会递增。当线程退出一个synchronized方法/块时,计数器会递减,如果计数器为0则释放该锁;

在ReentrantLock中,底层的 AQS 对应的state 同步状态值表示线程获取该锁的可重入次数,通过CAS方式进行设置,在默认情况下,state的值为0 表示当前锁没有被任何线程持有,原理类似。

所以如果想要实现可重入性,可能须有一个计数器来控制重入次数,实际Redisson确实是这么做的。

好的我们通过Redisson客户端进行设置,并循环3次,模拟锁重入:

for(int i = 0; i < 3; i++) {      
    RedissonLockUtil.tryLock("distributed:lock:distribute_key", TimeUnit.SECONDS, 20, 100); 
 }

连接Redis客户端进行查看:

可以看到,我们设置的分布式锁是存在一个hash结构中,value看起来是循环的次数3,key就不怎么认识了,那这个key是怎么设置进去的呢,另外为什么要设置成为Hash类型呢?

加锁

我们先来看看普通的分布式锁的上锁流程:

说明:

  1. 客户端在进行加锁时,会校验如果业务上没有设置持有锁时长leaseTime,会启动看门狗来每隔10s进行续命,否则就直接以leaseTime作为持有的时长;
  2. 并发场景下,如果客户端1锁还未释放,客户端2尝试获取,加锁必然失败,然后会通过发布订阅模式来订阅Key的释放通知,并继续进入后续的抢锁流程。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
      long time = unit.toMillis(waitTime);
      long current = System.currentTimeMillis();
      long threadId = Thread.currentThread().getId();
      Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
      if (ttl == null) {
         return true;
      } else {
         // 订阅分布式Key对应的消息,监听其它锁持有者释放,锁没有释放的时候则会等待,直到锁释放的时候会执行下面的while循环
         CompletableFuture subscribeFuture = this.subscribe(threadId);

         subscribeFuture.get(time, TimeUnit.MILLISECONDS);

         try {
            do {
               // 尝试获取锁
               ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
               // 竞争获取锁成功,退出循环,不再竞争。
               if (ttl == null) {
                  return true;
               }
               // 利用信号量机制阻塞当前线程相应时间,之后再重新获取锁
               if (ttl >= 0L && ttl < time) {
                  ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
               } else {
                  ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
               }

               time -= System.currentTimeMillis() - currentTime;
            } while(time > 0L);
         } finally {
            // 竞争锁成功后,取消订阅该线程Id事件
            this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
         }
      }
   }
}
RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        // 如果设置了持有锁的时长,直接进行尝试加锁操作
         if (leaseTime != -1L) {
            return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            // 未设置加锁时长,在加锁成功后,启动续期任务,初始默认持有锁时间是30s
            RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            ttlRemainingFuture.addListener(new FutureListener<Long>() {
                public void operationComplete(Future<Long> future) throws Exception {
                    if (future.isSuccess()) {
                        Long ttlRemaining = (Long)future.getNow();
                        if (ttlRemaining == null) {
                            RedissonLock.this.scheduleExpirationRenewal(threadId);
                        }
                    }
                }
            });
            return ttlRemainingFuture;
        }
    }

我们都知道Redis执行Lua脚本具有原子性,所以在尝试加锁的下层,Redis主要执行了一段复杂的lua脚本:

-- 不存在该key时
if (redis.call('exists', KEYS[1]) == 0) then
      -- 新增该锁并且hash中该线程id对应的count置1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 设置过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;

-- 存在该key 并且 hash中线程id的key也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
      -- 线程重入次数++
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);

参数说明:

  • KEYS[1]:对应我们设置的分布式key,即:distributed:lock:distribute_key
  • ARGV[1]:业务自定义的加锁时长或者默认的30s;
  • ARGV[2]: 具体的客户端初始化连接UUID+线程ID: 9d8f0907-1165-47d2-8983-1e130b07ad0c:1

我们从上面的脚本中可以看出核心逻辑其实不难:

  1. 如果分布式锁Key未被任何端持有,直接根据“客户端连接ID+线程ID” 进行初始化设置,并设置重入次数为1,并设置Key的过期时间;
  2. 否则重入次数+1,并重置过期时间;

锁续命

接下来看看scheduleExpirationRenewal续命是怎么做的呢?

private void scheduleExpirationRenewal(final long threadId) {
   if (!expirationRenewalMap.containsKey(this.getEntryName())) {
      Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
         public void run(Timeout timeout) throws Exception {
            // 执行续命操作
            RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
            future.addListener(new FutureListener<Boolean>() {
               public void operationComplete(Future<Boolean> future) throws Exception {
                  RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
                          ...
                  // 续命成功,继续
                  if ((Boolean)future.getNow()) {
                     RedissonLock.this.scheduleExpirationRenewal(threadId);
                  }
               }
            });
         }
      }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
   }
}

Tip小知识点:

续期是用的什么定时任务执行的?

Redisson用netty的HashedWheelTimer做命令重试机制,原因在于一条redis命令的执行不论成功或者失败耗时都很短,而HashedWheelTimer是单线程的,系统性能开销小。

而在上面的renewExpirationAsync中续命操作的执行核心Lua脚本要做的事情也非常的简单,就是给这个Key的过期时间重新设置为指定的30s.

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end;
return 0;

释放锁

释放锁主要是除了解锁本省,另外还要考虑到如果存在续期的情况,要将续期任务删除:

public RFuture<Void> unlockAsync(long threadId) {
   // 解锁
   RFuture<Boolean> future = this.unlockInnerAsync(threadId);
   CompletionStage<Void> f = future.handle((opStatus, e) -> {
      // 解除续期
      this.cancelExpirationRenewal(threadId);
      ...
   });
   return new CompletableFutureWrapper(f);
}

在unlockInnerAsync内部,Redisson释放锁其实核心也是执行了如下一段核心Lua脚本:

    // 校验是否存在
    if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
      return nil;
      end;
    // 获取加锁次数,校验是否为重入锁
    local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
    // 如果为重入锁,重置过期时间,锁本身不释放
    if (counter > 0) then
      redis.call('pexpire', KEYS[1], ARGV[2]);
      return 0;
   // 删除Key
    else redis.call('del', KEYS[1]);
      // 通知阻塞的客户端可以抢锁啦
      redis.call('publish', KEYS[2], ARGV[1]);
      return 1;
      end;
      return nil;

其中:

  • KEYS[1]: 分布式锁
  • KEYS[2]: redisson_lock_channel:{分布式锁} 发布订阅消息的管道名称
  • ARGV[1]: 发布的消息内容
  • ARGV[2]: 锁的过期时间
  • ARGV[3]: 线程ID标识名称

其它问题

  1. 红锁这么火,但真的靠谱么?
  2. Redisson公平锁是什么情况?

作者:宫三公子
链接:https://juejin.cn/post/7210756986462371900
来源:稀土掘金

相关推荐

如何将数据仓库迁移到阿里云 AnalyticDB for PostgreSQL

阿里云AnalyticDBforPostgreSQL(以下简称ADBPG,即原HybridDBforPostgreSQL)为基于PostgreSQL内核的MPP架构的实时数据仓库服务,可以...

Python数据分析:探索性分析

写在前面如果你忘记了前面的文章,可以看看加深印象:Python数据处理...

CSP-J/S冲奖第21天:插入排序

...

C++基础语法梳理:算法丨十大排序算法(二)

本期是C++基础语法分享的第十六节,今天给大家来梳理一下十大排序算法后五个!归并排序...

C 语言的标准库有哪些

C语言的标准库并不是一个单一的实体,而是由一系列头文件(headerfiles)组成的集合。每个头文件声明了一组相关的函数、宏、类型和常量。程序员通过在代码中使用#include<...

[深度学习] ncnn安装和调用基础教程

1介绍ncnn是腾讯开发的一个为手机端极致优化的高性能神经网络前向计算框架,无第三方依赖,跨平台,但是通常都需要protobuf和opencv。ncnn目前已在腾讯多款应用中使用,如QQ,Qzon...

用rust实现经典的冒泡排序和快速排序

1.假设待排序数组如下letmutarr=[5,3,8,4,2,7,1];...

ncnn+PPYOLOv2首次结合!全网最详细代码解读来了

编辑:好困LRS【新智元导读】今天给大家安利一个宝藏仓库miemiedetection,该仓库集合了PPYOLO、PPYOLOv2、PPYOLOE三个算法pytorch实现三合一,其中的PPYOL...

C++特性使用建议

1.引用参数使用引用替代指针且所有不变的引用参数必须加上const。在C语言中,如果函数需要修改变量的值,参数必须为指针,如...

Qt4/5升级到Qt6吐血经验总结V202308

00:直观总结增加了很多轮子,同时原有模块拆分的也更细致,估计为了方便拓展个管理。把一些过度封装的东西移除了(比如同样的功能有多个函数),保证了只有一个函数执行该功能。把一些Qt5中兼容Qt4的方法废...

到底什么是C++11新特性,请看下文

C++11是一个比较大的更新,引入了很多新特性,以下是对这些特性的详细解释,帮助您快速理解C++11的内容1.自动类型推导(auto和decltype)...

掌握C++11这些特性,代码简洁性、安全性和性能轻松跃升!

C++11(又称C++0x)是C++编程语言的一次重大更新,引入了许多新特性,显著提升了代码简洁性、安全性和性能。以下是主要特性的分类介绍及示例:一、核心语言特性1.自动类型推导(auto)编译器自...

经典算法——凸包算法

凸包算法(ConvexHull)一、概念与问题描述凸包是指在平面上给定一组点,找到包含这些点的最小面积或最小周长的凸多边形。这个多边形没有任何内凹部分,即从一个多边形内的任意一点画一条线到多边形边界...

一起学习c++11——c++11中的新增的容器

c++11新增的容器1:array当时的初衷是希望提供一个在栈上分配的,定长数组,而且可以使用stl中的模板算法。array的用法如下:#include<string>#includ...

C++ 编程中的一些最佳实践

1.遵循代码简洁原则尽量避免冗余代码,通过模块化设计、清晰的命名和良好的结构,让代码更易于阅读和维护...

取消回复欢迎 发表评论: