0xTTEPX

Just do it, deeply...

Follow me on GitHub

Netty总结

write by donaldhan, 2018-02-06 15:17

引言

Netty 是一个易于使用网络通信的客户端/服务器框架,利用Java的高级网络的能力,隐藏其背后的复杂性。同时是一个使用广泛的Java网络编程框架(Netty 在 2011 年获得了Duke’s Choice Award。它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的网络抽象核心代码。

Netty

上面文章中的Netty源码为Netty的4.1分支,当时的源码版本为4.1.12。

目录

Netty 通道处理器ChannelHandler和适配器定义ChannelHandlerAdapter

通道处理器ChannelHandler,主要有两个事件方法分别为handlerAdded和handlerRemoved,handlerAdded在通道处理器添加到实际上下文后调用,通道处理器准备处理IO事件;handlerRemoved在通道处理器从实际上下文中移除后调用,通道处理器不再处理IO事件。

一个通道处理器关联一个通道处理器上下文ChannelHandlerContext。通道处理器通过一个上下文对象,与它所属的通道管道线交互。通道上下文对象,通道处理器上行或下行传递的事件,动态修改管道,或通过AttributeKey存储特殊的信息。通道处理器内部定义了一个共享注解Sharable,默认访问类型为Protected;添加共享注解的通道处理器,说明通道处理器中的变量可以共享,可以创建一个通道处理器实例,多次添加到通道管道线ChannlePipeline;对于没有共享注解的通道器,在每次添加到管道线上时,都要重新创建一个通道处理器实例。通道处理器只定义了简单的通道处理器添加到通道处理器上下文或从上下文移除的事件操作,没有具体定义读操作(上行UpStream,输入流Inbound,字节流到消息对象ByteToMessage),写操作(下行DownStream,输出流Outbound,消息到字节流MessageToByte)。这操作分别定义在,输入流处理器ChannelInboundHandler,输出流处理器ChannelOutboundHandler,并提供了处理的相应适配器,输入流处理器适配器ChannelInboundHandlerAdapter,输出流通道适配器ChannelOutboundHandlerAdapter,多路复用适配器ChannelDuplexHandler。

通道处理器适配器ChannelHandlerAdapter的设计模式为适配器,这个适配器模式中的 handlerAdded和handlerRemoved事件默认处理器,不做任何事情,这个与MINA中的适配器模式相同。处理IO操作异常,则调用ChannelHandlerContext#fireExceptionCaught方法,触发异常事件,并转发给通道管道线的下一个通道处理器。 看通道处理器适配器的判断通道处理器是否共享注解,首先获取线程的本地变量,从线程本地变量获取线程本地共享注解通道处理器探测结果缓存,如果缓存中存在通道处理器clazz,则返回缓存结果,否则将探测结果添加到缓存中。

Netty Inbound/Outbound通道处理器定义

通道Inbound处理器,主要是处理从peer发送过来的字节流;通道处理器上下文关联的通道注册到事件循环EventLoop时,触发channelRegistered方法;通道处理器上下文关联的通道激活时,触发channelActive方法;通道从peer读取消息时,触发channelRead方法;当上一消息通过#channelRead方法,并被当先读操作消费时,触发channelReadComplete方法,如果通道配置项#AUTO_READ为关闭状态,没有进一步尝试从当前通道读取inbound数据时,直到ChannelHandlerContext#read调用,触发;当用户事件发生时,触发userEventTriggered方法;异常抛出时,触发exceptionCaught方法;当通道可写状态改变时,触发channelWritabilityChanged方法;通道处理器上下文关联的通道注册到事件循环EventLoop,但处于非激活状态,达到生命周期的末端时,触发channelInactive方法;通道处理器上下文关联的通道从事件循环EventLoop移除时,触发channelUnregistered方法。

Inbound通道handler适配器ChannelInboundHandlerAdapter,提供的Inbound通道处理器的所有方法的实现,但实现仅仅是,转发操作给Channel管道线的下一个通道处理器,子类必须重写方法。需要注意的是,在#channelRead方法自动返回后,消息并没有释放。如果你寻找ChannelInboundHandler的实现,可以自动释放接受的到消息可以使用SimpleChannelInboundHandler。

Outbound通道处理器ChannelOutboundHandler主要处理outbound IO操作。当绑定操作发生时,调用bind方法;当连接操作发生时,调用connect方法;read方法拦截通道处理器上下文读操作;当写操发生时,调用write方法,写操作通过Channel管道线写消息,当通道调用#flush方法时,消息将会被刷新,发送出去;当一个刷新操作发生时,调用flush方法,刷新操作将会刷新所有先前已经写,待发送的消息。

Outbound通道Handler适配器ChannelOutboundHandlerAdapter为Outbound通道处理器的基本实现,这个实现仅仅通过通道处理器上下文转发方法的调用。子类必须重写Outbound通道Handler适配器的相关方法。

在Mina中,通道读写全部在一个通道Handler,Mina提供的通道Handler适配器,我们在使用通道处理器时继承它,实现我们需要关注的读写事件。而Netty使用InBound和OutBound将通道的读写分离,同时提供了InBound和OutBound通道Handler的适配器。

Netty 简单Inbound通道处理器SimpleChannelInboundHandler

简单Inbound通道处理器SimpleChannelInboundHandler,内部有连个变量一个为参数类型匹配器,用来判断通道是否可以处理消息,另一个变量autoRelease,用于控制是否在通道处理消息完毕时,释放消息。读取方法channelRead,首先判断跟定的消息类型是否可以被处理,如果是,则委托给channelRead0,channelRead0待子类实现;如果返回false,则将消息转递给Channel管道线的下一个通道处理器;最后,如果autoRelease为自动释放消息,且消息已处理则释放消息。

Netty 消息编码器-MessageToByteEncoder

消息编码器MessageToByteEncoder实际上为一个Outbound通道处理器,内部有一个类型参数处理器TypeParameterMatcher,用于判断消息是否可以被当前编码器处理,不能则传给Channel管道线上的下一个通道处理器;一个preferDirect参数,用于决定,当将消息编码为字节序列时,应该存储在direct类型还是heap类型的字节buffer中。消息编码器主要方法为write方法,write方法首先,判断消息是否可以被当前编码器处理,如果消息可以被编码器处理,根据通道处理器上下文和preferDirect,分配一个字节buf,委托encode方法,编码消息对象到字节buf,encode方法待子类实现;释放消息对应引用参数,如果当前buffer可读,则通道处理器上下文写buffer,否则释放buffer,写空buf,最后释放buf。消息编码器MessageToByteEncoder实际上为一个Outbound通道处理器,这个与Mina中的消息编码器是有区别的,Mina中的消息编码器要和解码器组装成编解码工厂过滤器添加到过滤链上,且编解码工厂过滤器,在过滤链上是由先后顺序的,通道Mina中编码器和通道Handler是两个概念。而Netty中编码器实际为Outbound通道处理器,主要是通过类型参数匹配器TypeParameterMatcher,来判断消息是否可以被编码器处理。

Netty 消息解码器-ByteToMessageDecoder

消息解码器ByteToMessageDecoder,内部有两个buf累计器,分别为MERGE_CUMULATOR累计器buf,累计buf的过程为,首先判断当前累计buf空间不够存储,需要整合的in buf,或当前buf的引用数大于1,或累计buf只可读,三个条件中,有一个满足,则扩展累计buf容量,然后写in buf字节序列到累计器,释放in buf;COMPOSITE_CUMULATOR累计器是将需要整合的buf放在,内部的Component集合中,每个Component描述一个buf信息。

解码器有一个累计buf cumulation用于存放接收的数据;一个累计器Cumulator,默认为MERGE_CUMULATOR,累计接收的字节数据到cumulatio;first表示是否第一次累计字节到累计buf;decodeWasNull表示当前解码器是否成功解码消息,后续调用ByteBuf#discardSomeReadBytes方法;singleDecode表示解码器是否只能解码一次;numReads表示当前已经读取的字节数;discardAfterReads表示在读取多少个字节后,调用ByteBuf#discardSomeReadBytes方法,释放buf空间;解码器有三种状态,初始化状态STATE_INIT,还没有解码,正在解码状态STATE_CALLING_CHILD_DECODE,解码器正在从通道处理器上下文移除状态STATE_HANDLER_REMOVED_PENDING。

需要注意的是,解码器不可共享。读取操作首先判断消息是否为字节buf,是则,创建解码消息List集合out,如果第一次累计字节buf,则累计buf为,消息buf,否则累计器,累计消息buf数据,然后调用解码器#callDecode解码累计buf中的数据,并将解码后的消息添加到out集合中,并遍历解码消息集合,转发消息到Channle管道线上的下一个通道处理器。如果消息类型不是字节buf,直接通知Channle管道线上的下一个通道处理器消息消息。在解码的过程中,如果解码器从通道处理器上下文移除,则处理移除事件。移除解码器,首先判断 解码器状态,如果解码器处于正在解码状态,则解码器状态置为正在移除,并返回,否则判断累计buf是否为空,如果为空,则置空,否则通知通道处理上下文,所属的Channle管道线上的下一通道Handler消费数据。委托给handlerRemoved0方法完成实际的handler移除工作。

解码器的channelReadComplete方法,主要是执行通道处理器上下文read操作,请求从通道读取数据到第一个Inbound字节buf中,如果读取到数据,触发ChannelInboundHandler#channelRead操作;并触发一个ChannelInboundHandler#channelReadComplete时间以便,处理能够决定是否继续读取数据,如果当前解码完成,则通知,管道中的下一个通道处理器的read方法处理数据,如果一个读操作正在放生,则此方法不做什么事情。并触发管道中下一个通道处理器的fireChannelReadComplete事件。

channelInactive事件操作,主要是关闭通道输入流,在关闭之前,如果累计buf不为空,调用callDecode方法,解码字节数,为消息对象,并放入解码消息集合out中,管道中的下一个通道处理器,消费解码消息集合中的消息;最后调用decodeRemovalReentryProtection做最后的解码工作和通道移除工作。

消息解码器ByteToMessageDecoder实际上为Inbound通道处理器,这个与Mina中的消息解码器是有区别的,Mina中的消息解码器要和编码器组装成编解码工厂过滤器添加到过滤链上,且编解码工厂过滤器,在过滤链上是有先后顺序的,通道Mina中解码器和通道Handler是两个概念。

Netty Inbound/Outbound通道Invoker

每个通道Channel拥有自己的管道Pipeline,当通道创建时,管道自动创建,默认为DefaultChannelPipeline。Inbound通道Invoker ChannelInboundInvoker主要是触发管道线ChannelPipeline上的下一个Inbound通道处理器ChannelInboundHandler的相关方法。ChannelInboundInvoker有点Mina过滤器的意味。Outbound通道Invoker ChannelOutboundInvoker主要是触发触发管道线ChannelPipeline上的下一个Outbound通道处理器ChannelOnboundHandler的相关方法,同时增加了一下通道结果创建方法, ChannelOutboundInvoker也有点Mina过滤器的意味,只不过不像ChannelInboundInvoker的方法命名那么相似。

Netty 异步任务-ChannelFuture

netty的异步结果Future继承于JUC的Future,可以异步获取IO操作的结果信息,比如IO操作是否成功完成,如果失败,可以获取失败的原因,是否取消,同时可以使用cancel方法取消IO操作,添加异步结果监听器,、监听IO操作是否完成,并可以移除结果监听器,除这些之外我们还可以异步、同步等待或超时等待IO操作结果。异步结果监听器GenericFutureListener,主要监听一个IO操作是否完成,在异步结果有返回值时,通知监听器。ChannelFuture继承于空异步结果,即没有返回值,所以添加移除监听器,同步异步等待方法为空体。netty所有的IO操作都是异步的,当一个IO操作开始时,不管操作是否完成,一个新的异步操作结果将会被创建。如果因为IO操作没有完成,同时既没有成功,失败,也没有取消,新创建的那么,异步结果并没有完成初始化。如果IO操作完成,不论操作结果成功,失败或取消,异步结果将会标记为完成,同时携带更多的精确信息,比如失败的原因。需要注意的时,失败或取消也属于完成状态。强烈建议使用添加监听器的方式等待IO操作结果,而不await方法,因为监听器模式时非阻塞的,有更好的性能和资源利用率。

通道结果监听器ChannelFutureListener内部有3个监听器,分别为在操作完成时,关闭通道任务关联的通道的监听器CLOSE;当IO操作失败时,关闭通道任务关联的通道的监听器CLOSE_ON_FAILURE;转发通道任务异常到Channel管道的监听器FIRE_EXCEPTION_ON_FAILURE。Promise任务继承了任务Future,但多了以便标记成功、失败和不可取消的方法。 ChannelPromise与ChannelFuture的不同在于ChannelPromise可以标记任务结果。ChannelProgressivePromise与ProgressivePromise,ChannelProgressiveFuture的关系与ChannelPromise与Promise,ChannelFuture的关系类似,只不过ChannelPromise表示异步操作任务,ChannelProgressivePromise表示异步任务的进度,同时Promise类型异步任务都是可写的。

Netty 管道线定义-ChannelPipeline

Channle管道线继承了Inbound、OutBound通道Invoker和Iterable<Entry<String, ChannelHandler»接口,Channel管道线主要是管理Channel的通道处理器,每个通道有一个Channle管道线。Channle管道线主要定义了添加移除替换通道处理器的相关方法,在添加通道处理器的相关方法中,有一个事件执行器group参数,用于中Inbound和Outbound的相关事件,告诉管道,在不同于IO线程的事件执行器组中,执行通道处理器的事件执行方法,以保证IO线程不会被一个耗时任务阻塞,如果你的业务逻辑完全异步或能够快速的完成,可以添加一个事件执行器组。Channel管道线中的Inbound和Outbound通道处理器,主要通过通道处理器上下文的相关fire-INBOUND_ENT和OUTBOUND_OPR事件方法,传播Inbound和Outbound事件给管道中的下一个通道处理器

Netty 默认Channel管道线初始化

netty的log实例,实际是从默认log工厂获取,默认log工厂顺序为SLF4J,LOG4j,JDKLog。 每个通道拥有一个Channel管道线;管道线用于管理,通道事件处理Handler ChannelHandler,管道线管理通道处理器的方式,为通道处理器器上下文模式,即每个通道处理器在管道中,是以通道上下文的形式存在;通道上下文关联一个通道处理器,通道上下文描述通道处理器的上下文,通道上下文拥有一个前驱和后继上下文,即通道上下文在管道线中是一个双向链表,通道处理器上下文通过inbound和oubound两个布尔标志,判断通道处理器是inbound还是outbound。

上下文链表的头部为HeadContext,尾部为TailContext。头部上下文HeadContext的outbound的相关操作,直接委托给管道线所属通道的unsafe(Native API),inbound事件直接触发通道处理器上下文的相关事件,以便通道处理器上下文关联的通道Handler处理相关事件,但读操作实际是通过Channel读取。HeadContext的通道注册方法channelRegistered,主要是执行通道处理器添加回调任务链中的任务。处理器添加回调任务主要是触发触发上下文关联通道处理器的handlerAdded事件,更新上下文状态为添加完毕状态,如果过程中有异常发生,则移除通道上下文。channelUnregistered方法,主要是在通道从选择器反注册时,清空管道线程的通道处理器上下文,并触发上下文关联的通道处理器handlerRemoved事件,更新上下文状态为已移除。

Channel的管道线的通道处理器上下文链的尾部TailContext是一个傀儡,不同于尾部上下文,头部上下文,在处理inbound事件时,触发通道处理器上下文相关的方法,在处理outbound事件时,委托给管道线关联的Channle的内部unsafe。默认Channel管道实现内部有两个回调任务PendingHandlerAdded/RemovedTask,一个是添加通道处理器上下文回调任务,一个是移除通道上下文回调任务,主要是触发上下文关联通道处理器的处理器添加移除事件,并更新相应的上下文状态为已添加或已移除。管道构造,主要是检查管道通道是否为空,初始化管道上下文链的头部与尾部上下文。

netty通道处理器上下文可以说,是Mina中Hanlder和过滤器的集合,整合两者功能,管道线有点Mina过滤链的意味,HeadContext相当于Mina过滤链的头部过滤器,TailContext相当于Mina过滤链的尾部过滤器

Netty 默认Channel管道线-添加通道处理器

添加通道处理器到管道头部,首次检查通道处理器是否为共享模式,如果非共享,且已添加,则抛出异常;检查通道处理器名在管道内,是否存在对应通道处理器上下文,已存在抛出异常;根据事件执行器,处理器名,及处理器,构造处理器上下文;添加处理器上限文到管道上下文链头;如果通道没有注册到事件循环,上下文添加到管道时,创建添加通道处理器回调任务,并将任务添加管道的回调任务链中,当通道注册到事件循环时,触发通道处理器的handlerAdded事件,已注册则创建一个线程,用于调用通道处理器的handlerAdded事件方法,及更新上下文状态为已添加,并交由事件执行器执行;最好调用callHandlerAdded0方法,确保调用通道处理器的handlerAdded事件方法,更新上下文状态为已添加。其他last(添加到管道尾部),before(添加指定上下文的前面),after(添加指定上下文的后面)操作,基本上与addfirst思路基本相同,不同的是添加到管道上下文链的位置。

Netty 默认Channel管道线-通道处理器移除与替换

无论是根据名称,处理器句柄,还是根据类型移除通道处理器,都是首先获取对应的处理器上下文,从管道中移除对应的上下文,如果通道已经从事件循环反注册,则添加移除回调任务到管道回调任务链,否则直接创建线程(触发上下文关联的处理器handlerRemoved事件,更新上下文状态为已移除),有上下文关联的事件执行器执行。

无论是根据名称,处理器句柄,还是根据类型替换通道处理器,都是首先获取对应的处理器上下文,然后添加新上下文到管道中原始上下文的位置,并将原始上下文的前驱和后继同时指向新上下文,以便转发剩余的buf内容;可以简单理解为添加新上下文,移除原始上下文,注意必须先添加,后移除,因为移除操作会触发channelRead和flush事件,而这些事件处理必须在handlerAdded事件后。

Netty 默认Channel管道线-Inbound和Outbound事件处理

管道处理通道注册方法fireChannelRegistered,首先从头部上下文开始,如果上下文已经添加到管道,则触发上下文关联的通道处理器的channelRegistered事件,否则转发事件给上下文所属管道的下一个上下文;其他触发Inbound事件的处理过程与fireChannelRegistered方法思路相同,只不过触发的通道处理器的相应事件。管道处理Inbound事件首先从头部上下文开始,直到尾部上下文,最后默认直接丢弃。

管道处理通道处理器地址绑定bind事件,首先从管道上下文链的尾部开始,寻找Outbound上下文,获取上下文的事件执行器,如果事件执行器线程在当前事件循环中,则触发通道处理器地址绑定事件#invokeBind,否则创建一个线程,执行事件触发操作,并交由事件执行器执行;#invokeBind首先判断通道处理器是否已经添加到管道,如果以添加,则触发Outbound通道处理器的bind事件方法,否则,传递地址绑定事件给管道中的下一个Outbound上下文。管道处理Outbound相关事件,从尾部上下文到头部上下文,到达头部时,交由上下文所属管道关联的Channel的Unsafe处理。

netty 通道处理器上下文定义

通道处理器上下文ChannelHandlerContext,使通道处理器可以与管道和管道中其他处理器进行交互。当IO事件发生时,处理可以将事件转发给所属管道的下一个通道处理器,同时可以动态修改处理器所属的管道。通过上下文可以获取关联通道,处理器,事件执行器,上下文名,所属管道等信息。同时可以通过AttributeKey存储上下文属性,用alloc方法获取通道上下文的字节buf分配器,用于分配buf。

Netty 通道处理器上下文

抽象通道处理器上下文AbstractChannelHandlerContext,拥有一个前驱和后继上下文,用于在管道中传递IO事件;通道处理器总共有四个状态,分别为初始化,正在添加到管道,已添加管道和从管道移除状态;上下文同时关联一个管道;Inbound和Outbound两个用于判断上下文的类型,决定了上下文是处理器Inbound事件还是Outbound事件;一个事件执行器executor,当上下文执行器不在当前事务循环中时,用于执行IO事件操作;同时有一些延时任务,如上下文读任务,上下文刷新任务,读完成任务和通道可写状态改变任务。上下文构造,主要是初始化上下文name,所属管道,事件执行器,上下文类型。上下文关联的通道通道处理器在具体的实现中定义,比如通道处理器上下文默认实现为DefaultChannelHandlerContext,内部关联一个通道处理器。

上下文处理通道fireChannelRegistered事件,如果上下文事件执行器在当前事务循环,则直接在当前线程,执行触发上下文关联通道处理器的channelRegistered事件任务,否则,创建一个线程执行事件任务,并由上下文事务执行器运行;触发上下文关联通道处理器的channelRegistered事件任务,首先判断上下文是否已经添加到管道,已添加,则触发 上下文关联通道处理器的channelRegistered事件,否则转发事件给上下文所属管道的下一个Inbound上下文。其他Inbound事件的处理过程与fireChannelRegistered方法思路相同,只不过触发的是通道处理器的相应事件;如果Inbound事件处理过程中,异常发生,首先检查异常是不是通道处理器的exceptionCaught方法抛出,是,则直接log,否则触发上下文关联通道处理器的exceptionCaught事件。

上下文处理关联通道处理器的地址绑定bind事件,首先从所属管道上下文链的尾部开始,寻找Outbound上下文,找到后,获取上下文的事件执行器,如果事件执行器线程在当前事件循环中,则触发上下文关联通道处理器地址绑定事件,否则创建一个线程,执行事件触发操作,并交由事件执行器执行;触发上下文关联通道处理器地址绑定事件,首先判断上下文关联通道处理器是否已经添加到管道,如果以添加,则触发Outbound通道处理器的bind事件方法,否则,传递地址绑定事件给管道中的下一个Outbound上下文。如果在Outbound事件处理器过程中,出现异常则直接委托给异步任务结果通知工具PromiseNotificationUtil,通知异步任务失败,并log异常日志。

netty的通道处理器上下文和mina的会话有点像,都拥有描述通道Handler的Context;不同的时mina中的会话与通道直接关联,而netty上下文与通道间是通过Channel管道关联起来,mina中的过滤链是依附于会话,而netty上下文依附于Channel管道,mina中的IO事件执行器为IoProcessor,netty中的IO事件的处理委托给事件循环或上下文的子事件执行器。

Netty 通道初始化器ChannelInitializer

通道初始化器ChannelInitializer实际上为Inbound通道处理器,当通道注册到事件循环中后,添加通道初始化器到通道,触发handlerAdded事件,然后将初始化器的上下文放入通道初始化器的上下文Map中,如果放入成功且先前不存在,initChannel(C ch),初始化通道,其中C为当前通道,我们可以获取C的管道,添加通道处理器到管道,这就是通道初始化器的作用。添加完后,从通道的管道中移除初始化器上下文,并从通道初始化器的上下文Map中移除通道初始化器上下文。

Netty 事件执行器组和事件执行器定义及抽象实现

事件循环组EventLoopGroup为一个特殊的事件执行器组EventExecutorGroup,可以注册通道,以便在事件循环中,被后面的选择操作处理器。事件执行器组继承了JUC的调度执行器服务ScheduledExecutorService,用迭代器Iterable管理组内的事件执行器。事件执行器是一个特殊的事件执行器组。Nio多线程事件循环NioEventLoopGroup可以理解为多线程版MultithreadEventExecutorGroup的事件执行器组。

事件执行器组EventExecutorGroup主要提供了关闭事件执行器组管理的执行器的相关方法,获取事件执行器组管理的事件执行器和执行任务线程方法。事件执行器EventExecutor为一个特殊的事件执行器组EventExecutorGroup,提供了获取事件执行器组的下一个事件执行器方法,判断线程是否在当前事件循环中以及创建可写的异步任务结果和进度结果,及已经成功失败的异步结果。 抽象事件执行器组AbstractEventExecutorGroup,所有与调度执行器关联的提交任务和调度任务方法,直接委托给事件执行器组的下一个事件执行器相应方法执行。graceful方式关闭事件执行器组,默认关闭间隔为2s,超时时间为25s,具体定义在抽象事件执行器AbstractEventExecutor中。

抽象事件执行器,继承了抽象执行器服务AbstractExecutorService,提交任务线程,直接委托给父类抽象执行器服务,不支持延时调度的周期间歇性调度任务线程,多个一个安全地执行给定任务线程方法,捕捉执行过程中抛出的异常。由于抽象的事件执行器是一个特殊的事件执行器组,内部事件执行器selfCollection(Collections.singleton(this)),是自己单例集,next方法返回的是自己。

Netty 多线程事件执行器组

多线程事件执行器组MultithreadEventExecutorGroup,内部有一个事件执行器数组存放组内的事件执行器;readonlyChildren为组内事件执行器集的可读包装集Set;terminatedChildren(AtomicInteger),用于记录已关闭的事件执行器数;termination为执行器组terminated异步任务结果;同时有一个事件执行器选择器chooser(EventExecutorChooser)。构造多线程执行器组,首先检查线程数参数,如果执行器不为空,则初始化线程执行器的线程工厂,创建事件执行器集,并根据执行器和相关参数创建事件执行器,实际创建方法为newChild,待子类实现,初始化事件执行器选择器,创建terminated事件执行器监听器,添加terminated事件执行器监听器到terminated异步任务结果,包装事件执行器集为只读集readonlyChildren。获取执行器组的下一个事件执行器方法委托个内存的事件执行器选择器chooser;返回的迭代器为内部只读执行器集的迭代器;而关闭执行器组方法,实际为遍历管理的事件执行器集,关闭执行器;判断执行器组是否关闭和Terminated,当且仅当组内的事件执行器都关闭和Terminated时,才返回true;超时等待Terminated执行器组方法,实际为遍历事件执行器组超时等待时间耗完,则停止Terminated执行器组,否则,超时剩余等待时间timeLeft,Terminated事件执行器。

Netty 多线程事件循环组

事件循环组EventLoopGroup继承了事件执行器组EventExecutorGroup,next方法返回的为事件循环EventLoop,事件循环组主要所做的工作为通道注册。 事件循环EventLoop可理解为已顺序、串行的方式处理提交的任务的事件执行器EventExecutor。事件循环组EventLoopGroup可以理解为特殊的事件执行器组EventExecutorGroup;事件执行器组管理事件执行器,事件循环组管理事件循环。抽象事件循环AbstractEventLoop继承了抽象事件执行器AbstractEventExecutor,实现了事件循环接口。 多线程事件循环组MultithreadEventLoopGroup继承了多线程事件执行器组,实现了事件循环组接口,相关注册通道方法委托给多线程事件循环组的next事件循环,线程工程创建的线程优先级默认为最大线程优先级;默认事件循环线程数为1和可用处理器数的2倍中的最大者,这个线程数就是构造多线程事件执行器组事件执行器数量。

Netty 抽象调度事件执行器

事件循环组EventLoopGroup为一个特殊的事件执行器组EventExecutorGroup,可以注册通道,以便在事件循环中,被后面的选择操作处理器。事件执行器组继承了JUC的调度执行器服务ScheduledExecutorService,用迭代器Iterable管理组内的事件执行器。事件执行器是一个特殊的事件执行器组。Nio多线程事件循环NioEventLoopGroup可以理解为多线程版MultithreadEventExecutorGroup的事件执行器组。

事件执行器组EventExecutorGroup主要提供了关闭事件执行器组管理的执行器的相关方法,获取事件执行器组管理的事件执行器和执行任务线程方法。事件执行器EventExecutor为一个特殊的事件执行器组EventExecutorGroup,提供了获取事件执行器组的下一个事件执行器方法,判断线程是否在当前事件循环中以及创建可写的异步任务结果和进度结果,及已经成功失败的异步结果。抽象事件执行器组AbstractEventExecutorGroup,所有与调度执行器关联的提交任务和调度任务方法,直接委托给事件执行器组的下一个事件执行器相应方法执行。graceful方式关闭事件执行器组,默认关闭间隔为2s,超时时间为25s,具体定义在抽象事件执行器AbstractEventExecutor中。抽象事件执行器,继承了抽象执行器服务AbstractExecutorService,提交任务线程,直接委托给父类抽象执行器服务,不支持延时调度的周期间歇性调度任务线程,多个一个安全地执行给定任务线程方法,捕捉执行过程中抛出的异常。由于抽象的事件执行器是一个特殊的事件执行器组,内部事件执行器selfCollection(Collections.singleton(this)),是自己单例集,next方法返回的是自己。

Netty 单线程事件执行器初始化

单线程事件执行器SingleThreadEventExecutor,内部主要有一个状态变量STATE_UPDATER(AtomicIntegerFieldUpdater),执行器状态以供有4中就绪,开始,正在关闭,已关闭,终止;一个任务队列taskQueue存放待执行的任务线程;一个执行器执行任务taskQueue(LinkedBlockingQueue);一个事件执行器关闭信号量threadLock控制事件执行器的关闭;一个是高可见线程thread,指定当前事件执行器线程,用于判断IO操作线程是否在当前事件循环中;单线程事件执行器构造,主要是初始化父事件执行器,最大任务数,事件执行器,任务队列和任务拒绝策略,默认拒绝策略为直接抛出拒绝执行器异常。由于单线程事件执行器为顺序执行器OrderedEventExecutor,其主要通过taskQueue为LinkedBlockQueue保证任务的顺序执行。

Netty 单线程事件执行器执行任务与graceful方式关闭

单线程事件执行器,执行任务,首先判断任务是否为null,为空抛出空指针异常,否则,判断线程是否在当前事件循环中,在则添加任务到任务队列,否则开启当前单线程事件执行器,并添加任务到任务队列,如果此时事件执行器已关闭,并可以移除任务,则抛出拒绝执行器任务异常;如果需要启动事件执行器唤醒线程,则添加唤醒线程到任务队列。添加,移除,poll任务操作,实际委托给任务队列,添加,移除hook线程操作委托给关闭hooks线程集合。单线程事件执行器take任务,首先从调度任务队列peek头部调度任务,如果任务不为空,则获取调度任务延时时间,如果延时时间大于0,则从任务队列超时poll任务,否则从调度任务队列抓取调度任务,添加到任务队列,并从任务队列poll任务;如果调度任务为空,则从任务队列take一个任务,如果是唤醒任务,则忽略。关闭单线程执行器,首先检查间隔、超时时间,时间单元参数,并且间隔时间要小于超时时间,如果已经关闭,则返回异步关闭任务结果,否则检查线程是否在当前事务循环中,如果是则更新状态为正在关闭,并计算计算关闭间隔和超时时间。

Netty 单线程事件循环

单线程事件循环SingleThreadEventLoop,继承了单线程事件执行器,实现了事件循环接口,内部一个事件循环任务队列,我们可以把单线程事件循环看为一个简单的事件执行器,单线程事件循环中多了一个通道注册的方法,实际注册工作委托给通道关联的UnSafe。

Netty nio事件循环初始化

io事件循环内部有一个取消选择key计数器清理间隔CLEANUP_INTERVAL,用于当取消的选择key达到256个时,重置取消选择key计数器cancelledKeys(int),并重新进行选择操作;选择器自动重构阈值SELECTOR_AUTO_REBUILD_THRESHOLD,默认选择操作发生512次,用于控制当选择器发生多少次选择操作时,重构选择器;选择器状态判断器selectNowSupplier,用于获取Nio事件循环内部选择器的选择操作结果;同时有一个选择器selector,未包装过的选择器unwrappedSelector和一个选择器提供者provider,一个选择key就绪集合selectedKeys(SelectedSelectionKeySet);当选择器的选择操作阻塞时,wakenUp(AtomicBoolean)属性决定是否应该break选择操作过程;一个Nio处理Io事件的时间占比ioRatio(int),默认为50,即IO事件处理时间和其他事件处理时间各占Nio事件循环一半;一个选择策略selectStrategy用于控制选择循环,如果返回结果为-1,则下一步应该阻塞选择操作,如果返回结果为-2,则下一步应该调回IO事件循环,处理IO事件,而不是继续执行选择操作,返回值大于0,表示需要有工作要做,即注册到选择器的选择通道有IO事件就绪。

Nio事件循环初始化,主要是将Nio事件循环组和事件执行器及任务拒绝策略传给父类单线程事件循环(单线程事件执行器),同时打开一个选择器。打开选择器过程,委托给选择器提供者打开一个选择器,如果需要优化选择器的,在当前线程访问控制选择下,加载选择器实现类,不初始化,如果从系统类加载器加载的选择key实现类不是Class实例,或不是裸选择器类型,不进行选择器key集合优化,及选择器为选择器提供者打开的裸选择器;否则在当前线程相同访问控制权限下,获取系统选择器实现类的,选择器就绪key集合selectedKeysField及其代理publicSelectedKeysField,设置选择器就绪key集合selectedKeysField及其代理publicSelectedKeysField访问控制权限,将系统选择器的就绪key集合selectedKeysField及其代理publicSelectedKeysField设值为selectedKeySet(SelectedSelectionKeySet),并将选择器selector包装为SelectedSelectionKeySetSelector。

Netty nio事件循环后续

Nio事件循环启动后,首先选择策略根据选择器结果提供者和任务队列是否有任务生成下一步的操作策略,如果选择操作结果返回为SelectStrategy.CONTINUE,则跳出当前事件循环,如果为SelectStrategy.SELECT,则执行选择操作,并阻塞下一次选择操作,如果需要唤醒选择器,则唤醒;然后重置取消选择key计数器cancelledKeys为0,置是否需要重新选择属性needsToSelectAgain为false,然后处理选择器就绪的选择key,在根据当前IO事件处理时间百分比ioRatio,决定是执行任务队列所有任务还是超时执行任务队列任务,如果ioRatio小于100,则为超时执行任务队列中任务;最后检查事件循环是否正在关闭,是则反注册选择器中的选择通道,并关闭,确定事件循环关闭;上述整个过程在事件循环运行期间,不断地重复。在事件循环的每次处理过程中,在最后都要检查意见事件循环是否关闭,如果正在关闭,则关闭注册到选择器的所有通道,并确保事件循环关闭。

选择操作的过程为,首先重置选择操作计数器为0,计算选择操作延时时间;如果延时时间已过去0.5毫秒,且选择操作计数器当前为0,即第一次执行选择操作,执行立刻执行选择操作,更新选择操作计数器,并跳出当前选择操作过程;如果任务队列中有任务,且wakeUp属性为false,并更新为true成功,则立刻执行选择操作,更新选择操作计数器,跳出当前选择操作过程;如果上面两种情况都不是,则执行超时选择操作,如果有选择key就绪,或原始wakeUp属性为true,或当前wakeUp属性为true,或任务队列有任务,或调度任务队列有调度任务,则跳出当前选择操作;如果选择器重构阈值大于0,且当前选择操作计数器的值大于阈值,则重新构造选择器,即创建新的选择器将原始选择器关联的选择key,注册到新的选择器中。

默认的处理选择器就绪选择key集合过程,为遍历选择key集合,处理就绪的选择key,首先选择key当前必须有效,再判断选择key通道事件循环是否是当前循环,否则直接返回,是则判断就绪key的就绪事件是连接请求事件,写事件还是读事件,如果事件连接操作,则委托通道的Unsafe完成通道连接,并移除连接事件;如果是如果是写事件,则委托通道的Unsafe刷新写请求队列,释放内存;果是读事件,则委托给通道的Unsafe的read方法;如果在处理就绪选择key的过程,需要重新执行选择操作,则立刻执行,并更新当前就绪选择key集合及其迭代器。

Nio事件循环内部有一个选择器,所有注册到选择器的通道都在一个事件循环中,Nio事件循环是单线程事件循环,即单线程事件执行器,在处理选择器的就绪选择key时,当且仅当,就绪选择key关联通道所在的事件循环为当前事件循环时,才出来就绪选择key关联通道的就绪IO事件,从而保证通道的读写等操作线程安全。

Nio事件循环实际的工作就是执行选择操作,并处理选择器的就绪选择key,Nio事件循环与Mina的IoProcessor有点相似,都可以看着一个线程执行器,执行通道的IO事件操作,而不同的是Nio管理的是选择器Selector,而Mina的IoProcessor管理的会话IoSession。

Netty nio事件循环组

nio事件循环组实际为一个多线程事件循环组,主要用于管理nio事件循环;从设置nio事件循环组的IO事件处理时间百分比和重构nio事件循环组选择器方法,可以看出事件循环组继承迭代器的原因;nio事件循环组可以统一设置组内的nio事件循环的IO事件处理时间百分比,而nio事件可以动态变更自己的IO事件处理时间百分比,重构选择器也有这么点意思。nio事件循环组是多线程的,而nio事件循环时单线程的,这个与Mina的IoProcessor和processor的关系有点像。不同的是nio事件循环面向的是选择器Selector,而processor面向的是会话IoSession。

Netty 抽象BootStrap定义

抽象引导程序AbstractBootstrap,内部关联的一个事件循环组EventLoopGroup,一个通道处理器ChannelHandler,一个通道选项集和一个本地Socket地址及一个通道属性集。内部的方法主要配置事件循环组,通道处理,通道选项集,socket地址,及通道属性,通道注册,地址绑定。注册通道到事件循环组,首先由通道工厂创建通道实例,然后初始化通道,初始化工作待子类实现;然后将实际注册工作委托给事件循环组。绑定定socket地址,首先注册通道到事件循环组,待注册完成时,创建一个绑定任务线程完成地址绑定,实际将地址绑定工作委托给通道,并将绑定任务线程交由通道关联的事件循环的事件执行器执行。

Netty ServerBootStrap解析

服务端Bootstrap虽然继承与抽象Bootstrap,但他有自己的child通道选项及属性集,事件循环组和通道处理器,这些是用于配置,当Server通道接收客户端的请求,创建与客户端交互的通道。当构造Server引导配置时,如果传递一个事件循环,则Server通道监听器和客户端交互的通道公用一个事件循环组,否则parentGroup事件循环组用于监听器ServerChannel接受连接,childGroup事件循环组用于处理与客户端交互的通道相关事件和IO操作。

Server引导配置绑定socket地址,首先初始化通道,对于Server引导配置,这个通道为NioServerSocketChannel,初始化通道,即初始化Server通道;初始化Server通道,首先将Server引导配置的父类抽象Bootstrap的选项和属性配置给Server通道,然后添加ServerBootstrapAcceptor到Server通道内部的Channel管道内,然后将Server通道注册到事件循环组parentGroup中,然后通过Server通道#bind方法完成实际socket地址;Server引导配置监听器实际为一个Inbound通道处理器,每当有客户端连接请求时,则创建一个与客户端交互的通道,将child通道选项及属性配置给通道,并将通道注册到childGroup事件循环组,然后将通道处理器添加到与客户端交互的通道内部的Channel管道中。

Netty Bootstrap解析

客户端引导配置的连接操作,首先初始化通道,主要是配置通道的,选型和属性,将通道处理器添加到通道内部的Channel管道中,注册通道到事件循环组,然后委托通道完成实际的连接操作。

Netty 通道接口定义

通道Channel,关联一个事件循环,及通道注册的事件循环EventLoop,一个Channel管道ChannelPipeline,用于存放通道处理器;一个字节buf分配器ByteBufAllocator,用于分配字节buf,还有一些获取通道状态的方式,是否注册到事件循环,通道是否打开,是否可写;另外还要获取通道配置ChannelConfig,通道元数据ChannelMetadata的方法;最重要的是,关联一个Unsafe,用于通道的地址绑定,连接操作以及断开,通道的读写,注册到事件循环以及反注册。

Netty 抽象通道初始化

抽象通道AbstractChannel内部关联一个硬件底层操作类Unsafe,一个事件循环,即通道注册的事件循环EventLoop,一个Channel管道ChannelPipeline,用于存放通道处理器,默认为DefaultChannelPipeline。通道构造主要是初始化通道所属父通道,通道id,底层操作类Unsafe,Channel管道线程,默认的Channel管道线为DefaultChannelPipeline,底层操作类Unsafe为AbstractUnsafe。

Netty 抽象Unsafe定义

抽象Unsafe内部关联一个通道Outbound buf(ChannelOutboundBuffer),一个接收字节buf分配器Hander( RecvByteBufAllocator.Handle)。通道注册到事件循环,首先检查事件循环是否为空,通道是否已注册到事件循环,通道是否兼容事件循环,检查通过后,如果线程在当前事件循环,则委托给register0完成实际注册任务,否则创建一个任务线程,完成通道注册事件循环实际工作register0,并将任务线程交由事件循环执行。register0方法首先确保任务没取消,通道打开,调用doRegister完成注册,确保在实际通知注册任务完成前,调用handlerAdded事件,触发通道已注册事件fireChannelRegistered,如果通道激活且第一次注册,则触发通道已激活事件fireChannelActive,否则如果通道配置为自动读取,则读取数据beginRead,实际委托给doBeginRead方法,待子类实现。这个过程中触发的事件,则传递给通道内部的Channel管道。地址绑定方法委托给doBind,待子类实现。

关闭通道方法,首先确保异步关闭任务没有取消,如果Outbound buf为空,则添加异步结果监听器;再次检查关闭任务有没有执行完,执行完则更新异步任务结果;获取关闭线程执行器,如果关闭执行器不为空,则创建关闭任务线程,并由关闭执行器执行,否则在当前事务循环中执行实际关闭任务。实际关闭任务过程为,调用doClose0完成通道关闭任务,待子类实现,然后设置刷新Outbound 写请求队列数据失败,关闭OutBound buf,如果通道正在刷新,则延迟触发ChannelInactive事件,并反注册,否则直接触发ChannelInactive事件并反注册。写消息,首先检查Outbound buf是否为null,为空,则通道关闭,设置任务失败,否则转换消息,估算消息大小,添加消息到OutBound Buf中。刷新操作,首先将Outbound buf中写请求,添加到刷新队列中,然后将实际刷新工作委托给doWrite,doWrite方法,待子类实现。

Netty 通道Outbound缓冲区

通道Outbound缓存区内部关联一个通道,同时有一个线程本地buf数组,一个未刷新的buf链表和一个刷新buf链表。通道写消息时,消息将会被包装成写请求Entry。

添加消息到通道Outbound缓冲区,首先包装消息为写请求Entry,将写请求Entry添加到未刷新写请求链表上,并更新通道当前待发送的字节数据,如果通道待发送的字节数大于通道写bufsize,则更新通道写状态,并触发ChannelWritabilityChanged事件。触发事件实际操作委托给通道的Channel管道。

添加刷新操作,即遍历未刷新写请求链表,将写请求添加到刷新链表中,如果写请求取消,则更新通道待发送字节数,如果待发送字节数消息,小于通道配置的写buf size,则更新通道可写状态。

移除操作,主要是从刷新写请求链移除链头写请求,并则释放写请求消息,更新写请求任务结果,当前通道待发送字节数和可写状态,并触发相应的事件

从刷新写请求链表,移除writtenBytes个字节数方法removeBytes,自旋,直至从刷新链中移除writtenBytes个字节数,如果链头消息的可读字节数小于writtenBytes,则移除写请求Entry,否则更新writtenBytes,继续从刷新链中的写请求消息中移除writtenBytes个字节数。

将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。

Netty 抽象通道后续

通道的绑定操作、连接,写消息,读操作,刷新操作,反注册、断开连接,关闭通道等操作事件实际调用通道的Channel管道的相关方法,即触发通道相关事件,这些方法是重写了通道OutboundInvoker的相关方法。在抽象Unsafe那篇文章中,我们看到其内部也有绑定、注册,读操作,写操作和关闭操作,这些是通道的实际操作方法。

Netty 抽象nio通道

抽象nio通道AbstractNioChannel内部关联一个可选择通道(SelectableChannel)和一个选择key(selectionKey)。抽象Nio通道构造,主要是初始化通道并配置为非阻塞模式。

注册doRegister工作主要是,注册可选择通道到通道所在事件循环的选择器中。反注册doDeregister,委托给事件循环,取消选择key,即从事件循环关联选择器的选择key集合中移除当前选择key。开始读操作doBeginRead,实际工作为将读操作事件,添加选择key的兴趣事件集

抽象nioUnsafe为特殊的Unsafe,允许访问底层的选择通道。选择通道方法返回的实际为抽象nio通道内部的底层可选择通道。移除读兴趣事件removeReadOp,即从选择key兴趣事件集中,移除读操作事件。连接操作,将实际连接操作委托给doConnect,待子类实现,如果连接成功,则通知异步任务连接成功,如果是第一次连接,则触发通道的激活事件fireChannelActive。完成连接操作,实际工作委托给抽象Nio通道的doFinishConnect方法,待子类实现,完成后更新任务结果,触发通道的激活事件fireChannelActive,如果出现异常,则更新连接任务为异常失败。

Netty 抽象nio字节通道

写通道Outbound缓冲区,即遍历刷新链上的写请求,如果写请求消息为字节buf,则调用doWriteBytes完成实际数据发送操作,待子类扩展,如果写请求消息为文件Region,调用doWriteFileRegion完成实际数据发送操作,待子类扩展,数据发送,则更新通道的数据发送进度,并从刷新链上移除写请求;如果所有写请求发送完毕,则重新添加写操作事件到选择key兴趣事件集,否则继续刷新通道Outbound缓冲区中的写请求。

nio字节Unsafe读操作,从通道接收缓冲区读取数据,通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则读取缓存区中没有读完的数据,并通道通道处理剩余数据。

Netty 抽象nio消息通道

抽象Nio消息通道AbstractNioMessageChannel,写通道Outbound缓冲区消息,即遍历通道Outbound缓冲区刷新链,当写消息请求为空时,从选择key兴趣集中移除写操作事件,否则,委托doWriteMessage方法,将消息写到底层通道,doWriteMessage方法待子类扩展,写完,将写请求从刷新链上移除,否则,如果需要,添加写事件到选择key的兴趣事件集。

nio消息Unsafe(NioMessageUnsafe)读操作,从通道接收缓冲区读取数据,通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则触发通道fireExceptionCaught事件,如果读任务完毕,且不需自动读,则从选择key兴趣事件集移除读操作事件

Netty NioServerSocketChannel解析

nio服务端socket通道NioServerSocketChannel内部有两个变量,一个为选择器提供者SelectorProvider,一个为通道配置ServerSocketChannelConfig。

通道实际绑定socket地址,首先判断jdk版本信息,如果jdk版本大于1.7 则使用通道bind方法,绑定socket地址,否则为通道关联Socket的bind方法。

doReadMessages方法,实际为当接受客户端的连接请求时,创建一个与客户端交互的socket通道,并添加到读操作结果集中,实际为socket通道集。并将socket通道集交给ServerBootStrap的引导配置监听器ServerBootstrapAcceptor处理,Server引导配置监听器实际为一个Inbound通道处理器,每当有客户端连接请求时,则创建一个与客户端交互的通道,将child通道选项及属性配置给通道,并将通道注册到childGroup事件循环组,然后将通道处理器添加到与客户端交互的通道内部的Channel管道中。 客户端连接服务端时,首先向服务端发送连接请求数据,服务端接受到连接请求时,创建一个与客户端交互的socket通道。

由于服务端通道用于接受客户端的请求,所有不支持连接,写消息,消息过滤等等操作。

Netty 通道配置接口定义

通道配置接口,主要配置通道的字节buf分配器,接受buf分配器,消息size估算器,和通道选项。通通配置有两类分别为Socket通道和ServerSocket通道配置,大部分配置与Socket和SeverSocket的基本相同 。

Netty 默认通道配置初始化

默认通道配置内部关联一个通道,一个消息大小估算器,默认为DefaultMessageSizeEstimator,,尝试写自旋次数默认为6,写操作失败,默认自动关闭通道,连接超时默认为30000ms,同时拥有一个字节buf 分配器和一个接收字节buf 分配器。通道配置构造,主要是初始化配置关联通道和接收字节buf分配器。如果系统属性io.netty.allocator.type,配置为unpooled,则默认的字节buf分配器为UnpooledByteBufAllocator,否则为PooledByteBufAllocator,对于Android平台,默认为UnpooledByteBufAllocator。默认接收字节buf分配器为AdaptiveRecvByteBufAllocator。接收字节buf分配器,主要是控制下一次接收字节buf的容量,如果当前读取字节数大于消息上一次读取的字节buf容量,则减少下一次接收buf的容量,否则增加下一次接收buf的容量。

Netty 默认通道配置后续

默认通道配置内部主要是配置消息大小估算器,字节buf分配器,接收字节buf分配器等属性。默认ServerSocket通道配置,与ServerSocket相关的配置委托给ServerSocket的相关方法,其他委托给父类默认通道配置。默认Socket通道配置,与Socket相关的配置委托给Socket的相关方法,其他委托给父类默认通道配置

Netty NioSocketChannel解析

nio socket通道初始化,主要是创建socket通道,初始化socket通道配置NioSocketChannelConfig。地址绑定操作,如果jdk大于1.7 则socket通道直接绑定地址,否则委托通道内关联的socket。连接操作,直接委托给内部的socket通道连接操作。socket通道读取操作,实际委托给socket通道的read操作,从Socket通道读取数据,写到当前buf。写字节buf,实际委托给socket通道的写操作,从当前buf读取数据,写socket通道中。socket通道写文件region,委托给文件Region的转移数据操作transferTo,从文件Region读取数据,写到通道中。

写通道Outbound缓存区,首先从Outbound缓存区获取刷新链上的写请求对应的字节buf,然后委托给socket通道的写操作,发送数据,发送成功后,从刷新链上移除已经发送的写请求。关闭数据流,就是从事件循环反注册,即事件循环取消选择key,然后如果jdk大于1.7 则委托socket通道关闭输入流,否则委托通道内关联的socket。关闭输出流与关闭输入流思路一致。关闭通道实际为关闭通道输入流和输出流。断开连接实际为close通道。

Netty 字节buf定义

对象引用计数器ReferenceCounted,主要记录对象的引用数量,当引用数量为0时,表示可以回收对象,在调试模式下,如果发现对象出现内存泄漏,可以用touch方法记录操作的相关信息,通过ResourceLeakDetector获取操作的相关信息,以便分析内存泄漏的原因。

字节缓存ByteBuf继承了对象引用计数器ReferenceCounted,拥有一个最大容量限制,如果用户尝试用 #capacity(int)和 #ensureWritable(int)方法,增加buf容量超过最大容量,将会抛出非法参数异常;字节buf有两个索引,一个为读索引readerIndex,一个为写索引writerIndex,读索引不能大于写索引,写索引不能小于读索引,buf可读字节数为writerIndex - readerIndex,buf可写字节数为capacity - writerIndex,buf可写的最大字节数为maxCapacity - writerIndex;

可以使用markReader/WriterIndex标记当前buf读写索引位置,resetReader/WriterIndex方法可以重回先前标记的索引位置;

当内存空间负载过度时,我们可以使用discardReadBytes丢弃一些数据,以节省空间;

我们可以使用ensureWritable检测当buf是否有足够的空间写数据;

提供了getBytes方法,可以将buf中的数据转移到目的ByteBuf,Byte数组,Nio字节buf ByteBuffer,OutputStream,聚集字节通道 GatheringByteChannel和文件通道FileChannel中,这些方法不会修改当前buf读写索引,具体是否修改目的对象索引或位置,见java doc 描述。

提供了setBytes方法,可以将源ByteBuf,Byte数组,Nio字节buf ByteBuffer,InputputStream,分散字节通道ScatteringByteChannel和文件通道FileChannel中的数据转移到当前buf中,这些方法不会修改当前buf的读写索引,至于源对象索引或位置,见java doc 描述。

提供了readBytes方法,可以将buf中的数据转移到目的ByteBuf,Byte数组,Nio字节buf ByteBuffer,OutputStream,聚集字节通道GatheringByteChannel和文件通道FileChannel中,这些方法具体会会修改当前buf读索引,至于会不会修改源对象索引或位置,见java doc 描述。

提供了writeBytes方法,可以将源ByteBuf,Byte数组,Nio字节buf ByteBuffer, InputputStream,分散字节通道ScatteringByteChannel和文件通道FileChannel中的数据写到当前buf中,这些方法会修改当前buf的写索引,至于会不会修改源对象索引或位置,见java doc 描述。

set原始类型方法不会修改读写索引; get原始类型方法不会修改读写索引;

write原始类型方法会修改写索引; read原始类型方法,会修改读索引;

字节buf中的set/get方法不会修改当前buf的读写索引,而write修改写索引,read*会修改读索引;

提供了copy,slice和retainSlice,duplicate和retainedDuplicate方法,用于拷贝,切割,复制当前buf数据,retained*方法会增加buf的引用计数器;

提供nioBuffer和nioBuffers方法,用于包装当前buf可读数据为java nio ByteBuffer和ByteBuffer数组。

Netty 资源泄漏探测器

默认的资源泄漏探测器工厂创建的资源泄漏探测器为ResourceLeakDetector。

资源泄漏探测器,探测等级有四种,关闭DISABLED,SIMPLE简单,高级ADVANCED,终极PARANOID,SIMPLE开启简单的采样资源泄漏探测,仅仅已少量的负载为代价报告是否为泄漏对象;ADVANCED开启高级采样泄漏探测,将会以高负载为代价,报告最近泄漏对象访问的地方;PARANOID开启终极采样泄漏探测,将会以高级负载为代价,报告最近泄漏对象访问的地方(仅仅测试);开启资源探测的情况下,默认等级为SIMPLE。资源探测器内部有一个探测等级和采样间隔,资源类型,泄漏报告Map( ConcurrentMap<String, Boolean>),激活资源集合ConcurrentMap<DefaultResourceLeak, LeakEntry>。

资源泄漏探测器构造,主要初始化资源类型名及探测间隔。

资源泄漏追踪ResourceLeakTracker,主要定义了记录当前调用者栈追踪和额外的主观信息方法,以便资源泄漏探测器可以告诉,泄漏资源最近访问的地方;关闭泄漏资源方法,以便资源泄漏探测器不在警告泄漏资源。

默认资源泄漏DefaultResourceLeak,构造过程为,首先将资源添加到引用队列,如果探测等级大于ADVANCED,则创建记录,然后添加资源泄漏对象到资源泄漏探测器的激活对象Map中。

记录资源泄漏,首先根据资源泄漏线索,创建泄漏线索信息,如果泄漏线索信息集为空,或者与上次记录不同,则添加泄漏信息到泄漏记录集,如果记录超过最大记录数,则从对头移除记录信息。

关闭资源探测对象,就是从资源泄漏探测器移除资源泄漏对象。

报告资源泄漏信息,主要是报告由于泄漏记录数限制,丢弃的记录数,记录的资源泄漏信息,资源泄漏创建信息。

资源泄漏探测器,追踪资源泄漏探测信息,在采样点到达时,首先从资源泄漏引用队列,获取资源泄漏对象对象,关闭资源泄漏对象,并将探测结果添加到探测器的资源泄漏信息Map中,然后报告资源泄漏探测信息,最后创建资源探测对象。

Netty 抽象字节buf解析

字节buf内部有两个索引,一个读索引,一个写索引,两个索引标记,即读写索引对应的标记,buf的最大容量为maxCapacity;buf的构造,主要是初始化最大容量。

弃已读数据方法discardReadBytes,丢弃buf数据时,只修改读写索引和相应的标记,并不删除数据。

get*原始类型方法不会修改当前buf读写索引,getBytes(…,ByteBuf,…)方法不会修改当前buf读写索引,会修改目的buf的写索引。getBytes(…,byte[],…)方法不会修改当前buf读写索引。

set*原始类型方法不会修改当前buf读写索引,setBytes(…,ByteBuf,…)方法不会修改当前buf读写索引,会修改源buf的读索引。setBytes(…,byte[],…)方法不会修改当前buf读写索引。

read原始类型方法会修改当前buf读索引,readBytes(…,ByteBuf,…)方法会修改当前buf读索引,同时会修改目的buf的写索引,readBytes(…,byte[],…)方法会修改当前buf读索引。 read操作实际委托个get*的相关操作,同时更新buf读索引。

跳过length长度的字节,只更新读索引,不删除实际buf数据。

retainedSlice和slice方法返回则的字节buf,实际为字节buf底层unwrap buf,可以理解为字节buf的快照或引用,数据更改相互影响,retainedSlice方法会增加字节buf的引用计数器。

write原始类型方法会修改当前buf写索引,writeBytes(…,ByteBuf,…)方法会修改当前buf写索引,同时会修改目的buf的读索引,readBytes(…,byte[],…)方法会修改当前buf写索引。 write操作实际委托个set*的相关操作,同时更新buf写索引。

retainedDuplicate和duplicate方法返回则的字节buf,实际为字节buf底层unwrap buf,可以理解为字节buf的快照或引用,数据更改相互影响,retainedDuplicate方法会增加字节buf的引用计数器。

Netty 抽象字节buf引用计数器

抽象字节引用计数器AbstractReferenceCountedByteBuf,内部有一个引用计数器,以及原子更新引用计数器的refCntUpdater(AbstractReferenceCountedByteBuf),更新引用计数器,实际通过refCntUpdater CAS操作,释放对象引用的时候,如果引用计数器为0,则释放对象相关资源。

Netty 复合buf概念

复合字节缓冲CompositeByteBuf,内部有一个字节buf数组,用于存放字节buf,每个字节buf添加到复合buf集时,将被包装成一个buf组件,如果添加buf是,复合buf集已满,则将buf集中的所有buf,整合到一个组件buf中,并将原始buf集清空,添加整合后的buf到buf集。复合buf的读写索引为字节buf集的起始索引和size;每个组件buf Component内部记录着字节buf在复合buf中的起始位置和结束位置,及buf可读数据长度。

Netty 抽象字节buf分配器

创建字节buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建direct和heap buf实际通过newDirectBuffer和newHeapBuffer方法,待子类扩展。看出ioBuffer方法创建的字节buf,优先为direct类型,当系统平台不支持Unsafe时,才为heap类型;创建复合buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建复合buf时,如果资源泄漏探测功能开启,则追踪复合buf内存泄漏情况。

Netty Unpooled字节buf分配器

非池类堆字节buf,实际为一个字节数组,直接在Java虚拟机堆内存中,分配字节缓存;非池类Direct buf,实际为一个nio 字节buf,从操作系统实际物理内存中,分配字节缓存。Unpooled创建字节buf,实际委托给内部字节分配器UnpooledByteBufAllocator。

Netty Pooled字节buf分配器

Pooled字节buf分配器,内部有一个堆buf和direct buf分配Region区(PoolArena),每个Region的内存块(PoolChunk)size为chunkSize,每个内存块内存页(PoolSubpage)大小,默认为8k。Pooled 堆buf是基于字节数组,而direct buf是基于nio 字节buf。Pooled字节分配器分配heap和direct buf时,首先获取线程本地buf缓存PoolThreadCache,从buf获取对应的heap或direct分配区,分配区创建buf(PooledByteBuf),然后将buf放到内存块中管理,根据buf的容量,将放到相应tiny,small,normal Memory Region Cache(MemoryRegionCache)中。每个Pooled buf通过内存的Recycler,重用buf。Pool字节buf内部有一个回收器Recycler,管理字节buf,而回收器内部是将对象放在一个线程本地栈中管理。

总结

事件循环组用于创建nio事件循环,可以理解为单线程的事件执行器,每个事件循环关联一个选择器Selector,Socket通道的所有IO事件执行,实际委托给Nio事件循环。每个socket通道关联一个 事件循环和通道处理器管道。通道处理器将被包装成通道上下文,存储在处理器管道中。netty中的消息编码器和解码器实际为通道处理器,以通道上下文形式,依附于通道处理器管道。 一个inbound事件,由管道头向尾被Inbound通道处理器处理。一个Inbound通道处理器,一般处理来自IO线程的数据。Inbound数据,通常通过实际的输入操作,如SocketChannel#read,从远端peer读取。如果inbound事件到达Inbound处理器的尾部,默认将会被抛弃,如果需要关注,可以log 。一个Outbound事件,由管道尾到头被Outbound通道处理器处理。一个Outbound通道处理器通常产生或者转发Outbound数据,比如写请求。如果outbound事件到达Outbound通道处理器的头部,那么将会被通道关联的Io线程处理,如SocketChannel#write。netty的事件循环与Mina的IoProcessor有点相似,都可以看着一个线程执行器,执行通道的IO事件操作,而不同的是Nio管理的是选择器Selector,而Mina的IoProcessor管理的会话IoSession。