经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Zookeeper » 查看文章
zookeeper【3】服务发现
来源:cnblogs  作者:撸码识途  时间:2018/11/28 9:58:27  对本文有异议

服务发现:指对集群中的服务上下线做统一管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让某些监控服务器作为订阅方,订阅工作服务器的基本信息。当工作服务器的基本信息改变时,如服务上下线、服务器的角色或服务范围变更,那么监控服务器可以得到通知并响应这些变化。

实现代码如下:

  1. import com.alibaba.fastjson.JSON;
  2. import org.I0Itec.zkclient.IZkDataListener;
  3. import org.I0Itec.zkclient.ZkClient;
  4. import org.I0Itec.zkclient.exception.ZkNoNodeException;
  5. /**
  6. * 代表工作服务器
  7. */
  8. public class WorkServer {
  9. private ZkClient zkClient;
  10. // ZooKeeper
  11. private String configPath;
  12. // ZooKeeper集群中servers节点的路径
  13. private String serversPath;
  14. // 当前工作服务器的基本信息
  15. private ServerData serverData;
  16. // 当前工作服务器的配置信息
  17. private ServerConfig serverConfig;
  18. private IZkDataListener dataListener;
  19. public WorkServer(String configPath, String serversPath,
  20. ServerData serverData, ZkClient zkClient, ServerConfig initConfig) {
  21. this.zkClient = zkClient;
  22. this.serversPath = serversPath;
  23. this.configPath = configPath;
  24. this.serverConfig = initConfig;
  25. this.serverData = serverData;
  26. this.dataListener = new IZkDataListener() {
  27. public void handleDataDeleted(String dataPath) throws Exception {
  28. }
  29. public void handleDataChange(String dataPath, Object data)
  30. throws Exception {
  31. String retJson = new String((byte[])data);
  32. ServerConfig serverConfigLocal = (ServerConfig) JSON.parseObject(retJson,ServerConfig.class);
  33. updateConfig(serverConfigLocal);
  34. System.out.println("new Work server config is:"+serverConfig.toString());
  35. }
  36. };
  37. }
  38. // 启动服务器
  39. public void start() {
  40. System.out.println("work server start...");
  41. initRunning();
  42. }
  43. // 停止服务器
  44. public void stop() {
  45. System.out.println("work server stop...");
  46. zkClient.unsubscribeDataChanges(configPath, dataListener); // 取消监听config节点
  47. }
  48. // 服务器初始化
  49. private void initRunning() {
  50. registMe(); // 注册自己
  51. zkClient.subscribeDataChanges(configPath, dataListener); // 订阅config节点的改变事件
  52. }
  53. // 启动时向zookeeper注册自己的注册函数
  54. private void registMe() {
  55. String mePath = serversPath.concat("/").concat(serverData.getAddress());
  56. try {
  57. zkClient.createEphemeral(mePath, JSON.toJSONString(serverData)
  58. .getBytes());
  59. } catch (ZkNoNodeException e) {
  60. zkClient.createPersistent(serversPath, true);
  61. registMe();
  62. }
  63. }
  64. // 更新自己的配置信息
  65. private void updateConfig(ServerConfig serverConfig) {
  66. this.serverConfig = serverConfig;
  67. }
  68. }
  1. /**
  2. * 调度类
  3. */
  4. public class SubscribeZkClient {
  5. private static final int CLIENT_QTY = 5; // Work Server数量
  6.  
  7. private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";
  8. private static final String CONFIG_PATH = "/config";
  9. private static final String COMMAND_PATH = "/command";
  10. private static final String SERVERS_PATH = "/servers";
  11. public static void main(String[] args) throws Exception {
  12. List<ZkClient> clients = new ArrayList<ZkClient>();
  13. List<WorkServer> workServers = new ArrayList<WorkServer>();
  14. ManageServer manageServer = null;
  15. try {
  16. // 创建一个默认的配置
  17. ServerConfig initConfig = new ServerConfig();
  18. initConfig.setDbPwd("123456");
  19. initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb");
  20. initConfig.setDbUser("root");
  21. // 实例化一个Manage Server
  22. ZkClient clientManage = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
  23. manageServer = new ManageServer(SERVERS_PATH, COMMAND_PATH,CONFIG_PATH,clientManage,initConfig);
  24. manageServer.start(); // 启动Manage Server
  25. // 创建指定个数的工作服务器
  26. for ( int i = 0; i < CLIENT_QTY; ++i ) {
  27. ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
  28. clients.add(client);
  29. ServerData serverData = new ServerData();
  30. serverData.setId(i);
  31. serverData.setName("WorkServer#"+i);
  32. serverData.setAddress("192.168.1."+i);
  33. WorkServer workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig);
  34. workServers.add(workServer);
  35. workServer.start(); // 启动工作服务器
  36. }
  37. System.out.println("敲回车键退出!\n");
  38. new BufferedReader(new InputStreamReader(System.in)).readLine();
  39. } finally {
  40. System.out.println("Shutting down...");
  41. for ( WorkServer workServer : workServers ) {
  42. try {
  43. workServer.stop();
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. for ( ZkClient client : clients ) {
  49. try {
  50. client.close();
  51. } catch (Exception e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. }
  56. }
  57. }
  1. /**
  2. * 服务器基本信息
  3. */
  4. public class ServerData {
  5. private String address;
  6. private Integer id;
  7. private String name;
  8. public String getAddress() {
  9. return address;
  10. }
  11. public void setAddress(String address) {
  12. this.address = address;
  13. }
  14. public Integer getId() {
  15. return id;
  16. }
  17. public void setId(Integer id) {
  18. this.id = id;
  19. }
  20. public String getName() {
  21. return name;
  22. }
  23. public void setName(String name) {
  24. this.name = name;
  25. }
  26. @Override
  27. public String toString() {
  28. return "ServerData [address=" + address + ", id=" + id + ", name="
  29. + name + "]";
  30. }
  31. }
  1. /**
  2. * 配置信息
  3. */
  4. public class ServerConfig {
  5. private String dbUrl;
  6. private String dbPwd;
  7. private String dbUser;
  8. public String getDbUrl() {
  9. return dbUrl;
  10. }
  11. public void setDbUrl(String dbUrl) {
  12. this.dbUrl = dbUrl;
  13. }
  14. public String getDbPwd() {
  15. return dbPwd;
  16. }
  17. public void setDbPwd(String dbPwd) {
  18. this.dbPwd = dbPwd;
  19. }
  20. public String getDbUser() {
  21. return dbUser;
  22. }
  23. public void setDbUser(String dbUser) {
  24. this.dbUser = dbUser;
  25. }
  26. @Override
  27. public String toString() {
  28. return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd
  29. + ", dbUser=" + dbUser + "]";
  30. }
  31. }
  1. import com.alibaba.fastjson.JSON;
  2. import org.I0Itec.zkclient.IZkChildListener;
  3. import org.I0Itec.zkclient.IZkDataListener;
  4. import org.I0Itec.zkclient.ZkClient;
  5. import org.I0Itec.zkclient.exception.ZkNoNodeException;
  6. import org.I0Itec.zkclient.exception.ZkNodeExistsException;
  7. import java.util.List;
  8. public class ManageServer {
  9. // zookeeper的servers节点路径
  10. private String serversPath;
  11. // zookeeper的command节点路径
  12. private String commandPath;
  13. // zookeeper的config节点路径
  14. private String configPath;
  15. private ZkClient zkClient;
  16. private ServerConfig config;
  17. // 用于监听servers节点的子节点列表的变化
  18. private IZkChildListener childListener;
  19. // 用于监听command节点数据内容的变化
  20. private IZkDataListener dataListener;
  21. // 工作服务器的列表
  22. private List<String> workServerList;
  23. public ManageServer(String serversPath, String commandPath,
  24. String configPath, ZkClient zkClient, ServerConfig config) {
  25. this.serversPath = serversPath;
  26. this.commandPath = commandPath;
  27. this.zkClient = zkClient;
  28. this.config = config;
  29. this.configPath = configPath;
  30. this.childListener = new IZkChildListener() {
  31. public void handleChildChange(String parentPath,
  32. List<String> currentChilds) throws Exception {
  33. // TODO Auto-generated method stub
  34. workServerList = currentChilds; // 更新内存中工作服务器列表
  35. System.out.println("work server list changed, new list is ");
  36. execList();
  37. }
  38. };
  39. this.dataListener = new IZkDataListener() {
  40. public void handleDataDeleted(String dataPath) throws Exception {
  41. // TODO Auto-generated method stub
  42. // ignore;
  43. }
  44. public void handleDataChange(String dataPath, Object data)
  45. throws Exception {
  46. // TODO Auto-generated method stub
  47. String cmd = new String((byte[]) data);
  48. System.out.println("cmd:"+cmd);
  49. exeCmd(cmd); // 执行命令
  50. }
  51. };
  52. }
  53. private void initRunning() {
  54. zkClient.subscribeDataChanges(commandPath, dataListener);
  55. zkClient.subscribeChildChanges(serversPath, childListener);
  56. }
  57. /*
  58. * 1: list 2: create 3: modify
  59. */
  60. private void exeCmd(String cmdType) {
  61. if ("list".equals(cmdType)) {
  62. execList();
  63. } else if ("create".equals(cmdType)) {
  64. execCreate();
  65. } else if ("modify".equals(cmdType)) {
  66. execModify();
  67. } else {
  68. System.out.println("error command!" + cmdType);
  69. }
  70. }
  71. // 列出工作服务器列表
  72. private void execList() {
  73. System.out.println(workServerList.toString());
  74. }
  75. // 创建config节点
  76. private void execCreate() {
  77. if (!zkClient.exists(configPath)) {
  78. try {
  79. zkClient.createPersistent(configPath, JSON.toJSONString(config)
  80. .getBytes());
  81. } catch (ZkNodeExistsException e) {
  82. zkClient.writeData(configPath, JSON.toJSONString(config)
  83. .getBytes()); // config节点已经存在,则写入内容就可以了
  84. } catch (ZkNoNodeException e) {
  85. String parentDir = configPath.substring(0,
  86. configPath.lastIndexOf('/'));
  87. zkClient.createPersistent(parentDir, true);
  88. execCreate();
  89. }
  90. }
  91. }
  92. // 修改config节点内容
  93. private void execModify() {
  94. // 我们随意修改config的一个属性就可以了
  95. config.setDbUser(config.getDbUser() + "_modify");
  96. try {
  97. zkClient.writeData(configPath, JSON.toJSONString(config).getBytes());
  98. } catch (ZkNoNodeException e) {
  99. execCreate(); // 写入时config节点还未存在,则创建它
  100. }
  101. }
  102. // 启动工作服务器
  103. public void start() {
  104. initRunning();
  105. }
  106. // 停止工作服务器
  107. public void stop() {
  108. zkClient.unsubscribeChildChanges(serversPath, childListener);
  109. zkClient.unsubscribeDataChanges(commandPath, dataListener);
  110. }
  111. }

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号