经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Zookeeper » 查看文章
zookeeper源码(09)follower处理客户端请求
来源:cnblogs  作者:用户不存在!  时间:2024/2/26 15:08:50  对本文有异议

在zookeeper中,follower也可以接收客户端连接,处理客户端请求,本文将分析follower处理客户端请求的流程:

  • 读请求处理
  • 写请求转发与响应

follower接收转发客户端请求

网络层接收客户端数据包

leader、follower都会启动ServerCnxnFactory组件,用来接收客户端连接、读取客户端数据包、将客户端数据包转发给zk应用层。

在"zookeeper源码(08)请求处理及数据读写流程"一文中已经介绍,ServerCnxn在读取到客户端数据包之后,会调用zookeeperServer的processConnectRequest或processPacket方法:

  • processConnectRequest方法:创建session
  • processPacket方法:处理业务请求

processConnectRequest创建session

  • 会使用sessionTracker生成sessionId、创建session对象
  • 生成一个密码
  • 提交一个createSession类型Request并提交给业务处理器
  1. long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
  2. // 生成sessionId、创建session对象
  3. long sessionId = sessionTracker.createSession(timeout);
  4. // 生成密码
  5. Random r = new Random(sessionId ^ superSecret);
  6. r.nextBytes(passwd);
  7. // 提交createSession类型Request
  8. CreateSessionTxn txn = new CreateSessionTxn(timeout);
  9. cnxn.setSessionId(sessionId);
  10. Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
  11. submitRequest(si);
  12. return sessionId;
  13. }

processPacket处理业务请求

  • 封装Request
  • 验证largeRequest
  • 提交业务层处理器
  1. Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
  2. int length = request.limit();
  3. if (isLargeRequest(length)) {
  4. // checkRequestSize will throw IOException if request is rejected
  5. checkRequestSizeWhenMessageReceived(length);
  6. si.setLargeRequestSize(length);
  7. }
  8. si.setOwner(ServerCnxn.me);
  9. submitRequest(si);

FollowerRequestProcessor处理器

在follower端,客户端请求会由FollowerRequestProcessor处理:

  1. 把请求提交下游CommitProcessor处理器
  2. 写请求转发给leader处理
  3. 读请求经过CommitProcessor直接转发给FinalRequestProcessor处理器,直接查询数据返回给客户端
  1. public void run() {
  2. try {
  3. while (!finished) {
  4. Request request = queuedRequests.take();
  5. // Screen quorum requests against ACLs first 略
  6. // 转发给CommitProcessor处理器
  7. // 提交到queuedRequests队列
  8. // 写请求还会提交到queuedWriteRequests队列
  9. maybeSendRequestToNextProcessor(request);
  10. // ...
  11. // 写请求需要转发给leader处理
  12. switch (request.type) {
  13. case OpCode.sync:
  14. zks.pendingSyncs.add(request); // 待同步命令
  15. zks.getFollower().request(request);
  16. break;
  17. case OpCode.create:
  18. case OpCode.create2:
  19. case OpCode.createTTL:
  20. case OpCode.createContainer:
  21. case OpCode.delete:
  22. case OpCode.deleteContainer:
  23. case OpCode.setData:
  24. case OpCode.reconfig:
  25. case OpCode.setACL:
  26. case OpCode.multi:
  27. case OpCode.check:
  28. zks.getFollower().request(request);
  29. break;
  30. case OpCode.createSession:
  31. case OpCode.closeSession:
  32. if (!request.isLocalSession()) {
  33. zks.getFollower().request(request);
  34. }
  35. break;
  36. }
  37. }
  38. } catch (Exception e) {
  39. handleException(this.getName(), e);
  40. }
  41. }

转发leader

  1. zks.getFollower().request(request);

Learner转发请求:

  1. void request(Request request) throws IOException {
  2. // 略
  3. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  4. DataOutputStream oa = new DataOutputStream(baos);
  5. oa.writeLong(request.sessionId); // sessionId
  6. oa.writeInt(request.cxid); // 客户端xid
  7. oa.writeInt(request.type); // 业务类型
  8. byte[] payload = request.readRequestBytes(); // 请求体
  9. if (payload != null) {
  10. oa.write(payload);
  11. }
  12. oa.close();
  13. // 封装REQUEST数据包
  14. QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
  15. writePacket(qp, true); // 通过网络发给leader服务器
  16. }

leader处理follower请求

LearnerHandler接收REQUEST请求

  1. case Leader.REQUEST:
  2. bb = ByteBuffer.wrap(qp.getData());
  3. sessionId = bb.getLong(); // 解析请求信息
  4. cxid = bb.getInt();
  5. type = bb.getInt();
  6. bb = bb.slice();
  7. Request si;
  8. if (type == OpCode.sync) {
  9. si = new LearnerSyncRequest(
  10. this, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
  11. } else {
  12. si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
  13. }
  14. si.setOwner(this); // 用来判断请求来自follower
  15. learnerMaster.submitLearnerRequest(si); // 提交给业务处理器
  16. requestsReceived.incrementAndGet();

submitLearnerRequest提交业务处理器:

  1. public void submitLearnerRequest(Request si) {
  2. zk.submitLearnerRequest(si);
  3. }

LeaderZooKeeperServer提交业务处理器:

  1. public void submitLearnerRequest(Request request) {
  2. // 提交给PrepRequestProcessor处理器
  3. prepRequestProcessor.processRequest(request);
  4. }

从此处开始走leader处理写请求流程。

leader处理写请求流程回顾

  • PrepRequestProcessor - 做事务设置
  • ProposalRequestProcessor - 发起proposal,将Request转发给SyncRequestProcessor写事务log、本地ack
  • CommitProcessor - 读请求直接调用下游处理器,写请求需要等待足够的ack之后commit再调用下游RequestProcessor处理器
  • ToBeAppliedRequestProcessor - 维护toBeApplied列表
  • FinalRequestProcessor - 把事务应用到ZKDatabase,提供查询功能,返回响应

follower处理leader数据

在follower中,Follower使用processPacket方法处理来自leader的数据包,此处看一下PROPOSAL和COMMIT的逻辑。

PROPOSAL数据包

  1. fzk.logRequest(hdr, txn, digest);

logRequest会使用syncProcessor将事务写入到txnlog文件,之后调用SendAckRequestProcessor处理器给leader发ack数据包。

leader收到超过半数的ack之后会发COMMIT数据包让各个节点将事务应用到ZKDatabase中。

COMMIT数据包

  1. fzk.commit(qp.getZxid());

CommitProcessor处理器会将其提交到committedRequests队列,之后客户端Request会继续向下游FinalRequestProcessor处理器传递。

FinalRequestProcessor处理器

  • 把事务应用到ZKDatabase中
  • 提供查询功能
  • 给客户端返回响应

原文链接:https://www.cnblogs.com/xugf/p/18033571

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号