经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Zookeeper » 查看文章
Hadoop学习(5)-zookeeper的安装和命令行,java操作
来源:cnblogs  作者:两千个秘密  时间:2019/8/7 8:32:50  对本文有异议

zookeeper是干嘛的呢

Zookeeper的作用
1.可以为客户端管理少量的数据kv
key:是以路径的形式表示的,那就意味着,各key之间有父子关系,比如
/ 是顶层key
用户建的key只能在/ 下作为子节点,比如建一个key: /aa 这个key可以带value数据
也可以建一个key: /bb
也可以建key: /aa/xx

 

 

2.可以为客户端监听指定数据节点的状态,并在数据节点发生变化是,通知客户端

 


Zookeeper 安装步骤
把包上传linux后解压到apps/
[root@hdp-01 ~]# tar -zxvf zookeeper-3.4.6.tar.gz -C apps/
/root/apps/zookeeper-3.4.6/conf下该配置文件
[root@hdp-01 conf]# cp zoo_sample.cfg zoo.cfg
然后vim zoo.cfg
更改为
dataDir=/root/zkdata
最后添加
server.1=hdp-01:2888:3888
server.2=hdp-02:2888:3888
server.3=hdp-03:2888:3888
server.4=hdp-04:2888:3888
接着,在hdp-01上,新建数据目录/root/zkdata,并在目录重生成一个文件myid,内容为1
接着,在hdp-02上,新建数据目录/root/zkdata,并在目录重生成一个文件myid,内容为2
接着,在hdp-03上,新建数据目录/root/zkdata,并在目录重生成一个文件myid,内容为3
接着,在hdp-04上,新建数据目录/root/zkdata,并在目录重生成一个文件myid,内容为4
然后将zookeeper scp给其他机器
启动
[root@hdp-01 ~]# /root/apps/zookeeper-3.4.6/bin/zkServer.sh start
查看状态
[root@hdp-01 ~]# /root/apps/zookeeper-3.4.6/bin/zkServer.sh status

可以自己写一个脚本进行启动名字叫zkmanage.sh
用的时候后面跟上参数,传入$1.
sh ./zkmanage.sh start
或者关闭的时候
sh ./zkmanager.sh stop
脚本代码如下

复制代码
  1. #!/bin/bash
  2. for host in hdp-01 hdp-02 hdp-03 hdp-04
  3. do
  4. echo "${host}:starting...."
  5. ssh $host "/root/apps/zookeeper-3.4.6/bin/zkServer.sh $1"
  6. done
  7. sleep 2
  8. for host in hdp-01 hdp-02 hdp-03 hdp-04
  9. do
  10. ssh $host "/root/apps/zookeeper-3.4.6/bin/zkServer.sh status"
  11. done
复制代码

注意一点,如果有的结点没有启动,一定要看一下是不是这几台机器的时间是不是不对应,如果差别太大是启动不起来的。f**k.

简单补充一点就是,启动之后,这几台机器,有的当leader,有的当follower,只有一个leader,他们谁当leader是根据他们 '投票的形式'的决定的。

只有一个leader

 

 

zookeeper的命令行客户端和java客户端

命令行

在bin/zkCli.sh

这样会连到本机localhost

指定连到哪一台zookeeper

bin/zkcli.sh –server hdp-02:2181

 

两个作用,管理数据和监听

首先是管理数据

 

也可以自己建数据

[zk: hdp-03:2181(CONNECTED) 8] create /aa "hellozk"

created /aa

 

[zk: hdp-03:2181(CONNECTED) 9] ls /

[aa, root, hbase, zookeeper]

 

[zk: hdp-03:2181(CONNECTED) 10] get /aa

"hellozk"

cZxid = 0xc00000023

ctime = Mon Aug 05 14:41:52 CST 2019

mZxid = 0xc00000023

mtime = Mon Aug 05 14:41:52 CST 2019

pZxid = 0xc00000023

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 9

numChildren = 0

 

 

 

修改数据

[zk: hdp-03:2181(CONNECTED) 11] set /aa hellospark

cZxid = 0xc00000023

ctime = Mon Aug 05 14:41:52 CST 2019

mZxid = 0xc00000024

mtime = Mon Aug 05 14:42:40 CST 2019

pZxid = 0xc00000023

cversion = 0

dataVersion = 1

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 10

numChildren = 0

这个数据版本,你没修改几次就会变成几

也可以在/aa下建立子目录

如果有些命令忘了,可以输入help查看帮助

 

删除就是rmr

[zk: hdp-03:2181(CONNECTED) 13] rmr /aa

 

监听

[zk: hdp-03:2181(CONNECTED) 17] create /aa iamfine

Created /aa

 

[zk: hdp-03:2181(CONNECTED) 18] get /aa watch

然后这时候如果改变了/aa 就让他通知我

在另一台机器上启动一个zookeeper

 

[zk: hdp-03:2181(CONNECTED) 2] set /aa iamnotfine

此时就会有信息

 

但当你再改一次的话,这个连接就不会再提醒了,这个监听只起一次作用。

 

数据类型分为好几种

zookeeper中的znode有多种类型:

1、PERSISTENT  持久的:创建者就算跟集群断开联系,该类节点也会持久存在与zk集群中

2、EPHEMERAL  短暂的:创建者一旦跟集群断开联系,zk就会将这个节点删除

3、SEQUENTIAL  带序号的:这类节点,zk会自动拼接上一个序号,而且序号是递增的

我们一般创建的都是持久的

create –e /bb xxx

这时候就是短暂的

create /cc yyyy

create –s /cc/c qq

然后他们就会自动的在这些子节点下带上序号

 

java客户端

 

 

 需要的jar包

 

  1. import java.io.UnsupportedEncodingException;
  2. import java.util.List;
  3. import org.apache.zookeeper.CreateMode;
  4. import org.apache.zookeeper.KeeperException;
  5. import org.apache.zookeeper.ZooDefs.Ids;
  6. import org.apache.zookeeper.ZooKeeper;
  7. import org.junit.Before;
  8. import org.junit.Test;
  9. public class ZookeeperClientDemo {
  10. ZooKeeper zk = null;
  11. @Before
  12. public void init() throws Exception{
  13. // 构造一个连接zookeeper的客户端对象
  14. zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null);
  15. }
  16. //
  17. @Test
  18. public void testCreate() throws Exception{
  19. // 参数1:要创建的节点路径 参数2:数据 参数3:访问权限 参数4:节点类型
  20. String create = zk.create("/zkTest", "hello zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  21. System.out.println(create);
  22. zk.close();
  23. }
  24. //
  25. @Test
  26. public void testUpdate() throws Exception {
  27. // 参数1:节点路径 参数2:数据 参数3:所要修改的版本,-1代表任何版本
  28. zk.setData("/zkTest", "我爱你".getBytes("UTF-8"), -1);
  29. zk.close();
  30. }
  31. //
  32. @Test
  33. public void testGet() throws Exception {
  34. // 参数1:节点路径 参数2:是否要监听 参数3:所要获取的数据的版本,null表示最新版本
  35. byte[] data = zk.getData("/zkTest", false, null);
  36. System.out.println(new String(data,"UTF-8"));
  37. zk.close();
  38. }
  39. //查子节点
  40. @Test
  41. public void testListChildren() throws Exception {
  42. // 参数1:节点路径 参数2:是否要监听
  43. // 注意:返回的结果中只有子节点名字,不带全路径
  44. List<String> children = zk.getChildren("/zkTest", false);
  45. for (String child : children) {
  46. System.out.println(child);
  47. }
  48. zk.close();
  49. }
  50. //
  51. @Test
  52. public void testRm() throws InterruptedException, KeeperException{
  53. zk.delete("/zkTest", -1);
  54. zk.close();
  55. }
  56. }

java客户端监听节点是否发生了变化

  1. import java.util.List;
  2. import org.apache.zookeeper.KeeperException;
  3. import org.apache.zookeeper.WatchedEvent;
  4. import org.apache.zookeeper.Watcher;
  5. import org.apache.zookeeper.Watcher.Event.EventType;
  6. import org.apache.zookeeper.Watcher.Event.KeeperState;
  7. import org.apache.zookeeper.ZooKeeper;
  8. import org.junit.Before;
  9. import org.junit.Test;
  10. public class ZookeeperWatchDemo {
  11. ZooKeeper zk = null;
  12. @Before
  13. public void init() throws Exception {
  14. // 构造一个连接zookeeper的客户端对象
  15. zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, new Watcher() {
  16. @Override
  17. public void process(WatchedEvent event) {
  18. //如果在连接,并且为该节点的数据变化了
  19. if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeDataChanged) {
  20. System.out.println(event.getPath()); // 收到的事件所发生的节点路径
  21. System.out.println(event.getType()); // 收到的事件的类型
  22. System.out.println("数据变化了啊....."); // 收到事件后,我们的处理逻辑
  23. try {
  24. zk.getData("/mygirls", true, null);
  25. } catch (KeeperException | InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. //如果在连接,并且是字节点变化了
  29. }else if(event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged){
  30. System.out.println("子节点变化了......");
  31. }
  32. }
  33. });
  34. }
  35. @Test
  36. public void testGetWatch() throws Exception {
  37. //此时监听的逻辑就是new ZooKeeper时的watcher,这里也可以自己写一个watcher,
  38. //但如果自己写的话,就会只运行一次了,不能重复监听
  39. byte[] data = zk.getData("/mygirls", true, null); // 监听节点数据变化
  40. List<String> children = zk.getChildren("/mygirls", true); //监听节点的子节点变化事件
  41. System.out.println(new String(data, "UTF-8"));
  42. //这时候启动的监听线程为一个守护线程,当主线程结束后,就会退出,所以这里让主线程睡眠时间,当主线程结束,他也就没了
  43. //这个守护线程使我们在创建的zookeeper的时候就创建的,
  44. Thread.sleep(Long.MAX_VALUE);
  45. }
  46. }

 

 监听服务器上下线

首先是一个服务器的业务逻辑

 

  1. import java.io.IOException;
  2. import java.io.InputStream;
  3. import java.io.OutputStream;
  4. import java.net.ServerSocket;
  5. import java.net.Socket;
  6. import java.util.Date;
  7. public class TimeQueryService extends Thread{
  8. int port = 0;
  9. public TimeQueryService(int port){
  10. this.port = port;
  11. }
  12. @Override
  13. public void run() {
  14. try {
  15. //javaSocket编程,创建一个指定的端口号接受数据
  16. ServerSocket ss = new ServerSocket(port);
  17. System.out.println("业务线程已绑定端口"+port+"准备接受消费端请求了.....");
  18. while(true){
  19. Socket sc = ss.accept();
  20. InputStream inputStream = sc.getInputStream();
  21. OutputStream outputStream = sc.getOutputStream();
  22. outputStream.write(new Date().toString().getBytes());
  23. }
  24. } catch (IOException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. }

 

然后服务器上线时,先向zookeeper注册,等待消费者来访问

  1. package cn.edu360.zk.distributesystem;
  2. import org.apache.zookeeper.CreateMode;
  3. import org.apache.zookeeper.ZooDefs.Ids;
  4. import org.apache.zookeeper.ZooKeeper;
  5. import org.apache.zookeeper.data.Stat;
  6. public class TimeQueryServer {
  7. ZooKeeper zk = null;
  8. // 构造zk客户端连接
  9. public void connectZK() throws Exception{
  10. zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181,hdp-04:2181", 2000, null);
  11. }
  12. // 注册服务器信息
  13. public void registerServerInfo(String hostname,String port) throws Exception{
  14. /**
  15. * 先判断注册节点的父节点是否存在,如果不存在,则创建
  16. */
  17. Stat stat = zk.exists("/servers", false);
  18. if(stat==null){
  19. zk.create("/servers", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  20. }
  21. // 注册服务器数据到zk的约定注册节点下
  22. String create = zk.create("/servers/server", (hostname+":"+port).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  23. System.out.println(hostname+" 服务器向zk注册信息成功,注册的节点为:" + create);
  24. }
  25. public static void main(String[] args) throws Exception {
  26. TimeQueryServer timeQueryServer = new TimeQueryServer();
  27. // 构造zk客户端连接
  28. timeQueryServer.connectZK();
  29. // 注册服务器信息
  30. timeQueryServer.registerServerInfo(args[0], args[1]);
  31. // 启动业务线程开始处理业务
  32. new TimeQueryService(Integer.parseInt(args[1])).start();
  33. }
  34. }

然后是消费者端的业务逻辑

先看一下zookeeper有哪些alive的服务器,然后随便挑一台访问

  1. package cn.edu360.zk.distributesystem;
  2. import java.io.InputStream;
  3. import java.io.OutputStream;
  4. import java.net.Socket;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. import java.util.Random;
  8. import org.apache.zookeeper.WatchedEvent;
  9. import org.apache.zookeeper.Watcher;
  10. import org.apache.zookeeper.ZooKeeper;
  11. import org.apache.zookeeper.Watcher.Event.EventType;
  12. import org.apache.zookeeper.Watcher.Event.KeeperState;
  13. public class Consumer {
  14. // 定义一个list用于存放最新的在线服务器列表
  15. private volatile ArrayList<String> onlineServers = new ArrayList<>();
  16. // 构造zk连接对象
  17. ZooKeeper zk = null;
  18. // 构造zk客户端连接
  19. public void connectZK() throws Exception {
  20. zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181,hdp-04:2181", 2000, new Watcher() {
  21. @Override
  22. public void process(WatchedEvent event) {
  23. if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged) {
  24. try {
  25. // 事件回调逻辑中,再次查询zk上的在线服务器节点即可,查询逻辑中又再次注册了子节点变化事件监听
  26. getOnlineServers();
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
  32. });
  33. }
  34. // 查询在线服务器列表
  35. public void getOnlineServers() throws Exception {
  36. List<String> children = zk.getChildren("/servers", true);
  37. ArrayList<String> servers = new ArrayList<>();
  38. for (String child : children) {
  39. byte[] data = zk.getData("/servers/" + child, false, null);
  40. String serverInfo = new String(data);
  41. servers.add(serverInfo);
  42. }
  43. onlineServers = servers;
  44. System.out.println("查询了一次zk,当前在线的服务器有:" + servers);
  45. }
  46. public void sendRequest() throws Exception {
  47. Random random = new Random();
  48. while (true) {
  49. try {
  50. // 挑选一台当前在线的服务器
  51. int nextInt = random.nextInt(onlineServers.size());
  52. String server = onlineServers.get(nextInt);
  53. String hostname = server.split(":")[0];
  54. int port = Integer.parseInt(server.split(":")[1]);
  55. System.out.println("本次请求挑选的服务器为:" + server);
  56. Socket socket = new Socket(hostname, port);
  57. OutputStream out = socket.getOutputStream();
  58. InputStream in = socket.getInputStream();
  59. out.write("haha".getBytes());
  60. out.flush();
  61. byte[] buf = new byte[256];
  62. int read = in.read(buf);
  63. System.out.println("服务器响应的时间为:" + new String(buf, 0, read));
  64. out.close();
  65. in.close();
  66. socket.close();
  67. Thread.sleep(2000);
  68. } catch (Exception e) {
  69. e.printStackTrace();
  70. }
  71. }
  72. }
  73. public static void main(String[] args) throws Exception {
  74. Consumer consumer = new Consumer();
  75. // 构造zk连接对象
  76. consumer.connectZK();
  77. // 查询在线服务器列表
  78. consumer.getOnlineServers();
  79. // 处理业务(向一台服务器发送时间查询请求)
  80. consumer.sendRequest();
  81. }
  82. }

 

原文链接:http://www.cnblogs.com/wpbing/p/11309761.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号