经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
如何实现一个分布式锁
来源:cnblogs  作者:二价亚铁  时间:2024/7/13 21:59:44  对本文有异议

如何实现一个分布式锁

本篇内容主要介绍如何使用 Java 语言实现一个注解式的分布式锁,主要是通过注解+AOP 环绕通知来实现。

1. 锁注解

我们首先写一个锁的注解

  1. /**
  2. * 分布式锁注解
  3. */
  4. @Retention(RetentionPolicy.RUNTIME)
  5. @Target({ElementType.METHOD})
  6. @Documented
  7. public @interface RedisLock {
  8. long DEFAULT_TIMEOUT_FOR_LOCK = 5L;
  9. long DEFAULT_EXPIRE_TIME = 60L;
  10. String key() default "your-biz-key";
  11. long expiredTime() default DEFAULT_EXPIRE_TIME;
  12. long timeoutForLock() default DEFAULT_TIMEOUT_FOR_LOCK;
  13. }

expiredTime 是设置锁的过期时间,timeoutForLock 是设置等待锁的超时时间。如果没有等待获得锁的超时时间这个功能,那么其他线程在获取锁失败时只能直接失败,无法进行排队等待。

我们如何使用这个注解呢,很容易,在需要加锁的业务方法上直接用就行.如下,我们有一个库存服务类,它有一个扣减库存方法,该方法将数据库中的一个库存商品的数量减一。在并发场景下,如果我们没有对其进行资源控制,必然会发生库存扣减不一致现象。

  1. public class StockServiceImpl {
  2. @RedisLock(key = "stock-lock", expiredTime = 10L, timeoutForLock = 5L)
  3. public void deduct(Long stockId) {
  4. Stock stock = this.getById(1L);
  5. Integer count = stock.getCount();
  6. stock.setCount(count - 1);
  7. this.updateById(stock);
  8. }
  9. }

2. 在 AOP 切面中进行加锁处理

我们需要使用 AOP 来处理什么?自然是处理使用@RedisLock的方法,因此我们写一个切点表达式,它匹配所有标有 @RedisLock 注解的方法。
接着,我们将此切点表达式与 @Around 注解结合使用,以创建环绕通知,在目标方法执行前后执行我们的加锁解锁逻辑。

因此,基本的逻辑我们就理清了,代码大致长下面这个样子:

  1. public class RedisLockAspect {
  2. private final RedisTemplate<String, Object> redisTemplate;
  3. // 锁的redis key前缀
  4. private static final String DEFAULT_KEY_PREFIX = "lock:";
  5. // 匹配所有标有 @RedisLock 注解的方法
  6. @Pointcut("@annotation(com.kelton.lock.annotation.RedisLock)")
  7. public void lockAnno() {
  8. }
  9. @Around("lockAnno()")
  10. public void invoke(ProceedingJoinPoint joinPoint) throws Exception {
  11. // 获取拦截方法上的RedisLock注解
  12. RedisLock annotation = getLockAnnotationOnMethod(joinPoint);
  13. // 获取锁key
  14. String key = getKey(annotation);
  15. // 锁过期时间
  16. long expireTime = annotation.expiredTime();
  17. // 获取锁的等待时间
  18. long timeoutForLock = annotation.timeoutForLock();
  19. // 在这里加锁
  20. someCodeForLock...
  21. // 执行业务
  22. joinPoint.proceed();
  23. // 在这里解锁
  24. someCodeForUnLock...
  25. }

我们在加锁的时候,需要用上 timeoutForLock 这个属性,我们通过自旋加线程休眠的方式,来达到在一段时间内等待获取锁的目的。如果自旋时间结束后,还没获取锁,则抛出异常,这里可以根据自己情况而定。自旋加锁代码如下:

  1. // 自旋获取锁
  2. long endTime = System.currentTimeMillis() + timeoutForLock * 1000;
  3. boolean acquired = false;
  4. String uuid = UUID.randomUUID().toString();
  5. while(System.currentTimeMillis() < endTime) {
  6. Boolean absent = redisTemplate.opsForValue()
  7. .setIfAbsent(key, uuid, expireTime, TimeUnit.SECONDS);
  8. if (Boolean.TRUE.equals(absent)) {
  9. acquired = true;
  10. break;
  11. } else {
  12. // 获取不到锁,尝试休眠100毫秒后重试
  13. Thread.sleep(100);
  14. }
  15. }
  16. // 超时未获取到锁, 抛出异常,可根据自己业务而定
  17. if (!acquired) {
  18. throw new RuntimeException("获取锁异常");
  19. }

我们发现上面加锁的时候设置了一个 uuid 作为 value 值,这是为了在锁释放的时候,不误删其他线程上的锁,随后,我们就可以执行被 AOP 切中的方法,执行结束释放锁。代码如下:

  1. try {
  2. // 执行业务
  3. joinPoint.proceed();
  4. } catch (Throwable e) {
  5. log.error("业务执行出错!");
  6. } finally {
  7. // 解锁时进行校验,只删除自己线程加的锁
  8. String value = (String) redisTemplate.opsForValue().get(key);
  9. if (uuid.equals(value)) {
  10. redisTemplate.delete(key);
  11. } else {
  12. log.warn("锁已过期!");
  13. }
  14. }

到这里,我们就以注解+AOP 的方式实现了分布式锁的功能。当然,以上只实现了分布式锁的简单功能,还缺少了分布式锁的 key 自动续约防止锁过期功能,以及锁重入功能。

目前,RedisLockAspect的完整代码如下:

  1. @Component
  2. @Aspect
  3. @Slf4j
  4. @AllArgsConstructor
  5. public class RedisLockAspect {
  6. // 匹配所有标有 @RedisLock 注解的方法
  7. @Pointcut("@annotation(com.kelton.lock.annotation.RedisLock)")
  8. public void lockAnno() {
  9. }
  10. @Around("lockAnno()")
  11. public void invoke(ProceedingJoinPoint joinPoint) throws Exception {
  12. // 获取拦截方法上的RedisLock注解
  13. RedisLock annotation = getLockAnnotationOnMethod(joinPoint);
  14. String key = getKey(annotation);
  15. // 锁过期时间
  16. long expireTime = annotation.expiredTime();
  17. // 获取锁的等待时间
  18. long timeoutForLock = annotation.timeoutForLock();
  19. // 自旋获取锁
  20. long endTime = System.currentTimeMillis() + timeoutForLock * 1000;
  21. boolean acquired = false;
  22. String uuid = UUID.randomUUID().toString();
  23. while(System.currentTimeMillis() < endTime) {
  24. Boolean absent = redisTemplate.opsForValue()
  25. .setIfAbsent(key, uuid, expireTime, TimeUnit.SECONDS);
  26. if (Boolean.TRUE.equals(absent)) {
  27. acquired = true;
  28. break;
  29. } else {
  30. // 获取不到锁,尝试休眠100毫秒后重试
  31. Thread.sleep(100);
  32. }
  33. }
  34. // 超时未获取到锁, 抛出异常,可根据自己业务而定
  35. if (!acquired) {
  36. throw new RuntimeException("获取锁异常");
  37. }
  38. try {
  39. // 执行业务
  40. joinPoint.proceed();
  41. } catch (Throwable e) {
  42. log.error("业务执行出错!");
  43. } finally {
  44. // 解锁时进行校验,只删除自己线程加的锁
  45. String value = (String) redisTemplate.opsForValue().get(key);
  46. if (uuid.equals(value)) {
  47. redisTemplate.delete(key);
  48. } else {
  49. log.warn("锁已过期!");
  50. }
  51. }
  52. }
  53. private String getKey(RedisLock redisLock) {
  54. if (Objects.isNull(redisLock)) {
  55. return DEFAULT_KEY_PREFIX + "default";
  56. }
  57. return DEFAULT_KEY_PREFIX + redisLock.key();
  58. }
  59. private RedisLock getLockAnnotationOnMethod(ProceedingJoinPoint joinPoint) {
  60. MethodSignature signature = (MethodSignature) joinPoint.getSignature();
  61. Method method = signature.getMethod();
  62. return method.getAnnotation(RedisLock.class);
  63. }
  64. }

3. key 自动续约防止锁过期

我们接着完善该分布式锁,为其添加 key 自动续约防止锁过期的功能。我们的思路与Redission的watch dog类似,开启一个后台线程,来定时检查需要续约的锁。我们如何判断一个锁是否需要续约呢,我们可以简单定义一个续约分界线,比如在锁过期时间的三分之二的时间点及之后,对锁进行续约。

3.1 定义一个续约任务4

我们来定义一个锁续约任务,那我们需要什么信息呢?
我们至少需要锁的 key,锁要设置的过期时间。这是两个最基本的信息。
要判断在锁过期时间的三分之二的时间点及之后进行续约,那么我们还需要记录锁上次续约的时间点。
此外,我们还可以为锁续约任务添加最大续约次数限制,这可以避免某些执行时间特别久的任务不断占用锁。所以我们还需要记录当前锁续约次数和最大续约次数。
对超过最大续约次数的锁的线程,我们直接将其停止,因此我们也记录一下该锁的线程。
结合上面的分析,我们定义的锁续约任务类如下:

  1. public class LockRenewTask {
  2. /**
  3. * key
  4. */
  5. private final String key;
  6. /**
  7. * 过期时间。单位:秒
  8. */
  9. private final long expiredTime;
  10. /**
  11. * 锁的最大续约次数
  12. */
  13. private final int maxRenewCount;
  14. /**
  15. * 锁的当前续约次数
  16. */
  17. private int currentRenewCount;
  18. /**
  19. * 最新更新时间
  20. */
  21. private LocalDateTime latestRenewTime;
  22. /**
  23. * 业务线程
  24. */
  25. private final Thread thread;
  26. public LockRenewTask(String key, long expiredTime, int maxRenewCount, Thread thread) {
  27. this.key = key;
  28. this.expiredTime = expiredTime;
  29. this.maxRenewCount = maxRenewCount;
  30. this.thread = thread;
  31. this.latestRenewTime = LocalDateTime.now();
  32. }
  33. /**
  34. * 是否到达续约时间
  35. * @return
  36. */
  37. public boolean isTimeToRenew() {
  38. LocalDateTime now = LocalDateTime.now();
  39. Duration duration = Duration.between(latestRenewTime, now);
  40. return duration.toSeconds() >= ((double)(this.expiredTime / 3) * 2);
  41. }
  42. /**
  43. * 是否达到最大续约次数
  44. * @return
  45. */
  46. public boolean exceedMaxRenewCount() {
  47. return this.currentRenewCount >= this.maxRenewCount;
  48. }
  49. public synchronized void renew() {
  50. this.currentRenewCount++;
  51. this.latestRenewTime = LocalDateTime.now();
  52. }
  53. // 取消业务方法
  54. public void cancel() {
  55. thread.interrupt();
  56. }
  57. public String getKey() {
  58. return key;
  59. }
  60. public long getExpiredTime() {
  61. return expiredTime;
  62. }
  63. }

我们添??了一些关于锁续约的方法:

  • isTimeToRenew(): 判断是否可以对锁进行续约
  • exceedMaxRenewCount(): 判断是否达到最大续约次数
  • renew(): 来标记一次续约操作
  • cancel(): 取消业务方法

3.2 定义一个锁续约任务处理器

接着,我们定义一个定时执行该续约任务的 handler。该 handler 也比较简答,核心逻辑是持有一个类型为 List<LockRenewTask>taskList 来添加续约任务,且使用一个 ScheduledExecutorService 来定时遍历该 taskList 来执行续约任务。该 handler 再对外暴露一个 addRenewTask 方法,方便外部调用来添加续约任务到 taskList 中。

  1. @Slf4j
  2. @Component
  3. public class LockRenewHandler {
  4. @Autowired
  5. private RedisTemplate<String, Object> redisTemplate;
  6. /**
  7. * 保障对 taskList的添加删除操作是线程安全的
  8. */
  9. private final ReentrantLock taskListLock = new ReentrantLock();
  10. private final List<LockRenewTask> taskList = new ArrayList<>();
  11. private final ScheduledExecutorService taskExecutorService;
  12. {
  13. taskExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
  14. taskExecutorService.scheduleAtFixedRate(() -> {
  15. try {
  16. executeRenewTask();
  17. } catch (Exception e) {
  18. //错误处理
  19. }
  20. }, 1, 2, TimeUnit.SECONDS);
  21. }
  22. /**
  23. * 添加续约任务
  24. */
  25. public void addRenewTask(LockRenewTask task) {
  26. taskListLock.lock();
  27. try {
  28. taskList.add(task);
  29. } finally {
  30. taskListLock.unlock();
  31. }
  32. }
  33. /**
  34. * 执行续约任务
  35. */
  36. private void executeRenewTask() {
  37. log.info("开始执行续约任务");
  38. if (CollectionUtils.isEmpty(taskList)) {
  39. return;
  40. }
  41. // 需要删除的任务,暂存这个集合中 取消
  42. List<LockRenewTask> cancelTask = new ArrayList<>();
  43. // 获取任务副本
  44. List<LockRenewTask> copyTaskList = new ArrayList<>(taskList);
  45. for (LockRenewTask task : copyTaskList) {
  46. try {
  47. // 判断 Redis 中是否存在 key
  48. if (!redisTemplate.hasKey(task.getKey())) {
  49. cancelTask.add(task);
  50. continue;
  51. }
  52. // 大于等于最大续约次数
  53. if (task.exceedMaxRenewCount()) {
  54. // 停止续约任务
  55. task.cancel();
  56. cancelTask.add(task);
  57. continue;
  58. }
  59. // 到达续约时间
  60. if (task.isTimeToRenew()) {
  61. log.info("续约任务:{}", task.getKey());
  62. redisTemplate.expire(task.getKey(), task.getExpiredTime(), TimeUnit.SECONDS);
  63. task.renew();
  64. }
  65. } catch (Exception e) {
  66. //错误处理
  67. log.error("处理任务出错:{}", task);
  68. }
  69. }
  70. // 加锁,删除 taskList 中需要移除的任务
  71. taskListLock.lock();
  72. try {
  73. taskList.removeAll(cancelTask);
  74. // 清理cancelTask,避免堆积,产生内存泄露
  75. cancelTask.clear();
  76. } finally {
  77. taskListLock.unlock();
  78. }
  79. }
  80. }

总结一下 LockRenewHandler的主要作用:它负责管理和执行续约任务,以延长 Redis 中键的过期时间。

  • 添加续约任务:addRenewTask() 方法允许添加新的续约任务到内部列表 taskList 中。
  • 执行续约任务:executeRenewTask() 方法定期执行续约任务。它检查每个任务的状态,并根据需要续约 Redis 中的键。
  • 移除完成的任务:维护一个 cancelTask 列表,用于存储需要从 taskList 中移除的任务。在 executeRenewTask() 方法中,它会将完成的任务添加到 cancelTask 列表中,并在之后将其从 taskList 中移除。

大概的工作流程如下:

  • 续约任务被添加到 taskList 中。

  • executeRenewTask() 方法定期执行,它检查每个任务的状态:

    • 如果 Redis 中不再存在该键,则取消任务。
    • 如果任务的续约次数达到上限,则取消任务。
    • 如果是时候续约了,则续约 Redis 中的键并更新任务的续约次数,记录续约时间点。
  • 完成的任务被添加到 cancelTask 列表中。

  • executeRenewTask() 方法获取 taskList 的副本,并从副本中移除 cancelTask 中的任务,并且在完成移除任务操作后清空cancelTask

  • 更新后的 taskList 被保存回类中。

两个需要注意的点

  • 我们遍历taskList时拷贝了一份副本进行遍历,因为taskList是可变的,这样可以避免在遍历的时候产生并发修改问题。
  • cancelTask需要清理,避免产生内存泄漏。

通过这种方式,LockRenewHandler 可以确保 Redis 中的键在需要时得到续约,并自动移除完成或失败的任务。

3.3 添加锁续约任务

在上面 3.1 节和 3.2 节我们定义好了锁续约任务和处理锁续约任务的核心代码,接下来我们需要在第 2 节加锁解锁的 AOP 处理逻辑上进行一点小小的修改,主要就是在执行加锁之后,执行业务代码之前,添加上锁续约任务。修改位置如下:

  1. public void invoke(ProceedingJoinPoint joinPoint) throws Exception {
  2. ... // 省略代码
  3. try {
  4. // 添加锁续约任务
  5. LockRenewTask task = new LockRenewTask(key, annotation.expiredTime(), annotation.maxRenew(), Thread.currentThread());
  6. lockRenewHandler.addRenewTask(task);
  7. log.info("添加续约任务, key:{}", key);
  8. // 执行业务
  9. joinPoint.proceed();
  10. } catch (Throwable e) {
  11. log.error("业务执行出错!");
  12. } finally {
  13. // 解锁时进行校验,只删除自己线程加的锁
  14. String value = (String) redisTemplate.opsForValue().get(key);
  15. if (uuid.equals(value)) {
  16. redisTemplate.delete(key);
  17. } else {
  18. log.warn("锁已过期!");
  19. }
  20. }
  21. ... // 省略代码
  22. }

到这里,我们的分布式锁已经相当完善了,把锁自动续约的功能也加上了。当然,还没有实现锁的可重入性。

原文链接:https://www.cnblogs.com/xw-01/p/18300808

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号