经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Zookeeper » 查看文章
zookeeper源码(06)ZooKeeperServer及子类
来源:cnblogs  作者:用户不存在!  时间:2024/1/22 16:41:11  对本文有异议

ZooKeeperServer

实现了单机版zookeeper服务端功能,子类实现了更加丰富的分布式集群功能:

  1. ZooKeeperServer
  2. |-- QuorumZooKeeperServer
  3. |-- LeaderZooKeeperServer
  4. |-- LearnerZooKeeperServer
  5. |-- FollowerZooKeeperServer
  6. |-- ObserverZooKeeperServer
  7. |-- ReadOnlyZooKeeperServer

主要字段

  1. // tickTime参数默认值
  2. public static final int DEFAULT_TICK_TIME = 3000;
  3. protected int tickTime = DEFAULT_TICK_TIME;
  4. // 默认tickTime * 2
  5. protected int minSessionTimeout = -1;
  6. // 默认tickTime * 20
  7. protected int maxSessionTimeout = -1;
  8. // 会话跟踪
  9. protected SessionTracker sessionTracker;
  10. // 存储组件
  11. private FileTxnSnapLog txnLogFactory = null;
  12. private ZKDatabase zkDb;
  13. // 缓存数据
  14. private ResponseCache readResponseCache;
  15. private ResponseCache getChildrenResponseCache;
  16. // zxid会在启动阶段设置为最新lastZxid
  17. private final AtomicLong hzxid = new AtomicLong(0);
  18. // 请求处理器链入口
  19. protected RequestProcessor firstProcessor;
  20. // 缓存变化的数据
  21. final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
  22. final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<>();
  23. protected ServerCnxnFactory serverCnxnFactory;
  24. protected ServerCnxnFactory secureServerCnxnFactory;
  25. // 大请求判断使用的参数
  26. private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
  27. private volatile int largeRequestThreshold = -1;

主要方法

方法定义

  1. // 通过zkDb从dataTree中删除Watcher监听器
  2. void removeCnxn(ServerCnxn cnxn);
  3. // 创建zkDb(为null时)并loadData加载数据
  4. public void startdata() throws IOException, InterruptedException;
  5. // 加载数据、清理session、生成快照
  6. public void loadData() throws IOException, InterruptedException;
  7. // 保存zkDb当前快照
  8. public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere,
  9. boolean fastForwardFromEdits) throws IOException;
  10. // 从指定的输入流解析数据,生成新的zkDb和SessionTrack
  11. public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException;
  12. // 使用zkDb.truncateLog(zxid)删除快照数据
  13. public void truncateLog(long zxid) throws IOException;
  14. // 通过zkDb获取dataTree.lastProcessedZxid的值
  15. public long getLastProcessedZxid();
  16. // 提交closeSession类型的Request来关闭会话
  17. private void close(long sessionId);
  18. // 使用zkDb杀掉会话
  19. protected void killSession(long sessionId, long zxid);
  20. // 启动组件
  21. private void startupWithServerState(State state);
  22. // 创建RequestProcessor用来处理请求
  23. protected void setupRequestProcessors();
  24. // 创建SessionTracker
  25. protected void createSessionTracker();
  26. // 为指定的session生成一个密码
  27. byte[] generatePasswd(long id);
  28. // 验证session密码
  29. protected boolean checkPasswd(long sessionId, byte[] passwd);
  30. // 使用sessionTracker创建session、生成密码、提交一个createSession请求
  31. long createSession(ServerCnxn cnxn, byte[] passwd, int timeout);
  32. // 为指定的session绑定owner
  33. public void setOwner(long id, Object owner) throws SessionExpiredException;
  34. // 验证session之后使用finishSessionInit方法确定继续通信或者断开连接
  35. protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException;
  36. public void finishSessionInit(ServerCnxn cnxn, boolean valid);
  37. // checkPasswd->revalidateSession->finishSessionInit
  38. public void reopenSession(ServerCnxn cnxn, long sessionId,
  39. byte[] passwd, int sessionTimeout) throws IOException;
  40. // 把请求提交给requestThrottler之后再陆续调用submitRequestNow处理
  41. public void enqueueRequest(Request si);
  42. // 使用firstProcessor处理请求
  43. public void submitRequestNow(Request si);
  44. // 处理连接请求,网络IO层调用
  45. public void processConnectRequest(
  46. ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException;
  47. // 处理业务请求,网络IO层调用
  48. public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException;
  49. // sasl认证
  50. private void processSasl(
  51. RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException;
  52. // 处理transaction
  53. public ProcessTxnResult processTxn(TxnHeader hdr, Record txn);
  54. public ProcessTxnResult processTxn(Request request);
  55. private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn);
  56. private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest);
  57. // Grant or deny authorization to an operation on a node
  58. public void checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids,
  59. String path, List<ACL> setAcls) throws KeeperException.NoAuthException;
  60. // Check a path whether exceeded the quota
  61. public void checkQuota(String path, byte[] lastData, byte[] data,
  62. int type) throws KeeperException.QuotaExceededException;
  63. private void checkQuota(String lastPrefix, long bytesDiff, long countDiff,
  64. String namespace) throws KeeperException.QuotaExceededException;
  65. // 获取上级父类path
  66. private String parentPath(String path) throws KeeperException.BadArgumentsException;
  67. // 从Request获取有效的path
  68. private String effectiveACLPath(
  69. Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException;
  70. // 根据Request获取需要的权限类型
  71. private int effectiveACLPerms(Request request);
  72. // 检查写权限
  73. public boolean authWriteRequest(Request request);

loadData方法

加载数据、清理session、生成快照:

  1. public void loadData() throws IOException, InterruptedException {
  2. // 初始化zxid
  3. if (zkDb.isInitialized()) {
  4. setZxid(zkDb.getDataTreeLastProcessedZxid());
  5. } else {
  6. setZxid(zkDb.loadDataBase());
  7. }
  8. // 使用killSession方法杀死过期会话
  9. zkDb.getSessions().stream()
  10. .filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null)
  11. .forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid()));
  12. // 保存快照
  13. // txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap)
  14. takeSnapshot();
  15. }

killSession方法

  1. protected void killSession(long sessionId, long zxid) {
  2. // 需要清理临时节点
  3. zkDb.killSession(sessionId, zxid);
  4. if (sessionTracker != null) {
  5. // 删除会话跟踪信息
  6. sessionTracker.removeSession(sessionId);
  7. }
  8. }

startupWithServerState方法

  1. private void startupWithServerState(State state) {
  2. if (sessionTracker == null) {
  3. createSessionTracker();
  4. }
  5. startSessionTracker();
  6. // 创建RequestProcessor用于处理请求
  7. setupRequestProcessors();
  8. // 这是一个限流的组件,不做分析
  9. startRequestThrottler();
  10. registerJMX();
  11. startJvmPauseMonitor();
  12. registerMetrics();
  13. setState(state);
  14. requestPathMetricsCollector.start();
  15. localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
  16. notifyAll();
  17. }

setupRequestProcessors方法(重要)

  1. protected void setupRequestProcessors() {
  2. RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  3. RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
  4. ((SyncRequestProcessor) syncProcessor).start();
  5. firstProcessor = new PrepRequestProcessor(this, syncProcessor);
  6. ((PrepRequestProcessor) firstProcessor).start();
  7. }

RequestProcessor接口:以处理器链方式处理事务,请求总是按顺序处理。standaloneServer、follower和leader有不同的处理器链。请求通过processRequest方法传递给其他RequestProcessor对象,通常情况总是由单个线程调用。当调用shutdown时,RequestProcessor还应关闭与其关联的其他RequestProcessor对象。

FinalRequestProcessor类:处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾。

SyncRequestProcessor类:将请求记录到磁盘,对请求进行批处理,以有效地执行IO操作。在日志同步到磁盘之前,请求不会传递给下一个RequestProcessor对象。SyncRequestProcessor用于3种不同的情况:

  • Leader - 将请求同步到磁盘,并将其转发给AckRequestProcessor,后者将ack发送回leader自己
  • Follower - 将请求同步到磁盘,并将其转发给SendAckRequestProcessor,后者将ack发送给leader
  • Observer - 将请求同步到磁盘,作为INFORM数据包接收。不将ack发送回leader,因此nextProcessor将为null

PrepRequestProcessor类:通常位于RequestProcessor链开头,为更新请求关联的事务做设置。

createSessionTracker方法

  1. protected void createSessionTracker() {
  2. sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime,
  3. createSessionTrackerServerId, getZooKeeperServerListener());
  4. }

不同的子类使用了不同的SessionTracker实现类:

  • LeaderZooKeeperServer - LeaderSessionTracker
  • LearnerZooKeeperServer- LearnerSessionTracker

createSession方法

  1. long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
  2. if (passwd == null) {
  3. passwd = new byte[0];
  4. }
  5. // 创建一个session
  6. long sessionId = sessionTracker.createSession(timeout);
  7. // 生成session密码
  8. Random r = new Random(sessionId ^ superSecret);
  9. r.nextBytes(passwd);
  10. // 提交createSession请求,该请求会被RequestProcessor处理
  11. CreateSessionTxn txn = new CreateSessionTxn(timeout);
  12. cnxn.setSessionId(sessionId);
  13. Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
  14. submitRequest(si);
  15. return sessionId;
  16. }

submitRequestNow方法

  1. public void submitRequestNow(Request si) {
  2. try {
  3. touch(si.cnxn);
  4. boolean validpacket = Request.isValid(si.type);
  5. if (validpacket) {
  6. setLocalSessionFlag(si);
  7. // 使用firstProcessor处理请求
  8. firstProcessor.processRequest(si);
  9. if (si.cnxn != null) {
  10. incInProcess();
  11. }
  12. } else {
  13. // Update request accounting/throttling limits
  14. requestFinished(si);
  15. new UnimplementedRequestProcessor().processRequest(si);
  16. }
  17. } catch (MissingSessionException e) {
  18. // Update request accounting/throttling limits
  19. requestFinished(si);
  20. } catch (RequestProcessorException e) {
  21. // Update request accounting/throttling limits
  22. requestFinished(si);
  23. }
  24. }

processConnectRequest方法

  1. public void processConnectRequest(
  2. ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {
  3. long sessionId = request.getSessionId();
  4. // 略
  5. if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
  6. // zxid参数有误
  7. throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
  8. }
  9. int sessionTimeout = request.getTimeOut();
  10. byte[] passwd = request.getPasswd();
  11. int minSessionTimeout = getMinSessionTimeout();
  12. if (sessionTimeout < minSessionTimeout) {
  13. sessionTimeout = minSessionTimeout;
  14. }
  15. int maxSessionTimeout = getMaxSessionTimeout();
  16. if (sessionTimeout > maxSessionTimeout) {
  17. sessionTimeout = maxSessionTimeout;
  18. }
  19. cnxn.setSessionTimeout(sessionTimeout);
  20. // We don't want to receive any packets until we are sure that the session is setup
  21. cnxn.disableRecv();
  22. if (sessionId == 0) {
  23. // 创建session
  24. long id = createSession(cnxn, passwd, sessionTimeout);
  25. } else {
  26. validateSession(cnxn, sessionId); // do nothing
  27. // 关闭旧的ServerCnxn
  28. if (serverCnxnFactory != null) {
  29. serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
  30. }
  31. if (secureServerCnxnFactory != null) {
  32. secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
  33. }
  34. cnxn.setSessionId(sessionId);
  35. // 开启新session
  36. reopenSession(cnxn, sessionId, passwd, sessionTimeout);
  37. }
  38. }

processPacket方法

  1. public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {
  2. cnxn.incrOutstandingAndCheckThrottle(h);
  3. if (h.getType() == OpCode.auth) {
  4. AuthPacket authPacket = request.readRecord(AuthPacket::new);
  5. String scheme = authPacket.getScheme();
  6. ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
  7. Code authReturn = KeeperException.Code.AUTHFAILED;
  8. // 认证、继续通信或者关闭连接,略
  9. return;
  10. } else if (h.getType() == OpCode.sasl) {
  11. processSasl(request, cnxn, h);
  12. } else {
  13. if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
  14. return;
  15. } else {
  16. Request si = new Request(
  17. cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
  18. int length = request.limit();
  19. if (isLargeRequest(length)) { // 判断large请求
  20. checkRequestSizeWhenMessageReceived(length);
  21. si.setLargeRequestSize(length);
  22. }
  23. si.setOwner(ServerCnxn.me);
  24. // 提交请求等待firstProcessor处理
  25. submitRequest(si);
  26. }
  27. }
  28. }

processTxn相关方法

  1. // entry point for quorum/Learner.java
  2. public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
  3. processTxnForSessionEvents(null, hdr, txn);
  4. return processTxnInDB(hdr, txn, null);
  5. }
  6. // entry point for FinalRequestProcessor.java
  7. public ProcessTxnResult processTxn(Request request) {
  8. TxnHeader hdr = request.getHdr();
  9. processTxnForSessionEvents(request, hdr, request.getTxn());
  10. final boolean writeRequest = (hdr != null);
  11. final boolean quorumRequest = request.isQuorum();
  12. // return fast w/o synchronization when we get a read
  13. if (!writeRequest && !quorumRequest) {
  14. return new ProcessTxnResult();
  15. }
  16. synchronized (outstandingChanges) {
  17. ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
  18. // request.hdr is set for write requests, which are the only ones
  19. // that add to outstandingChanges.
  20. if (writeRequest) {
  21. long zxid = hdr.getZxid();
  22. while (!outstandingChanges.isEmpty() && outstandingChanges.peek().zxid <= zxid) {
  23. ChangeRecord cr = outstandingChanges.remove();
  24. ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
  25. if (outstandingChangesForPath.get(cr.path) == cr) {
  26. outstandingChangesForPath.remove(cr.path);
  27. }
  28. }
  29. }
  30. // do not add non quorum packets to the queue.
  31. if (quorumRequest) {
  32. getZKDatabase().addCommittedProposal(request);
  33. }
  34. return rc;
  35. }
  36. }
  37. private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) {
  38. int opCode = (request == null) ? hdr.getType() : request.type;
  39. long sessionId = (request == null) ? hdr.getClientId() : request.sessionId;
  40. if (opCode == OpCode.createSession) {
  41. if (hdr != null && txn instanceof CreateSessionTxn) {
  42. CreateSessionTxn cst = (CreateSessionTxn) txn;
  43. // Add the session to the local session map or global one in zkDB.
  44. sessionTracker.commitSession(sessionId, cst.getTimeOut());
  45. }
  46. } else if (opCode == OpCode.closeSession) {
  47. sessionTracker.removeSession(sessionId);
  48. }
  49. }
  50. private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) {
  51. if (hdr == null) {
  52. return new ProcessTxnResult();
  53. } else {
  54. return getZKDatabase().processTxn(hdr, txn, digest);
  55. }
  56. }

QuorumZooKeeperServer

集群模式下的ZooKeeperServer基类:

  • 封装了QuorumPeer用来获取节点信息
  • 封装了UpgradeableSessionTracker做会话追踪

LeaderZooKeeperServer

  1. Just like the standard ZooKeeperServer. We just replace the request processors: PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor

实现类概述

集群模式下leader节点使用的ZooKeeperServer实现类:

  • 继承QuorumZooKeeperServer

  • 使用的RequestProcessor与父类不同:

    1. // 构建处理器链
    2. protected void setupRequestProcessors() {
    3. RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    4. RequestProcessor toBeAppliedProcessor =
    5. new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
    6. commitProcessor = new CommitProcessor(
    7. toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
    8. commitProcessor.start();
    9. ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
    10. proposalProcessor.initialize();
    11. prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
    12. prepRequestProcessor.start();
    13. firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
    14. setupContainerManager(); // 启动ContainerManager用于删除ttl节点和container节点
    15. }
  • 使用LeaderSessionTracker做会话追踪

  • 与learner节点通信

处理器链

  • FinalRequestProcessor - 处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾

  • ToBeAppliedRequestProcessor - 维护toBeApplied列表

  • CommitProcessor - 等待commit完成之后调用下游RequestProcessor处理器

  • ProposalRequestProcessor - 发起proposal并将Request转发给内部的SyncRequestProcessor和AckRequestProcessor

    1. public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
    2. this.zks = zks;
    3. this.nextProcessor = nextProcessor;
    4. // 内部有维护SyncRequestProcessor和AckRequestProcessor
    5. AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
    6. syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
    7. forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean(
    8. FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED);
    9. }
  • PrepRequestProcessor - 通常位于RequestProcessor链开头,为更新请求关联的事务做设置

  • LeaderRequestProcessor - 负责执行本地会话升级,只有直接提交给leader的Request才能通过这个处理器

LearnerZooKeeperServer

Learner基类:

  • 使用LearnerSessionTracker做会话追踪
  • 使用CommitProcessor、SyncRequestProcessor做处理器链

FollowerZooKeeperServer

实现类概述

与ZooKeeperServer类似,只是处理器链不同:

FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor

使用SyncRequestProcessor来记录leader的提案。

处理器链

setupRequestProcessors方法:

  1. protected void setupRequestProcessors() {
  2. RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  3. commitProcessor = new CommitProcessor(
  4. finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
  5. commitProcessor.start();
  6. firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
  7. ((FollowerRequestProcessor) firstProcessor).start();
  8. syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
  9. syncProcessor.start();
  10. }
  • FinalRequestProcessor
  • CommitProcessor
  • FollowerRequestProcessor - 将数据更新请求转发给Leader
  • SyncRequestProcessor
  • SendAckRequestProcessor - 给leader发ACK

ObserverZooKeeperServer

Observer类型节点的ZooKeeperServer实现。

setupRequestProcessors方法:

  1. protected void setupRequestProcessors() {
  2. RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  3. commitProcessor = new CommitProcessor(
  4. finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
  5. commitProcessor.start();
  6. firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
  7. ((ObserverRequestProcessor) firstProcessor).start();
  8. // 默认false
  9. if (syncRequestProcessorEnabled) {
  10. syncProcessor = new SyncRequestProcessor(this, null);
  11. syncProcessor.start();
  12. }
  13. }

原文链接:https://www.cnblogs.com/xugf/p/17979795

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号