经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Redis » 查看文章
Redis 高阶应用
来源:cnblogs  作者:二价亚铁  时间:2024/7/8 9:59:18  对本文有异议

生成全局唯一 ID

  • 全局唯一 ID 需要满足以下要求:

  • 唯一性:在分布式环境中,要全局唯一

  • 高可用:在高并发情况下保证可用性

  • 高性能:在高并发情况下生成 ID 的速度必须要快,不能花费太长时间

  • 递增性:要确保整体递增的,以便于数据库创建索引

  • 安全性:ID 的规律性不能太明显,以免信息泄露

从上面的要求可以看出,全局 ID 生成器的条件还是比较苛刻的,而 Redis 恰巧可以满足以上要求。

Redis 本身就是就是以性能著称,因此完全符合高性能的要求,其次使用 Redis 的 incr 命令可以保证递增性,配合相应的分布式 ID 生成算法便可以实现唯一性和安全性,Redis 可以通过哨兵、主从等集群方案来保证可用性。因此 Redis 是一个不错的选择。
下面我们就写一个简单的示例,来让大家感受一下,实际工作中大家可以根据需要进行调整:

  1. @Component
  2. public class IDUtil{
  3. //开始时间戳(单位:秒) 2000-01-01 00:00:00
  4. private static final long START_TIMESTAMP = 946656000L;
  5. //Spring Data Redis 提供的 Redis 操作模板
  6. @Resource
  7. private StringRedisTemplate stringRedisTemplate;
  8. /**
  9. * 获取 ID 格式:时间戳+序列号
  10. * @param keyPrefix Redis 序列号前缀
  11. * @return 生成的 ID
  12. */
  13. public long getNextId(String keyPrefix){
  14. //获取当前时间戳
  15. LocalDateTime now = LocalDateTime.now();
  16. long nowTimestamp = now.toEpochSecond(ZoneOffset.UTC);
  17. //获取 ID 时间戳
  18. long timestamp = nowSecond - START_TIMESTAMP;
  19. //获取当前日期
  20. String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
  21. //生成 key
  22. String key = "incr:" + keyPrefix + ":" + date;
  23. //获取序列号
  24. long count = stringRedisTemplate.opsForValue().increment(key);
  25. //生成 ID 并返回
  26. return timestamp << 32 | count;
  27. }
  28. }

分布式锁

在 JVM 内部会有一个锁监视器来控制线程间的互斥,但在分布式的环境下会有多台机器部署同样的服务,也就是说每台机器都会有自己的锁监视器。而 JVM 的锁监视器只能保证自己内部线程的安全执行,并不能保证不同机器间的线程安全执行,因此也很难避免高并发带来的线程安全问题。因此就需要分布式锁来保证整个集群的线程的安全,而分布式锁需要满足 5 点要求:多进程可见、互斥性、高可用、高性能、安全性
其中核心要求就是多进程之间互斥,而满足这一点的方式有很多,最常见的有三种:mysql、Redis、Zookeeper。

image

通过对比我们发现,其中 Redis 的效果最理想,所以下面就用 Redis 来实现一个简单的分布式锁。

  1. public class DistributedLockUtil {
  2. //分布式锁前缀
  3. private static final String KEY_PREFIX = "distributed:lock:";
  4. //业务名
  5. private String business;
  6. //分布式锁的值
  7. private String value;
  8. //Spring Data Redis 提供的 Redis 操作模板
  9. private StringRedisTemplate stringRedisTemplate;
  10. //私有化无参构造
  11. private DistributedLockUtil(){}
  12. //有参构造
  13. public DistributedLockUtil(String business,StringRedisTemplate stringRedisTemplate){
  14. this.business = business;
  15. this.stringRedisTemplate = stringRedisTemplate;
  16. this.value = UUID.randomUUID().toString();
  17. }
  18. /**
  19. * 尝试获取锁
  20. * @param timeout 超时时间(单位:秒)
  21. * @return 锁是否获取成功
  22. */
  23. public boolean tryLock(long timeout){
  24. //生成分布式锁的 key
  25. StringBuffer keyBuffer = new StringBuffer(KEY_PREFIX);
  26. keyBuffer.append(business);
  27. Boolean success = stringRedisTemplate.opsForValue().setIsAbsent(keyBuffer.toString(),value,timeout, TimeUnit.SECONDS);
  28. //返回结果 注意:为了防止自动拆箱时出现空指针,所以这里用了 equals 判断
  29. return Boolean.TRUE.equals(success);
  30. }
  31. /**
  32. * 释放锁(不安全版)
  33. */
  34. public void unLock(){
  35. //生成分布式锁的 key
  36. StringBuffer keyBuffer = new StringBuffer(KEY_PREFIX);
  37. keyBuffer.append(business);
  38. //获取分布式锁的值
  39. String redisValue = stringRedisTemplate.opsForValue().get(keyBuffer.toString());
  40. //判断值是否一致,防止误删
  41. if (value.equals(redisValue)) {
  42. //当代码执行到这里时,如果 JVM 恰巧执行了垃圾回收(虽然几率极低),就会导致所有线程阻塞等待,因此这里仍然会有线程安全的问题
  43. stringRedisTemplate.delete(keyBuffer.toString());
  44. }
  45. }
  46. /**
  47. * 通过脚本释放锁(彻底解决线程安全问题)
  48. */
  49. public void unLockWithScript(){
  50. //加载 lua 脚本,实际工作中我们可以将脚本设置为常量,并在静态代码块中初始化(脚本内容在下文)
  51. DefaultRedisScript<Long> script = new DefaultRedisScript<>();
  52. script.setLocation(new ClassPathResource("unlock.lua"));
  53. script.setResultType(Long.class);
  54. //生成分布式锁的 key
  55. StringBuffer keyBuffer = new StringBuffer(KEY_PREFIX);
  56. keyBuffer.append(business);
  57. //调用 lua 脚本释放锁
  58. stringRedisTemplate.execute(script,
  59. Collections.singletonList(keyBuffer.toString()),
  60. value);
  61. }
  62. }

lua 脚本内容如下:

  1. -- 判断值是否一致,防止误删
  2. if(redis.call('get',KEYS[1]) == VRGV[1]) then
  3. -- 判断通过,释放锁
  4. return redis.call('del',KEYS[1])
  5. end
  6. -- 判断不通过,返回 0
  7. return 0

虽然通过 lua 脚本解决了线程不安全的问题,但是仍然存在以下问题:

  • 不可重入:同一个线程无法多次获取同一把锁
  • 不可重试:获取锁只能尝试一次,失败就返回 false,没有重试机制
  • 超时释放:锁超时释放虽然可以避免死锁,但如果业务执行耗时较长,也会导致锁释放,存在安全隐患
  • 主从一致性:如果 Redis 提供了主从集群,主从同步存在延迟,当主机宕机时,如果从机还没来得及同步主机的锁数据,则会出现锁失效。

要解决以上问题也非常简单,只需要利用 Redis 的 hash 结构记录线程标识和重入次数就可以解决不可重入的问题。利用信号量和 PubSub 功能实现等待、唤醒,获取锁失败的重试机制即可解决不可重试的问题。而超时释放的问题则可以通过获取锁时为锁添加一个定时任务(俗称看门狗),定期刷新锁的超时时间即可。至于主从一致性问题,我们只需要利用多个独立的 Redis 节点(非主从),必须在所有节点都获取重入锁,才算获取锁成功。

image

有的人可能说了,虽然说起来简单,但真正实现起来也不是很容易呀。对于这种问题,大家不用担心,俗话说得好想要看的更远,需要站在巨人的肩膀上。对于上述的需求,早就有了成熟的开源方案 Redisson ,我们直接拿来用就可以了,无需重复造轮子,具体使用方法可以查看官方文档

轻量化消息队列

虽然市面上有很多优秀的消息中间件如 RocketMQ、Kafka 等,但对于应用场景较为简单,只需要简单的消息传递,比如任务调度、简单的通知系统等,不需要复杂的消息路由、事务支持的业务来说,用那些专门的消息中间件成本就显得过高。因此我们就可以使用 Redis 来做消息队列。
Redis 提供了三种不同的方式来实现消息队列:

  • list 结构:可以使用 list 来模拟消息队列,可以使用 BRPOP 或 BLPOP 命令来实现类似 JVM 阻塞队列的消息队列。
  • PubSub:基于发布/订阅的消息模型,但不支持数据持久化,且消息堆积有上限,超出时数据丢失。
  • Stream:Redis 5.0 新增的数据类型,可以实现一个功能非常完善的消息队列,也是我们实现消息队列的首选。

image

下面我就采用 Redis 的 Stream 实现一个简单的案例来让大家感受一下,实际工作中大家可以根据需要进行调整:

  1. public class RedisQueueUtil{
  2. //Spring Data Redis 提供的 Redis 操作模板
  3. private StringRedisTemplate stringRedisTemplate;
  4. /**
  5. * 获取消息队列中的数据,执行该方法前,一定要确保消费者组已经创建
  6. * @param queueName 队列名
  7. * @param groupName 消费者组名
  8. * @param consumerName 消费者名
  9. * @param type 返回值类型
  10. * @return 消息队列中的数据
  11. */
  12. public <T> T getQueueData(String queueName, String groupName, String consumerName, Class<T> type){
  13. while (true){
  14. try {
  15. //获取消息队列中的信息
  16. List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
  17. Consumer.from(groupName,consumerName),
  18. StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
  19. StreamOffset.create(queueName, ReadOffset.lastConsumed())
  20. );
  21. //判断消息是否获取成功
  22. if (list == null || list.isEmpty()){
  23. //如果获取失败,说明没有消息,继续下一次循环
  24. continue;
  25. }
  26. //如果获取成功,则解析消息中的数据
  27. MapRecord<String,Object,Object> record = list.get(0);
  28. Map<Object,Object> values = record.getValue();
  29. String jsonString = JSON.toJSONString(values);
  30. T result = JSON.parseObject(jsonString, type);
  31. // ACK
  32. stringRedisTemplate.opsForStream().acknowledge(queueName,groupName,record.getId());
  33. //返回结果
  34. return result;
  35. }catch (Exception e){
  36. while (true){
  37. try {
  38. //获取 pending-list 队列中的信息
  39. List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
  40. Consumer.from(groupName,consumerName),
  41. StreamReadOptions.empty().count(1)),
  42. StreamOffset.create(queueName,ReadOffset.from("0")
  43. );
  44. //判断消息是否获取成功
  45. if (list == null || list.isEmpty()){
  46. //如果获取失败,说明 pending-list 没有异常消息,结束循环
  47. break;
  48. }
  49. //如果获取成功,则解析消息中的数据
  50. MapRecord<String,Object,Object> record = list.get(0);
  51. Map<Object,Object> values = record.getValue();
  52. String jsonString = JSON.toJSONString(values);
  53. T result = JSON.parseObject(jsonString, type);
  54. // ACK
  55. stringRedisTemplate.opsForStream().acknowledge(queueName,groupName,record.getId());
  56. //返回结果
  57. return result;
  58. }catch (Exception ex){
  59. log.error("处理 pending-list 订单异常",ex);
  60. try {
  61. Thread.sleep(50);
  62. }catch (InterruptedException err){
  63. err.printStackTrace();
  64. }
  65. }
  66. }
  67. }
  68. }
  69. }
  70. /**
  71. * 向消息队列中发送数据
  72. * @param queueName 消息队列名
  73. * @param map 要发送数据的集合
  74. */
  75. public void sendQueueData(String queueName, Map<String,Object> map){
  76. StringBuilder builder = new StringBuilder("redis.call('xadd','");
  77. builder.append(queueName).append("','*','");
  78. Set<String> keys = map.keySet();
  79. for(String key:keys){
  80. builder.append(key).append("','").append(map.get(key)).append("','");
  81. }
  82. String script = builder.substring(0, builder.length() - 2);
  83. script += ")";
  84. stringRedisTemplate.execute(new DefaultRedisScript<Long>(script,Long.class),Collections.emptyList());
  85. }
  86. }

原文链接:https://www.cnblogs.com/xw-01/p/18284716

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号