Netty 4.X 用户指南

----Netty 4.X  用户指南  为本人自己翻译,其中不免有差错,还请谅解,也请指正。

Preface、The Problem、The Solution 、Geting  started、Before Geting Started 略过。

写一个 什么都不做(丢弃消息)的服务端

这个世界上最简单的协议不是“hello world”,而是”DISACRD“。这是一种对于你接收的任何数据都不做任何响应的协议。
为实现DISCARD协议,你需要做的唯一事情是忽略所有你接收到的数据。让我们直接从handler实现开始,handler的实现操控着由Netty生成的I/O事件。

package io.netty.example.discard;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

  1. DiscardServerHandler 继承自ChannelInboundHandlerAdapter,这是ChannelInboundHand实现。ChannelInboundHandler提供了多种你可以重写的事件处理器。
  2. 在这里我们重写了channelRead()事件处理器方法。当新数据被一个客户端接收时,该方法会随着接收的数据一同被调用。在这个例子中,接收的数据类型是ByteBuf。
  3. 为实现DISCARD协议,处理器需要忽略接收的数据。ByteBuf 是一种引用计数对象,必须通过release()方法显示的释放掉。通常channelRead()处理器方法的实现通过如下方式实现:
    <span style="font-size:14px;">@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            // Do something with msg
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }</span>

  4. exceptionCaught()事件处理器方法会随着Throwable被调用,当来自于Netty的I/O错误引起的异常或者是当处理器处理事件过程中抛出的异常发生时。在大部分情况下,捕获的异常应该被记录并且它所关联的channel 应该被关闭,尽管这个方法的实现因你想如何处理一个异常场景而异。例如,你想在连接关闭之前发送错误代码响应消息。
到目前为止一切还好,我们已经实现了DISCARD server的上半部分。接下来的工作是写main()方法与DiscardServerHandler一起启动这个 服务。
package io.netty.example.discard;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Discards any incoming data.
 */
public class DiscardServer {

    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)

            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}
  1. NioEventLoopGroup 是控制I/O操作的一种多线程event循环。Netty为不同种传输提供了多种EventLoopGroup 的实现。我们在这个例子中实现的是sever端的应用,所以两个NioEventLoopGroup会被用到。第一个,常常称之为‘boss’,接受进来的连接。第二个,常称之为‘woker’,控制着接收到的连接的通行,一旦boss接受了连接并将注册接受到的连接给woker。有多少线程被使用以及他们如何映射到channel 取决于EventLoopGroup的实现并且甚至于通过构造函数实现可配置。
  2. ServerBootstrap 是一个建立server的帮助类。你可以用Channel接口直接建立。无论如何,请注意这是一个繁杂的过程,尽可能不要这样做。(作者的意思是:封装成工具类的方式会更好)
  3. 这里,我们指定了NioServerSocketChannel 类用于实例化一个新的Channel接收进来的连接。
  4. 这里指定的处理器将一直被新接受的Channel所评估。ChannelInitializer 专门用于帮助用户配置一个新的Channel。这就像你想为一个新的Channel配置ChangePipeline通过添加一些像:DiscardServerHandler的处理器去实现你的网络应用一样。随着应用程序变得复杂,很可能你会更多的处理程序添加到管道并提取这最终匿名类到顶级类. 
  5. 你也可以设置这些参数到Channel的实现中。我们要协议个TCP/IP server,所以我们允许设置socket选项,例如:tcpNoDelay和keepAlive。请参见ChannelOption的API文档针对于ChannelConfig的ChannelOption配置。
  6. 你是否注意到option()和childoption()?option()用于NioServerSocketChannel接受进入的连接。childoption()用于父类ServerChannel接受的,这指的是NioServerSockethannel.
  7. 我们可以准备好开始了。剩下的就是绑定端口并启动 server。这里我们绑定的网卡端口是8080。现在你可以根据你需要多次调用bind()方法(但是需要绑定不同的地址)。

祝贺!你刚刚完成了你已经熟练掌握了Netty的第一个server。


深入学习接收数据

现在已经完成了第一个服务端,我们需要测试它是否能真正工作。最简单的方法就是使用telnet命令测试。例如:你可以在命令行键入 ‘telnet localhost 8080‘ 或者其他命令。
然而,我们能说服务端可以运行的很好吗?我们无法真正知道因为这是一个DISCARD SERVER。 你根本无法获取任何响应。为证明它是真正运行的,让我们修改下服务端打印它所接收的数据。
我们已经知道无论何时数据是否接收,channelRead()方法都将被调用。让我们在DiscardServerHandler的channelRead()方法中加入一些代码。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}

  1. 无效的循环完全可以简化为如下:System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
  2. 你也可以选择这种方法释放:in.release()
如果你再次运行telnet命令,你将会看到server端打印接收到的数据。
完整的discard server源码位置在发布版本的的io.netty.example.discard包中。

写一个ECHO 服务端

至此,我们对消费数据还没有任何响应。然而,服务器应该对请求做出回应。接下来让我们学习当任何地方收到的数据时返回时,如何通过ECHO协议的实现向一个客户端响应消息。
这和前面提到的的DISCARD SERVER 仅有的不同是,它对接收到的数据做出了响应而不是将接收的数据打印到控制台。因此,很有必要再次修改channelRead()方法。

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg); // (1)
        ctx.flush(); // (2)
    }

  1. ChannelHandlerContext对象提供了多种可以使你触发各种I/O事件和操作的方法(操作)。这里我们调用write(Object),逐个的接收信息。请注意我们并没有向在DISCARD 例子中那样不释放接收的信息。这是因为Netty会当你输出这些信息再释放。
  2. ctx.write(Object)并没有使这些数据输出到线上。它在内部做了缓冲,接着通过ctx.flush()排出到线上。当然还可以简洁的通过ctx.writeAndFlush(msg)实现。
如果你运行telnet命令,你将会看到你接收到你通过命令发送的数据。
这段的源码在发布版本中的io.netty.example.echo包中。

写一个计时服务端

在这一部分实现的是TIME协议。它不同于前面的例子是一旦它发送信息,它会包含一个32位整数并且不接受任何请求,且会丢失连接。在这个例子中,你将学习如何构建和发送信息,并且完成关闭连接。
由于我们要准备忽略任何接收的数据但这次发送消息一旦建立连接,我们不再准备使用channelRead()方法。代替的,方法是我们应该重写channelActive()方法。下面是实现:
package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

  1. 如代码中解释的一样,当一个连接被简历并且准备产生数据交互时,channelActive()方法被调用。
  2. 为发送一条新的消息,我们需要分配一段新的buffer用于容纳这个消息。我们要写一个32为的整数,因此我们需要一个至少容量为4byte的ByteBuf。可以通过ChannelHandlerContext.alloc()获取当前ByteBufAllocator和分配一段新的Buffer。
  3. 像以前一样,我们写构造信息。
    但是,等等,flip在哪里?在NIO中发送消息之前,我们不是应该调用java.nio.ByteBuffer.flip()吗?ByteBuf没有这样的方法,因为它有两个指示器;一个是读操作,另一个是写操作。读的索引不改变时,当你想ByteBuf写一些东西时写的索引会不断增长。读的位置和写的位置分别代表消息的开始和结束。
    相比之下,在不调用flip方法情况下,NIO buffer 没有提供一个清楚的方法支出消息内容的开始和结束。当你忘记反转缓冲区时你会遇到麻烦,因为没有或者错误的数据被发送。就像一个错误在NETTY中没有发生一样,因为我们对于不同的操作类型有不同的指示器。你将会发现自从你用了它之后你的生活变的越来越简单--人生没有反转出来。
    另一个需要指出的是,ChannelHandlerContext.write()(包括writeAndFlush())方法返回一个ChannelFuture。ChannelFuture代表一个I/O操作尚未发生。它意味着,任何请求操作可能已经执行,因为在NETTY中所有的操作是异步的。例如,接下来的代码可能关闭了连接,尽管是在消息发送之前。
    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();
    因此,你需要在channelFuture结束时调用close()方法,他通过write()方法返回,并且当写操作已经做时,他会通知它的监听者。请注意这里,close()方法也有可能立刻关闭连接,并将返回channelFuture。
  4. 当一个写的请求完成时,接下来我们如何获取通知。这就和向返回的ChannelFuture中添加一个ChannelFutureListener一样简单。这里,当操作执行时,我们创建一个用于关闭Channel的新的匿名的ChannelFutureLiistener 。
当然,可以用预定义listener简化代码:
f.addListener(ChannelFutureListener.CLOSE);
为测试我们的服务端是否如愿工作,你可以用如下UNIX rdate 命令:
$ rdate -o <port> -p <host>
这里的<port> 是main方法中的端口,<host> 一般是localhost。

写一个计时客户端

不像DISCARD和ECHO server 一样,由于人类无法将32进制数据转化为时间和日期所以对于TIME协议,我们需要一个客户端。在本段,我们讨论如何确定服务端正确工作和如何用NETTY写客户端。
在NETTY中服务端和客户端的最大也是唯一的不同点是不同的对于用到的BootStrap和Channel的实现不同。请看下面的代码:
package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

  1. Bootstrap与ServerBootStrap非常相似,除了它是非服务端channels,比如客户端或者是无连接传输模式的通道。
  2. 如果你仅仅指定一个EventLoopGroup,它将会作为boss group和worker group。然而,boss工作者不能用于客户端。
  3. 代替NioServerSocketChannel的NioSocketChannel用于创建客户算Channel。
  4. 请注意这里我们没有想ServerBootStrap一样用chileOption(),因为客户算SocketChannel没有父类。
  5. 在客户端我们应该调用connect()方法代替bind()方法。
如你所看到的,这和服务端代码并没有什么不同的。那么ChannelHandler是什么样的实现呢?它应该从服务端接收32位的整型并转换为人类可读的格式,打印转换的时间并关闭连接。
package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 在TCP/IP协议中,Netty将读取的数据发送到‘ByteBuf‘。
以上看起来非常简单,并且与服务端例子并没有多少不同。然而,客户端handler有时会拒绝服务随之抛出IndexOutOfBoundsException。我们将在下一部分讨论为什么会发生如上问题。

处理基于流的传输

对于Socket Buffer的一个小的警告

基于流的传输,像TCP/IP,接收到的数据会存储到socket 接收缓存中。不幸的是,基于刘的缓冲缓存不是有序的包而是有序的byte。这就意味着,即使你将两条信息以两个独立的包发送出去,操作系统也不会将他们做为两条信息对待,而是一堆byte。因此,对于你是否读的精确或者是否同行远程写,都没有保证的。例如,让我们假设操作系统的TCP/IP 堆已经接收了三个包:

由于基于流协议的一般属性,在应用中读取的数据有较高的几率是碎片格式的。

因此,接收部分,不管它是服务器端或客户端,应该将整理的磁盘碎片进入到到一个或多个有意义的框架,以便可在这里我们以很容易地理解应用程序逻辑。在上面的示例中,接收到的数据应该如下形式:

第一种解决方案

让我们回到TIME 客户端的例子中。在这里有同样的问题。对于庞大的数据来说32 bit的数据微乎其微,并且也不太可能碎片化。然而,问题是还是会碎片化,而且碎片化随着传输的不断增长可能性更大。
package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();

        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. ChannelHandler 有两个生存周期监听方法:handlerAdded和handlerRemoved。您可以执行任意初始化任务,只要它不会阻碍很长一段时间。
  2. 首先,所有接收到的数据必须累积到buf中。
  3. 接着,handler必须检查,如果buf中与足够的数据,就像例子中buf可以容纳4byte数据一样,就要执行实际的业务逻辑。另外,当有更多的数据到达,Netty会再次调用channelREAD()方法,知道积累够4byte的数据。

第二种解决方案

尽管第一种解决方案已经解决了TIME 客户端的问题,但是修改后的handler看起来并不是那么清晰。想象一下如果有一个更为复杂的协议,有多个字段组成而且是多种很长的字段。你的ChannelInboundHandler的实现将会很快变的不可控。
或许你已经注意到,你可以添加多个ChannelHandler到ChannelPipeline中,因此所以,你可以将一整个ChannelHandler 分解为多个模块,以降低你的应用的复杂性。例如,你可以将
TimeClientHandler分解为两个handler:
  • TimeDecoder 处理碎片的问题,并且最初的简略版本是TimeClientHandler。
幸运的是,Netty提供了可扩展的class,可以帮助你拿来即用。
package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }

        out.add(in.readBytes(4)); // (4)
    }
}
  1. ByteToMessageDecoder 继承了ChannelInboundHandler,它可以使你更容易的处理碎片问题。
  2. 无论何时接收新数据,ByteToMessageDecoder 都会调用decode()方法内部维护累计缓冲区。
  3. 当累积的缓冲区中没有足够数据,decode()方法可以决定不向out添加任何东西。当有更多的数据接收到时,ByteToMessageDecoder 会再次调用 decode()方法。
  4. 如果decode()添加一个对象到out中,这意味着转译器成功转换了一个信息。ByteToMessageDecoder 将会丢弃读了部分的累计缓冲区。请技术,你无需转译混杂的信息。ByteToMessageDecoder 会一直调用decode()方法直到不在向out中添加任何东西。
现在我们有另一个handler插入到ChannelPipeline中,我们应该TimeClient中的ChannelInitializer。
b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});
如果你是个喜欢挑战的人,你可能会试着将ReplayingDecoder进一步简化。你可以查阅API指南,了解更多信息。
public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}
另外,Netty提供了开箱即用的解码器,使您可以很容易的实现大多数协议并且帮助你实现避免了整体不可维护的处理程序的问题。请参考如下包,获取更多详细例子:
  • io.netty.example.factorial 针对二进制协议
  • io.netty.example.telnet  针对基于文本行的协议

  用POJO 代替bytebuf

至此我们所讨论的协议信息都是用ByteBuf作为私有数据结构。在这部分,我们将在TIME协议的客户端和服务端的例子中用POJO代替bytebuf。
在你的ChannelHandler中用POJO的优势是非常明显的;从ByteBuf中提取出代码信息 从handler 会变的更加可控和可多次利用。在客户端和服务端的例子中,我们仅读了32 位整数,但是真并不是用bytebuf的直接原因。然而,你将会发现在你实现一个真实世界的协议这是非常需要分离的。
首先,让我们定义一个叫UnixTime的新类型。
package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final int value;

    public UnixTime() {
        this((int) (System.currentTimeMillis() / 1000L + 2208988800L));
    }

    public UnixTime(int value) {
        this.value = value;
    }

    public int value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

我们现在可以修改TimeDecoder生产一个UnixTime从而代替ByteBuf。
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readInt()));
}
更新decoder的同时, TimeClientHandler不再用bytebuf.
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}
更加的简单和优雅了,不是吗? 同样的方法也可以应用与服务端。让我们首先修改下TimeServerHandler。
@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}
现在,仅剩下的是编码,编码继承自ChannelOutboundHandler,用于将unixtime 转换为bytebuf.这要比写一个解码器更加简单,因为这里没必要当将信息编码时,处理和整合包碎片。
package io.netty.example.time;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt(m.value());
        ctx.write(encoded, promise); // (1)
    }
}
  1. 在这一行有很重要的事情值得重视。
首先,当编译数据实际上是写入时,我们通过原有的ChannelPromise,以便Netty标记成功和失败。
其次,我们并没有调用ctx.flush()。void flush(ChannelHandlerContext ctx) 是一个单独的handler方法,其目的是重写flush()操作。
要进一步简化,你可以使用MessageToByteEncoder:
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt(msg.value());
    }
}
最后剩余的任务是在服务端TimeServerHandler之前将TimeEncoder 插入到ChannelPipeline。

 关闭你的应用

关闭netty应用和你通过shutdownGracefully关闭所有的 EventLoopGroup一样简单。当EventLoopGroup 停止的时候,它将会提醒你返回一个Future并且所有的channel属于已经关闭的group。

总结

在本文中,我们已经通过一些例子快速学习了如何在netty中写一个完整的的可以工作的网络应用。
在上面的章节中,有太多的详细的信息。我们也鼓励您在io.netty.example包中再次回顾netty例子。
也请注意在社区中你可以发问并有一些主意可以帮助你,这样能保持Netty的持续改进并且文档会基于你的反馈。


郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。