Netty3 源码分析 - ChannelHandler

Netty3 源码分析 - ChannelHandler

每个通道关联一个Pipeline,在流水线中拦截处理各种事件的对象就是ChannelHandler,它处理ChannelEvent而后进行传递。
接口ChannelHandler没有提供任何方法,有两个子接口分别用来规范处理上行和下行的通道事件。


ChannelHandler是随ChannelHandlerContext对象提供的,handler通过这个context对象参与这个Pipeline的交互管理,通过它所属的context对象,一个handler可以传递上行或下行事件,动态改变流水线,存储信息到attachment。

状态管理:一个ChannelHandler通常需要存一些状态信息,最简单推荐的方式就是利用成员变量(放在特定的Handler类中),向下面这样:
public class DataServerHandler extends SimpleChannelHandler {

      private boolean loggedIn // 存储状态信息

      @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        Channel ch = e.getChannel();
        Object o = e.getMessage();
        if (o instanceof LoginMessage) {
            authenticate((LoginMessage ) o);
            loggedIn = true ;
        } else (o instanceof GetDataMessage) {
            if (loggedIn) {
                ch.write( fetchSecret((GetDataMessage ) o));
            } else {
                fail();
            }
        }
    }
}
因为每个Handler实例是服务于一个连接的,所以对于每个新的Channel都应该创建一个新的ChannelHandler实例。
//Create a new handler instance per channel.
// See Bootstrap.setPipelineFactory(ChannelPipelineFactory).
public class DataServerPipelineFactory implements ChannelPipelineFactory {
      public ChannelPipeline getPipeline() {
           return Channels.pipeline( new DataServerHandler());//注意这里
     }
}

但是有时候并不需要很多冗余Handler,做着相同的工作,不需要为每个连接(或Channel)都创建一个这样的Handler,所以可以利用attachment存储状态信息,(或许可以认为这样的Handler是安全的,可重入的)。向下面这样:
@Sharable
public class DataServerHandler2 extends SimpleChannelHandler {

      @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        Channel ch = e.getChannel();
        Object o = e.getMessage();
        if (o instanceof LoginMessage) {
            authenticate((LoginMessage ) o);
            ctx.setAttachment( true );
        } else (o instanceof GetDataMessage) {
            if (Boolean.TRUE.equals(ctx.getAttachment())) {
                ch.write( fetchSecret((GetDataMessage ) o));
            } else {
                fail();
            }
        }
    }
}

这样就可以为不同的Pipeline增加同一个Handler,如下:
public class DataServerPipelineFactory2 implements ChannelPipelineFactory {

      private static final ChannelHandler SHARED = new DataServerHandler2();

      public ChannelPipeline getPipeline() {
           return Channels.pipeline( new ChannelHandler[] { SHARED });
     }
}

使用ChannelLocal:如果有的状态变量需要从其他Handler或者Handler之外来访问,就需要用
ChannelLocal这个Iterable,相当于从属于这个Channel的全局变量,可以联想TreadLocal
public final class DataServerState {

      public static final ChannelLocal<Boolean> loggedIn = new ChannelLocal<Boolean>() {
           protected Boolean initialValue(Channel channel) {
               return false ;
          }
     };
}
Handler此时这样写:
@ Sharable
public class DataServerHandler extends SimpleChannelHandler {

      @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        Channel ch = e.getChannel();
        Object o = e.getMessage();
        if (o instanceof LoginMessage) {
            authenticate((LoginMessage ) o);
            DataServerState.loggedIn.set(ch, true );// 更新这个ChannelLocal
        } else (o instanceof GetDataMessage) {
            if (DataServerState.loggedIn.get(ch)) {
                ctx.getChannel().write(fetchSecret(( GetDataMessage) o));
            } else {
                fail();
            }
        }
    }
}
根据每个Channel的状态进行逻辑处理,使用场景:
          // Print the remote addresses of the authenticated clients:
           ChannelGroup allClientChannels = ...;
           for (Channel ch: allClientChannels) {
               if (DataServerState.loggedIn.get(ch)) {
                   System.out.println(ch.getRemoteAddress());
               }
           }

@Sharable注解:如果一个ChannelHandler前面有该注解,意味着可以将该对象的一个实例分配给多个Pipeline,而不会发生竟态条件。否则的话就要为每个Pipeline单独创建一个实例,因为存在非共享的状态如成员变量。










郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。