This weekend, the article continues. When using TCP protocol for communication, the most frequently heard problems are packet sticking and unpacking. This article will take a look at how to solve the problems of sticking and unpacking.
A TCP packet sticking / unpacking problem and its solution
In solving TCP packet sticking and unpacking, let's first look at an idea. Let's take a look at the Demo of reading Int data and experience this idea.
1.1 ReplayingDecoder
1. customize the decoder to read an Int from ByteBuf. (focus, be sure to understand this code)
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { if (buf.readableBytes() < 4) { return; } buf.markReaderIndex();//Mark the current read pointer. int length = buf.readInt();//Read an int from ByteBuf if (buf.readableBytes() < length) { buf.resetReaderIndex();//Restore to the read pointer just marked return; } out.add(buf.readBytes(length)); } }
2. use ReplayingDecoder for optimization ()
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> { protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { out.add(buf.readBytes(buf.readInt())); } }
3. instructions for replayingdecoder (key points, to be understood)
A enables a special ByteBuf, called ReplayingDecoderByteBuf, and extends ByteBuf
b rewrites the readXxx() and other methods of ByteBuf. It will first check the readability of bytes. Once it detects that the request is not satisfied, it will directly throw replace (replace inherits ERROR)
c ReplayingDecoder overrides the callDecode() method of ByteToMessageDecoder to capture Signal and reset the readerIndex of ByteBuf in the catch block.
d. continue to wait for data until data is available, so as to ensure that the data to be read is read.
The generic S in the definition of class e is a state machine enumeration class that records the decoding state. It can be found in state(S s), checkpoint(S s) and other methods. You can also use java Lang.void.
Summary:
ReplayingDecoder is a subclass of ByteToMessageDecoder, which extends ByteBuf. From the readXxx() and other methods written, the data in the current ByteBuf is less than the proxy data. The data can be retrieved only after the data is satisfied. You can omit the manual implementation of this code.
4. attention
1 buffer Partial operation of( readBytes(ByteBuffer dst),retain(),release()Wait for an exception to be thrown directly) 2 In some cases, performance will be affected (such as decoding the same segment of messages multiple times)
Inherit ReplayingDecoder, error examples and modifications
//This is an example of an error: //The message contains two integer s, and the "decode" method in the code will be called twice. At this time, the queue size is not equal to 2, and this code does not achieve the expected result. public class MyDecoder extends ReplayingDecoder<Void> { private final Queue<Integer> values = new LinkedList<Integer>(); @Override public void decode(ByteBuf buf, List<Object> out) throws Exception { // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); assert values.size() == 2; out.add(values.poll() + values.poll()); } }
//The right approach: public class MyDecoder extends ReplayingDecoder<Void> { private final Queue<Integer> values = new LinkedList<Integer>(); @Override public void decode(ByteBuf buf, List<Object> out) throws Exception { // Revert the state of the variable that might have been changed // since the last partial decode. values.clear(); // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); // Now we know this assertion will never fail. assert values.size() == 2; out.add(values.poll() + values.poll()); } }
Implementation of ByteToIntegerDecoder2
public class ByteToIntegerDecoder2 extends ReplayingDecoder<Void> { /** * @param ctx Up and down text * @param in ByteBuf message data input * @param out Containers exported after conversion * @throws Exception */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { out.add(in.readInt()); //Read int type data, put into to output, and complete data type conversion } }
1.2 the problem of unpacking and sticking packets reappears (the client sends ten pieces of data to the server)
1. client startup class
public class NettyClient { public static void main(String[] args) throws Exception{ EventLoopGroup worker = new NioEventLoopGroup(); try { //Server startup class Bootstrap bootstrap = new Bootstrap(); bootstrap.group(worker); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync(); future.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); } } }
2. client ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("Message received from the server:" + msg.toString(CharsetUtil.UTF_8)); System.out.println("Number of messages received from the server:" + (++count)); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 10; i++) { ctx.writeAndFlush(Unpooled.copiedBuffer("from client a message!", CharsetUtil.UTF_8)); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
3. server NettyServer
public class NettyServer { public static void main(String[] args) throws Exception { //The main thread does not process any business logic, but only receives connection requests from customers EventLoopGroup boss = new NioEventLoopGroup(1); //Work as a thread, the default number of threads is: cpu*2 EventLoopGroup worker = new NioEventLoopGroup(); try { //Server startup class ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker); //Configure server channel serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new ServerHandler()); } }); //Processor of worker thread ChannelFuture future = serverBootstrap.bind(5566).sync(); System.out.println("Server startup complete....."); //Wait for the server listening end mouth to close future.channel().closeFuture().sync(); } finally { //Elegant close boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
4. ServerHandler
public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("The server received the message:" + msg.toString(CharsetUtil.UTF_8)); System.out.println("Number of messages received by the server:" + (++count)); ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8)); } }
1.3 what is TCP packet sticking and unpacking
TCP is delivered by streams. Streams are data without boundaries. The server accepts the client data, and does not know whether it is one or more. When the server reads the data, the packet sticking problem occurs.
Therefore, when the server and client transfer data, they should formulate unpacking rules. The client sticks packets according to the rule, and the server unpacks according to the rule. If the rule is violated arbitrarily, the server cannot get the expected data.
1. solutions (three types)
1. Add a header to the sent packet, and store the data in the header. The server can read the data according to this header, so that it knows where the boundary is. 2. Send data at a fixed long degree. If the data is exceeded, it will be sent several times. If it is not _ filled with 0, the receiving end will receive it at a fixed long degree. 3. Set boundaries between data packets, such as adding special symbols, so that the receiving end can separate different data packets through this boundary.
1.4 actual combat: solve the problem of TCP packet sticking / unpacking
1. custom agreement
public class MyProtocol { private Integer length; //Data header: long degrees private byte[] body; //Data body public Integer getLength() { return length; } public void setLength(Integer length) { this.length = length; } public byte[] getBody() { return body; } public void setBody(byte[] body) { this.body = body; } }
2. encoder
public class MyEncoder extends MessageToByteEncoder<MyProtocol> { @Override protected void encode(ChannelHandlerContext ctx, MyProtocol msg, ByteBuf out) throws Exception { out.writeInt(msg.getLength()); out.writeBytes(msg.getBody()); } }
3. decoder
public class MyDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int length = in.readInt(); //Get long degrees byte[] data = new byte[length]; //Define byte array according to long degree in.readBytes(data); //Read data MyProtocol myProtocol = new MyProtocol(); myProtocol.setLength(length); myProtocol.setBody(data); out.add(myProtocol); } }
4. client ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<MyProtocol> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception { System.out.println("Message received from the server:" + new String(msg.getBody(), CharsetUtil.UTF_8)); System.out.println("Number of messages received from the server:" + (++count)); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 10; i++) { byte[] data = "from client a message!".getBytes(CharsetUtil.UTF_8); MyProtocol myProtocol = new MyProtocol(); myProtocol.setLength(data.length); myProtocol.setBody(data); ctx.writeAndFlush(myProtocol); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception { cause.printStackTrace(); ctx.close(); } }
5. NettyClient
public class NettyClient { public static void main(String[] args) throws Exception{ EventLoopGroup worker = new NioEventLoopGroup(); try { //Server startup class Bootstrap bootstrap = new Bootstrap(); bootstrap.group(worker); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MyEncoder()); ch.pipeline().addLast(new MyDecoder()); ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync(); future.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); } } }
6. ServerHandler
public class ServerHandler extends SimpleChannelInboundHandler<MyProtocol> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception { System.out.println("The server received the message:" + new String(msg.getBody(), CharsetUtil.UTF_8)); System.out.println("Number of messages received by the server:" + (++count)); byte[] data = "ok".getBytes(CharsetUtil.UTF_8); MyProtocol myProtocol = new MyProtocol(); myProtocol.setLength(data.length); myProtocol.setBody(data); ctx.writeAndFlush(myProtocol); } }
7. NettyServer
public class NettyServer { public static void main(String[] args) throws Exception { //The main thread does not process any business logic, but only receives connection requests from customers EventLoopGroup boss = new NioEventLoopGroup(1); //Work as a thread, the default number of threads is: cpu*2 EventLoopGroup worker = new NioEventLoopGroup(); try { //Server startup class ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker); //Configure server channel serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new MyDecoder()) .addLast(new MyEncoder()) .addLast(new ServerHandler()); } }); //Processor of worker thread ChannelFuture future = serverBootstrap.bind(5566).sync(); System.out.println("Server startup complete....."); //Wait for the server listening end mouth to close future.channel().closeFuture().sync(); } finally { //Elegant close boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
8. test
II. Analysis of Netty core source code
2.1 analysis of server startup process
1. create a server Channel
1 ServerBootstrap Object's bind()The law is also the law 2 AbstractBootstrap Medium initAndRegister()Enter line create Channel establish Channel Reason for ReflectiveChannelFactory In reflection classes newChannel()Fang method completion. 3 NioServerSocketChannel In the construction Fang method of jdk nio Underlying SelectorProvider open ServerSocketChannel. 4 stay AbstractNioChannel In the construction Fang method of, set channel For not blocking: ch.configureBlocking(false); 5 Passed AbstractChannel The construction Fang method of creates id,unsafe,pipeline Content. 6 adopt NioServerSocketChannelConfig obtain tcp Some parameters of the bottom layer
2. initialize the server Channel
1 AbstractBootstrap Medium initAndRegister()Go to initialization channel,code: init(channel); 2 stay ServerBootstrap Medium init()Fang method setting channelOptions as well as Attributes. 3 Next, save the user-defined parameters and attributes to local variables currentChildOptions,currentChildAttrs,with For later use 4 If set serverBootstrap.handler()If yes, it will add enter to pipeline Medium. 5 Add connector ServerBootstrapAcceptor,After a new connection is added, the defined childHandler Add enter to connected pipeline Medium:
ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast( new ServerBootstrapAcceptor(ch, currentChildGroup,currentChildHandler, currentChildOptions, currentChildAttrs)); } });
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { //Execute only when the client is connected final Channel child = (Channel) msg; //Add the defined childHandler to the connected pipeline child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { childGroup.register(child).addListener(new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
3. register the selector
//Registration 1 initAndRegister()Fang in law ChannelFuture regFuture = config().group().register(channel); 2 stay io.netty.channel.AbstractChannel.AbstractUnsafe#Complete the actual registration in register() 2.1 AbstractChannel.this.eventLoop = eventLoop; Enter eventLoop Assignment of, subsequent IO event The operation will be carried out by the eventLoop Hold. 2.2 Adjustment register0(promise)Medium doRegister()Enter the actual registration 3 io.netty.channel.nio.AbstractNioChannel#doRegister has implemented the method
//Register multiplexers through the jdk bottom layer //javaChannel() -- the channel created in the previous step //eventLoop().unwrappedSelector() -- get selector //The event of interest registered is 0, indicating that there is no event of interest. After that, the event will be re registered //Register this object to the selector in the form of attachment, and then get the contents of the current object selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
4. binding port
1 entrance in io.netty.bootstrap.AbstractBootstrap#doBind0(), start a thread to perform line binding end port operations 2 Adjustment io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)Start thread execution again 3 Final adjustment io.netty.channel.socket.nio.NioServerSocketChannel#doBind() method in line binding operation
//Binding through the channel at the bottom of jdk @SuppressJava6Requirement(reason = "Usage guarded by java version check") @Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
When to update the primary and secondary events of the selector? Finally in io netty.channel.nio.Abstractniochannel\dobeginread() method
protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); //set up Events of interest are OP_ACCEPT } } //Assignment is made in the construction method of NioServerSocketChannel public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
2.2 connection request process source code analysis
1. new connection access
Entrance at io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)in Inlet NioMessageUnsafe of read()Fang method Adjustment io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages() method, creating jdk Underlying channel,Packaged as NioSocketChannel Add to List In container
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
establish NioSocketChannel object new NioSocketChannel(this, ch),adopt new The Fang incremental line creation of Adjustment super Construction method of Transmission SelectionKey.OP_READ Event ID establish id,unsafe,pipeline object Set not blocking ch.configureBlocking(false); establish NioSocketChannelConfig object
2. register read events
stay io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe In: for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); //Propagate read events }
stay io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object) method In law private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { //Execute channelRead. It should be noted that the first execution is HeadHandler, and the second execution is ServerBootstrapAcceptor //Enter the same registered selector as the new connection through ServerBootstrapAcceptor Logical registration and event binding ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { invokeExceptionCaught(t); } } else { fireChannelRead(msg); } }
III. using Netty optimization points
Some simple suggestions when using Netty. It's worth seeing.
3.1 zero copy
1 Bytebuf is a DirectBuffer type that enables pooling. There is no need to copy into the byte buffer. If the heap memory is used, the JVM will copy it to the heap first, and then write to the Socket. This will increase the number of copies.
2 CompositeByteBuf encapsulates multiple bytebufs into "bytebufs". Process copying is not required when adding bytebufs.
3. The transferTo method of the DefaultFileRegion of netty's "file transfer" class sends "files to the" standard "channel, which eliminates the need for" circular copying "and improves performance.
08
3.2 task scheduling of EventLoop
channel.eventLoop().execute(new Runnable() { @Override public void run() { channel.writeAndFlush(data) } });
Instead of using channel writeAndFlush(data); The task scheduling of the EventLoop is directly put into the execution queue of the EventLoop corresponding to the channel, which will cause thread switching. Note: at the bottom of writeandflush, if it is not executed through EventLoop, a new thread will be started.
3.3 reduce the adjustment of ChannelPipline
public class YourHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { //msg. All handler s must pass through the entire channelpipeline. ctx.channel().writeAndFlush(msg); //From the current handler one to the tail of pipline, use is shorter. ctx.writeAndFlush(msg); } }
3.4 reduce the creation of ChannelHandler (basically not configured)
If the channelhandler is in a stateless (that is, it does not need to save any state parameters), then use the Use Sharable annotation and create only one instances during bootstrap to reduce GC. Otherwise, the handler object will be new for each connection.
@ChannelHandler.Shareable public class StatelessHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) {} } public class MyInitializer extends ChannelInitializer<Channel> { private static final ChannelHandler INSTANCE = new StatelessHandler(); @Override public void initChannel(Channel ch) { ch.pipeline().addLast(INSTANCE); } }
be careful:
Codecs such as ByteToMessageDecoder are stateful and cannot be annotated with "Sharable".
3.5 setting of configuration parameters
Server side bossGroup Just set it to 1 because ServerSocketChannel During the initialization phase, only Register to a eventLoop Up, this eventLoop Only one threads are running, so it is not necessary to set to Multithreading. But IO Thread, in order to make full use of use CPU,At the same time, consider reducing the overhead of online and offline text switching. Generally workGroup Set to CPU Twice the number of cores, which is Netty The default value provided. In the scenario with high requirements for response time, make use.childOption(ChannelOption.TCP_NODELAY, true) and.option(ChannelOption.TCP_NODELAY, true)To ban nagle Algorithm, send without waiting.
The knowledge points related to Netty have been shared. In the future, I still share Netty related content. Mainly problems encountered in work and new insights.
--------------------------------------------------------------END-----------------------------------------------------------------------------
Introduction to official account
In my spare time, I have studied the dark horse architecture course for some time. Continue to share some dry goods, such as Netty, MySQL, cache, middleware, midrange, automated deployment, design patterns, JVM, Spring source code, and MyBatis source code. If you want to know more about the official account, please reply to [story] on the official account.
Official account benefits
Pay attention to the official account, reply to the [Architecture Interview] and obtain the interview questions of the architecture course. If you have any suggestions or want to say, please send a message directly to the official account. The little sister in charge of operation will deal with it immediately after seeing it. Please wait patiently! Crab crab!