经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Zookeeper » 查看文章
Java zookeeper服务的使用详解
来源:jb51  时间:2022/8/1 17:09:22  对本文有异议

Java语言客户端使用zookeeper

下载zookeeper连接工具,方便我们查看zookeeper存的数据。下载地址:

https://pan.baidu.com/s/1UG5_VcYUZUYUkg04QROLYg?pwd=3ych 提取码: 3ych

下载后解压就可以使用了:

使用页面:

Java语言连接z00keeper

首先引入maven 依赖jar包

  1. <dependency>
  2. <groupId>com.101tec</groupId>
  3. <artifactId>zkclient</artifactId>
  4. <version>0.9</version>
  5. </dependency>

编写Java代码

  1. public class Test001 {
  2. private static final String ADDRES = "127.0.0.1:2181";
  3. private static final int TIMAOUT = 5000;
  4. //计数器
  5. private static CountDownLatch countDownLatch = new CountDownLatch(1);
  6. public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
  7. //zk核心节点+事件通知
  8. //节点路径和界定啊value
  9. /**
  10. * 参数一:连接地址
  11. * 参数二:zk超时时间
  12. * 参数三:事件通知
  13. */
  14. //1、创建zk链接
  15. ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() {
  16. @Override
  17. public void process(WatchedEvent watchedEvent) {
  18. Event.KeeperState state = watchedEvent.getState();
  19. if(state == Event.KeeperState.SyncConnected){
  20. System.out.println("zk链接成功");
  21. countDownLatch.countDown(); //计数器减 1
  22. }
  23. }
  24. });
  25. //计数器结果必须是为0 才能继续执行
  26. System.out.println("zk正在等待连接");
  27. countDownLatch.await();
  28. System.out.println("开始创建我们的连接");
  29. //2、创建我们的节点
  30. /**
  31. * 参数一:路径名称
  32. * 参数二:节点value
  33. * 参数三:节点权限acl
  34. * 蚕食四:节点类型 临时和永久
  35. */
  36. String s = zooKeeper.create("/kaico/one", "hello,boy".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  37. System.out.println(s);
  38. zooKeeper.close();
  39. }
  40. }

可以利用连接工具查看操作结果。

zooKeeper类有很多api操作节点,可以创建、删除。

zookeeper Javaapi文档: 点击查看

四种节点类型

第一种:临时节点:会话关闭之后,就自动消失 CreateMode.PERSISTENT_SEQUENTIAL

第二种:临时有序节点 CreateMode.EPHEMERAL

第三种:持久节点:会话关闭之后,持久化到硬盘 CreateMode.PERSISTENT

第四种:持久有序节点 CreateMode.PERSISTENT_SEQUENTIAL

ACL权限

ACL权限模型,实际上就是对树每个节点实现控制.

身份的认证有4种方式:

world:默认方式,相当于全世界都能访问.

auth:代表已经认证通过的用户(cli中可以通过addauth digest user:pwd来添加当前上下文中的授权用户).

digest:即用户名:密码这种方式认证,这也是业务系统中最常用的.

ip:使用lp地址认证。

代码案例:使用账号密码实现权限控制

1、添加有权限控制的节点数据

  1. public class Test002 {
  2. private static final String ADDRES = "127.0.0.1:2181";
  3. private static final int TIMAOUT = 5000;
  4. //计数器
  5. private static CountDownLatch countDownLatch = new CountDownLatch(1);
  6. public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
  7. /**
  8. * 参数一:连接地址
  9. * 参数二:zk超时时间
  10. * 参数三:事件通知
  11. */
  12. //1、创建zk链接
  13. ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() {
  14. @Override
  15. public void process(WatchedEvent watchedEvent) {
  16. Event.KeeperState state = watchedEvent.getState();
  17. if(state == Event.KeeperState.SyncConnected){
  18. System.out.println("zk链接成功");
  19. countDownLatch.countDown(); //计数器减 1
  20. }
  21. }
  22. });
  23. //计数器结果必须是为0 才能继续执行
  24. System.out.println("zk正在等待连接");
  25. countDownLatch.await();
  26. System.out.println("开始创建我们的连接");
  27. //创建账号 admin 可以实现读写操作
  28. Id admin = new Id("digest", DigestAuthenticationProvider.generateDigest("admin:admin123"));
  29. ACL acl1 = new ACL(ZooDefs.Perms.ALL, admin);
  30. //创建账号 guest 只允许做读操作
  31. Id guest = new Id("digest", DigestAuthenticationProvider.generateDigest("guest:guest123"));
  32. ACL acl2 = new ACL(ZooDefs.Perms.READ, guest);
  33. ArrayList<ACL> acls = new ArrayList<>();
  34. acls.add(acl1);
  35. acls.add(acl2);
  36. //2、创建我们的节点
  37. /**
  38. * 参数一:路径名称
  39. * 参数二:节点value
  40. * 参数三:节点权限acl
  41. * 蚕食四:节点类型 临时和永久
  42. */
  43. String s = zooKeeper.create("/kaico/acl", "hello,boy".getBytes(), acls, CreateMode.PERSISTENT);
  44. System.out.println(s);
  45. zooKeeper.close();
  46. }
  47. }

2、获取设置了权限的节点数据

  1. public class Test003 {
  2. private static final String ADDRES = "127.0.0.1:2181";
  3. private static final int TIMAOUT = 5000;
  4. //计数器
  5. private static CountDownLatch countDownLatch = new CountDownLatch(1);
  6. public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
  7. /**
  8. * 参数一:连接地址
  9. * 参数二:zk超时时间
  10. * 参数三:事件通知
  11. */
  12. //1、创建zk链接
  13. ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() {
  14. @Override
  15. public void process(WatchedEvent watchedEvent) {
  16. Event.KeeperState state = watchedEvent.getState();
  17. if(state == Event.KeeperState.SyncConnected){
  18. System.out.println("zk链接成功");
  19. countDownLatch.countDown(); //计数器减 1
  20. }
  21. }
  22. });
  23. //计数器结果必须是为0 才能继续执行
  24. System.out.println("zk正在等待连接");
  25. countDownLatch.await();
  26. System.out.println("开始创建我们的连接");
  27. //设置一下zookeeper 的账号才有权限获取内容
  28. zooKeeper.addAuthInfo("digest", "guest:guest123".getBytes());
  29. //获取节点的内容
  30. byte[] data = zooKeeper.getData("/kaico/acl", null, new Stat());
  31. System.out.println(new String(data));
  32. zooKeeper.close();
  33. }
  34. }

实现事件监听通知

Zookeeper实现基本的总结:类似于文件存储系统,可以帮助我们解决分布式领域中遇到问题

Zookeeper分布式协调工具

特征:

  • 定义的节点包含key (路径)和 value ,路径不允许有重复保证唯一性.
  • Zookeeper分为四种类型持久、持久序号、临时、临时序号.
  • 持久与临时节点区别:连接如果一旦关闭,当前的节点自动删除;
  • 事件通知监听节点发生的变化删除、修改、子节点

Java代码案例:

  1. public class Test004 {
  2. private static final String ADDRES = "127.0.0.1:2181";
  3. private static final int TIMAOUT = 5000;
  4. //计数器
  5. private static CountDownLatch countDownLatch = new CountDownLatch(1);
  6. public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
  7. //1、创建zk 连接
  8. ZkClient zkClient = new ZkClient(ADDRES, TIMAOUT);
  9. String parentPath = "/kaico/jing";
  10. //2、监听节点发生的变化,监听子节点是否发生变化,如果发生变化都可以获取到回调通知。
  11. // zkClient.subscribeChildChanges(parentPath, new IZkChildListener() {
  12. // @Override
  13. // public void handleChildChange(String s, List<String> list) throws Exception {
  14. // System.out.println("s:" + s + ",节点发生了变化");
  15. // list.forEach((t)->{
  16. // System.out.println("子节点:" + t);
  17. // });
  18. // }
  19. // });
  20. //监听节点的内容是否发生变化或删除
  21. zkClient.subscribeDataChanges(parentPath, new IZkDataListener() {
  22. @Override
  23. public void handleDataChange(String s, Object o) throws Exception {
  24. System.out.println("修改的节点为:" + s + ",修改之后的值:" + o);
  25. }
  26. @Override
  27. public void handleDataDeleted(String s) throws Exception {
  28. System.out.println("节点:" + s + "被删除");
  29. }
  30. });
  31. //修改值内容
  32. zkClient.writeData(parentPath, "666666666666666");
  33. while (true){
  34. }
  35. // zkClient.close();
  36. }
  37. }

微服务使用zookeeper作为注册中心

调用接口逻辑图

使用zookeeper实现逻辑:

根据服务提供方的名称创建对应的节点,服务提供方的接口所有的ip+端口作为子节点的value的值,这样服务调用方根据服务提供方的名称在zookeeper上找到对应的ip+端口从而可以调用对应的接口,再监听该节点,如果提供接口的机器发生宕机于zookeeper断开连接,子节点也相应的减少了,服务调用方也会收到通知。

分布式锁

分布式锁的概念:解决再多个jvm中最终只能有一个jvm 执行。

zookeeper实现分布式锁的思路:

节点保证唯一、事件通知、临时节点(生命周期和Session会关联)﹒

创建分布式锁原理:

1.多个jvm同时在Zookeeper 上创建相同的临时节点(lockPath).

2. 因为临时节点路径保证唯一的性,只要谁能够创建成功谁就能够获取锁,就可以开始执

行业务逻辑;,

3.如果节点已经给其他请求创建的话或者是创建节点失败,当前的请求实现等待;

释放锁的原理

因为我们采用临时节点,当前节点创建成功,表示获取锁成功;正常执行完业务逻辑调用Session关闭连接方法,当前的节点会删除;----释放锁

其他正在等待请求,采用事件监听如果当前节点被删除的话,又重新进入到获取锁流程;

临时节点+事件通知。

代码实现分布式锁

实现分布式锁的方式有多种:数据库、redis、zookeeper,这里使用zookeeper实现。

实现原理:

因为Zookeeper节点路径保持唯一,不允许重复 且有临时节点特性连接关闭后当前节点会自动消失,从而实现分布式锁。

  • 多请求同时创建相同的节点(lockPath),只要谁能够创建成功 谁就能够获取到锁;
  • 如果创建节点的时候,突然该节点已经被其他请求创建的话则直接等待;
  • 只要能够创建节点成功,则开始进入到正常业务逻辑操作,其他没有获取锁进行等待;
  • 正常业务逻辑流程执行完后,调用zk关闭连接方式释放锁,从而是其他的请求开始进入到获取锁的资源。

使用zookeeper 实现分们式锁的代码案例

利用模板设计模式实现分布式锁

1、定义锁接口 Lock

  1. public interface Lock {
  2. /**
  3. * 获取锁
  4. */
  5. public void getLock();
  6. /**
  7. * 释放锁
  8. */
  9. public void unLock();
  10. }

2、定义抽象类实现锁接口 Lock ,设计其他的方法完成对锁的操作

  1. abstract class AbstractTemplzateLock implements Lock {
  2. @Override
  3. public void getLock() {
  4. // 模版方法 定义共同抽象的骨架
  5. if (tryLock()) {
  6. System.out.println(">>>" + Thread.currentThread().getName() + ",获取锁成功");
  7. } else {
  8. // 开始实现等待
  9. waitLock();// 事件监听
  10. // 重新获取
  11. getLock();
  12. }
  13. }
  14. /**
  15. * 获取锁
  16. * @return
  17. */
  18. protected abstract boolean tryLock();
  19. /**
  20. * 等待锁
  21. * @return
  22. */
  23. protected abstract void waitLock();
  24. /**
  25. * 释放锁
  26. * @return
  27. */
  28. protected abstract void unImplLock();
  29. @Override
  30. public void unLock() {
  31. unImplLock();
  32. }
  33. }

3、利用zookeeper实现锁,继承抽象类 AbstractTemplzateLock

  1. public class ZkTemplzateImplLock extends AbstractTemplzateLock {
  2. //参数1 连接地址
  3. private static final String ADDRES = "192.168.212.147:2181";
  4. // 参数2 zk超时时间
  5. private static final int TIMEOUT = 5000;
  6. // 创建我们的zk连接
  7. private ZkClient zkClient = new ZkClient(ADDRES, TIMEOUT);
  8. /**
  9. * 共同的创建临时节点
  10. */
  11. private String lockPath = "/lockPath";
  12. private CountDownLatch countDownLatch = null;
  13. @Override
  14. protected boolean tryLock() {
  15. // 获取锁的思想:多个jvm同时创建临时节点,只要谁能够创建成功 谁能够获取到锁
  16. try {
  17. zkClient.createEphemeral(lockPath);
  18. return true;
  19. } catch (Exception e) {
  20. // // 如果创建已经存在的话
  21. // e.printStackTrace();
  22. return false;
  23. }
  24. }
  25. @Override
  26. protected void waitLock() {
  27. // 1.使用事件监听 监听lockPath节点是否已经被删除,如果被删除的情况下 有可以重新的进入到获取锁的权限
  28. IZkDataListener iZkDataListener = new IZkDataListener() {
  29. @Override
  30. public void handleDataChange(String s, Object o) throws Exception {
  31. }
  32. @Override
  33. public void handleDataDeleted(String s) throws Exception {
  34. if (countDownLatch != null) {
  35. countDownLatch.countDown();// 计数器变为0
  36. }
  37. }
  38. };
  39. zkClient.subscribeDataChanges(lockPath, iZkDataListener);
  40. // 2.使用countDownLatch等待
  41. if (countDownLatch == null) {
  42. countDownLatch = new CountDownLatch(1);
  43. }
  44. try {
  45. countDownLatch.await();// 如果当前计数器不是为0 就一直等待
  46. } catch (Exception e) {
  47. }
  48. // 3. 如果当前节点被删除的情况下,有需要重新进入到获取锁
  49. zkClient.unsubscribeDataChanges(lockPath, iZkDataListener);
  50. }
  51. @Override
  52. protected void unImplLock() {
  53. if (zkClient != null) {
  54. zkClient.close();
  55. System.out.println(Thread.currentThread().getName() + ",释放了锁>>>");
  56. }
  57. }
  58. }

4、编写使用锁的方法

  1. //main方法使用多线程测试分布式锁
  2. public static void main(String[] args) {
  3. // OrderService orderService = new OrderService();
  4. for (int i = 0; i < 100; i++) {
  5. new Thread(new OrderService()).start();
  6. }
  7. // 单个jvm中多线程同时生成订单号码如果发生重复 如何解决 synchronized或者是lock锁
  8. // 如果在多个jvm中同时生成订单号码如果发生重复如何解决
  9. // 注意synchronized或者是lock锁 只能够在本地的jvm中有效
  10. // 分布式锁的概念
  11. }
  12. //多线程run 方法
  13. public class OrderService implements Runnable {
  14. private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
  15. private Lock lock = new ZkTemplzateImplLock();
  16. @Override
  17. public void run() {
  18. getNumber();
  19. }
  20. private void getNumber() {
  21. try {
  22. lock.getLock();
  23. Thread.sleep(50);
  24. String number = orderNumGenerator.getNumber();
  25. System.out.println(Thread.currentThread().getName() + ",获取的number:" + number);
  26. // 如果zk超时了,有做数据库写的操作统一直接回滚
  27. } catch (Exception e) {
  28.  
  29. } finally {
  30. lock.unLock();
  31. }
  32. }
  33. // ZkTemplzateImplLock父亲 模版类 AbstractTemplzateLock 父亲 Lock
  34. }
  35. //自动生成订单号的类
  36. public class OrderNumGenerator {
  37. /**
  38. * 序号
  39. */
  40. private static int count;
  41. /**
  42. * 生成我们的时间戳 为订单号码
  43. * @return
  44. */
  45. public String getNumber() {
  46. SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
  47. try {
  48. Thread.sleep(30);
  49. } catch (Exception e) {
  50. }
  51. return simpt.format(new Date()) + "-" + ++count;
  52. }
  53. }

如何防止死锁?

创建zkClient时设置session 连接时间 sessionTimeout。 也就是设置Session连接超时时间,在规定的时间内获取锁后超时啦~自动回滚当前数据库业务逻辑。

注意:等待锁时,zkClient注册的事件最后需要删除。

到此这篇关于Java zookeeper服务的使用详解的文章就介绍到这了,更多相关Java zookeeper服务内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号