Netty可靠性设计一

NIO闲聊

自从JAVA1.4推出NIO起,JAVA网络编程进入了一个全新的时代,传统网络IO(OIO)是傻等式的,一旦IO操作发起,那么用户线程就陷入很傻很天真的等待中,直到IO操作结束或者发生了断连,而NIO则要聪明许多是事件触发式的,只有当前有IO事件发生了,才会通知用户线程执行IO操作,当前操作结束之后不会阻塞等待可以执行其他的业务操作等待下一次事件,就好比上银行取钱,一种方式排队傻等直到排到你,一种是登记排号,登记完之后该干嘛干嘛去,等轮到你的时候业务员电话通知你去办理,脑子正常的都会喜欢后面那种方式。相比OIO的线程和连接的N对N,NIO只需少数几个线程处理N个连接,这就是著名的多路复用模型,对于Netty来讲,它并不是标准的多路复用,Netty的IO事件和非IO事件由同一个线程池进行调度,通过一个参数来控制IO事件和非IO事件的执行时间,这种方式下控制复杂一些,但是CPU使用率更高,因为会有很少的上下文切换(Context Switch)。

但是天下没有免费的午餐,NIO带来了性能方面的优势,但是相比OIO,在使用上复杂度飙升。比如我想执行一次写操作,OIO几行代码就搞定了:

OutputStream os = s.getOutputStream();
String str = "test";
os.write(str.getBytes());
而只用NIO则需要一坨代码:

while (running) {
	SocketChannel socketChanel = serverSocketChanel.accept();
	socketChanel.configureBlocking(false);
	socketChanel.register(selector, SelectionKey.OP_READ
			| SelectionKey.OP_WRITE);
	int count = selector.select();
	if (count > 0) {
		Set<SelectionKey> keys = selector.selectedKeys();
		Iterator<SelectionKey> iter = keys.iterator();
		while (iter.hasNext()) {
			SelectionKey key = iter.next();
			if (key.isWritable()) {
				ByteBuffer buff = ByteBuffer.allocate(1024);
				buff.put("<html>test</html>".getBytes());
				System.out.println("writable");
				buff.flip();
				socketChanel.write(buff);
				buff.clear();
			}
			iter.remove();
		}
	}
}

这也可以理解,还是拿前面取钱例子,第二种方式下,用户是爽了,但是银行需要搞一套排号 的系统,还要比较负责有耐心的业务员,总之银行很麻烦。除此之外NIO还带来了更多可靠性问题。Netty牛的地方就是屏蔽NIO编程的复杂性,简化编程模型,让开发者聚焦业务逻辑,而且针对NIO中的一些可靠性问题就行了处理。下面对Netty对几个可靠性问题处理进行学习。

连接超时处理

在OIO中,连接超时处理非常简单,只需调用一个setConnectTimeout方法设置连接超时时间即可,但是JDK的NIO API中并没有提供设置超时时间的方法,显然无论从服务器资源还是用户体验的角度,连接必须要用超时时间,一方面服务器的句柄资源是有限的,既然我不能为你服务那么请你尽早放手吧,长时间无法连接上服务器时非常有必要释放句柄资源,另一方面在用户发起操作时如果系统长时间不给响应显然不是一种好体验,这时候也非常有必要在超时之后提示用户当前无法连接上服务器。Netty对外提供了设置连接超时时间的API,通过ChannleOption.CONNECT_TIMEOUT_MILLIS设置超时时间timetou,Netty的处理是在调用建连之后马上启动一个延时任务,该任务在timeout时间过后执行,任务中执行关闭操作,连接建立成功之后取消这个任务,处理代码在AbstractNioUnsafe类的connect方法中:

@Override
public void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    try {
        if (connectPromise != null) {
            throw new IllegalStateException("connection attempt already made");
        }

        boolean wasActive = isActive();
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } else {
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;

            // Schedule connect timeout.
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > 0) {
                connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
                    @Override
                    public void run() {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause =
                                new ConnectTimeoutException("connection timed out: " + remoteAddress);
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
            }

            promise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                }
            });
        }
    } catch (Throwable t) {
        if (t instanceof ConnectException) {
            Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
            newT.setStackTrace(t.getStackTrace());
            t = newT;
        }
        promise.tryFailure(t);
        closeIfClosed();
    }
}

分包传输

在TCP协议中,分包传输是非常,一份信息可能分几次达到目的地,在OIO中这是没有问题的,因为OIO是傻等式的,不读到完整的信息它是不会罢手的,但是NIO就不同了,它是基于事件的,只有有数据来了它才会去读取,那么问题来了,在读取到数据之后对数据进行业务解析时该如何处理?比如说我想解析一个整形数,但是当前只读取到了两个字节的数据,还有两个字节的数据在后面的传输包中,由于NIO的非阻塞性,业务数据的解析时机成了一个大问题,因为可能无法一次取到完整的数据。

基于上面这个问题,Netty框架设计了一个ReplayingDecoder来解决这种场景中的问题,ReplayingDecoder的核心原理是,当ReplayingDecoder在进行数据解析时,如果发现当前ByteBuf中所有可读数据并不完整,比如我想解析出一个整型数,但是ByteBuf中数据小于4个字节,那么此时会抛出一个Signal类型的Error,抛Error的操作在ReplayingDecoderBuffer中进行,一个ByteBuf的装饰器。在ReplayingDecoder会捕捉一个Error,捕捉到Signal之后会把ByteBuf中的读指针还原到之前的断点处(checkpoint,默认是ByteBuf的其实读位置),然后结束这次解析操作,等待下一次IO读事件。如果只是简单的整形数解析问题不大,但是如果数据解析逻辑复杂是,这种处理方式存在一个问题,在网络条件比较糟糕时,解析逻辑会反复执行多次,如果解析过程是一个耗CPU的操作,那么这对CPU是个大负担。可以通过ReplayingDecoder中的断点和状态机来解决这个问题,使用者可以在ReplayingDecoder中保存之前的解析结果、状态和读指针断点,举个例子,我要解析8个字节的数据,把前后四个字节都解析成整形数,并且把这两个数据相加当做解析结果,代码如下:

public class TowIntegerReplayingDecoder extends ReplayingDecoder<Integer> {

	private static final int PARSE_1 = 1;
	private static final int PARSE_2 = 2;
	private int number1;
	private int number2;

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in,
			List<Object> out) throws Exception {
		switch (state()) {
		case PARSE_1:
			number1 = in.readInt();
			checkpoint(PARSE_2);
			break;
		case PARSE_2:
			number2 = in.readInt();
			checkpoint(PARSE_1);
			out.add(number1 + number2);
			break;
		default:
			break;
		}

	}

}
在代码中,把解析分成两个阶段,当一个阶段解析完成之后,记录第一个阶段的解析结果,并且更新解析状态和读指针,这样如果由于数据不完整导致第二阶段的解析无法完成,下次IO事件触发时,该解析器会直接进入第二阶段的解析,而不会重复第一阶段的解析,这样会减少重复解析大概率。基于这种设计,ReplayingDecoder必须是Channel独有的,它的实例不能被共享,没有Channel实例必须有个单独的ReplayingDecoder解析器实例,而且不能添加Sharable注解,因为它是用状态了,如果在多个Channel中共享了,那么状态就乱套了。


郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。