稀土掘金技术社区 01月12日
为什么推荐用Redisson实现分布式锁,看完直呼好好好
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文深入解析了Redisson实现分布式锁的原理。文章从分布式锁的特性出发,如互斥、超时、续期、可重入及专一释放,详细阐述了Redisson如何通过Redis的Hash结构和Lua脚本来保证这些特性。文章还分析了Redisson客户端的创建、锁的获取与释放流程,并着重讲解了其如何利用Redis的发布订阅机制来实现锁的等待和唤醒,以及通过看门狗机制实现锁的续期,确保了在分布式环境下锁的可靠性和健壮性。此外,文章还对比了自旋锁的缺点,强调了Redisson在处理分布式锁问题上的优势。

🔒Redisson 利用 Redis 的 hash 结构存储锁资源,通过 Lua 脚本操作保证线程互斥,未获取锁的线程会订阅锁频道进入阻塞等待。

⏰ Redisson 通过设置 key 的过期时间(默认 30s)实现超时机制,并通过看门狗定时任务(默认 10s)重置过期时间实现锁续期,防止锁因程序崩溃而永久占用。

🔄 Redisson 使用 hash 结构,key 为锁资源,field 为线程标识(uuid + threadId),value 为重入次数,实现可重入锁,保证同一线程多次获取同一把锁。

🔑 Redisson 通过 field 存储线程信息,确保只有加锁的线程才能释放锁,避免了错乱释放的问题。释放锁时,会判断重入次数并发布锁释放消息。

原创 青石路 2025-01-11 09:02 重庆

点击关注公众号,“技术干货” 及时达!

点击关注公众号,“技术干货” 及时达!

写在前面

关于锁,相信大家都不陌生,一般我们用其在多线程环境中控制对共享资源的并发访问;单服务下,用 JDK 中的 synchronizedLock 的实现类可实现对共享资源的并发访问,分布式服务下,JDK 中的锁就显得力不从心了,分布式锁也就应运而生了;分布式锁的实现方式有很多,常见的有如下几种

    基于 MySQL,利用行级悲观锁(select ... for update)

    基于 Redis,利用其 (setnx + expire) 或 set

    基于 Zookeeper,利用其临时目录和事件回调机制

本文不讲这些,网上资料很多,感兴趣的小伙伴自行去查阅;本文的重点是基于 Redis 的 Redisson,从源码的角度来看看为什么推荐用 Redisson 来实现分布式锁;推荐大家先去看看

搞懂了 Redis 的订阅、发布与Lua,Redisson的加锁原理就好理解了

有助于理解后文

分布式锁特点

可以类比 JDK 中的锁

    互斥

    不仅要保证同个服务中不同线程的互斥,还需要保证不同服务间、不同线程的互斥;如何处理互斥,是自旋、还是阻塞 ,还是其他 ?

    超时

    锁超时设置,防止程序异常奔溃而导致锁一直存在,后续同把锁一直加不上

    续期

    程序具体执行的时长无法确定,所以过期时间只能是个估值,那么就不能保证程序在过期时间内百分百能运行完,所以需要进行锁续期,保证业务是在加锁的情况下完成的

    可重入

    可重入锁又名递归锁,是指同一个线程在外层方法已经获得锁,再进入该线程的中层或内层方法会自动获取锁;简单点来说,就是同个线程可以反复获取同一把锁

    专一释放

    通俗点来讲:谁加的锁就只有它能释放这把锁;为什么会出现这种错乱释放的问题了,举个例子就理解了

    线程 T1 对资源 lock_zhangsan 加了锁,由于某些原因,业务还未执行完,锁已经过期自动释放了,此时线程 T2 对资源 lock_zhangsan 加锁成功,T2 还在执行业务的过程中,T1 业务执行完后释放资源 lock_zhangsan 的锁,结果把 T2 加的锁给释放了

    公平与非公平

    公平锁:多个线程按照申请锁的顺序去获得锁,所有线程都在队列里排队,这样就保证了队列中的第一个先得到锁

    非公平锁:多个线程不按照申请锁的顺序去获得锁,而是同时直接去尝试获取锁

    JDK 中的 ReentrantLock 就有公平和非公平两种实现,有兴趣的可以去看看它的源码;多数情况下用的是非公平锁,但有些特殊情况下需要用公平锁

你们可能会有这样的疑问

引入一个简单的分布式锁而已,有必要考虑这么多吗?

虽然绝大部分情况下,我们的程序都是在跑正常流程,但不能保证异常情况 100% 跑不到,出于健壮性考虑,异常情况都需要考虑到;下面我们就来看看 Redisson 是如何实现这些特点的

Redisson实现分布式锁

关于 Redisson,更多详细信息可查看官方文档,它提供了非常丰富的功能,分布式锁 只是其中之一;我们基于 Redisson 3.13.6,来看看分布式锁的实现

    先将 Redis 信息配置给 Redisson,创建出 RedissonClient 实例

    Redis 的部署方式不同,Redisson 配置模式也会不同,详细信息可查看:Configuration,我们就以最简单的 Single mode 来配置

      @Beforepublic void before() {    Config config = new Config();    config.useSingleServer()            .setAddress("redis://192.168.1.110:6379");    redissonClient = Redisson.create(config);}

      通过 RedissonClient 实例获取锁

      RedissonClient 实例创建出来后,就可以通过它来获取锁

        /** * 多线程 * @throws Exception */@Testpublic void multiLock() throws Exception {
        RLock testLock = redissonClient.getLock("multi_lock"); int count = 5; CountDownLatch latch = new CountDownLatch(count);
        for (int i=1; i<=count; i++) { new Thread(() -> { try { System.out.println("线程 " + Thread.currentThread().getName() + " 尝试获取锁"); testLock.lock(); System.out.println(String.format("线程 %s 获取到锁, 执行业务中...", Thread.currentThread().getName())); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("线程 %s 业务执行完成", Thread.currentThread().getName())); latch.countDown(); } finally { testLock.unlock(); System.out.println(String.format("线程 %s 释放锁完成", Thread.currentThread().getName())); } }, "t" + i).start(); }
        latch.await(); System.out.println("结束");}

        完整示例代码:redisson-demo

      用 Redisson 实现分布式锁就是这么简单,但光会使用肯定是不够的,我们还得知道其底层实现原理

      知其然,并知其所以然!

      那如何知道其原理呢?当然是看其源码实现

      客户端创建

      客服端的创建过程中,会生成一个 id 作为唯一标识,用以区分分布式下不同节点中的客户端


      id 值就是一个 UUID,客户端启动时生成;至于这个 id 有什么用,大家暂且在脑中留下这个疑问,我们接着往下看

      锁获取

      我们从 lock 开始跟源码


      最终会来到有三个参数的 lock 方法

      private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {    long threadId = Thread.currentThread().getId();        // 尝试获取锁;ttl为null表示锁获取成功; ttl不为null表示获取锁失败,其值为其他线程占用该锁的剩余时间    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);    // lock acquired    if (ttl == null) {        return;    }
      // 锁被其他线程占用而获取失败,使用redis的发布订阅功能来等待锁的释放通知,而非自旋监测锁的释放 RFuture<RedissonLockEntry> future = subscribe(threadId); // 当前线程会阻塞,直到锁被释放时当前线程被唤醒(有超时等待,默认 7.5s,而不会一直等待) // 持有锁的线程释放锁之后,redis会发布消息,所有等待该锁的线程都会被唤醒,包括当前线程 if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); }
      try { while (true) { // 尝试获取锁;ttl为null表示锁获取成功; ttl不为null表示获取锁失败,其值为其他线程占用该锁的剩余时间 ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; }
      // waiting for message if (ttl >= 0) { try { // future.getNow().getLatch() 返回的是 Semaphore 对象,其初始许可证为 0,以此来控制线程获取锁的顺序 // 通过 Semaphore 控制当前服务节点竞争锁的线程数量 future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { // 退出锁竞争(锁获取成功或者放弃获取锁),则取消锁的释放订阅 unsubscribe(future, threadId); }// get(lockAsync(leaseTime, unit));}

      主要三个点:尝试获取锁订阅取消订阅

        尝试获取锁

        尝试获取锁主要做了两件事:1、尝试获取锁,2、锁续期;尝试获取锁主要涉及到一段 Lua 代码



        结合 搞懂了 Redis 的订阅、发布与Lua,Redisson的加锁原理就好理解了 来看这段 Lua 脚本,还是很好理解的

        给你们提个问题

        为什么 field = uuid + : + threadId,而不是 field = threadId

        友情提示:从多个服务(也就是多个 Redisson 客户端)来考虑

        这个问题想清楚了,那么前面提到的:在 Redisson 客户端创建的过程中生成的 id(一个随机的 uuid 值),它的作用也就清楚了

        尝试获取锁成功之后,会启动一个定时任务(即 WatchDog,亦称 看门狗)实现锁续期,也涉及到一段 Lua 脚本



        这段脚本很简单,相信你们都能看懂

        默认情况下,锁的过期时间是 30s,锁获取成功之后每隔 10s 进行一次锁续期,重置过期时间成 30s

        若锁已经被释放了,则定时任务也会停止,不会再续期

          用 exists 判断 key 不存在,则用 hash 结构来存放锁,key = 资源名,field = uuid + : + threadId,value 自增 1;设置锁的过期时间(默认是 lockWatchdogTimeout = 30 * 1000 毫秒),并返回 nil

          用 hexists 判断 field = uuid + : + threadId 存在,则该 field 的 value 自增 1,并重置过期时间,最后返回 nil

          这里相当于实现了锁的重入

          上面两种情况都不满足,则说明锁被其他线程占用了,直接返回锁的过期时间

        订阅


        获取锁的过程中,尝试获取锁失败(锁被其他线程锁占有),则会完成对该锁频道的订阅,订阅过程中线程会阻塞;持有锁的线程释放锁时会向锁频道发布消息,订阅了该锁频道的线程会被唤醒,继续去获取锁,

        给你们提个问题

        如果持有锁的线程意外停止了,未向锁频道发布消息,那订阅了锁频道的线程该如何唤醒

        Redisson 其实已经考虑到了,提供了超时机制来处理



        默认超时时长 = 3000 + 1500 * 3 = 7500 毫秒

        再给你们提个问题

        为什么要用 Redis 的发布订阅

        如果我们不用 Redis 的发布订阅,我们该如何实现,自旋?自旋有什么缺点?自旋频率难以掌控,太高会增大 CPU 的负担,太低会不及时(锁都释放半天了才检测到);可以类比 生产者与消费者 来考虑这个问题

        取消订阅

        有订阅,肯定就有取消订阅;当阻塞的线程被唤醒并获取到锁时需要取消对锁频道的订阅,当然,取消获取锁的线程也需要取消对锁频道的订阅

        比较好理解,就是取消当前线程对锁频道的订阅


      锁释放

      我们从 unlock 开始


      代码比较简单,我们继续往下跟


      主要有两点:释放锁取消续期定时任务

        释放锁

        重点在于一个 Lua 脚本



        我们把参数具象化,脚本就好理解了

        KEYS[1] = 锁资源,KEYS[2] = 锁频道

        ARGV[1] = 锁频道消息类型,ARGV[2] = 过期时间,ARGV[3] = uuid + : + threadId

        两个细节:1、重入锁的释放,2、锁彻底释放后的消息发布

          如果当前线程未持有锁,直接返回 nil

          hash 结构的 field 的 value 自减 1,counter = 自减后的 value 值

          如果 counter > 0,表示线程重入了,重置锁的过期时间,返回 0

          如果 counter <= 0,删除锁,并对锁频道发布锁释放消息(频道订阅者则可收到消息,然后唤醒线程去获取锁),返回 1

          上面 1、2 都不满足,则直接返回 nil

        取消续期定时任务



        比较简单,没什么好说的

        总结

        我们从分布式锁的特点出发,来总结下 Redisson 是如何实现这些特点的

          互斥

          Redisson 采用 hash 结构来存锁资源,通过 Lua 脚本对锁资源进行操作,保证线程之间的互斥;互斥之后,未获取到锁的线程会订阅锁频道,然后进入一定时长的阻塞

          超时

          有超时设置,给 hash 结构的 key 加上过期时间,默认是 30s

          续期

          线程获取到锁之后会开启一个定时任务(watchdog 即 看门狗),每隔一定时间(默认 10s)重置 key 的过期时间

          可重入

          通过 hash 结构解决,key 是锁资源,field(值:uuid + : + threadId) 是持有锁的线程,value 表示重入次数

          专一释放

          通过 hash 结构解决,field 中存放了线程信息,释放的时候就能够知道是不是当前线程加上的锁,是才能够进行锁释放

          公平与非公平

          由你们在评论区补充

      点击关注公众号,“技术干货” 及时达!

      阅读原文

      跳转微信打开

      Fish AI Reader

      Fish AI Reader

      AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

      FishAI

      FishAI

      鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

      联系邮箱 441953276@qq.com

      相关标签

      Redisson 分布式锁 Redis Lua脚本 锁续期
      相关文章