🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
Nio事件循环在NioEventLoop中,主要功能: * 处理网络I/O读写事件 * 执行系统任务和定时任务 在主循环中我们可以看到netty对I/O任务和提交到事件循环中的系统任务的调度。 ![EventLoop事件循环](http://web.uxiaowo.com/netty/Future/EventLoopTask.png) ## 6.1 I/O事件 1. 由于NIO的I/O读写需要使用选择符,因此,netty在NioEventLoop初始化时,会使用SelectorProvider打开selector。在类加载时,netty会从系统设置中读取相关配置参数: * sun.nio.ch.bugLevel 用来修复JDK的NIO在Selector.open()的一个BUG * io.netty.selectorAutoRebuildThreshold select()多少次数后重建selector ``` static { int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512); if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) { selectorAutoRebuildThreshold = 0; } SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold; } ``` 2. NioEventLoop的构造方法中,会调用provider.openSelector()打开Selector;如果设置`io.netty.noKeySetOptimization`为true,则会启动优化,优化内容是将Selector的selectedKeys和publicSelectedKeys属性设置为可写并替换为Netty实现的集合以提供效率。 ``` private Selector openSelector() { final Selector selector; try { selector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } if (DISABLE_KEYSET_OPTIMIZATION) { return selector; } // 下面是优化程序,此处省略 ... return selector; } ``` 3. NioEventLoop最核心的地方在于事件循环,具体代码在NioEventLoop.java在run方法中 * 首先根据默认的选择策略DefaultSelectStrategy判断本次循环是否select,具体逻辑为:如果当前有任务则使用selectNow立刻查询是否有准备就绪的I/O;如果当前没有任务则返回SelectStrategy.SELECT,并将wakenUp设置为false,并调用select()进行查询。 ``` protected void run() { for (;;) { // 事件循环 try { // select策略 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); // select() if (wakenUp.get()) { selector.wakeup(); // 唤醒select()的线程 } default: // fallthrough } .... 后续处理 ``` * select()时需要判断当前是否有scheduledTask(定时任务),如果有则需要计算任务delay的时间,如果定时任务需要立刻执行了,那么必须马上selectNow()并返回,之后执行任务。如果没有scheduledTask,会判断当前是否有任务在等待列表,如果有任务时将wakenUp设置为true并selectNow();如果没有任务,那么会 selector.select(1000); 阻塞等待1s,直到有I/O就绪,或者有任务等待,或需要唤醒时退出,否则,会继续循环,直到前面的几种情况发生后退出。 * 之后,事件循环开始处理IO和任务。如果查询到有IO事件,会调用processSelectedKeysOptimized(优化的情况下),对SelectionKey进行处理。 ``` if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; // io花费的时间 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 按照iorate计算task的时间 } } ``` * processSelectedKeysOptimized处理I/O,主要是NIO的select操作,处理相关的事件。 ``` private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); ...... try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } ``` ## 6.2 任务处理 * runAllTasks执行提交到EventLoop的任务,首先从scheduledTaskQueue获取需要执行的任务,加入到taskQueue,然后依次执行taskQueue的任务。 ``` protected boolean runAllTasks() { assert inEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false; do { fetchedAll = fetchFromScheduledTaskQueue(); // 获取定时任务 if (runAllTasksFrom(taskQueue)) { ranAtLeastOne = true; } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. if (ranAtLeastOne) { lastExecutionTime = ScheduledFutureTask.nanoTime(); } afterRunningAllTasks(); return ranAtLeastOne; } ``` * ioRatio不为100时,会调用runAllTasks(ioTime * (100 - ioRatio) / ioRatio),首先计算出I/O处理的事件,然后按照比例为执行task分配事件,内部主要逻辑与runAllTasks()主要逻辑相同。