经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Zookeeper » 查看文章
Rpc-实现Client对ZooKeeper的服务监听
来源:cnblogs  作者:zko0  时间:2023/2/20 15:17:31  对本文有异议

1、前言

在上一篇文章中,完成了ZooKeeper注册中心,添加了一个简单的本地缓存

但是,存在一些问题:

  1. 当本地缓存OK,ZooKeeper对应服务有新的实例时,本地缓存不会自动更新
  2. 当ZooKeeper对应服务实例关闭,本地缓存不会监控到实例消失

2、编写

之前我们是将缓存直接放在ZooKeeperClientUtils中的,维护一个Map集合。我们将缓存部分移动到ZooKeeperClientCache中,缓存数据从这里获取:

我们监听树上所有节点的变化情况,对于包含实例的变化,每次获取对应的服务信息,然后通过Clinet查询现存的对应服务的实例,进行更新。

watchPathSet维护了Client调用过的服务集合,对于调用过的服务才开启本地的缓存,并且进行更新。

instances即为本地缓存集合

  1. @Slf4j
  2. public class ZookeeperClientCache {
  3. private static final Map<String, List<InetSocketAddress>> instances=new ConcurrentHashMap<>();
  4. private static final Set<String> watchPathSet=new ConcurrentHashSet<>();
  5. private static CuratorFramework zookeeperClient;
  6. private static boolean isListening=false;
  7. //将服务加入监听set中
  8. public static void addListenService(String service){
  9. //开启服务监听
  10. openListen();
  11. //path路径放入
  12. watchPathSet.add(ZookeeperUtil.serviceName2Path(service));
  13. }
  14. //添加本地缓存,同时开启监听服务
  15. public static void addLocalCache(String serviceName,List<InetSocketAddress> addressList){
  16. //直接替换原本的缓存
  17. instances.put(serviceName,addressList);
  18. //将服务加入监听set
  19. addListenService(serviceName);
  20. }
  21. public static void cleanLocalCache(String serviceName){
  22. log.info("服务调用失败,清除本地缓存,重新获取实例===>{}",serviceName);
  23. instances.remove(serviceName);
  24. }
  25. public static boolean containsKey(String serviceName){
  26. return instances.containsKey(serviceName);
  27. }
  28. public static List<InetSocketAddress> getOrDefault(String serviceName){
  29. return instances.getOrDefault(serviceName,null);
  30. }
  31. public static List<InetSocketAddress> getInstances(String serviceName){
  32. try {
  33. String path = ZookeeperUtil.serviceName2Path(serviceName);
  34. //获取路径下所有的实现
  35. List<String> instancePaths = zookeeperClient.getChildren().forPath(path);
  36. List<InetSocketAddress> addressList = new ArrayList<>();
  37. for (String instancePath : instancePaths) {
  38. byte[] bytes = zookeeperClient.getData().forPath(path+"/"+instancePath);
  39. String json = new String(bytes);
  40. InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json);
  41. addressList.add(instance);
  42. }
  43. return addressList;
  44. } catch (Exception e) {
  45. log.error("服务获取失败====>{}",e);
  46. throw new RpcException(RpcError.SERVICE_NONE_INSTANCE);
  47. }
  48. }
  49. private static synchronized void openListen(){
  50. //已初始化过
  51. if (isListening){
  52. return;
  53. }
  54. //注入client
  55. if (zookeeperClient==null) {
  56. zookeeperClient=ZookeeperUtil.getZookeeperClient();
  57. }
  58. TreeCache cache = TreeCache.newBuilder(zookeeperClient, "/cn/zko0/myRpc/api").setCacheData(true).build();
  59. cache.getListenable().addListener((c, event) -> {
  60. if ( event.getData() != null )
  61. {
  62. System.out.println("type=" + event.getType() + " path=" + event.getData().getPath());
  63. //可以通过event.type来进行节点的处理,我这里直接多节点每次行为做reload
  64. if (event.getData().getPath().contains("Service/")){
  65. //是服务节点,做更新
  66. String path = event.getData().getPath();
  67. //去除尾部实例段
  68. path=path.substring(0,path.lastIndexOf("/"));
  69. String serviceName = ZookeeperUtil.path2ServiceName(path);
  70. if (watchPathSet.contains(path)) {
  71. log.info("更新本地缓存");
  72. List<InetSocketAddress> addressList = getInstances(serviceName);
  73. addLocalCache(serviceName,addressList);
  74. }
  75. }
  76. }
  77. else
  78. {
  79. System.out.println("type=" + event.getType());
  80. }
  81. });
  82. try {
  83. cache.start();
  84. } catch (Exception e) {
  85. throw new RuntimeException(e);
  86. }
  87. isListening=true;
  88. }
  89. }

创建完Cache类,只需要修改之前ZooKeeperClientUtils中,从当前类改为Cache类获取即可:

image-20230220133343196

完整代码:

  1. @Slf4j
  2. public class ZookeeperClientUtils {
  3. private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();
  4. public static InetSocketAddress searchService(String serviceName, LoadBalancer loadBalancer) {
  5. InetSocketAddress address;
  6. //本地缓存查询
  7. if (ZookeeperClientCache.containsKey(serviceName)){
  8. List<InetSocketAddress> addressList = ZookeeperClientCache.getOrDefault(serviceName);
  9. if (!addressList.isEmpty()){
  10. //使用lb进行负载均衡
  11. return loadBalancer.select(addressList);
  12. }
  13. }
  14. try {
  15. String path = ZookeeperUtil.serviceName2Path(serviceName);
  16. //获取路径下所有的实现
  17. List<String> instancePaths = client.getChildren().forPath(path);
  18. List<InetSocketAddress> addressList = new ArrayList<>();
  19. for (String instancePath : instancePaths) {
  20. byte[] bytes = client.getData().forPath(path+"/"+instancePath);
  21. String json = new String(bytes);
  22. InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json);
  23. addressList.add(instance);
  24. }
  25. ZookeeperClientCache.addLocalCache(serviceName,addressList);
  26. return loadBalancer.select(addressList);
  27. } catch (Exception e) {
  28. log.error("服务获取失败====>{}",e);
  29. throw new RpcException(RpcError.SERVICE_NONE_INSTANCE);
  30. }
  31. }
  32. }

3、测试

实现上述代码,下面是服务监听的简单测试

开启Server,Client:

image-20230220134850987

关闭Server,Server自动进行服务的注销:

image-20230220135154126

Client服务监控:

image-20230220135320003

原文链接:https://www.cnblogs.com/zko0/p/17137124.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号