经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Spring Boot » 查看文章
Sentinel 源码学习
来源:cnblogs  作者:废物大师兄  时间:2024/2/21 9:26:16  对本文有异议

引入依赖

  1. <dependency>
  2. <groupId>com.alibaba.csp</groupId>
  3. <artifactId>sentinel-core</artifactId>
  4. <version>1.8.7</version>
  5. </dependency>

基本用法

  1. try (Entry entry = SphU.entry("HelloWorld")) {
  2. // 被保护的逻辑
  3. System.out.println("hello world");
  4. } catch (BlockException ex) {
  5. // 处理被流控的逻辑
  6. System.out.println("blocked!");
  7. }

接下来,阅读源码,我们从SphU.entry()开始 

每个SphU#entry()将返回一个Entry。这个类维护了当前调用的一些信息:

  • createTime :这个entry的创建时间,用于响应时间统计
  • current Node :在当前上下文中的资源的统计
  • origin Node :原始节点的统计
  • ResourceWrapper :资源名称

CtSph#entryWithPriority()方法就是整个流控的基本流程:

1、首先,获取当前线程上下文,如果为空,则创建一个

2、然后,查找处理器链

3、最后,依次执行处理器

这是一个典型的责任链

接下来,挨个来看,首先看一下上下文。上下文是一个线程局部变量  ThreadLocal<Context>

如果当前线程还没有上下文,则创建一个

有了Context之后,接下来查找处理器

这些功能插槽(slot chain)有不同的职责:

  • NodeSelectorSlot :负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
  • ClusterBuilderSlot :用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
  • StatisticSlot :用于记录、统计不同纬度的 runtime 指标监控信息;
  • FlowSlot :用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
  • AuthoritySlot :根据配置的黑白名单和调用来源信息,来做黑白名单控制;
  • DegradeSlot :通过统计信息以及预设的规则,来做熔断降级;
  • SystemSlot :通过系统的状态,例如 load1 等,来控制总的入口流量;

到这里为止,资源有了,上下文有了,处理器链有了,于是,接下来就可以对资源应用所有的处理器了

关于功能插槽的学习就先到这里,下面补充一个知识点:Node

Node 用于保存资源的实时统计信息

StatisticNode 保存三种实时统计指标:

  1. 秒级指标
  2. 分钟级指标
  3. 线程数

DefaultNode 用于保存特定上下文中特定资源名称的统计信息

EntranceNode 代表调用树的入口

总之一句话,Node是用于保存统计信息的。那么,这些指标数据是如何计数的呢?

Sentinel 使用滑动窗口实时记录和统计资源指标。ArrayMetric背后的滑动窗口基础结构是LeapArray。

下面重点看一下StatisticNode

StatisticNode是用于实时统计的处理器插槽。在进入这个槽位时,需要分别计算以下信息:

  • ClusterNode :该资源ID的集群节点统计信息总和
  • Origin node :来自不同调用者/起源的集群节点的统计信息
  • DefaultNode :特定上下文中特定资源名称的统计信息
  • 最后,是所有入口的总和统计

  1. private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
  2. long timeId = timeMillis / windowLengthInMs;
  3. // Calculate current index so we can map the timestamp to the leap array.
  4. return (int)(timeId % array.length());
  5. }
  6. protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
  7. return timeMillis - timeMillis % windowLengthInMs;
  8. }
  9. /**
  10. * Get bucket item at provided timestamp.
  11. *
  12. * @param timeMillis a valid timestamp in milliseconds
  13. * @return current bucket item at provided timestamp if the time is valid; null if time is invalid
  14. */
  15. public WindowWrap<T> currentWindow(long timeMillis) {
  16. if (timeMillis < 0) {
  17. return null;
  18. }
  19. int idx = calculateTimeIdx(timeMillis);
  20. // Calculate current bucket start time.
  21. long windowStart = calculateWindowStart(timeMillis);
  22. /*
  23. * Get bucket item at given time from the array.
  24. *
  25. * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
  26. * (2) Bucket is up-to-date, then just return the bucket.
  27. * (3) Bucket is deprecated, then reset current bucket.
  28. */
  29. while (true) {
  30. WindowWrap<T> old = array.get(idx);
  31. if (old == null) {
  32. /*
  33. * B0 B1 B2 NULL B4
  34. * ||_______|_______|_______|_______|_______||___
  35. * 200 400 600 800 1000 1200 timestamp
  36. * ^
  37. * time=888
  38. * bucket is empty, so create new and update
  39. *
  40. * If the old bucket is absent, then we create a new bucket at {@code windowStart},
  41. * then try to update circular array via a CAS operation. Only one thread can
  42. * succeed to update, while other threads yield its time slice.
  43. */
  44. WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
  45. if (array.compareAndSet(idx, null, window)) {
  46. // Successfully updated, return the created bucket.
  47. return window;
  48. } else {
  49. // Contention failed, the thread will yield its time slice to wait for bucket available.
  50. Thread.yield();
  51. }
  52. } else if (windowStart == old.windowStart()) {
  53. /*
  54. * B0 B1 B2 B3 B4
  55. * ||_______|_______|_______|_______|_______||___
  56. * 200 400 600 800 1000 1200 timestamp
  57. * ^
  58. * time=888
  59. * startTime of Bucket 3: 800, so it's up-to-date
  60. *
  61. * If current {@code windowStart} is equal to the start timestamp of old bucket,
  62. * that means the time is within the bucket, so directly return the bucket.
  63. */
  64. return old;
  65. } else if (windowStart > old.windowStart()) {
  66. /*
  67. * (old)
  68. * B0 B1 B2 NULL B4
  69. * |_______||_______|_______|_______|_______|_______||___
  70. * ... 1200 1400 1600 1800 2000 2200 timestamp
  71. * ^
  72. * time=1676
  73. * startTime of Bucket 2: 400, deprecated, should be reset
  74. *
  75. * If the start timestamp of old bucket is behind provided time, that means
  76. * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
  77. * Note that the reset and clean-up operations are hard to be atomic,
  78. * so we need a update lock to guarantee the correctness of bucket update.
  79. *
  80. * The update lock is conditional (tiny scope) and will take effect only when
  81. * bucket is deprecated, so in most cases it won't lead to performance loss.
  82. */
  83. if (updateLock.tryLock()) {
  84. try {
  85. // Successfully get the update lock, now we reset the bucket.
  86. return resetWindowTo(old, windowStart);
  87. } finally {
  88. updateLock.unlock();
  89. }
  90. } else {
  91. // Contention failed, the thread will yield its time slice to wait for bucket available.
  92. Thread.yield();
  93. }
  94. } else if (windowStart < old.windowStart()) {
  95. // Should not go through here, as the provided time is already behind.
  96. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
  97. }
  98. }
  99. }

现在,有2个窗口,每个窗口500ms,2个窗口总共1000ms

假设,当前时间戳是1200ms,那么 (1200 / 500) % 2 = 0, 1200 - 1200 % 500 = 1000

这个时候,如果0这个位置没有窗口,则创建一个新的窗口,新窗口的窗口开始时间是1000ms

如果0这个位置有窗口,则继续判断旧窗口的窗口开始时间是否为1000ms,如果是,则表示窗口没有过期,直接返回该窗口。如果旧窗口的开始时间小于1000ms,则表示旧窗口过期了,于是重置旧窗口的统计数据,重新设置窗口开始时间(PS:相当于将窗口向后移动)

窗口(桶)数据保存在MetricBucket中

总结一下:

1、每个线程过来之后,创建上下文,然后依次经过各个功能插槽

2、每个资源都有自己的处理器链,也就是说多次访问同一个资源时,用的同一套处理器链(插槽)

3、Node相当于是一个载体,用于保存资源的实时统计信息

4、第一次进入插槽后,创建一个新Node,后面再补充Node的信息;第二次进入的时候,由于上下文的名称都是一样的,所以不会再创建Node,而是用之前的Node,也就是还是在之前的基础上记录统计信息。可以这样理解,每个DefaultNode就对应一个特定的资源。

5、StatisticNode中保存三种类型的指标数据:每秒的指标数据,每分钟的指标数据,线程数。

6、指标数据统计采用滑动窗口,利用当前时间戳和窗口长度计算数据应该落在哪个窗口数组区间,通过窗口开始时间判断窗口是否过期。实际数据保存在MetricBucket中

最后,千言万语汇聚成这张原理图

NodeSelectorSlot构造调用链路,ClusterBuilderSlot构造统计节点,StatisticSlot利用滑动窗口进行指标统计,然后是流量控制

 

参考文档

https://sentinelguard.io/zh-cn/docs/quick-start.html

https://sentinelguard.io/zh-cn/docs/basic-implementation.html

https://sentinelguard.io/zh-cn/docs/dashboard.html

https://blog.csdn.net/xiaolyuh123/article/details/107937353

https://www.cnblogs.com/magexi/p/13124870.html

https://www.cnblogs.com/mrxiaobai-wen/p/14212637.html

https://www.cnblogs.com/taromilk/p/11750962.html

https://www.cnblogs.com/taromilk/p/11751000.html

https://www.cnblogs.com/wekenyblog/p/17519276.html

https://javadoop.com/post/sentinel

https://www.cnblogs.com/cuzzz/p/17413429.html

原文链接:https://www.cnblogs.com/cjsblog/p/18020907

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号