0xTTEPX

Just do it, deeply...

Follow me on GitHub

Mina总结

write by donaldhan, 2018-02-03 19:34

引言

Mina是一个网络通信应用框架,是基于TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供JAVA 对象的序列化服务、虚拟机管道通信服务等),Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina提供了事件驱动、异步(Mina的异步IO,默认使用的是JAVA NIO 作为底层支持)操作的编程模型。Mina的相关组件有IoService,IoProcessor,IoFilter,IoHanler,IoBuffer。

上面文章中的Mina源码为Mina的2.0分支,当时的源码版本为2.0.16。

mina

目录

Mina Socket会话配置

NioSocketAcceptor构造是传输入的DefaultSocketSessionConfig参数实际上是初始化AbstractIoService的会话配置选项sessionConfig(IoSessionConfig)。会话配置IoSessionConfig主要是配置IoProcessor每次读操作分配的buffer的容量,一般情况下,不用手动设置这个属性,因为IoProcessor经常会自动调整;设置空闲状态IdleStatus(READER_IDLE,WRITER_IDLE 或 BOTH_IDLE)的空闲时间;UseReadOperation配置项用于优化客户端的读取操作,开启这个选项对服务器应用无效,并可能引起内存泄漏,因此默认为关闭状态。SocketSessionConfig主要是配置Socket,这些属性在java.net.Socket中都可以找到相似或类似的属性,比如发送缓冲区大小,是否保活,地址是否可重用等。从AbstractSocketSessionConfig来看SocketSessionConfig的发送、接收缓冲区大小,是否保活,地址是否可重用等配置默认为true。DefaultSocketSessionConfig构造时,发送和接受缓存区默认为-1,所以在创建NioSocketAcceptor要配置发送和接收缓存区大小; DefaultSocketSessionConfig关联一个parent父对象IoService,即配置的依附对象。#init方法初始化的parent父对象IoService,如果parent父对象为IoService(SocketAcceptor), 则地址默认可重用,否则不可。

Mina 过滤链默认构建器

过滤器链IoFilterChain用Entry存放过滤器对,即每个过滤器IoFilter关联一个后继过滤器NextFilter。我们可以通过滤器名name或过滤器实例ioFilter或过滤器类型获取相应的过滤器或过滤器对应的Entry。fireMessage*/exceptionCaught相关方法为触发IoHandler的相关事件,fireFilterWrite/Close触发的是,会话的相关事件IoSession#write/close。 DefaultIoFilterChainBuilder用entries列表(CopyOnWriteArrayList)来管理过滤器;添加过滤器,移除过滤器,及判断是否包含过滤器都是依赖于CopyOnWriteArrayList的相关功能。buildFilterChain方法是将默认过滤器链构建器的过滤器集合中的过滤器添加到指定的过滤链IoFilterChain上。DefaultIoFilterChainBuilder的过滤器EntryImpl中的getNextFilter并没有实际作用,即无效,这就说明了DefaultIoFilterChainBuilder只用于在创建会话时,构建过滤器链。创建完毕后,对过滤器链构建器的修改不会影响到会话实际的过滤器链IoFilterChain(SocketFilterChain,DatagramFilterChain...)。

Mina 过滤器定义

IoFilter添加到过滤链时,一般以ReferenceCountingIoFilter包装,添加到过滤链,init方法在添加到过滤链时,由ReferenceCountingIoFilter调用,所以可以在init方法初始化一些共享资源。如果过滤器没有包装成ReferenceCountingIoFilter,init方法将不会调用。然后调用onPreAdd通知过滤器将会添加到过滤连上,当过滤器添加到过滤链上时,所有IoHandler事件和IO请求,将会被过滤器拦截当过滤器添加到过滤链上后,将会调用onPostAdd,如果方法有异常,过滤器在过滤链的链尾,ReferenceCountingIoFilter将调用#destroy方法释放共享资源。过滤器IoFilter,主要是监听会话IoSession相关事件(创建,打开,空闲,异常,关闭,接受数据,发送数据) 及IoSesssion的Write与close事件;过滤器后继NextFilter关注的事件与过滤器IoFilter相同,主要是转发相关事件。WriteRequest是会话IoSession写操作write的包装,内部有一个消息对象用于存放write的内容,一个socket地址,即会话写请求的目的socket地址,一个写请求结果返回值WriteFuture,用于获取会话write消息的操作结果。当一个过滤器从过滤链上移除时,#onPreRemove被调用,用于通知过滤器将从过滤链上移除;如果过滤器从过滤链上移除,所有IoHandler事件和IO请求,过滤器不再拦截;#onPostRemove调用通知过滤器已经从过滤链上移除;移除后,如果过滤器在过滤链的链尾,ReferenceCountingIoFilter将调用#destroy方法释放共享资源。

IoFilter生命周期如下: inti->onPreAdd->onPostAdd->(拦截IoHandler相关事件:sessionCreated,Opened,Idle,exceptionCaught,Closed,messageSent,messageReceived;会话相关事件:filterWrite,filterClose)->onPreRemove->onPostRemove->destroy。

Mina 日志过滤器与引用计数过滤器

LoggingFilter拦截IoHandler的sessionCreated事件,将日志输出委托给SessionLog,然后将相关事件传递给过滤器后继,拦截sessionOpened,sessionClosed, filterClose事件思路基本相同。对于sessionIdle,messageSent,messageReceived,filterWrite先判断会话log的info级别是否开启,开启则输出相应事件日志。上面这些事件的日志级别都是Info;exceptionCaught则先判断会话log的warn级别是否开启,开启则输出相应事件日志。

一个过滤器可以多次添加到过滤链上,ReferenceCountingIoFilter用于保证过滤器第一次添加到过滤链上时,初始化过滤器,完全从过滤链上移除时,销毁过滤器,释放资;ReferenceCountingIoFilter内部一个count用于记录包装过滤器filter添加到过滤链上的次数,即过滤链上存在filter的个数,一个filter,即ReferenceCountingIoFilter包装的过滤器,ReferenceCountingIoFilter触发IoHandler和IoSession的相关事件,直接交给包装的内部filter处理。

Mina 过滤链抽象实现

AbstractIoFilterChain内部关联一个IoSession,用EntryImp来包装过滤器,过滤链中用HashMap<String,EntryImpl>来存放过滤器Entry,key为过滤器名,value为过滤器Entry。EntryImpl是过滤器在过滤链上存在的形式,EntryImpl有一个前驱和一个后继,内部包裹一个过滤器 with name,及过滤器的后继过滤器NextFilter。后继过滤器NextFilter的传递IoHandler和IoSession事件的方法,主要是将事件转发给后继Entry对应的过滤器。过滤链头为HeadFilter,链尾为TailFilter。HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;但对于IoSession write/close事件除了传递事件外,需要调用实际的事件操作doWrite/doClose,这两个方法需要子类扩展实现。TailFilter触发IoHandler和IoSession事件时,直接调用会话处理器IoHandler的相关事件方法。在sessionOpened事件中,最后如果是SocketConnector创建的会话,则要通知相关ConnectFuture;在sessionClosed事件中,最后还要清空过滤链;messageSent和messageReceived事件,如果消息对象为ByteBuffer,则释放buffer。添加过滤器到过滤链,首先检查过滤链上是否存在过滤器,不存在,才添加;添加过滤器到头部即,插入过滤器到链头的后面,添加过滤器到尾部,即插入过滤器到链尾的前面;添加到指定过滤器前后,思路基本相同;添加前触发过滤器onPreAdd事件,添加后触发过滤器onPostAdd事件;移除过滤器,首先获取过滤器对应的Entry,然后触发过滤器onPreRemove事件,从过滤链name2entry移除Entry,然后触发过滤器onPostRemove事件。过滤链处理相关事件策略为:与IoHanler的相关事件(Session*)处理的顺序为,从链头到链尾-》Iohanlder(这个过程handler处理相关事件);对于会话相关的事件(FilterWrite/close),处理顺序为Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发送消息,关闭会话,handler并不处理会话事件)。

Mina Socket与报文过滤链

SocketFilterChain发送消息首先获取Socket会话的的写请求队列;mark buffer的位置,主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session运行写操作,获取session关联的IoProcessor完成实际的消息发送工作。关闭session,即将会话添加到关联的IoProcessor待移除会话队列。DatagramFilterChain发送消息首先获取报文会话的的写请求队列;mark buffer的位置,主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session允许写操作,获取session关联的managerDelegate(DatagramService)完成实际的消息发送工作。 关闭会话委托给session关联的managerDelegate(DatagramService),如果managerDelegate为DatagramConnectorDelegate者直接关闭,如果为DatagramAcceptorDelegate,通知DatagramAcceptorDelegate的监听器会话已关闭,设置会话CloseFuture为已关闭状态。

Mina 协议编解码过滤器一(协议编解码工厂,协议编码器)

协议编解码过滤器ProtocolCodecFilter关联一个协议编解码工厂ProtocolCodecFactory,协议编解码工厂提供协议编码器ProtocolEncoder和解码器ProtocolDecoder;编码器将消息对象编码成二进制或协议数据,解码器将二进制或协议数据解码成消息对象。编码器ProtocolEncoder主要有两个方法,encode和dispose;encode用于,编码上层的消息对象为二进制或协议数据。mina调用编码器的#encode方法将从会话写请求队列中pop的消息,然后调用ProtocolEncoderOutput#write方法将编码后的消息放在ByteBuffer;dispose方法释放编码器资源。ProtocolEncoderAdapter为编码器抽象实现,默认实现了dispose,不做任何事情,对于不需要释放资源的编码器继承ProtocolEncoderAdapter。ProtocolEncoderOutput主要的工作是将协议编码器编码后的字节buffer,缓存起来,等待flush方法调用时,则将数据发送出去。SimpleProtocolEncoderOutput为ProtocolEncoderOutput的简单实现内部有一个buffer队列bufferQueue(Queue),用于存放write(ByteBuffer)方法,传入的字节buffer;mergeAll方法为合并buffer队列的所有buffer数据到一个buffer;flush方法为发送buffer队列中的所有buffer,实际发送工作委托给doFlush方法待子类实现。ProtocolEncoderOutputImpl为协议编解码过滤器的内部类,ProtocolEncoderOutputImpl的doFlush,首先将会话包装成DefaultWriteFuture,将会话,写请求信息传递给NextFilter。

Mina 协议编解码过滤器二(协议解码器)

解码器ProtocolDecoder将二级制或协议内存解码成上层消息对象。mina在读取数据时,调用解码器的#decode,将解码后的消息放到ProtocolDecoderOutput;当会话关闭时,调用finishDecode解码那些在#decode方法中没有处理完的数据。dispose主要是 释放所有与解码器有关的资源。针对不需要#finishDecode和#dispose的解码器,我们可以继承协议解码适配器ProtocolDecoderAdapter。ProtocolDecoderOutput有两个方法,一个write方法,用于解码器,解完消息后回调;一个flush方法,用于刷新所有解码器写到协议解码输出的消息对象。简单协议解码输出SimpleProtocolDecoderOutput,关联一个会话session,一个后继过滤器nextFilter,及一个消息队列messageQueue;所有解码器解码后的消息,回调协议解码输出的write方法,将消息暂时放在消息队列messageQueue中;flush方法主要是将消息队列中的消息传给后继过滤器的messageReceived方法。

Mina 队列Queue

队列Queue实际上为List,用一个对象数组items存放元素,一个实际容量计数器size,一个队头first和一个队尾索引last,默认队列容量为4。push元素时,先判断队列是否已满,如果已满,则扩容队列为原来的两倍。Queue实际为具有队列特性的List。Queue可以随机地访问队列索引。

Mina 协议编解码过滤器三(会话write与消息接收过滤)

协议编解码过滤器构造,主要是初始化协议编码器工厂。一个过滤链上不能存在两个协议编解码器,即唯一。会话写操作来看session#write(filterWrite),首先从写请求获取消息,如果消息为字节buffer,则直接传给后继过滤器,否则从协议编解码工厂获取协议编码器和协议编码器输出,协议编码器encode编码消息,写到协议编码器输出字节buffer队列, 然后协议编码器输出flush字节buffer队列。会话接收消息messageReceived,如果消息非字节buffer,则直接传给后继过滤器,否则获取协议解码器,及协议解码器输出,协议解码器解码字节buffer为上传消息对象,写到协议解码器输出消息队列,最后解码器输出flush消息队列。会话关闭,主要是解码器解码会话未解码数据,写到解码器输出消息队列,最后从会话移除编码器,解码器及解码器输出属性,释放编码器,解码器及解码器输出相关资源,flush解码器输出消息队列。协议编解码器过滤器从过滤链移除后,要释放会话编解码器,及解码输出属性,并释放相关的资源。

Mina 累计协议解码器

累积性协议解码器decode方法,首先从会话获取属性BUFFER对应的buffer,如果会话缓存buffer不为null,则将当前in(ByteBuffer)的内容放到会话缓存buffer中;否直接使用当前in(ByteBuffer),将是否开启会话缓存buffer,置为否;循环尝试使用doDecode方法解码数据,直到doDecode方法返回false,如果doDecode方法返回true,则继续调用doDecode方法解码数据,如果在一次解码数据返回true,但buffer中还有数据,则根据是否开启会话缓存buffer决定将buffer数据进行压缩还是存储在会话中;如果在一次解码数据返回true,但buffer中没有数据,则从会话中移除属性BUFFER,并释放对应的buffer空间。doDecode方法放抽象方法待类型实现。CumulativeProtocolDecoder实现可累性方法主要是通过将buffer存储在会话中,以实现接收数据的可累计性

MINA 多路复用协议编解码器工厂一(多路复用协议编码器)

多路复用的协议编码器添加消息编码器,如果参数为消息类型和消息编码器类型,添加消息类型与默认构造消息编码工厂到消息编码器Map映射type2encoderFactory;如果参数为消息类型和消息编码器实例,添加消息类型与单例消息编码器工厂到消息编码器Map映射type2encoderFactory。多路复用的协议编码器编码消息,首先从会话获取消息编码器状态,从消息编码器状态获取消息对应的编码器(先从消息编码器查找,没有则从消息编码器映射type2encoder查找),如果没有消息对应的解码,则查找消息父接口和类对应的编码器,编码消息。

MINA 多路复用协议编解码器工厂二(多路复用协议解码器)

消息解码器MessageDecoder,有3个方法分别为,decodable方法用于判断解码器是否可以解码buffer数据,返回值(MessageDecoderResult)#OK表示可以解码buffer中的数据,#NOT_OK表示解码器不可解码buffer数据, #NEED_DATA表示需要更多的数据来确认解码器是否可以解码buffer数据;decode用于解码buffer数据,返回值#OK表示解码消息成功,#NEED_DATA,表示需要更多的数据完成消息解码,#NOT_OK由于内容与协议不一致,不能解码当前消息;finishDecode主要是处理#decode方法没有解码完的数据。添加解码器到多路复用协议解码器,如果添加的解码器为Class,则添加默认构造消息解码器工厂到到多路复用协议解码器工厂集decoderFactories;如果添加的解码器为实例,则添加单例解码器工厂,到多路复用协议解码器工厂集。多路复用协议解码器,解码消息的过程为,首先从会话获取解码器状态集,遍历解码器集,找到可以解码消息的解码器,并赋给会话解码器状态当前解码器currentDecoder,最后由currentDecoder解码消息。 多路复用协议编解码器工厂内部关联一个多路复用协议编码器和解码器器,添加消息编码器和解码器到多路复用协议编解码器工厂,实际是添加多路复用协议编解码器中。

Mina Io处理器抽象实现

抽象Io处理器AbstractPollingIoProcessor,主要几个关键内部变量为选择操作超时时间SELECT_TIMEOUT,用于腾出时间,处理空闲的会话; executor处理器内部执行器,用于运行内部处理器Processor;存储Io处理器等线程最大线程id的threadIds(Map);创建会话队列newSessions用于存储新创建的会话;移除会话队列removingSessions用于存放从处理器移除的会话;刷新会话队列flushingSessions,用于存放要发送写请求的会话;次序控制会话队列trafficControllingSessions用于存放会话待读写的会话;Io处理器线程引用processorRef。 添加会话首先添加会话到Io处理器的创建会话队列中,启动处理器线程Processor。处理器的实际工作,尝试10次nbTries选择操作,在每次选择操作过程中,首先进行超时选择操作,然后检查Io处理器是否断开连接,尝试次数nbTries是否为零如果为0,则注册新的选择器;然后遍历创建会话队列,从队列拉取会话,如果会话为不null,则初始化会话,构建会话过滤链(从IoService继承)触发会话过滤链的会话创建和会话打开事件,并记录新创建的会话数量nSessions;更会会话状态,此过程为从会话次序控制队列获取会话,检查会话状态,如果状态为OPENED更新会话的读写状态,如果为OPENING放回次序控制会话队列;如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件(读写事件),遍历选择后读写等操作就绪的会话,如果会话可读,则读取会话缓存区数据到buffer,触发过滤链消息接收事件MessageReceive,接收完消息后,如果会话输入流关闭则触发过滤链fireInputClosed事件,如果在这过程有异常发生,则触发过滤链异常事件ExceptionCaught,如果会话可写,则添加会话到刷新会话队列;遍历刷新会话队列,根据会话写请求消息类型为IoBuffer还是FileRegion,发送会话数据,发送会话数据后,如果会话还有些请求,则添加会话到队列,如果在这个过程中有异常,则添加会话到会话移除队列;遍历会话移除队列,如果会话为关闭,则尝试关闭会话,并清除会话写请求队列,如果会话数据已发送完,则触发会话过滤链消息发送事件fireMessageSent;更新处理器会话计数器nSessions;遍历处理器所有会话,触发会话过滤器会话空闲时间fireSessionIdle;如果在这个过程中,处理器会话计数器nSessions为0,则清除处理器引用;如果Io处理器正在关闭,则添加所有会话到移除会话队列,释放Io处理器先关的资源。 抽象Io处理器AbstractPollingIoProcessor主要是处理IoProcessor关联会话message*事件,而所有的工作,都是通过处理器线程Processor完成。每当有会话添加到IoProcessor,则启动一个处理器线程Processor,处理会话的读写操作及相关事件。就连IoProcessor资源的释放,也是由处理器线程Processor处理。关闭IoProcessor时,现将处理器关联会话,添加移除会话队列,实际工作由IoProcessor的子类的doDispose方法完成。

Mina Nio处理器

NioProcessor内部有一个选择器Selector,一个可重入读写锁用于控制选择器相关的操作,构造主要是初始化线程执行器和选择器。Nio处理器的选择操作,唤醒等操作,实际通过内部的选择器完成。初始化会话,主要是配置会话通道为非阻塞模式,注册会话通道读事件到选择器。注册新选择器,主要是注册旧选择器的选择key(集合)关联的会话,通道,及通道兴趣事件集到新的选择器;会话时附加在通道选择key的Attachment上。处理器处理会话读写操作,主要是通过会话关联的通道完成。关闭会话主要是关闭会话关联的字节通道和取消会话关联选择key。

Mina 抽象Io会话

抽象会话AbstractIoSession内部有一个关联的IoService和一个IoHandler;一个写请求队列用于存发会话写请求;一个会话属性Map存放会话属性,还有一些读写字节数,消息数,相关吞吐量和上次读写或空闲操作时间计数器。,会话初始化主要为初始化关联service,及关联的IoHandler,实际为Service的IoHandler;初始化所有的会话事件计数器为当前时间。关闭方法默认关闭时,清空写请求队列,并将写请求的结果置为已写,触发过滤过滤链fireFilterClose事件,即不flush会话写请求队列,closeOnFlush方法为,在关闭会话前,flush会话写请求队列。会话读操作,首先获取会话读请求结果队列,从队列poll一个读结果,如果读结果不为空且已关闭,则重新入队列,否则新建一个默认读请求结果,添加到会话等待读请求结果队列。会话写请求,首先保证消息不为null,会话建立连接,并且远端socket地址不为null;如果消息为IoBuffer,确保buffer不为空,如果消息为文件通道/文件类型,则包装消息为DefaultFileRegion/FilenameFileRegion;然后创建写请求DefaultWriteRequest,触发会话过滤链fireFilterWrite事件,如果消息为文件通道,则注册写结果监听器,在消息发送完后,关闭文件通道,返回写结果DefaultWriteFuture。

Mina Nio会话(Socket,DataGram)

NioSession会话主要管理会话关联Io处理器processor,通道channel,选择键SelectionKey和会话过滤链IoFilterChain;IoService交由AbstractIoSession管理。 socket会话NioSocketSession配置继承于关联IoService的会话配置 ;关闭会话,即取消会话关联的选择key和关闭关联通道。

Mina IoService接口定义及抽象实现

抽象service关联一个IoHandler处理会话相关事件,关联一个执行器Executor,负责处理io事件的执行,一个会话配置IOsessionConfig,用于service创建会话时,配置会话,一个过滤链构建器IoFilterChainBuilder,用于构建会话的过滤链,会话数据结构工厂,用于创建会话的属性Map和写请求队列,还有service监听器和统计器。抽象service构造,首先检查会话配置和传输元数据,会话配置必须传输元数据的会话配置类型必须相同,即socket(TCP),会话配置为socketSessionConfig,报文通信(UDP),为DatagramSessionConfig;然后将会话创建监听器serviceActivationListener添加监听器管理器IoServiceListenerSupport;初始化会话配置,IO事件执行器executor和异常监视器。初始化会话就是将service会话数据结构工厂的会话属性添加到具体的会话中,将service会话数据结构工厂的写请求队列,设置到具体的会话中,如果是连接请求会话,则将连接结果添加会话属性中。

Mina Io监听器接口定义及抽象实现

IoAcceptor与IoService不同的是,添加了监听连接请求和地址绑定功能。抽象Io监听器AbstractIoAcceptor绑定地址首先要检查绑定的socket地址与传输元数据的地址类型是否相同,相同则通过bindInternal完成实际的绑定,然后通知Service监听器,Service已激活fireServiceActivated。解绑地址方法,主要是委托unbind0方法完成实际解绑工作,清空绑定地址集合boundAddresses,触发Service监听器无效事件fireServiceDeactivated。

Mina 抽象polling监听器

AbstractPollingIoAcceptor主要变量为Io处理器processor,地址绑定请求队列registerQueue,地址解绑请求队列cancelQueue,监听器绑定的socket地址,与ServerSocketChannel映射关系boundHandles-Map,监听工作线程Acceptor引用acceptorRef。

构造AbstractPollingIoAcceptor,主要是初始化会话配置,Io处理器类型,IO异步事件执行器为空的话默认为CachedThreadPool,然后初始化选择器。地址绑定过程为,创建绑定操作结果,注册绑定请求到注册地址绑定请求队列,创建监听器Acceptor实例并执行。Acceptor主要功能为,地址绑定,监听连接请求,解绑地址,实际工作逻辑为:如果监听器AbstractPollingIoAcceptor已经初始化,首先根据地址绑定队列中的绑定请求,打开一个ServerSocketChannle,注册接收事件OP_ACCEPT到选择器,并将绑定地址与ServerSocketChannle映射管理添加到地址绑定映射集合;执行选择操作,如果实际绑定地址为空,则置空acceptorRef;如果接收连接事件发生,则处理连接请求,遍历接收事件就绪的ServerSocketChannel,ServerSocketChannel创建一个关联processor的会话,初始化会话,添加会话到会话关联io处理器;检查是否有地址解绑请求,如果有解绑请求CancellationRequest,从绑定socket地址与ServerSocketChannle映射map中,移除绑定的socket地址,关闭ServerSocketChannle;最后检查监听器是否正在关闭,如果acceptor正在关闭,则关闭关联processor。Acceptor和AbstractPollingIoAcceptor的关系,与AbstractPollingIoProcessor和Processor的关系很像。地址解绑过程,首先根据解绑地址创建AcceptorOperationFuture,添加到解绑队列,启动Acceptor线程,完成实际解绑工作。AbstractPollingIoAcceptor所有的工作(地址绑定,接收连接,创建会话,添加会话到IO处理器,解绑地址,释放监听器资源)都是在Acceptor线程里完成。

Mina socket监听器(NioSocketAcceptor)

socket监听NioSocketAcceptor,有两个内部变量为选择器selector和选择器提供者selectorProvider。init方法主要工作为打开一个选择器selector。打开一个socket地址,如果选择器提供者不为空,则通过选择器提供者打开一个ServerSocketChannel,否则通过ServerSocketChannel打开一个socket通道服务者;配置通道阻塞模式,及通道关联的SeverSocket的地址重用配置,然后通过SeverSocket绑定地址。监听器接受连接,实际上是委托给绑定地址的ServerSocketChannel,接受客户端的连接,产生一个SocketChannel,再根据SocketChannel和Io处理器创建会话。选择唤醒操作实际委托给内部选择器。

Mina 连接器接口定义及抽象实现(IoConnector)

IoConnector接口给Ioservice增加了连接功能,可以连接服务端。连接操作,首先检查连接器状态,本地地址与远程地址是否为空已经与传输元数据地址类型是否匹配,如果连接器Iohandler为null,创建一个对会话操作事件不处理的IoHandler,最后将实际连接操作委托给connect0,待子类实现。

Mina 抽象Polling连接器(AbstractPollingIoConnector)

抽象拉取连接器内部有一个连接请求队列connectQueue,连接请求取消队列cancelQueue,Io处理器和连接线程引用connectorRef。拉取连接器构造主要初始化会话配置,IO事件执行器和IO处理器。连接操作,首先根据本地socket地址创建SocketChannel,连接远端socket地址,根据IO处理器和SocketChannel构建Io会话,将会话添加到会话关联的IO处理器中,根据SocketChannel和会话初始化sessionInitializer构建连接请求,添加到连接请求队列,最后启动连接器线程。连接器线程首先计算选择超时时间,执行超时选择操作,注册连接请求SocketChannel连接事件到选择器;如果没有任何连接请求SocketChannel需要处理,置空连接器连接线程引用,清空连接请求队列,如果有连接请求已经连接完成,即触发SocketChannel兴趣连接事件,处理连接事件就绪的连接请求,这个过程首先调用finishConnect完成SocketChannel连接后续工作,根据Io处理器和SocketChannel创建会话,初始化会话,添加会话到会话关联的IO处理器;然后处理连接超时的连接请求,即设置连接结果超时异常,添加到连接请求到取消队列;处理取消连接的连接请求,即关闭连接请求关联的SocketChannel。

Mina socket连接器(NioSocketConnector)

socket连接器NioSocketConnector内部关联一个选择器;初始化连接器,为打开一个选择器;连接远端地址,即委托SocketChannel的连接操作;创建socket通道来看,首先打开一个socket通道,配置接收缓存区size,绑定本地socket地址,配置通道为非阻塞模式;注册操作,即注册socket通道的连接事件到选择器。选择操作和唤醒操作直接委托给内部选择器。销毁socket连接器,即关闭选择器。

Mina 报文监听器NioDatagramAcceptor一(初始化,Io处理器)

报文监听器NioDatagramAcceptor,内部有一个注册队列registerQueue,用于存放地址绑定的请求,一个取消队列,用于存放地址解绑请求,一个Map-boundHandles,用于存放socket地址与报文通道映射映射关系,会话管理器sessionRecycler,监控连接Service的会话,如果会话过期,关闭过期的会话,一个通道选择器selector处理报文通道的读写操作事件,一个监听器线程acceptor,用于处理地址绑定和解绑,报文通道读写事件,发送会话消息及销毁监听器工作。报文监听器构造主要是初始化会话配置,IO事件执行器和打开选择器。报文监听器写操作,首先获取会话写请求队列,计算会话最大发送字节数,获取会话写请求buffer;如果写请求为空,则从请求队列poll一个写请求,然后获取写请求buffer及写请求目的socket地址,委托会话关联的报文通道发送数据;如果buffer数据太多或没有写成功,添加写请求到会话请求队列,关注写事件,否则取消关注写事件,置空会话当前写请求,触发会话发送事件。绑定地址,首先添加地址绑定请求到注册队列registerQueue,启动监听器线程acceptor,唤醒选择操作,然后等待地址绑定完成,最后返回报文通道绑定的socket地址集。

Mina 报文监听器NioDatagramAcceptor二(发送会话消息据等)

监听器线程Acceptor,首先执行超时选择操作;处理地址绑定请求,首先从注册队列poll地址绑定请求,遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道,配置通道会话及阻塞模式,绑定socket地址,注册报文通道读操作事件OP_READ到选择器selector,添加socket地址与报文通道映射到boundHandles,通知service监听,服务已开启,触发fireServiceActivated事件; 如果没有报文通道处理,则清空注册队列和取消队列,置空监听器线程; 如果选择操作后,有报文通道的读写事件就绪,则遍历读写操作事件就绪的报文通道,如果是读事件,接受报文通道数据,如果远端地址不为空,创建会话,首先从boundHandles获取远端socket地址关联的报文通道,从会话管理器sessionRecycler,获取远端socket地址会话,以便重用,如果会话管理器中不存在,则根据Io处理器,报文通道及远端socket地址创建报文会话,设置会话选择key,将会话添加会话管理器,监控会话,初始化会话,构建会话过滤链,通知Service监听器发生会话创建事件fireSessionCreated;如果是写事件,则调度Service管理的会话,添加到刷新队列; 处理刷新队列,从刷新队列poll写请求会话,获取会话写请求队列,会话最大读buffer size,获取会话当前写请求,获取写请求消息,写请求远端地址,通过会话关联的报文通道发送会话消息字节序列,数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent,否则设置会话重新关注写操作事件,如果刷新会话写请求成功,但会话写请求队列不为空,且未调度,则重新调度会话;处理解绑地址请求队列,首先从取消队列,poll地址解绑请求,遍历地址解绑请求socket地址集合,从socket与报文通道映射集boundHandles移除socket地址,关闭报文通道;通知service管理的会话空闲;如何Io处理器正在关闭,则销毁报文监听器。

Mina 报文连接器(NioDatagramConnector)

由于报文连接器是面向无连接的,与选择器有关的操作要么不支持,要么空体,要么返回空集合;连接操作直接委托报文通道;绑定地址,首先打开一个报文通道,然后由报文通道关联的报文socket完成实际的地址绑定。

总结

socket监听器和连接器关联一个选择器selector和Io处理器processor,Io处理器processor处理所有的IO事件,当有连接请求时创建会话,IO会话关联一个Io处理器processor,通道channel,选择键SelectionKey和会话过滤链IoFilterChain。会话相关的事件,经过过滤链过滤,与IoHanler的相关事件(Session*)处理的顺序为,从链头到链尾-》Iohanlder(这个过程handler处理相关事件);对于会话相关的事件(FilterWrite/close),处理顺序为Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发送消息,关闭会话,handler并不处理会话事件)。Mina中的消息编码器和解码器是以编辑器工厂过滤器的形式存在于过滤链上。通道IoHandler处理会话的读写等操作。