The whole process of resolving packet sticking and unpacking during "Netty series" TCP communication

This weekend, the article continues. When using TCP protocol for communication, the most frequently heard problems are packet sticking and unpacking. This article will take a look at how to solve the problems of sticking and unpacking.       

A TCP packet sticking / unpacking problem and its solution

In solving TCP packet sticking and unpacking, let's first look at an idea. Let's take a look at the Demo of reading Int data and experience this idea.

1.1 ReplayingDecoder

1. customize the decoder to read an Int from ByteBuf. (focus, be sure to understand this code)

public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
      if (buf.readableBytes() < 4) {
        return;
      }
      buf.markReaderIndex();//Mark the current read pointer.
      int length = buf.readInt();//Read an int from ByteBuf
      if (buf.readableBytes() < length) {
        buf.resetReaderIndex();//Restore to the read pointer just marked
        return; 
      }
      out.add(buf.readBytes(length));
    }
}

2. use ReplayingDecoder for optimization ()

public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
       out.add(buf.readBytes(buf.readInt()));
    }
}

 

3. instructions for replayingdecoder (key points, to be understood)

A enables a special ByteBuf, called ReplayingDecoderByteBuf, and extends ByteBuf

 

b rewrites the readXxx() and other methods of ByteBuf. It will first check the readability of bytes. Once it detects that the request is not satisfied, it will directly throw replace (replace inherits ERROR)

  

c ReplayingDecoder overrides the callDecode() method of ByteToMessageDecoder to capture Signal and reset the readerIndex of ByteBuf in the catch block.

  

d. continue to wait for data until data is available, so as to ensure that the data to be read is read.

 

The generic S in the definition of class e is a state machine enumeration class that records the decoding state. It can be found in state(S s), checkpoint(S s) and other methods. You can also use java Lang.void.

 

Summary:

ReplayingDecoder is a subclass of ByteToMessageDecoder, which extends ByteBuf. From the readXxx() and other methods written, the data in the current ByteBuf is less than the proxy data. The data can be retrieved only after the data is satisfied. You can omit the manual implementation of this code.

 

4. attention

1 buffer Partial operation of( readBytes(ByteBuffer dst),retain(),release()Wait for an exception to be thrown directly)
2 In some cases, performance will be affected (such as decoding the same segment of messages multiple times)

 

Inherit ReplayingDecoder, error examples and modifications

//This is an example of an error:
//The message contains two integer s, and the "decode" method in the code will be called twice. At this time, the queue size is not equal to 2, and this code does not achieve the expected result.
public class MyDecoder extends ReplayingDecoder<Void> {
    private final Queue<Integer> values = new LinkedList<Integer>();
    @Override
    public void decode(ByteBuf buf, List<Object> out) throws Exception {
        // A message contains 2 integers.
        values.offer(buf.readInt());
        values.offer(buf.readInt());
        assert values.size() == 2;
        out.add(values.poll() + values.poll());
    }
}
//The right approach:
public class MyDecoder extends ReplayingDecoder<Void> {
    private final Queue<Integer> values = new LinkedList<Integer>();
    @Override
    public void decode(ByteBuf buf, List<Object> out) throws Exception {
        // Revert the state of the variable that might have been changed
        // since the last partial decode.
        values.clear();
        // A message contains 2 integers.
        values.offer(buf.readInt());
        values.offer(buf.readInt());
        // Now we know this assertion will never fail.
        assert values.size() == 2;
        out.add(values.poll() + values.poll());
    }
} 

 

Implementation of ByteToIntegerDecoder2

public class ByteToIntegerDecoder2 extends ReplayingDecoder<Void> {
    /**
    * @param ctx Up and down text
    * @param in ByteBuf message data input
    * @param out Containers exported after conversion
    * @throws Exception
    */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
     out.add(in.readInt()); //Read int type data, put into to output, and complete data type conversion
    }
}

 

1.2 the problem of unpacking and sticking packets reappears (the client sends ten pieces of data to the server)

1. client startup class

public class NettyClient {
    public static void main(String[] args) throws Exception{
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            //Server startup class
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new ClientHandler());
                }
         });
            ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync();
            future.channel().closeFuture().sync();
        } finally {
         worker.shutdownGracefully();
        }
    }
}

 

2. client ClientHandler

public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
      System.out.println("Message received from the server:" +
      msg.toString(CharsetUtil.UTF_8));
      System.out.println("Number of messages received from the server:" + (++count));
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
      for (int i = 0; i < 10; i++) {
      ctx.writeAndFlush(Unpooled.copiedBuffer("from client a message!",
      CharsetUtil.UTF_8));
      }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      cause.printStackTrace();
      ctx.close();
    }
}

 

3. server NettyServer

public class NettyServer {
    public static void main(String[] args) throws Exception {
      //The main thread does not process any business logic, but only receives connection requests from customers
      EventLoopGroup boss = new NioEventLoopGroup(1);
      //Work as a thread, the default number of threads is: cpu*2
      EventLoopGroup worker = new NioEventLoopGroup();
      try {
        //Server startup class
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, worker);
        //Configure server channel
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline()
            .addLast(new ServerHandler());
          }
        }); //Processor of worker thread
        ChannelFuture future = serverBootstrap.bind(5566).sync();
        System.out.println("Server startup complete.....");
        //Wait for the server listening end mouth to close
        future.channel().closeFuture().sync();
      } finally {
        //Elegant close
        boss.shutdownGracefully();
        worker.shutdownGracefully();
      }
    }
}

 

4. ServerHandler

public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
      private int count;
      @Override
      protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
          System.out.println("The server received the message:" +
          msg.toString(CharsetUtil.UTF_8));
          System.out.println("Number of messages received by the server:" + (++count));
          ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
      }
}

 

1.3 what is TCP packet sticking and unpacking

TCP is delivered by streams. Streams are data without boundaries. The server accepts the client data, and does not know whether it is one or more. When the server reads the data, the packet sticking problem occurs.

Therefore, when the server and client transfer data, they should formulate unpacking rules. The client sticks packets according to the rule, and the server unpacks according to the rule. If the rule is violated arbitrarily, the server cannot get the expected data.

1. solutions (three types)

1. Add a header to the sent packet, and store the data in the header. The server can read the data according to this header, so that it knows where the boundary is.
   
2. Send data at a fixed long degree. If the data is exceeded, it will be sent several times. If it is not _ filled with 0, the receiving end will receive it at a fixed long degree.

3. Set boundaries between data packets, such as adding special symbols, so that the receiving end can separate different data packets through this boundary.

 

1.4 actual combat: solve the problem of TCP packet sticking / unpacking

1. custom agreement

public class MyProtocol {
    private Integer length; //Data header: long degrees
    private byte[] body; //Data body
    public Integer getLength() {
     return length;
    }
    public void setLength(Integer length) {
     this.length = length;
    }
    public byte[] getBody() {
     return body;
    }
    public void setBody(byte[] body) {
     this.body = body;
    }
}

 

2. encoder

public class MyEncoder extends MessageToByteEncoder<MyProtocol> {
  @Override
  protected void encode(ChannelHandlerContext ctx, MyProtocol msg, ByteBuf out) throws Exception {
    out.writeInt(msg.getLength());
    out.writeBytes(msg.getBody());
  }
}

 

3. decoder

public class MyDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int length = in.readInt(); //Get long degrees
        byte[] data = new byte[length]; //Define byte array according to long degree
        in.readBytes(data); //Read data
        MyProtocol myProtocol = new MyProtocol();
        myProtocol.setLength(length);
        myProtocol.setBody(data);
        out.add(myProtocol);
    }
}

 

4. client ClientHandler

public class ClientHandler extends SimpleChannelInboundHandler<MyProtocol> {
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception {
        System.out.println("Message received from the server:" + new String(msg.getBody(),
        CharsetUtil.UTF_8));
        System.out.println("Number of messages received from the server:" + (++count));
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; i++) {
            byte[] data = "from client a message!".getBytes(CharsetUtil.UTF_8);
            MyProtocol myProtocol = new MyProtocol();
            myProtocol.setLength(data.length);
            myProtocol.setBody(data);
            ctx.writeAndFlush(myProtocol);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
      cause.printStackTrace();
      ctx.close();
    }
}

 

5. NettyClient

public class NettyClient {
    public static void main(String[] args) throws Exception{
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            //Server startup class
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
              @Override
              protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new MyEncoder());
                ch.pipeline().addLast(new MyDecoder());
                ch.pipeline().addLast(new ClientHandler());
              }
            });
            ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync();
            future.channel().closeFuture().sync();
        } finally {
         worker.shutdownGracefully();
        }
    }
}

 

6. ServerHandler

public class ServerHandler extends SimpleChannelInboundHandler<MyProtocol> {
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception {
        System.out.println("The server received the message:" + new String(msg.getBody(),
        CharsetUtil.UTF_8));
        System.out.println("Number of messages received by the server:" + (++count));
        byte[] data = "ok".getBytes(CharsetUtil.UTF_8);
        MyProtocol myProtocol = new MyProtocol();
        myProtocol.setLength(data.length);
        myProtocol.setBody(data);
        ctx.writeAndFlush(myProtocol);
    }
}

 

7. NettyServer

public class NettyServer {
    public static void main(String[] args) throws Exception {
        //The main thread does not process any business logic, but only receives connection requests from customers
        EventLoopGroup boss = new NioEventLoopGroup(1);
        //Work as a thread, the default number of threads is: cpu*2
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            //Server startup class
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker);
            //Configure server channel
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
              @Override
              protected void initChannel(SocketChannel ch) throws Exception {
                  ch.pipeline()
                  .addLast(new MyDecoder())
                  .addLast(new MyEncoder())
                  .addLast(new ServerHandler());
              }
            }); //Processor of worker thread
            ChannelFuture future = serverBootstrap.bind(5566).sync();
            System.out.println("Server startup complete.....");
            //Wait for the server listening end mouth to close
            future.channel().closeFuture().sync();
        } finally {
            //Elegant close
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

 

8. test

 

II. Analysis of Netty core source code

2.1 analysis of server startup process

1. create a server Channel

1 ServerBootstrap Object's bind()The law is also the law
2 AbstractBootstrap Medium initAndRegister()Enter line create Channel
     establish Channel Reason for ReflectiveChannelFactory In reflection classes newChannel()Fang method completion.
3 NioServerSocketChannel In the construction Fang method of jdk nio Underlying SelectorProvider open ServerSocketChannel. 
4 stay AbstractNioChannel In the construction Fang method of, set channel For not blocking: ch.configureBlocking(false);
5 Passed AbstractChannel The construction Fang method of creates id,unsafe,pipeline Content.
6 adopt NioServerSocketChannelConfig obtain tcp Some parameters of the bottom layer

 

2. initialize the server Channel

1 AbstractBootstrap Medium initAndRegister()Go to initialization channel,code: init(channel);

2 stay ServerBootstrap Medium init()Fang method setting channelOptions as well as Attributes. 

3 Next, save the user-defined parameters and attributes to local variables currentChildOptions,currentChildAttrs,with
  For later use

4 If set serverBootstrap.handler()If yes, it will add enter to pipeline Medium.

5 Add connector ServerBootstrapAcceptor,After a new connection is added, the defined childHandler Add enter to connected
  pipeline Medium:
ch.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
      pipeline.addLast(
        new ServerBootstrapAcceptor(ch, currentChildGroup,currentChildHandler, currentChildOptions, currentChildAttrs));
    }
});
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) { 
    //Execute only when the client is connected
    final Channel child = (Channel) msg;
    //Add the defined childHandler to the connected pipeline
    child.pipeline().addLast(childHandler); 
    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);
    try {
        childGroup.register(child).addListener(new ChannelFutureListener(){
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                 forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
     forceClose(child, t);
    }
}

 

3. register the selector

//Registration
1 initAndRegister()Fang in law ChannelFuture regFuture = config().group().register(channel); 

2 stay io.netty.channel.AbstractChannel.AbstractUnsafe#Complete the actual registration in register()
    2.1 AbstractChannel.this.eventLoop = eventLoop; Enter eventLoop Assignment of, subsequent IO event
        The operation will be carried out by the eventLoop Hold.
     2.2 Adjustment register0(promise)Medium doRegister()Enter the actual registration
    
3 io.netty.channel.nio.AbstractNioChannel#doRegister has implemented the method
//Register multiplexers through the jdk bottom layer
//javaChannel() -- the channel created in the previous step
//eventLoop().unwrappedSelector() -- get selector
//The event of interest registered is 0, indicating that there is no event of interest. After that, the event will be re registered
//Register this object to the selector in the form of attachment, and then get the contents of the current object
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

 

4. binding port

1 entrance in io.netty.bootstrap.AbstractBootstrap#doBind0(), start a thread to perform line binding end port operations

2 Adjustment io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress,
  io.netty.channel.ChannelPromise)Start thread execution again
  
3 Final adjustment io.netty.channel.socket.nio.NioServerSocketChannel#doBind() method in line binding operation
//Binding through the channel at the bottom of jdk
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
      if (PlatformDependent.javaVersion() >= 7) {
         javaChannel().bind(localAddress, config.getBacklog());
      } else {
          javaChannel().socket().bind(localAddress,
          config.getBacklog());
    }
}

When to update the primary and secondary events of the selector? Finally in io netty.channel.nio.Abstractniochannel\dobeginread() method

protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
     return;
    }
    readPending = true;
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp); //set up
        Events of interest are OP_ACCEPT
    }
}
//Assignment is made in the construction method of NioServerSocketChannel
public NioServerSocketChannel(ServerSocketChannel channel) {
      super(null, channel, SelectionKey.OP_ACCEPT);
      config = new NioServerSocketChannelConfig(this,
      javaChannel().socket());
}

 

2.2 connection request process source code analysis

1. new connection access

Entrance at
io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey,
io.netty.channel.nio.AbstractNioChannel)in
    Inlet NioMessageUnsafe of read()Fang method
    
    Adjustment io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages() method, creating
    jdk Underlying channel,Packaged as NioSocketChannel Add to List In container
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
          buf.add(new NioSocketChannel(this, ch));
          return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an
        accepted socket.", t);
        try {
         ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }
    return 0;
}
establish NioSocketChannel object
new NioSocketChannel(this, ch),adopt new The Fang incremental line creation of
    Adjustment super Construction method of
        Transmission SelectionKey.OP_READ Event ID
        establish id,unsafe,pipeline object
        Set not blocking ch.configureBlocking(false);
    establish NioSocketChannelConfig object

 

2. register read events

stay io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe In:
for (int i = 0; i < size; i ++) {
    readPending = false;
    pipeline.fireChannelRead(readBuf.get(i)); //Propagate read events
}
 stay io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object) method
 In law
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
      try {
          //Execute channelRead. It should be noted that the first execution is HeadHandler, and the second execution is
          ServerBootstrapAcceptor
          //Enter the same registered selector as the new connection through ServerBootstrapAcceptor
          Logical registration and event binding
          ((ChannelInboundHandler) handler()).channelRead(this, msg);
      } catch (Throwable t) {
         invokeExceptionCaught(t);
      }
    } else {
     fireChannelRead(msg);
    }
}

 

III. using Netty optimization points

Some simple suggestions when using Netty. It's worth seeing.

3.1 zero copy

1 Bytebuf is a DirectBuffer type that enables pooling. There is no need to copy into the byte buffer. If the heap memory is used, the JVM will copy it to the heap first, and then write to the Socket. This will increase the number of copies.

  

2 CompositeByteBuf encapsulates multiple bytebufs into "bytebufs". Process copying is not required when adding bytebufs.

 

3. The transferTo method of the DefaultFileRegion of netty's "file transfer" class sends "files to the" standard "channel, which eliminates the need for" circular copying "and improves performance.

08

 

3.2 task scheduling of EventLoop

channel.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
     channel.writeAndFlush(data)
    }
});

Instead of using channel writeAndFlush(data); The task scheduling of the EventLoop is directly put into the execution queue of the EventLoop corresponding to the channel, which will cause thread switching. Note: at the bottom of writeandflush, if it is not executed through EventLoop, a new thread will be started.

3.3 reduce the adjustment of ChannelPipline

public class YourHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
    //msg. All handler s must pass through the entire channelpipeline.
    ctx.channel().writeAndFlush(msg);
    //From the current handler one to the tail of pipline, use is shorter.
    ctx.writeAndFlush(msg);
    }
}

 

3.4 reduce the creation of ChannelHandler (basically not configured)

If the channelhandler is in a stateless (that is, it does not need to save any state parameters), then use the Use Sharable annotation and create only one instances during bootstrap to reduce GC. Otherwise, the handler object will be new for each connection.

@ChannelHandler.Shareable
public class StatelessHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {}
}
public class MyInitializer extends ChannelInitializer<Channel> {
    private static final ChannelHandler INSTANCE = new StatelessHandler();
    @Override
    public void initChannel(Channel ch) {
     ch.pipeline().addLast(INSTANCE);
    }
}

be careful:

Codecs such as ByteToMessageDecoder are stateful and cannot be annotated with "Sharable".

3.5 setting of configuration parameters

Server side bossGroup Just set it to 1 because ServerSocketChannel During the initialization phase, only
 Register to a eventLoop Up, this eventLoop Only one threads are running, so it is not necessary to set to
 Multithreading. But IO Thread, in order to make full use of use CPU,At the same time, consider reducing the overhead of online and offline text switching. Generally workGroup
 Set to CPU Twice the number of cores, which is Netty The default value provided.

In the scenario with high requirements for response time, make use.childOption(ChannelOption.TCP_NODELAY, true)
and.option(ChannelOption.TCP_NODELAY, true)To ban nagle Algorithm, send without waiting.

The knowledge points related to Netty have been shared. In the future, I still share Netty related content. Mainly problems encountered in work and new insights.

--------------------------------------------------------------END-----------------------------------------------------------------------------

Introduction to official account

In my spare time, I have studied the dark horse architecture course for some time. Continue to share some dry goods, such as Netty, MySQL, cache, middleware, midrange, automated deployment, design patterns, JVM, Spring source code, and MyBatis source code. If you want to know more about the official account, please reply to [story] on the official account.

Official account benefits

Pay attention to the official account, reply to the [Architecture Interview] and obtain the interview questions of the architecture course. If you have any suggestions or want to say, please send a message directly to the official account. The little sister in charge of operation will deal with it immediately after seeing it. Please wait patiently! Crab crab!

 

 

Tags: Java Netty Network Communications

Posted by jamiller on Sat, 13 Aug 2022 02:34:55 +0930