经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » 设计模式 » 查看文章
也谈Reactor模式
来源:cnblogs  作者:莱布尼茨  时间:2018/10/18 9:00:19  对本文有异议

何谓Reactor模式?它是实现高性能IO的一种设计模式。网上资料有很多,有些写的也很好,但大多不知其所以然。这里博主按自己的思路简单介绍下,有不对的地方敬请指正。


BIO

Java1.4(2002年)以前,IO都是Blocking的,也就是常说的BIO,它在等待请求、读、写(返回)三个环节都是阻塞的。在等待请求阶段,系统无法知道请求何时到达,因此需要一个主线程一直守着,当有请求进来时,将请求分发给读写线程。如图:

代码如下:

  1. ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//线程池
  2. ServerSocket serverSocket = new ServerSocket();
  3. serverSocket.bind(8088);
  4. while(!Thread.currentThread.isInturrupted()){//主线程死循环等待新连接到来
  5. Socket socket = serverSocket.accept();
  6. executor.submit(new ConnectIOnHandler(socket));//为新的连接创建新的线程
  7. }
  8. class ConnectIOnHandler extends Thread{
  9. private Socket socket;
  10. public ConnectIOnHandler(Socket socket){ this.socket = socket; }
  11. public void run(){
  12. while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){//死循环处理读写事件
  13. String someThing = socket.read()....//读取数据
  14. if(someThing!=null){
               ......//处理数据
               
    socket.write()....//写数据
  15. }
  16. }
  17. }

需知,请求进来(accept),并不表示数据马上达到了,可能隔一段时间才会传进来,这个时候socket.read()也是一直阻塞的状态。socket.write()也同理,当向磁盘或其它socket写数据时,也要等对方准备好才能写入,在对方准备阶段,socket.write()也是阻塞的。这两个环节可能的无效阻塞导致读写线程的低效。


NIO

Java1.4开始,引入了NIO。NIO有三个概念:Selector、Buffer、Channel。与BIO的区别是,请求进来后,并不会马上分派IO线程,而是依靠操作系统底层的多路复用机制(select/poll/epoll等),在监听到socket读写就绪之后,再分配IO线程(实际可由当前线程[使用Buffer和Channel]直接读写,因为读写本身的效率很高),这就避免了线程等待。且与BIO多线程方式相比,使用I/O多路复用技术,系统不必创建和维护庞大的线程池,从而大大减小了开销。这部分工作是NIO的核心,由Selector负责,本质上是多路复用的Java封装。而Buffer和Channel又封装了一层socket的读写,应该为的是将IO与业务代码彻底分离。以下图示为本人理解:

如图示,与BIO中监听线程职责不同,Selector监听的不只是连接请求,还有读写就绪事件,当某个事件发生时,即通知注册了该事件的Channel,由Channel操作socket读写Buffer。虚线表示需要具体的NIO框架或业务代码自己处理,比如Channel如何注册以及注册何种事件,Channel处理IO的方式(如在当前线程处理还是新开线程,若新开线程,则可看作是AIO模式)等。NIO只是提供了一套机制,具体使用还是需要编程实现(Reactor模式就是OO的一种实现)。

示例代码(摘自Java NIO详解

服务端:

  1. 1 package cn.blog.test.NioTest;
  2. 2
  3. 3
  4. 4 import java.io.IOException;
  5. 5 import java.net.InetSocketAddress;
  6. 6 import java.nio.ByteBuffer;
  7. 7 import java.nio.channels.*;
  8. 8 import java.nio.charset.Charset;
  9. 9 import java.util.Iterator;
  10. 10 import java.util.Set;
  11. 11
  12. 12
  13. 13 public class MyNioServer {
  14. 14 private Selector selector; //创建一个选择器
  15. 15 private final static int port = 8686;
  16. 16 private final static int BUF_SIZE = 10240;
  17. 17
  18. 18 private void initServer() throws IOException {
  19. 19 //创建通道管理器对象selector
  20. 20 this.selector=Selector.open();
  21. 21
  22. 22 //创建一个通道对象channel
  23. 23 ServerSocketChannel channel = ServerSocketChannel.open();
  24. 24 channel.configureBlocking(false); //将通道设置为非阻塞
  25. 25 channel.socket().bind(new InetSocketAddress(port)); //将通道绑定在8686端口
  26. 26
  27. 27 //将上述的通道管理器和通道绑定,并为该通道注册OP_ACCEPT事件
  28. 28 //注册事件后,当该事件到达时,selector.select()会返回(一个key),如果该事件没到达selector.select()会一直阻塞
  29. 29 SelectionKey selectionKey = channel.register(selector,SelectionKey.OP_ACCEPT);
  30. 30
  31. 31 while (true){ //轮询
  32. 32 selector.select(); //这是一个阻塞方法,一直等待直到有数据可读,返回值是key的数量(可以有多个)
  33. 33 Set keys = selector.selectedKeys(); //如果channel有数据了,将生成的key访入keys集合中
  34. 34 Iterator iterator = keys.iterator(); //得到这个keys集合的迭代器
  35. 35 while (iterator.hasNext()){ //使用迭代器遍历集合
  36. 36 SelectionKey key = (SelectionKey) iterator.next(); //得到集合中的一个key实例
  37. 37 iterator.remove(); //拿到当前key实例之后记得在迭代器中将这个元素删除,非常重要,否则会出错
  38. 38 if (key.isAcceptable()){ //判断当前key所代表的channel是否在Acceptable状态,如果是就进行接收
  39. 39 doAccept(key);
  40. 40 }else if (key.isReadable()){
  41. 41 doRead(key);
  42. 42 }else if (key.isWritable() && key.isValid()){
  43. 43 doWrite(key);
  44. 44 }else if (key.isConnectable()){
  45. 45 System.out.println("连接成功!");
  46. 46 }
  47. 47 }
  48. 48 }
  49. 49 }
  50. 50
  51. 51 public void doAccept(SelectionKey key) throws IOException {
  52. 52 ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
  53. 53 System.out.println("ServerSocketChannel正在循环监听");
  54. 54 SocketChannel clientChannel = serverChannel.accept();
  55. 55 clientChannel.configureBlocking(false);
  56. 56 clientChannel.register(key.selector(),SelectionKey.OP_READ);
  57. 57 }
  58. 58
  59. 59 public void doRead(SelectionKey key) throws IOException {
  60. 60 SocketChannel clientChannel = (SocketChannel) key.channel();
  61. 61 ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
  62. 62 long bytesRead = clientChannel.read(byteBuffer);
  63. 63 while (bytesRead>0){
  64. 64 byteBuffer.flip();
  65. 65 byte[] data = byteBuffer.array();
  66. 66 String info = new String(data).trim();
  67. 67 System.out.println("从客户端发送过来的消息是:"+info);
  68. 68 byteBuffer.clear();
  69. 69 bytesRead = clientChannel.read(byteBuffer);
  70. 70 }
  71. 71 if (bytesRead==-1){
  72. 72 clientChannel.close();
  73. 73 }
  74. 74 }
  75. 75
  76. 76 public void doWrite(SelectionKey key) throws IOException {
  77. 77 ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
  78. 78 byteBuffer.flip();
  79. 79 SocketChannel clientChannel = (SocketChannel) key.channel();
  80. 80 while (byteBuffer.hasRemaining()){
  81. 81 clientChannel.write(byteBuffer);
  82. 82 }
  83. 83 byteBuffer.compact();
  84. 84 }
  85. 85
  86. 86 public static void main(String[] args) throws IOException {
  87. 87 MyNioServer myNioServer = new MyNioServer();
  88. 88 myNioServer.initServer();
  89. 89 }
  90. 90 }
View Code

客户端:

  1. 1 package cn.blog.test.NioTest;
  2. 2
  3. 3
  4. 4 import java.io.IOException;
  5. 5 import java.net.InetSocketAddress;
  6. 6 import java.nio.ByteBuffer;
  7. 7 import java.nio.channels.SelectionKey;
  8. 8 import java.nio.channels.Selector;
  9. 9 import java.nio.channels.SocketChannel;
  10. 10 import java.util.Iterator;
  11. 11
  12. 12 public class MyNioClient {
  13. 13 private Selector selector; //创建一个选择器
  14. 14 private final static int port = 8686;
  15. 15 private final static int BUF_SIZE = 10240;
  16. 16 private static ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
  17. 17
  18. 18 private void initClient() throws IOException {
  19. 19 this.selector = Selector.open();
  20. 20 SocketChannel clientChannel = SocketChannel.open();
  21. 21 clientChannel.configureBlocking(false);
  22. 22 clientChannel.connect(new InetSocketAddress(port));
  23. 23 clientChannel.register(selector, SelectionKey.OP_CONNECT);
  24. 24 while (true){
  25. 25 selector.select();
  26. 26 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  27. 27 while (iterator.hasNext()){
  28. 28 SelectionKey key = iterator.next();
  29. 29 iterator.remove();
  30. 30 if (key.isConnectable()){
  31. 31 doConnect(key);
  32. 32 }else if (key.isReadable()){
  33. 33 doRead(key);
  34. 34 }
  35. 35 }
  36. 36 }
  37. 37 }
  38. 38
  39. 39 public void doConnect(SelectionKey key) throws IOException {
  40. 40 SocketChannel clientChannel = (SocketChannel) key.channel();
  41. 41 if (clientChannel.isConnectionPending()){
  42. 42 clientChannel.finishConnect();
  43. 43 }
  44. 44 clientChannel.configureBlocking(false);
  45. 45 String info = "服务端你好!!";
  46. 46 byteBuffer.clear();
  47. 47 byteBuffer.put(info.getBytes("UTF-8"));
  48. 48 byteBuffer.flip();
  49. 49 clientChannel.write(byteBuffer);
  50. 50 //clientChannel.register(key.selector(),SelectionKey.OP_READ);
  51. 51 clientChannel.close();
  52. 52 }
  53. 53
  54. 54 public void doRead(SelectionKey key) throws IOException {
  55. 55 SocketChannel clientChannel = (SocketChannel) key.channel();
  56. 56 clientChannel.read(byteBuffer);
  57. 57 byte[] data = byteBuffer.array();
  58. 58 String msg = new String(data).trim();
  59. 59 System.out.println("服务端发送消息:"+msg);
  60. 60 clientChannel.close();
  61. 61 key.selector().close();
  62. 62 }
  63. 63
  64. 64 public static void main(String[] args) throws IOException {
  65. 65 MyNioClient myNioClient = new MyNioClient();
  66. 66 myNioClient.initClient();
  67. 67 }
  68. 68 }
View Code

在早期的JDK1.4和1.5 update10版本之前,Selector基于select/poll模型实现,是基于IO复用技术的非阻塞IO,不是异步IO。在JDK1.5 update10和linux core2.6以上版本,sun优化了Selctor的实现,底层使用epoll替换了select/poll。另据说Buffer指向的并非堆内内存,NIO使用 Native 函数库直接分配堆外内存,然后通过一个存储在 Java 堆的 DirectByteBuffer 对象作为这块内存的引用进行操作,避免了在 Java 堆和 Native 堆中来回复制数据。

NIO的实现解析可参看:深入浅出NIO Socket实现机制


Reactor模式

NIO为实现Reactor模式提供了基础,上面的NIO图示其实就是Reactor模式的雏形,只是Reactor以OO的方式抽象出了几个概念,使得职责划分更加明确。

  • Reactor:Reactor是IO事件的派发者,对应NIO的Selector;
  • Acceptor:Acceptor接受client连接,建立对应client的Handler,并向Reactor注册此Handler,对应NIO中注册Channel和事件触发时的判断分支(上述NIO服务端示例代码的38-46行);
  • Handler:IO处理类,对应NIO中Channel[使用socket]操作Buffer的过程。

基于上述三个角色画出Reactor模式图如下:

如此,Reactor模式便非常清晰地展现在我们眼前。那么业务线程如何与Reactor交互呢?由前文所知,数据存取于Buffer,具体操作由Handler负责。socket.read()将数据读入Buffer,需要一种机制将Buffer引用推送给业务线程;同样,业务线程返回的数据需要写入Buffer,按Reactor模式,写入后还需要注册write事件,socket可写后write()。如果直接调用的话,至少Handler和业务代码会耦合在一起,常见的解耦方式是定义接口,或使用消息中间件。


其它

话说回来,由于相对短暂的历史以及相对封闭的环境,.Net社区缺少很多概念的演化、探究和讨论,这也导致了.Neter们这些概念的缺失。虽然从语言层面上来说,C#和Java大同小异,前者甚至一定程度的有语法上的便利,然而只有认识到了其背后的思想和模式,才能真正用好这门语言,这就是.Neter需要了解Java及其历史的原因,毕竟.Net一开始就是参照着Java来的。

比如.Net里的堆栈概念,就算一些经典书籍都没有非常深入的说明,而Java方面的资料就很多了,参看深入理解JVM—JVM内存模型

 

其它参考资料:

NIO浅析

深入理解Java NIO

网络通信socket连接数上限

 

转载请注明本文出处:https://www.cnblogs.com/newton/p/9776821.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号