服务发现:指对集群中的服务上下线做统一管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让某些监控服务器作为订阅方,订阅工作服务器的基本信息。当工作服务器的基本信息改变时,如服务上下线、服务器的角色或服务范围变更,那么监控服务器可以得到通知并响应这些变化。
实现代码如下:
- import com.alibaba.fastjson.JSON;
- import org.I0Itec.zkclient.IZkDataListener;
- import org.I0Itec.zkclient.ZkClient;
- import org.I0Itec.zkclient.exception.ZkNoNodeException;
- /**
- * 代表工作服务器
- */
- public class WorkServer {
- private ZkClient zkClient;
- // ZooKeeper
- private String configPath;
- // ZooKeeper集群中servers节点的路径
- private String serversPath;
- // 当前工作服务器的基本信息
- private ServerData serverData;
- // 当前工作服务器的配置信息
- private ServerConfig serverConfig;
- private IZkDataListener dataListener;
- public WorkServer(String configPath, String serversPath,
- ServerData serverData, ZkClient zkClient, ServerConfig initConfig) {
- this.zkClient = zkClient;
- this.serversPath = serversPath;
- this.configPath = configPath;
- this.serverConfig = initConfig;
- this.serverData = serverData;
- this.dataListener = new IZkDataListener() {
- public void handleDataDeleted(String dataPath) throws Exception {
- }
- public void handleDataChange(String dataPath, Object data)
- throws Exception {
- String retJson = new String((byte[])data);
- ServerConfig serverConfigLocal = (ServerConfig) JSON.parseObject(retJson,ServerConfig.class);
- updateConfig(serverConfigLocal);
- System.out.println("new Work server config is:"+serverConfig.toString());
- }
- };
- }
- // 启动服务器
- public void start() {
- System.out.println("work server start...");
- initRunning();
- }
- // 停止服务器
- public void stop() {
- System.out.println("work server stop...");
- zkClient.unsubscribeDataChanges(configPath, dataListener); // 取消监听config节点
- }
- // 服务器初始化
- private void initRunning() {
- registMe(); // 注册自己
- zkClient.subscribeDataChanges(configPath, dataListener); // 订阅config节点的改变事件
- }
- // 启动时向zookeeper注册自己的注册函数
- private void registMe() {
- String mePath = serversPath.concat("/").concat(serverData.getAddress());
- try {
- zkClient.createEphemeral(mePath, JSON.toJSONString(serverData)
- .getBytes());
- } catch (ZkNoNodeException e) {
- zkClient.createPersistent(serversPath, true);
- registMe();
- }
- }
- // 更新自己的配置信息
- private void updateConfig(ServerConfig serverConfig) {
- this.serverConfig = serverConfig;
- }
- }
- /**
- * 调度类
- */
- public class SubscribeZkClient {
- private static final int CLIENT_QTY = 5; // Work Server数量
-
- private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";
- private static final String CONFIG_PATH = "/config";
- private static final String COMMAND_PATH = "/command";
- private static final String SERVERS_PATH = "/servers";
- public static void main(String[] args) throws Exception {
- List<ZkClient> clients = new ArrayList<ZkClient>();
- List<WorkServer> workServers = new ArrayList<WorkServer>();
- ManageServer manageServer = null;
- try {
- // 创建一个默认的配置
- ServerConfig initConfig = new ServerConfig();
- initConfig.setDbPwd("123456");
- initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb");
- initConfig.setDbUser("root");
- // 实例化一个Manage Server
- ZkClient clientManage = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
- manageServer = new ManageServer(SERVERS_PATH, COMMAND_PATH,CONFIG_PATH,clientManage,initConfig);
- manageServer.start(); // 启动Manage Server
- // 创建指定个数的工作服务器
- for ( int i = 0; i < CLIENT_QTY; ++i ) {
- ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
- clients.add(client);
- ServerData serverData = new ServerData();
- serverData.setId(i);
- serverData.setName("WorkServer#"+i);
- serverData.setAddress("192.168.1."+i);
- WorkServer workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig);
- workServers.add(workServer);
- workServer.start(); // 启动工作服务器
- }
- System.out.println("敲回车键退出!\n");
- new BufferedReader(new InputStreamReader(System.in)).readLine();
- } finally {
- System.out.println("Shutting down...");
- for ( WorkServer workServer : workServers ) {
- try {
- workServer.stop();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- for ( ZkClient client : clients ) {
- try {
- client.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- /**
- * 服务器基本信息
- */
- public class ServerData {
- private String address;
- private Integer id;
- private String name;
- public String getAddress() {
- return address;
- }
- public void setAddress(String address) {
- this.address = address;
- }
- public Integer getId() {
- return id;
- }
- public void setId(Integer id) {
- this.id = id;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- @Override
- public String toString() {
- return "ServerData [address=" + address + ", id=" + id + ", name="
- + name + "]";
- }
- }
- /**
- * 配置信息
- */
- public class ServerConfig {
- private String dbUrl;
- private String dbPwd;
- private String dbUser;
- public String getDbUrl() {
- return dbUrl;
- }
- public void setDbUrl(String dbUrl) {
- this.dbUrl = dbUrl;
- }
- public String getDbPwd() {
- return dbPwd;
- }
- public void setDbPwd(String dbPwd) {
- this.dbPwd = dbPwd;
- }
- public String getDbUser() {
- return dbUser;
- }
- public void setDbUser(String dbUser) {
- this.dbUser = dbUser;
- }
- @Override
- public String toString() {
- return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd
- + ", dbUser=" + dbUser + "]";
- }
- }
- import com.alibaba.fastjson.JSON;
- import org.I0Itec.zkclient.IZkChildListener;
- import org.I0Itec.zkclient.IZkDataListener;
- import org.I0Itec.zkclient.ZkClient;
- import org.I0Itec.zkclient.exception.ZkNoNodeException;
- import org.I0Itec.zkclient.exception.ZkNodeExistsException;
- import java.util.List;
- public class ManageServer {
- // zookeeper的servers节点路径
- private String serversPath;
- // zookeeper的command节点路径
- private String commandPath;
- // zookeeper的config节点路径
- private String configPath;
- private ZkClient zkClient;
- private ServerConfig config;
- // 用于监听servers节点的子节点列表的变化
- private IZkChildListener childListener;
- // 用于监听command节点数据内容的变化
- private IZkDataListener dataListener;
- // 工作服务器的列表
- private List<String> workServerList;
- public ManageServer(String serversPath, String commandPath,
- String configPath, ZkClient zkClient, ServerConfig config) {
- this.serversPath = serversPath;
- this.commandPath = commandPath;
- this.zkClient = zkClient;
- this.config = config;
- this.configPath = configPath;
- this.childListener = new IZkChildListener() {
- public void handleChildChange(String parentPath,
- List<String> currentChilds) throws Exception {
- // TODO Auto-generated method stub
- workServerList = currentChilds; // 更新内存中工作服务器列表
- System.out.println("work server list changed, new list is ");
- execList();
- }
- };
- this.dataListener = new IZkDataListener() {
- public void handleDataDeleted(String dataPath) throws Exception {
- // TODO Auto-generated method stub
- // ignore;
- }
- public void handleDataChange(String dataPath, Object data)
- throws Exception {
- // TODO Auto-generated method stub
- String cmd = new String((byte[]) data);
- System.out.println("cmd:"+cmd);
- exeCmd(cmd); // 执行命令
- }
- };
- }
- private void initRunning() {
- zkClient.subscribeDataChanges(commandPath, dataListener);
- zkClient.subscribeChildChanges(serversPath, childListener);
- }
- /*
- * 1: list 2: create 3: modify
- */
- private void exeCmd(String cmdType) {
- if ("list".equals(cmdType)) {
- execList();
- } else if ("create".equals(cmdType)) {
- execCreate();
- } else if ("modify".equals(cmdType)) {
- execModify();
- } else {
- System.out.println("error command!" + cmdType);
- }
- }
- // 列出工作服务器列表
- private void execList() {
- System.out.println(workServerList.toString());
- }
- // 创建config节点
- private void execCreate() {
- if (!zkClient.exists(configPath)) {
- try {
- zkClient.createPersistent(configPath, JSON.toJSONString(config)
- .getBytes());
- } catch (ZkNodeExistsException e) {
- zkClient.writeData(configPath, JSON.toJSONString(config)
- .getBytes()); // config节点已经存在,则写入内容就可以了
- } catch (ZkNoNodeException e) {
- String parentDir = configPath.substring(0,
- configPath.lastIndexOf('/'));
- zkClient.createPersistent(parentDir, true);
- execCreate();
- }
- }
- }
- // 修改config节点内容
- private void execModify() {
- // 我们随意修改config的一个属性就可以了
- config.setDbUser(config.getDbUser() + "_modify");
- try {
- zkClient.writeData(configPath, JSON.toJSONString(config).getBytes());
- } catch (ZkNoNodeException e) {
- execCreate(); // 写入时config节点还未存在,则创建它
- }
- }
- // 启动工作服务器
- public void start() {
- initRunning();
- }
- // 停止工作服务器
- public void stop() {
- zkClient.unsubscribeChildChanges(serversPath, childListener);
- zkClient.unsubscribeDataChanges(commandPath, dataListener);
- }
- }