经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Zookeeper » 查看文章
Rpc-实现Zookeeper注册中心
来源:cnblogs  作者:zko0  时间:2023/2/17 16:17:50  对本文有异议

1.前言

本文章是笔主在声哥的手写RPC框架的学习下,对注册中心的一个拓展。因为声哥某些部分没有保留拓展性,所以本文章的项目与声哥的工程有部分区别,核心内容在Curator的注册发现与注销,思想看准即可。

本文章Git仓库:zko0/zko0-rpc

声哥的RPC项目写的确实很详细,跟学一遍受益匪浅:

何人听我楚狂声的博客

在声哥的项目里使用Nacos作为了服务注册中心。本人拓展添加了ZooKeeper实现服务注册。

Nacos的服务注册和发现,设计的不是非常好,每次服务的发现都需要去注册中心拉取。本人实现ZooKeeper注册中心时,参考了Dubbo的设计原理,结合本人自身想法,添加了本地缓存:

  • Client发现服务后缓存在本地,维护一个服务——实例列表
  • 当监听到注册中心的服务列表发生了变化,Client更新本地列表
  • 当注册中心宕机,Client能够依靠本地的服务列表继续提供服务

问题:

  1. 实现服务注册的本地缓存,还需要实现注册中心的监听,当注册中心的服务发生更改时能够实现动态更新。或者用轮训的方式,定时更新,不过这种方式的服务实时性较差
  2. 当Server宕机,非临时节点注册容易出现服务残留无法清除的问题。所以我建议全部使用临时节点去注册。

2.内容

zookeeper需要简单学一下,知识内容非常简单,搭建也很简单,在此跳过。

如果你感兴趣,可以参考我的ZooKeeper的文章:Zookeeper学习笔记 - zko0

①添加依赖

Curator:(简化ZooKeeper客户端使用)(Netfix研发,捐给Apache,是Apache顶级项目)

这里排除slf4j依赖,因为笔主使用的slf4j存在冲突

  1. <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
  2. <dependency>
  3. <groupId>org.apache.curator</groupId>
  4. <artifactId>curator-recipes</artifactId>
  5. <version>5.2.0</version>
  6. <exclusions>
  7. <exclusion>
  8. <groupId>org.slf4j</groupId>
  9. <artifactId>slf4j-api</artifactId>
  10. </exclusion>
  11. </exclusions>
  12. </dependency>

②代码编写

1.首先创建一个连接类:

  1. @Slf4j
  2. public class ZookeeperUtil {
  3. //内部化构造方法
  4. private ZookeeperUtil(){
  5. }
  6. private static final String SERVER_HOSTNAME= RegisterCenterConfig.getHostName();
  7. private static final Integer SERVER_PORT=RegisterCenterConfig.getServerPort();
  8. private static CuratorFramework zookeeperClient;
  9. public static CuratorFramework getZookeeperClient(){
  10. if (zookeeperClient==null){
  11. synchronized (ZookeeperUtil.class){
  12. if (zookeeperClient==null){
  13. RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
  14. zookeeperClient = CuratorFrameworkFactory.builder()
  15. .connectString(SERVER_HOSTNAME+":"+SERVER_PORT)
  16. .retryPolicy(retryPolic)
  17. // zookeeper根目录为/serviceRegister,不为/
  18. .namespace("serviceRegister")
  19. .build();
  20. zookeeperClient.start();
  21. }
  22. }
  23. }
  24. return zookeeperClient;
  25. }
  26. public static String getServerHostname(){
  27. return SERVER_HOSTNAME;
  28. }
  29. public static Integer getServerPort(){
  30. return SERVER_PORT;
  31. }
  32. }

其中HOST,PORT信息我保存在regiserCenter.properties配置文件夹中,使用类读取:

  1. public class RpcConfig {
  2. //注册中心类型
  3. private static String registerCenterType;
  4. //序列化类型
  5. private static String serializerType;
  6. //负载均衡类型
  7. private static String loadBalanceType;
  8. //配置Nacos地址
  9. private static String registerCenterHost;
  10. private static Integer registerCenterPort;
  11. private static boolean zookeeperDestoryIsEphemeral;
  12. private static String serverHostName;
  13. private static Integer serverPort;
  14. static {
  15. ResourceBundle bundle = ResourceBundle.getBundle("rpc");
  16. registerCenterType=bundle.getString("registerCenter.type");
  17. loadBalanceType=bundle.getString("loadBalance.type");
  18. registerCenterHost=bundle.getString("registerCenter.host");
  19. registerCenterPort = Integer.parseInt(bundle.getString("registerCenter.port"));
  20. try {
  21. zookeeperDestoryIsEphemeral="true".equals(bundle.getString("registerCenter.destory.isEphemeral"));
  22. } catch (Exception e) {
  23. zookeeperDestoryIsEphemeral=false;
  24. }
  25. serializerType=bundle.getString("serializer.type");
  26. serverHostName=bundle.getString("server.hostName");
  27. serverPort=Integer.parseInt(bundle.getString("server.port"));
  28. }
  29. public static String getRegisterCenterType() {
  30. return registerCenterType;
  31. }
  32. public static String getSerializerType() {
  33. return serializerType;
  34. }
  35. public static String getLoadBalanceType() {
  36. return loadBalanceType;
  37. }
  38. public static String getRegisterCenterHost() {
  39. return registerCenterHost;
  40. }
  41. public static Integer getRegisterCenterPort() {
  42. return registerCenterPort;
  43. }
  44. public static String getServerHostName() {
  45. return serverHostName;
  46. }
  47. public static Integer getServerPort() {
  48. return serverPort;
  49. }
  50. public static boolean isZookeeperDestoryIsEphemeral() {
  51. return zookeeperDestoryIsEphemeral;
  52. }
  53. }

下面的代码我和声哥有些不同,我将服务注册,注销方法放在ServerUtils中,服务发现方法放在ClientUtils中:

服务的高一致性存在两种做法:

  • 因为ZooKeeper存在临时节点,注册中心可以实现Client(RPC的Server)断开,注册服务信息的自动丢失
  • 不设置为临时节点,手动的服务注册清除

我这里两种都实现了,虽然做两种方式不同但是功能相同的代码放在一起看起来很奇怪,这里只是做演示。选择其中一种即可。(我建议使用临时节点,当Server宕机,残留的服务信息也能及时清除)

注册实现原理图:

626723792516afae5530e634e691794

接口:

  1. public interface ServiceDiscovery {
  2. InetSocketAddress searchService(String serviceName);
  3. void cleanLoaclCache(String serviceName);
  4. }
  1. public interface ServiceRegistry {
  2. //服务注册
  3. void register(String serviceName, InetSocketAddress inetAddress);
  4. void cleanRegistry();
  5. }

ZooKeeper接口实现:

  1. public class ZookeeperServiceDiscovery implements ServiceDiscovery{
  2. private final LoadBalancer loadBalancer;
  3. public ZookeeperServiceDiscovery(LoadBalancer loadBalancer) {
  4. this.loadBalancer = loadBalancer;
  5. }
  6. @Override
  7. public InetSocketAddress searchService(String serviceName) {
  8. return ZookeeperClientUtils.searchService(serviceName,loadBalancer);
  9. }
  10. @Override
  11. public void cleanLoaclCache(String serviceName) {
  12. ZookeeperClientUtils.cleanLocalCache(serviceName);
  13. }
  14. }
  1. public class ZookeeperServiceRegistry implements ServiceRegistry{
  2. @Override
  3. public void register(String serviceName, InetSocketAddress inetAddress) {
  4. ZookeeperServerUitls.register(serviceName,inetAddress);
  5. }
  6. @Override
  7. public void cleanRegistry() {
  8. ZookeeperServerUitls.cleanRegistry();
  9. }
  10. }

Factory工厂:

  1. public class ServiceFactory {
  2. private static String center = RpcConfig.getRegisterCenterType();
  3. private static String lb= RpcConfig.getLoadBalanceType();
  4. private static ServiceRegistry registry;
  5. private static ServiceDiscovery discovery;
  6. private static Object registerLock=new Object();
  7. private static Object discoveryLock=new Object();
  8. public static ServiceDiscovery getServiceDiscovery(){
  9. if (discovery==null){
  10. synchronized (discoveryLock){
  11. if (discovery==null){
  12. if ("nacos".equalsIgnoreCase(center)){
  13. discovery= new NacosServiceDiscovery(LoadBalancerFactory.getLoadBalancer(lb));
  14. }else if ("zookeeper".equalsIgnoreCase(center)){
  15. discovery= new ZookeeperServiceDiscovery(LoadBalancerFactory.getLoadBalancer(lb));
  16. }
  17. }
  18. }
  19. }
  20. return discovery;
  21. }
  22. public static ServiceRegistry getServiceRegistry(){
  23. if (registry==null){
  24. synchronized (registerLock){
  25. if (registry==null){
  26. if ("nacos".equalsIgnoreCase(center)){
  27. registry= new NacosServiceRegistry();
  28. }else if ("zookeeper".equalsIgnoreCase(center)){
  29. registry= new ZookeeperServiceRegistry();
  30. }
  31. }
  32. }
  33. }
  34. return registry;
  35. }
  36. }

使用Gson序列化InetSocketAddress存在问题,编写Util:

  1. public class InetSocketAddressSerializerUtil {
  2. public static String getJsonByInetSockerAddress(InetSocketAddress address){
  3. HashMap<String, String> map = new HashMap<>();
  4. map.put("host",address.getHostName());
  5. map.put("port",address.getPort()+"");
  6. return new Gson().toJson(map);
  7. }
  8. public static InetSocketAddress getInetSocketAddressByJson(String json){
  9. HashMap<String,String> hashMap = new Gson().fromJson(json, HashMap.class);
  10. String host = hashMap.get("host");
  11. Integer port=Integer.parseInt(hashMap.get("port"));
  12. return new InetSocketAddress(host,port);
  13. }
  14. }

上面主要是注册,发现的逻辑,我把主要方法写在了Utils中:

  1. @Slf4j
  2. public class ZookeeperServerUitls {
  3. private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();
  4. private static final Set<String> instances=new ConcurrentHashSet<>();
  5. public static void register(String serviceName, InetSocketAddress inetSocketAddress){
  6. serviceName=ZookeeperUtil.serviceName2Path(serviceName);;
  7. String uuid = UUID.randomUUID().toString();
  8. serviceName=serviceName+"/"+uuid;
  9. String json = InetSocketAddressSerializerUtil.getJsonByInetSockerAddress(inetSocketAddress);
  10. try {
  11. if (RpcConfig.isZookeeperDestoryIsEphemeral()){
  12. //会话结束节点,创建消失
  13. client.create()
  14. .creatingParentsIfNeeded()
  15. .withMode(CreateMode.EPHEMERAL)
  16. .forPath(serviceName,json.getBytes());
  17. } else {
  18. client.create()
  19. .creatingParentsIfNeeded()
  20. .forPath(serviceName,json.getBytes());
  21. }
  22. }
  23. catch (Exception e) {
  24. log.error("服务注册失败");
  25. throw new RpcException(RpcError.REGISTER_SERVICE_FAILED);
  26. }
  27. //放入map
  28. instances.add(serviceName);
  29. }
  30. public static void cleanRegistry(){
  31. log.info("注销所有注册的服务");
  32. //如果自动销毁,不需要清除
  33. if (RpcConfig.isZookeeperDestoryIsEphemeral()) return;
  34. if (ZookeeperUtil.getServerHostname()!=null&&ZookeeperUtil.getServerPort()!=null&&!instances.isEmpty()){
  35. for (String path:instances) {
  36. try {
  37. client.delete().forPath(path);
  38. } catch (Exception e) {
  39. log.error("服务注销失败");
  40. throw new RpcException(RpcError.DESTORY_REGISTER_FALL);
  41. }
  42. }
  43. }
  44. }
  45. }
  1. @Slf4j
  2. public class ZookeeperClientUtils {
  3. private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();
  4. private static final Map<String, List<InetSocketAddress>> instances=new ConcurrentHashMap<>();
  5. public static InetSocketAddress searchService(String serviceName, LoadBalancer loadBalancer) {
  6. InetSocketAddress address;
  7. //本地缓存查询
  8. if (instances.containsKey(serviceName)){
  9. List<InetSocketAddress> addressList = instances.get(serviceName);
  10. if (!addressList.isEmpty()){
  11. //使用lb进行负载均衡
  12. return loadBalancer.select(addressList);
  13. }
  14. }
  15. try {
  16. String path = ZookeeperUtil.serviceName2Path(serviceName);
  17. //获取路径下所有的实现
  18. List<String> instancePaths = client.getChildren().forPath(path);
  19. List<InetSocketAddress> addressList = new ArrayList<>();
  20. for (String instancePath : instancePaths) {
  21. byte[] bytes = client.getData().forPath(path+"/"+instancePath);
  22. String json = new String(bytes);
  23. InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json);
  24. addressList.add(instance);
  25. }
  26. addLocalCache(serviceName,addressList);
  27. return loadBalancer.select(addressList);
  28. } catch (Exception e) {
  29. log.error("服务获取失败====>{}",e);
  30. throw new RpcException(RpcError.SERVICE_NONE_INSTANCE);
  31. }
  32. }
  33. public static void cleanLocalCache(String serviceName){
  34. log.info("服务调用失败,清除本地缓存,重新获取实例===>{}",serviceName);
  35. instances.remove(serviceName);
  36. }
  37. public static void addLocalCache(String serviceName,List<InetSocketAddress> addressList){
  38. //直接替换原本的缓存
  39. instances.put(serviceName,addressList);
  40. }
  41. }

③配置文件

rpc.properties放在resources下

  1. #nacos zookeeper
  2. #registerCenter.type=nacos
  3. registerCenter.type=zookeeper
  4. #registerCenter.host=127.0.0.1
  5. registerCenter.host=101.43.244.40
  6. #zookeeper port default 2181
  7. #registerCenter.port=9000
  8. registerCenter.port=2181
  9. registerCenter.destory.isEphemeral=false
  10. #??random?roundRobin
  11. loadBalance.type=random
  12. #kryo json jdk
  13. serializer.type=kryo
  14. server.hostName=127.0.0.1
  15. server.port=9999

④更多

声哥的代码我做了很多修改,如果上述代码和你参考的项目代码出入比较大,可以查看本文章的工程阅读。

原文链接:https://www.cnblogs.com/zko0/p/17130553.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号