netty权威指南--------第四章TCP粘包/拆包问题

第三章中的示例用于功能测试一般没有问题,但当压力上来或者发送大报文时,就会存在粘包/拆包问题。

这时就需要使用LineBasedFrameDecoder+StringDecoder


client端请求改为连续的100次


package com.xiaobing.netty.fourth;


import java.net.SocketAddress;


import org.omg.CORBA.Request;


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;


public class TimeClientHandler extends ChannelHandlerAdapter {




//private static final Logger  LOGGER= new Logger.getLogger(TimeClientHandler.class);

private int counter;
private byte[] req;

public TimeClientHandler() {
req = ("QUERY TIME ORDER" +System.getProperty("line.separator") ).getBytes();


}


/*
* 客户端和服务端TCP链路建立成功后被调用
* @see io.netty.channel.ChannelHandlerAdapter#channelActive(io.netty.channel.ChannelHandlerContext)
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//将请求消息发送至服务端
ByteBuf messageBuf = null;

//以下情况会发生tcp粘包
for(int i=0; i<100; i++){
messageBuf = Unpooled.buffer(req.length);
messageBuf.writeBytes(req);
ctx.writeAndFlush(messageBuf);

}


}


/*
* 服务端返回应答消息时被调用
* @see io.netty.channel.ChannelHandlerAdapter#channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object)
*/


@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {


// ByteBuf buf = (ByteBuf) msg;
// byte[] req = new byte[buf.readableBytes()];
// buf.readBytes(req);
// String bodyString = new String(req,"UTF-8");
String bodyString = (String)msg;
System.out.println("Now is :" + bodyString + "; the count is :" + ++counter);
}




@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {


ctx.close();
}


}


package com.xiaobing.netty.fourth;


import java.security.acl.Group;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;


public class TimeClient {

public void connect(int port , String host) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try{
//客户端启动辅助类
Bootstrap b = new Bootstrap();

b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
//在初始化时将channelHandler 设置到channelpipeline中
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeClientHandler());

}

});
//发起异步连接
ChannelFuture f = b.connect(host,port).sync();
//等待客户端链路关闭
f.channel().closeFuture().sync();
}finally{

group.shutdownGracefully();

}

}


public static void main(String[] args) throws Exception {


int port = 8080;
new TimeClient().connect(port, "127.0.0.1");
}


}



server端

package com.xiaobing.netty.fourth;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;


public class TimeServer {

public void bind(int port) throws Exception{
System.out.println("-------TimeServer start----------");
//服务端接受客户端的连接线程
EventLoopGroup bossGroup = new NioEventLoopGroup();
//网络读写线程
EventLoopGroup workerGroup = new NioEventLoopGroup();
//服务端辅助启动类
ServerBootstrap b = new ServerBootstrap();
try {

b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
//阻塞等待端口绑定完成
System.out.println("-------TimeServer wait----------");
ChannelFuture f = b.bind(port).sync();

//等待服务端链路关闭
f.channel().closeFuture().sync();
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}


}

//网络IO处理类
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{


@Override
protected void initChannel(SocketChannel arg0) throws Exception {
//发生tcp粘包的情况
// arg0.pipeline().addLast(new TimeServerHandler());
System.out.println("-------ChildChannelHandler----------");
arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());
arg0.pipeline().addLast(new TimeServerHandler());

}

}


public static void main(String[] args) throws Exception {

int port = 8080;
new TimeServer().bind(port);
}


}



package com.xiaobing.netty.fourth;




import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
//对网络事件进行读写
public class TimeServerHandler extends ChannelHandlerAdapter {


private int counter;


@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
// ByteBuf buf = (ByteBuf) msg;
// //获取缓冲区可读字节数
// byte[] req =  new byte[buf.readableBytes()];
//
// buf.readBytes(req);
// String body = new String(req,"UTF-8").substring(0,req.length - System.getProperty("line.separator").length());

String  body = (String)msg;

System.out.println("The time server receive order:" + body + "; the count is :" + ++counter);

String currentTimeString  = "QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date(System.currentTimeMillis()).toString() :"BAD ORDER";

currentTimeString = currentTimeString + System.getProperty("line.separator");
//应答消息
ByteBuf resp = Unpooled.copiedBuffer(currentTimeString.getBytes());

ctx.writeAndFlush(resp);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx){
//将消息发送队列的消息写入socketchannel中
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
//发生异常时,关闭ctx
ctx.close();
}


}




郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。