经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » RocketMQ » 查看文章
【RocketMQ】主从同步实现原理
来源:cnblogs  作者:shanml  时间:2022/12/5 9:12:08  对本文有异议

主从同步的实现逻辑主要在HAService中,在DefaultMessageStore的构造函数中,对HAService进行了实例化,并在start方法中,启动了HAService

  1. public class DefaultMessageStore implements MessageStore {
  2. public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
  3. final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
  4. // ...
  5. if (!messageStoreConfig.isEnableDLegerCommitLog()) {
  6. // 初始化HAService
  7. this.haService = new HAService(this);
  8. } else {
  9. this.haService = null;
  10. }
  11. // ...
  12. }
  13. public void start() throws Exception {
  14. // ...
  15. if (!messageStoreConfig.isEnableDLegerCommitLog()) {
  16. // 启动HAService
  17. this.haService.start();
  18. this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
  19. }
  20. // ...
  21. }
  22. }

HAService的构造函数中,创建了AcceptSocketServiceGroupTransferServiceHAClient,在start方法中主要做了如下几件事:

  1. 调用AcceptSocketService的beginAccept方法,这一步主要是进行端口绑定,在端口上监听从节点的连接请求(可以看做是运行在master节点的);
  2. 调用AcceptSocketService的start方法启动服务,这一步主要为了处理从节点的连接请求,与从节点建立连接(可以看做是运行在master节点的);
  3. 调用GroupTransferService的start方法,主要用于在主从同步的时候,等待数据传输完毕(可以看做是运行在master节点的);
  4. 调用HAClient的start方法启动,里面与master节点建立连接,向master汇报主从同步进度并存储master发送过来的同步数据(可以看做是运行在从节点的);
  1. public class HAService {
  2. public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
  3. this.defaultMessageStore = defaultMessageStore;
  4. // 创建AcceptSocketService
  5. this.acceptSocketService =
  6. new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
  7. this.groupTransferService = new GroupTransferService();
  8. // 创建HAClient
  9. this.haClient = new HAClient();
  10. }
  11. public void start() throws Exception {
  12. // 开始监听从服务器的连接
  13. this.acceptSocketService.beginAccept();
  14. // 启动服务
  15. this.acceptSocketService.start();
  16. // 启动GroupTransferService
  17. this.groupTransferService.start();
  18. // 启动
  19. this.haClient.start();
  20. }
  21. }

监听从节点连接请求

AcceptSocketServicebeginAccept方法里面首先获取了ServerSocketChannel,然后进行端口绑定,并在selector上面注册了OP_ACCEPT事件的监听,监听从节点的连接请求:

  1. public class HAService {
  2. class AcceptSocketService extends ServiceThread {
  3. /**
  4. * 监听从节点的连接
  5. *
  6. * @throws Exception If fails.
  7. */
  8. public void beginAccept() throws Exception {
  9. // 创建ServerSocketChannel
  10. this.serverSocketChannel = ServerSocketChannel.open();
  11. // 获取selector
  12. this.selector = RemotingUtil.openSelector();
  13. this.serverSocketChannel.socket().setReuseAddress(true);
  14. // 绑定端口
  15. this.serverSocketChannel.socket().bind(this.socketAddressListen);
  16. // 设置非阻塞
  17. this.serverSocketChannel.configureBlocking(false);
  18. // 注册OP_ACCEPT连接事件的监听
  19. this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
  20. }
  21. }
  22. }

处理从节点连接请求

AcceptSocketService的run方法中,对监听到的连接请求进行了处理,处理逻辑大致如下:

  1. 从selector中获取到监听到的事件;
  2. 如果是OP_ACCEPT连接事件,创建与从节点的连接对象HAConnection,与从节点建立连接,然后调用HAConnection的start方法进行启动,并创建的HAConnection对象加入到连接集合中,HAConnection中封装了Master节点和从节点的数据同步逻辑
  1. public class HAService {
  2. class AcceptSocketService extends ServiceThread {
  3. @Override
  4. public void run() {
  5. log.info(this.getServiceName() + " service started");
  6. // 如果服务未停止
  7. while (!this.isStopped()) {
  8. try {
  9. this.selector.select(1000);
  10. // 获取监听到的事件
  11. Set<SelectionKey> selected = this.selector.selectedKeys();
  12. // 处理事件
  13. if (selected != null) {
  14. for (SelectionKey k : selected) {
  15. // 如果是连接事件
  16. if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
  17. SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
  18. if (sc != null) {
  19. HAService.log.info("HAService receive new connection, "
  20. + sc.socket().getRemoteSocketAddress());
  21. try {
  22. // 创建HAConnection,建立连接
  23. HAConnection conn = new HAConnection(HAService.this, sc);
  24. // 启动
  25. conn.start();
  26. // 添加连接
  27. HAService.this.addConnection(conn);
  28. } catch (Exception e) {
  29. log.error("new HAConnection exception", e);
  30. sc.close();
  31. }
  32. }
  33. } else {
  34. log.warn("Unexpected ops in select " + k.readyOps());
  35. }
  36. }
  37. selected.clear();
  38. }
  39. } catch (Exception e) {
  40. log.error(this.getServiceName() + " service has exception.", e);
  41. }
  42. }
  43. log.info(this.getServiceName() + " service end");
  44. }
  45. }
  46. }

等待主从复制传输结束

GroupTransferService的run方法主要是为了在进行主从数据同步的时候,等待从节点数据同步完毕。

在运行时首先进会调用waitForRunning进行等待,因为此时可能还有没有开始主从同步,所以先进行等待,之后如果有同步请求,会唤醒该线程,然后调用doWaitTransfer方法等待数据同步完成:

  1. public class HAService {
  2. class GroupTransferService extends ServiceThread {
  3. public void run() {
  4. log.info(this.getServiceName() + " service started");
  5. // 如果服务未停止
  6. while (!this.isStopped()) {
  7. try {
  8. // 等待运行
  9. this.waitForRunning(10);
  10. // 如果被唤醒,调用doWaitTransfer等待主从同步完成
  11. this.doWaitTransfer();
  12. } catch (Exception e) {
  13. log.warn(this.getServiceName() + " service has exception. ", e);
  14. }
  15. }
  16. log.info(this.getServiceName() + " service end");
  17. }
  18. }
  19. }

在看doWaitTransfer方法之前,首先看下是如何判断有数据需要同步的。

Master节点中,当消息被写入到CommitLog以后,会调用submitReplicaRequest方法处主从同步,首先判断当前Broker的角色是否是SYNC_MASTER,如果是则会构建消息提交请求GroupCommitRequest,然后调用HAServiceputRequest添加到请求集合中,并唤醒GroupTransferService中在等待的线程:

  1. public class CommitLog {
  2. public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
  3. if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
  4. HAService service = this.defaultMessageStore.getHaService();
  5. if (messageExt.isWaitStoreMsgOK()) {
  6. if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
  7. // 构建GroupCommitRequest
  8. GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
  9. this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
  10. // 添加请求
  11. service.putRequest(request);
  12. // 唤醒GroupTransferService中在等待的线程
  13. service.getWaitNotifyObject().wakeupAll();
  14. return request.future();
  15. }
  16. else {
  17. return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
  18. }
  19. }
  20. }
  21. return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
  22. }
  23. }

doWaitTransfer方法中,会判断CommitLog提交请求集合requestsRead是否为空,如果不为空,表示有消息写入了CommitLog,Master节点需要等待将数据传输给从节点:

  1. push2SlaveMaxOffset记录了从节点已经同步的消息偏移量,判断push2SlaveMaxOffset是否大于本次CommitLog提交的偏移量,也就是请求中设置的偏移量;
  2. 获取请求中设置的等待截止时间;
  3. 开启循环,判断数据是否还未传输完毕,并且未超过截止时间,如果是则等待1s,然后继续判断传输是否完毕,不断进行,直到超过截止时间或者数据已经传输完毕;
    (向从节点发送的消息最大偏移量push2SlaveMaxOffset超过了请求中设置的偏移量表示本次同步数据传输完毕);
  4. 唤醒在等待数据同步完毕的线程;
  1. public class HAService {
  2. // CommitLog提交请求集合
  3. private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();
  4. class GroupTransferService extends ServiceThread {
  5. private void doWaitTransfer() {
  6. // 如果CommitLog提交请求集合不为空
  7. if (!this.requestsRead.isEmpty()) {
  8. // 处理消息提交请求
  9. for (CommitLog.GroupCommitRequest req : this.requestsRead) {
  10. // 判断传输到从节点最大偏移量是否超过了请求中设置的偏移量
  11. boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
  12. // 获取截止时间
  13. long deadLine = req.getDeadLine();
  14. // 如果从节点还未同步完毕并且未超过截止时间
  15. while (!transferOK && deadLine - System.nanoTime() > 0) {
  16. // 等待
  17. this.notifyTransferObject.waitForRunning(1000);
  18. // 判断从节点同步的最大偏移量是否超过了请求中设置的偏移量
  19. transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
  20. }
  21. // 唤醒
  22. req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
  23. }
  24. this.requestsRead = new LinkedList<>();
  25. }
  26. }
  27. }
  28. }

启动HAClient

HAClient可以看做是在从节点上运行的,主要进行的处理如下:

  1. 调用connectMaster方法连接Master节点,Master节点上也会运行,但是它本身就是Master没有可连的Master节点,所以可以忽略;
  2. 调用isTimeToReportOffset方法判断是否需要向Master节点汇报同步偏移量,如果需要则调用reportSlaveMaxOffset方法将当前的消息同步偏移量发送给Master节点;
  3. 调用processReadEvent处理网络请求中的可读事件,也就是处理Master发送过来的消息,将消息存入CommitLog
  1. public class HAService {
  2. class HAClient extends ServiceThread {
  3. @Override
  4. public void run() {
  5. log.info(this.getServiceName() + " service started");
  6. while (!this.isStopped()) {
  7. try {
  8. // 连接Master节点
  9. if (this.connectMaster()) {
  10. // 是否需要报告消息同步偏移量
  11. if (this.isTimeToReportOffset()) {
  12. // 向Master节点发送同步偏移量
  13. boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
  14. if (!result) {
  15. this.closeMaster();
  16. }
  17. }
  18. this.selector.select(1000);
  19. // 处理读事件,也就是Master节点发送的数据
  20. boolean ok = this.processReadEvent();
  21. if (!ok) {
  22. this.closeMaster();
  23. }
  24. // ...
  25. } else {
  26. this.waitForRunning(1000 * 5);
  27. }
  28. } catch (Exception e) {
  29. log.warn(this.getServiceName() + " service has exception. ", e);
  30. this.waitForRunning(1000 * 5);
  31. }
  32. }
  33. log.info(this.getServiceName() + " service end");
  34. }
  35. }
  36. }

连接主节点

connectMaster方法中会获取Master节点的地址,并转换为SocketAddress对象,然后向Master节点请求建立连接,并在selector注册OP_READ可读事件监听:

  1. public class HAService {
  2. class HAClient extends ServiceThread {
  3. // 当前的主从复制进度
  4. private long currentReportedOffset = 0;
  5. private boolean connectMaster() throws ClosedChannelException {
  6. if (null == socketChannel) {
  7. String addr = this.masterAddress.get();
  8. if (addr != null) {
  9. // 将地址转为SocketAddress
  10. SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
  11. if (socketAddress != null) {
  12. // 连接master
  13. this.socketChannel = RemotingUtil.connect(socketAddress);
  14. if (this.socketChannel != null) {
  15. // 注册OP_READ可读事件监听
  16. this.socketChannel.register(this.selector, SelectionKey.OP_READ);
  17. }
  18. }
  19. }
  20. // 获取CommitLog中当前最大的偏移量
  21. this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
  22. // 更新上次写入时间
  23. this.lastWriteTimestamp = System.currentTimeMillis();
  24. }
  25. return this.socketChannel != null;
  26. }
  27. }

发送主从同步消息拉取偏移量

isTimeToReportOffset方法中,首先获取当前时间与上一次进行主从同步的时间间隔interval,如果时间间隔interval大于配置的发送心跳时间间隔,表示需要向Master节点发送从节点消息同步的偏移量,接下来会调用reportSlaveMaxOffset方法发送同步偏移量,也就是说从节点会定时向Master节点发送请求,反馈CommitLog中同步消息的偏移量

  1. public class HAService {
  2. class HAClient extends ServiceThread {
  3. // 当前从节点已经同步消息的偏移量大小
  4. private long currentReportedOffset = 0;
  5. private boolean isTimeToReportOffset() {
  6. // 获取距离上一次主从同步的间隔时间
  7. long interval =
  8. HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
  9. // 判断是否超过了配置的发送心跳包时间间隔
  10. boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
  11. .getHaSendHeartbeatInterval();
  12. return needHeart;
  13. }
  14. // 发送同步偏移量,传入的参数是当前的主从复制偏移量currentReportedOffset
  15. private boolean reportSlaveMaxOffset(final long maxOffset) {
  16. this.reportOffset.position(0);
  17. this.reportOffset.limit(8); // 设置数据传输大小为8个字节
  18. this.reportOffset.putLong(maxOffset);// 设置同步偏移量
  19. this.reportOffset.position(0);
  20. this.reportOffset.limit(8);
  21. for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
  22. try {
  23. // 向Master节点发送拉取偏移量
  24. this.socketChannel.write(this.reportOffset);
  25. } catch (IOException e) {
  26. log.error(this.getServiceName()
  27. + "reportSlaveMaxOffset this.socketChannel.write exception", e);
  28. return false;
  29. }
  30. }
  31. // 更新发送时间
  32. lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
  33. return !this.reportOffset.hasRemaining();
  34. }
  35. }
  36. }

处理网络可读事件

processReadEvent方法中处理了可读事件,也就是处理Master节点发送的同步数据, 首先从socketChannel中读取数据到byteBufferRead中,byteBufferRead是读缓冲区,读取数据的方法会返回读取到的字节数,对字节数大小进行判断:

  • 如果可读字节数大于0表示有数据需要处理,调用dispatchReadRequest方法进行处理;
  • 如果可读字节数为0表示没有可读数据,此时记录读取到空数据的次数,如果连续读到空数据的次数大于3次,将终止本次处理;
  1. class HAClient extends ServiceThread {
  2. // 读缓冲区,会将从socketChannel读入缓冲区
  3. private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
  4. private boolean processReadEvent() {
  5. int readSizeZeroTimes = 0;
  6. while (this.byteBufferRead.hasRemaining()) {
  7. try {
  8. // 从socketChannel中读取数据到byteBufferRead中,返回读取到的字节数
  9. int readSize = this.socketChannel.read(this.byteBufferRead);
  10. if (readSize > 0) {
  11. // 重置readSizeZeroTimes
  12. readSizeZeroTimes = 0;
  13. // 处理数据
  14. boolean result = this.dispatchReadRequest();
  15. if (!result) {
  16. log.error("HAClient, dispatchReadRequest error");
  17. return false;
  18. }
  19. } else if (readSize == 0) {
  20. // 记录读取到空数据的次数
  21. if (++readSizeZeroTimes >= 3) {
  22. break;
  23. }
  24. } else {
  25. log.info("HAClient, processReadEvent read socket < 0");
  26. return false;
  27. }
  28. } catch (IOException e) {
  29. log.info("HAClient, processReadEvent read socket exception", e);
  30. return false;
  31. }
  32. }
  33. return true;
  34. }
  35. }
消息写入ComitLog

dispatchReadRequest方法中会将从节点读取到的数据写入CommitLog,dispatchPosition记录了已经处理的数据在读缓冲区中的位置,从读缓冲区byteBufferRead获取剩余可读取的字节数,如果可读数据的字节数大于一个消息头的字节数(12个字节),表示有数据还未处理完毕,反之表示消息已经处理完毕结束处理。
对数据的处理逻辑如下:

  1. 从缓冲区中读取数据,首先获取到的是消息在master节点的物理偏移量masterPhyOffset;
  2. 向后读取8个字节,得到消息体内容的字节数bodySize;
  3. 获取从节点当前CommitLog的最大物理偏移量slavePhyOffset,如果不为0并且不等于masterPhyOffset,表示与Master节点的传输偏移量不一致,也就是数据不一致,此时终止处理
  4. 如果可读取的字节数大于一个消息头的字节数 + 消息体大小,表示有消息可处理,继续进行下一步;
  5. 计算消息体在读缓冲区中的起始位置,从读缓冲区中根据起始位置,读取消息内容,将消息追加到从节点的CommitLog中
  6. 更新dispatchPosition的值为消息头大小 + 消息体大小,dispatchPosition之前的数据表示已经处理完毕;
  1. class HAClient extends ServiceThread {
  2. // 已经处理的数据在读缓冲区中的位置,初始化为0
  3. private int dispatchPosition = 0;
  4. // 读缓冲区
  5. private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
  6. private boolean dispatchReadRequest() {
  7. // 消息头大小
  8. final int msgHeaderSize = 8 + 4; // phyoffset + size
  9. // 开启循环不断读取数据
  10. while (true) {
  11. // 获可读取的字节数
  12. int diff = this.byteBufferRead.position() - this.dispatchPosition;
  13. // 如果字节数大于一个消息头的字节数
  14. if (diff >= msgHeaderSize) {
  15. // 获取消息在master节点的物理偏移量
  16. long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
  17. // 获取消息体大小
  18. int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
  19. // 获取从节点当前CommitLog的最大物理偏移量
  20. long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
  21. if (slavePhyOffset != 0) {
  22. // 如果不一致结束处理
  23. if (slavePhyOffset != masterPhyOffset) {
  24. log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
  25. + slavePhyOffset + " MASTER: " + masterPhyOffset);
  26. return false;
  27. }
  28. }
  29. // 如果可读取的字节数大于一个消息头的字节数 + 消息体大小
  30. if (diff >= (msgHeaderSize + bodySize)) {
  31. // 将度缓冲区的数据转为字节数组
  32. byte[] bodyData = byteBufferRead.array();
  33. // 计算消息体在读缓冲区中的起始位置
  34. int dataStart = this.dispatchPosition + msgHeaderSize;
  35. // 从读缓冲区中根据消息的位置,读取消息内容,将消息追加到从节点的CommitLog中
  36. HAService.this.defaultMessageStore.appendToCommitLog(
  37. masterPhyOffset, bodyData, dataStart, bodySize);
  38. // 更新dispatchPosition的值为消息头大小+消息体大小
  39. this.dispatchPosition += msgHeaderSize + bodySize;
  40. if (!reportSlaveMaxOffsetPlus()) {
  41. return false;
  42. }
  43. continue;
  44. }
  45. }
  46. if (!this.byteBufferRead.hasRemaining()) {
  47. this.reallocateByteBuffer();
  48. }
  49. break;
  50. }
  51. return true;
  52. }
  53. }

HAConnection

HAConnection中封装了Master节点与从节点的网络通信处理,分别在ReadSocketServiceWriteSocketService中。

ReadSocketService

ReadSocketService启动后处理监听到的可读事件,前面知道HAClient中从节点会定时向Master节点汇报从节点的消息同步偏移量,Master节点对汇报请求的处理就在这里,如果从网络中监听到了可读事件,会调用processReadEvent处理读事件:

  1. public class HAConnection {
  2. class ReadSocketService extends ServiceThread {
  3. @Override
  4. public void run() {
  5. HAConnection.log.info(this.getServiceName() + " service started");
  6. while (!this.isStopped()) {
  7. try {
  8. this.selector.select(1000);
  9. // 处理可读事件
  10. boolean ok = this.processReadEvent();
  11. if (!ok) {
  12. HAConnection.log.error("processReadEvent error");
  13. break;
  14. }
  15. // ...
  16. } catch (Exception e) {
  17. HAConnection.log.error(this.getServiceName() + " service has exception.", e);
  18. break;
  19. }
  20. }
  21. // ...
  22. HAConnection.log.info(this.getServiceName() + " service end");
  23. }
  24. }
  25. }

处理可读事件

processReadEvent中从网络中处理读事件的方式与上面HAClientdispatchReadRequest类似,都是将网络中的数据读取到读缓冲区中,并用一个变量记录已读取数据的位置,processReadEvent方法的处理逻辑如下:

  1. 从socketChannel读取数据到读缓冲区byteBufferRead中,返回读取到的字节数;
  2. 如果读取到的字节数大于0,进入下一步,如果读取到的字节数为0,记录连续读取到空字节数的次数是否超过三次,如果超过终止处理;
  3. 判断剩余可读取的字节数是否大于等于8,前面知道,从节点发送同步消息拉取偏移量的时候设置的字节大小为8,所以字节数大于等于8的时候表示需要读取从节点发送的偏移量;
  4. 计算数据在缓冲区中的位置,从缓冲区读取从节点发送的同步偏移量readOffset;
  5. 更新processPosition的值,processPosition表示读缓冲区中已经处理数据的位置;
  6. 更新slaveAckOffset为从节点发送的同步偏移量readOffset的值;
  7. 如果当前Master节点记录的从节点的同步偏移量slaveRequestOffset小于0,表示还未进行同步,此时将slaveRequestOffset更新为从节点发送的同步偏移量;
  8. 如果从节点发送的同步偏移量比当前Master节点的最大物理偏移量还要大,终止本次处理;
  9. 调用notifyTransferSome,更新Master节点记录的向从节点同步消息的偏移量;
  1. public class HAConnection {
  2. class ReadSocketService extends ServiceThread {
  3. // 读缓冲区
  4. private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
  5. // 读缓冲区中已经处理的数据位置
  6. private int processPosition = 0;
  7. private boolean processReadEvent() {
  8. int readSizeZeroTimes = 0;
  9. // 如果没有可读数据
  10. if (!this.byteBufferRead.hasRemaining()) {
  11. this.byteBufferRead.flip();
  12. // 处理位置置为0
  13. this.processPosition = 0;
  14. }
  15. // 如果数据未读取完毕
  16. while (this.byteBufferRead.hasRemaining()) {
  17. try {
  18. // 从socketChannel读取数据到byteBufferRead中,返回读取到的字节数
  19. int readSize = this.socketChannel.read(this.byteBufferRead);
  20. // 如果读取数据字节数大于0
  21. if (readSize > 0) {
  22. // 重置readSizeZeroTimes
  23. readSizeZeroTimes = 0;
  24. // 获取上次处理读事件的时间戳
  25. this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
  26. // 判断剩余可读取的字节数是否大于等于8
  27. if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
  28. // 获取偏移量内容的结束位置
  29. int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
  30. // 从结束位置向前读取8个字节得到从点发送的同步偏移量
  31. long readOffset = this.byteBufferRead.getLong(pos - 8);
  32. // 更新处理位置
  33. this.processPosition = pos;
  34. // 更新slaveAckOffset为从节点发送的同步进度
  35. HAConnection.this.slaveAckOffset = readOffset;
  36. // 如果记录的从节点的同步进度小于0,表示还未进行同步
  37. if (HAConnection.this.slaveRequestOffset < 0) {
  38. // 更新为从节点发送的同步进度
  39. HAConnection.this.slaveRequestOffset = readOffset;
  40. log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
  41. } else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) {
  42. // 如果从节点发送的拉取偏移量比当前Master节点的最大物理偏移量还要大
  43. log.warn("slave[{}] request offset={} greater than local commitLog offset={}. ",
  44. HAConnection.this.clientAddr,
  45. HAConnection.this.slaveAckOffset,
  46. HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset());
  47. return false;
  48. }
  49. // 更新Master节点记录的向从节点同步消息的偏移量
  50. HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
  51. }
  52. } else if (readSize == 0)
  53. // 判断连续读取到空数据的次数是否超过三次
  54. if (++readSizeZeroTimes >= 3) {
  55. break;
  56. }
  57. } else {
  58. log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
  59. return false;
  60. }
  61. } catch (IOException e) {
  62. log.error("processReadEvent exception", e);
  63. return false;
  64. }
  65. }
  66. return true;
  67. }
  68. }
  69. }

前面在GroupTransferService中可以看到是通过push2SlaveMaxOffset的值判断本次同步是否完成的,在notifyTransferSome方法中可以看到当Master节点收到从节点反馈的消息拉取偏移量时,对push2SlaveMaxOffset的值进行了更新:

  1. public class HAService {
  2. // 向从节点推送的消息最大偏移量
  3. private final GroupTransferService groupTransferService;
  4. public void notifyTransferSome(final long offset) {
  5. // 如果传入的偏移大于push2SlaveMaxOffset记录的值,进行更新
  6. for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
  7. // 更新向从节点推送的消息最大偏移量
  8. boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
  9. if (ok) {
  10. this.groupTransferService.notifyTransferSome();
  11. break;
  12. } else {
  13. value = this.push2SlaveMaxOffset.get();
  14. }
  15. }
  16. }
  17. }

WriteSocketService

WriteSocketService用于Master节点向从节点发送同步消息,处理逻辑如下:

  1. 根据从节点发送的主从同步消息拉取偏移量slaveRequestOffset进行判断:

    • 如果slaveRequestOffset值为-1,表示还未收到从节点报告的同步偏移量,此时睡眠一段时间等待从节点发送消息拉取偏移量;
    • 如果slaveRequestOffset值不为-1,表示已经开始进行主从同步进行下一步;
  2. 判断nextTransferFromWhere值是否为-1,nextTransferFromWhere记录了下次需要传输的消息在CommitLog中的偏移量,如果值为-1表示初次进行数据同步,此时有两种情况:

    • 如果从节点发送的拉取偏移量slaveRequestOffset为0,就从当前CommitLog文件最大偏移量开始同步;
    • 如果slaveRequestOffset不为0,则从slaveRequestOffset位置处进行数据同步;
  3. 判断上次写事件是否已经将数据都写入到从节点

    • 如果已经写入完毕,判断距离上次写入数据的时间间隔是否超过了设置的心跳时间,如果超过,为了避免连接空闲被关闭,需要发送一个心跳包,此时构建心跳包的请求数据,调用transferData方法传输数据;
    • 如果上次的数据还未传输完毕,调用transferData方法继续传输,如果还是未完成,则结束此处处理;
  4. 根据nextTransferFromWhere从CommitLog中获取消息,如果未获取到消息,等待100ms,如果获取到消息,从CommitLog中获取消息进行传输:
    (1)如果获取到消息的字节数大于最大传输的大小,设置最最大传输数量,分批进行传输;
    (2)更新下次传输的偏移量地址也就是nextTransferFromWhere的值;
    (3)从CommitLog中获取的消息内容设置到将读取到的消息数据设置到selectMappedBufferResult中;
    (4)设置消息头信息,包括消息头字节数、拉取消息的偏移量等;
    (5)调用transferData发送数据;

  1. public class HAConnection {
  2. class WriteSocketService extends ServiceThread {
  3. private final int headerSize = 8 + 4;// 消息头大小
  4. @Override
  5. public void run() {
  6. HAConnection.log.info(this.getServiceName() + " service started");
  7. while (!this.isStopped()) {
  8. try {
  9. this.selector.select(1000);
  10. // 如果slaveRequestOffset为-1,表示还未收到从节点报告的拉取进度
  11. if (-1 == HAConnection.this.slaveRequestOffset) {
  12. // 等待一段时间
  13. Thread.sleep(10);
  14. continue;
  15. }
  16. // 初次进行数据同步
  17. if (-1 == this.nextTransferFromWhere) {
  18. // 如果拉取进度为0
  19. if (0 == HAConnection.this.slaveRequestOffset) {
  20. // 从master节点最大偏移量从开始传输
  21. long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
  22. masterOffset =
  23. masterOffset
  24. - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
  25. .getMappedFileSizeCommitLog());
  26. if (masterOffset < 0) {
  27. masterOffset = 0;
  28. }
  29. // 更新nextTransferFromWhere
  30. this.nextTransferFromWhere = masterOffset;
  31. } else {
  32. // 根据从节点发送的偏移量开始数据同步
  33. this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
  34. }
  35. log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
  36. + "], and slave request " + HAConnection.this.slaveRequestOffset);
  37. }
  38. // 判断上次传输是否完毕
  39. if (this.lastWriteOver) {
  40. // 获取当前时间距离上次写入数据的时间间隔
  41. long interval =
  42. HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
  43. // 如果距离上次写入数据的时间间隔超过了设置的心跳时间
  44. if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
  45. .getHaSendHeartbeatInterval()) {
  46. // 构建header
  47. this.byteBufferHeader.position(0);
  48. this.byteBufferHeader.limit(headerSize);
  49. this.byteBufferHeader.putLong(this.nextTransferFromWhere);
  50. this.byteBufferHeader.putInt(0);
  51. this.byteBufferHeader.flip();
  52. // 发送心跳包
  53. this.lastWriteOver = this.transferData();
  54. if (!this.lastWriteOver)
  55. continue;
  56. }
  57. } else {
  58. // 未传输完毕,继续上次的传输
  59. this.lastWriteOver = this.transferData();
  60. // 如果依旧未完成,结束本次处理
  61. if (!this.lastWriteOver)
  62. continue;
  63. }
  64. // 根据偏移量获取消息数据
  65. SelectMappedBufferResult selectResult =
  66. HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
  67. if (selectResult != null) {// 获取消息不为空
  68. // 获取消息内容大小
  69. int size = selectResult.getSize();
  70. // 如果消息的字节数大于最大传输的大小
  71. if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
  72. // 设置为最大传输大小
  73. size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
  74. }
  75. long thisOffset = this.nextTransferFromWhere;
  76. // 更新下次传输的偏移量地址
  77. this.nextTransferFromWhere += size;
  78. selectResult.getByteBuffer().limit(size);
  79. // 将读取到的消息数据设置到selectMappedBufferResult
  80. this.selectMappedBufferResult = selectResult;
  81. // 设置消息头
  82. this.byteBufferHeader.position(0);
  83. // 设置消息头大小
  84. this.byteBufferHeader.limit(headerSize);
  85. // 设置偏移量地址
  86. this.byteBufferHeader.putLong(thisOffset);
  87. // 设置消息内容大小
  88. this.byteBufferHeader.putInt(size);
  89. this.byteBufferHeader.flip();
  90. // 发送数据
  91. this.lastWriteOver = this.transferData();
  92. } else {
  93. // 等待100ms
  94. HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
  95. }
  96. } catch (Exception e) {
  97. HAConnection.log.error(this.getServiceName() + " service has exception.", e);
  98. break;
  99. }
  100. }
  101. HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
  102. // ...
  103. HAConnection.log.info(this.getServiceName() + " service end");
  104. }
  105. }
  106. }

发送数据

transferData方法的处理逻辑如下:

  1. 发送消息头数据;
  2. 消息头数据发送完毕之后,发送消息内容,前面知道从CommitLog中读取的消息内容放入到了selectMappedBufferResult,将selectMappedBufferResult的内容发送给从节点;
  1. public class HAConnection {
  2. class WriteSocketService extends ServiceThread {
  3. private boolean transferData() throws Exception {
  4. int writeSizeZeroTimes = 0;
  5. // 写入消息头
  6. while (this.byteBufferHeader.hasRemaining()) {
  7. // 发送消息头数据
  8. int writeSize = this.socketChannel.write(this.byteBufferHeader);
  9. if (writeSize > 0) {
  10. writeSizeZeroTimes = 0;
  11. // 记录发送时间
  12. this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
  13. } else if (writeSize == 0) {
  14. if (++writeSizeZeroTimes >= 3) {
  15. break;
  16. }
  17. } else {
  18. throw new Exception("ha master write header error < 0");
  19. }
  20. }
  21. if (null == this.selectMappedBufferResult) {
  22. return !this.byteBufferHeader.hasRemaining();
  23. }
  24. writeSizeZeroTimes = 0;
  25. // 消息头数据发送完毕之后,发送消息内容
  26. if (!this.byteBufferHeader.hasRemaining()) {
  27. while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
  28. // 发送消息内容
  29. int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
  30. if (writeSize > 0) {
  31. writeSizeZeroTimes = 0;
  32. this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
  33. } else if (writeSize == 0) {
  34. if (++writeSizeZeroTimes >= 3) {
  35. break;
  36. }
  37. } else {
  38. throw new Exception("ha master write body error < 0");
  39. }
  40. }
  41. }
  42. // ...
  43. return result;
  44. }
  45. }
  46. }

总结

主从同步流程

有新消息写入之后的同步流程

参考
丁威、周继锋《RocketMQ技术内幕》

RocketMQ版本:4.9.3

原文链接:https://www.cnblogs.com/shanml/p/16950178.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号