Netty - Network programming (non-blocking understanding and code samples)

1. Overview of non-blocking

  • In non-blocking mode, the relevant methods will not allow the thread to pause.
    (1) On ServerSocketChannel.accept returns null when no connection is established and continues to run;
    (2) SocketChannel.read returns 0 when there is no data to read, but the thread does not have to be blocked, either to read from another SocketChannel or to perform ServerSocketChannel.accept;
    (3) When writing data, the thread only waits for the data to be written to Channel, instead of waiting for Channel to send the data over the network;
  • In non-blocking mode, threads continue to run without connection establishment and readable data, wasting cpu in vain.
  • Threads are actually blocked during data replication (where AIO improves).

2. Examples of non-blocking mode service-side code (implemented using nio)

  • Service-side code

    package com.example.nettytest.nio.day3;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.ArrayList;
    import java.util.Iterator;
    
    import static com.example.nettytest.nio.day1.ByteBufferUtil.debugRead;
    /**
     * @description: Service-side code example in non-blocking mode (using nio implementation)
     * @author: xz
     * @create: 2022-08-16 21:21
     */
    @Slf4j
    public class TestServer {
        public static void main(String[] args) throws IOException {
            nioNoBlockServer();
        }
        
        /**
         * Using nio to understand non-blocking mode (single-threaded server)
         * */
        private static void nioNoBlockServer() throws IOException {
            //1. Create a ByteBuffer with a capacity of 16
            ByteBuffer byteBuffer = ByteBuffer.allocate(16);
            //2. Create Server
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);//ssc set to non-blocking mode
            //3. Bind listening ports
            ssc.bind(new InetSocketAddress(8080));
            //4. Connection Sets
            ArrayList<SocketChannel> channels = new ArrayList<>();
            while(true){
                //5. accept() establishes a connection with the client, and SocketChannel is used to communicate with the client
                SocketChannel sc = ssc.accept();//Start the server, non-blocking method, and the thread will continue to run. If no connection is established sc=null
                if(sc !=null){
                    log.info("create connected SocketChannel... {}", sc);
                    sc.configureBlocking(false);//sc set to non-blocking mode
                    //6. The established client connection sc is added to the connection collection channels
                    channels.add(sc);
                }
                //7. Traversing through a set of connections
                for(SocketChannel channel : channels){
                    // 8. Receive data sent by client, read data from channel and write to byteBuffer
                    int read = channel.read(byteBuffer);// Start client, non-blocking method, thread will continue running, read returns 0
                    if(read>0){
                        //Switch Reading Mode
                        byteBuffer.flip();
                        //Print readable content (read data from byteBuffer)
                        debugRead(byteBuffer);
                        //Switch Writeback Mode
                        byteBuffer.clear();
                        log.info("after read channel ... {}", channel);
                    }
                }
            }
        }
    }
    

3. Client code samples in non-blocking mode (using nio implementation)

  • Client Code

    package com.example.nettytest.nio.day3;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.SocketAddress;
    import java.nio.channels.SocketChannel;
    /**
     * @description:
     * @author: xz
     * @create: 2022-08-16 21:45
     */
    public class TestClient {
        public static void main(String[] args) throws IOException {
            SocketChannel sc = SocketChannel.open();
            sc.connect(new InetSocketAddress("localhost", 8080));
            SocketAddress address = sc.getLocalAddress();
            //After debug mode, click on the src parameter and right-click the input expression sc.write(Charset.defaultCharset().encode("hello"); Then execute
            System.out.println("waiting...");
        }
    }
    

4. Examples of tool class codes

  • Tool class, printing input and output data using

    package com.example.nettytest.nio.day1;
    
    import io.netty.util.internal.StringUtil;
    
    import java.nio.ByteBuffer;
    
    import static io.netty.util.internal.MathUtil.isOutOfBounds;
    import static io.netty.util.internal.StringUtil.NEWLINE;
    
    public class ByteBufferUtil {
        private static final char[] BYTE2CHAR = new char[256];
        private static final char[] HEXDUMP_TABLE = new char[256 * 4];
        private static final String[] HEXPADDING = new String[16];
        private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
        private static final String[] BYTE2HEX = new String[256];
        private static final String[] BYTEPADDING = new String[16];
    
        static {
            final char[] DIGITS = "0123456789abcdef".toCharArray();
            for (int i = 0; i < 256; i++) {
                HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
                HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
            }
    
            int i;
    
            // Generate the lookup table for hex dump paddings
            for (i = 0; i < HEXPADDING.length; i++) {
                int padding = HEXPADDING.length - i;
                StringBuilder buf = new StringBuilder(padding * 3);
                for (int j = 0; j < padding; j++) {
                    buf.append("   ");
                }
                HEXPADDING[i] = buf.toString();
            }
    
            // Generate the lookup table for the start-offset header in each row (up to 64KiB).
            for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
                StringBuilder buf = new StringBuilder(12);
                buf.append(NEWLINE);
                buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
                buf.setCharAt(buf.length() - 9, '|');
                buf.append('|');
                HEXDUMP_ROWPREFIXES[i] = buf.toString();
            }
    
            // Generate the lookup table for byte-to-hex-dump conversion
            for (i = 0; i < BYTE2HEX.length; i++) {
                BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
            }
    
            // Generate the lookup table for byte dump paddings
            for (i = 0; i < BYTEPADDING.length; i++) {
                int padding = BYTEPADDING.length - i;
                StringBuilder buf = new StringBuilder(padding);
                for (int j = 0; j < padding; j++) {
                    buf.append(' ');
                }
                BYTEPADDING[i] = buf.toString();
            }
    
            // Generate the lookup table for byte-to-char conversion
            for (i = 0; i < BYTE2CHAR.length; i++) {
                if (i <= 0x1f || i >= 0x7f) {
                    BYTE2CHAR[i] = '.';
                } else {
                    BYTE2CHAR[i] = (char) i;
                }
            }
        }
    
        /**
         * Print everything
         * @param buffer
         */
        public static void debugAll(ByteBuffer buffer) {
            int oldlimit = buffer.limit();
            buffer.limit(buffer.capacity());
            StringBuilder origin = new StringBuilder(256);
            appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
            System.out.println("+--------+-------------------- all ------------------------+----------------+");
            System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
            System.out.println(origin);
            buffer.limit(oldlimit);
        }
    
        /**
         * Print Readable Content
         * @param buffer
         */
        public static void debugRead(ByteBuffer buffer) {
            StringBuilder builder = new StringBuilder(256);
            appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
            System.out.println("+--------+-------------------- read -----------------------+----------------+");
            System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
            System.out.println(builder);
        }
    
        public static void main(String[] args) {
            ByteBuffer buffer = ByteBuffer.allocate(10);
            buffer.put(new byte[]{97, 98, 99, 100});
            debugAll(buffer);
        }
    
        private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
            if (isOutOfBounds(offset, length, buf.capacity())) {
                throw new IndexOutOfBoundsException(
                        "expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
                                + ") <= " + "buf.capacity(" + buf.capacity() + ')');
            }
            if (length == 0) {
                return;
            }
            dump.append(
                    "         +-------------------------------------------------+" +
                            NEWLINE + "         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |" +
                            NEWLINE + "+--------+-------------------------------------------------+----------------+");
    
            final int startIndex = offset;
            final int fullRows = length >>> 4;
            final int remainder = length & 0xF;
    
            // Dump the rows which have 16 bytes.
            for (int row = 0; row < fullRows; row++) {
                int rowStartIndex = (row << 4) + startIndex;
    
                // Per-row prefix.
                appendHexDumpRowPrefix(dump, row, rowStartIndex);
    
                // Hex dump
                int rowEndIndex = rowStartIndex + 16;
                for (int j = rowStartIndex; j < rowEndIndex; j++) {
                    dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
                }
                dump.append(" |");
    
                // ASCII dump
                for (int j = rowStartIndex; j < rowEndIndex; j++) {
                    dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
                }
                dump.append('|');
            }
    
            // Dump the last row which has less than 16 bytes.
            if (remainder != 0) {
                int rowStartIndex = (fullRows << 4) + startIndex;
                appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);
    
                // Hex dump
                int rowEndIndex = rowStartIndex + remainder;
                for (int j = rowStartIndex; j < rowEndIndex; j++) {
                    dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
                }
                dump.append(HEXPADDING[remainder]);
                dump.append(" |");
    
                // Ascii dump
                for (int j = rowStartIndex; j < rowEndIndex; j++) {
                    dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
                }
                dump.append(BYTEPADDING[remainder]);
                dump.append('|');
            }
    
            dump.append(NEWLINE +
                    "+--------+-------------------------------------------------+----------------+");
        }
    
        private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
            if (row < HEXDUMP_ROWPREFIXES.length) {
                dump.append(HEXDUMP_ROWPREFIXES[row]);
            } else {
                dump.append(NEWLINE);
                dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
                dump.setCharAt(dump.length() - 9, '|');
                dump.append('|');
            }
        }
    
        public static short getUnsignedByte(ByteBuffer buffer, int index) {
            return (short) (buffer.get(index) & 0xFF);
        }
    }
    

5. Local debugging of non-blocking mode code samples

  • Start the server first, then start two clients. In the server console, you can see that the server receives connections from two clients. The accept method is non-blocking and the thread continues to run. As shown in the following figure:

  • Client 1 right-clicks the SC parameter, clicks Evaluate Expression..., and the pop-up window enters sc.write(Charset.defaultCharset().encode("111111"); Represents Client 1 sending data. As shown in the following figure:


  • Look at the server side, you have received 11111 data sent by Client 1, the server side program has finished executing, but the thread will continue to run, waiting for the client to send data. As shown in the following figure:

  • Client 2 right-clicks the SC parameter, clicks on Evaluate Expression..., and the pop-up window enters sc.write(Charset.defaultCharset().encode("222222"); Represents that Client 2 sends data. As shown in the following figure:

  • At this time, look at the server again, you have received 22222 data sent by Client 2, the server program has been executed to completion, but the thread will continue to run, waiting for the client to send data. As shown in the following figure:

Tags: Netty

Posted by sarabjit on Wed, 17 Aug 2022 03:09:46 +0930