Precautions for using custom thread pool to control socket short connection in production

Welcome to search for the technical notes of little monkeys, and pay attention to my official account. I can communicate with you in time if there are problems.


In addition to the socket platform, it also uses more hardware to connect with the socket platform, especially for the hardware. When the concurrency requirement is not very high, you can use BIO socket because it is simple enough and convenient enough. If the concurrency is high, NIO socket is recommended because it can support high concurrency.

If you use a long connection, you should pay attention to the heartbeat of the long connection, the disconnection and reconnection of the client, the definition of message format, etc.

If you can, it is recommended to use netty, a powerful framework, because its encapsulation of various protocols is still in place. If you are familiar with its API, netty is strongly recommended here (I will update my study notes later).

To get back to business, let's talk about the precautions for BIO short connection: if the short connection is relatively simple as a client, only use the process of connect - > recv / send - > close to receive and send data. When a business request comes, it can be processed. The following code examples are given (the following codes can be found in gitee): https://gitee.com/MonkeyBrothers/study/tree/master/springboot-netty/src/main/java/org/hry/socket/bio/shortlink):

public class SocketClient {
    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("localhost", 8888);
        byte[] bytes = new byte[1024];
        InputStream in = socket.getInputStream();
        StringBuffer sb = new StringBuffer();
        int len;
        while ((len = in.read(bytes)) != -1) {
            sb.append(new String(bytes, 0, len));
        }
        // After receiving the information, you can process it after you get the received information.
        System.out.println("Message received:" + sb.toString());
        // Close the input stream and tell the server that I have finished reading the data
        socket.shutdownInput();
        // If you need to return the processed data to the socket server, you can get the output stream for output.
        OutputStream out = socket.getOutputStream();
        out.write("Here is the processed data".getBytes(StandardCharsets.UTF_8));
        // Close the output stream and tell the server that I have finished writing the data
        socket.shutdownOutput();
        System.out.println("Data transmission completed");
        socket.close();
    }
}

Connect to the server. After connecting, process the data. After processing, close the socket to release resources.

BIO server: if it is a server, there are many things that need to be paid attention to. The first is that when there is a large amount of concurrency, do not use BIO socket. It is more recommended to use NIO or netty framework to help you solve business problems. First, let's take a look at a socket service similar to ask and answer BIO.

public class SocketServer {
    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(8888);
        Socket socket = server.accept();
        byte[] bytes = new byte[1024];
        InputStream in = socket.getInputStream();
        OutputStream out = socket.getOutputStream();
        out.write("Here is the data sent by the server".getBytes(StandardCharsets.UTF_8));
        // Tell the client that I have sent part of the data. If it is not called, the client cannot receive the input signal
        socket.shutdownOutput();
        StringBuffer sb = new StringBuffer();
        int len;
        while ((len = in.read(bytes)) != -1) {
            sb.append(new String(bytes, 0, len));
        }
        // After receiving the information, you can process it after you get the received information.
        System.out.println("Message received:" + sb.toString());
        socket.shutdownInput();
        server.close();
    }
}

There is a problem here: there can only be one socket client to connect. If the business is concurrent, it cannot meet the requirements. Therefore, it is necessary to start a thread for each socket. But this is very resource consuming. Therefore, if the concurrency is not particularly large, it is still possible. Again, don't use BIO for more than 100 concurrency! Remember!!!

Next, simulate low concurrency socket requests and see how the client and server handle them. First, the client uses the while loop. Here "timeunit. Sessions. Sleep (1)" sleeps for 1 second, that is, a request comes every second (if you want to request 100 per second, you can use CountDownLatch to control it. There are examples at the end of the article).

public class SocketClient {
    public static void main(String[] args) throws Exception {
        while (true) {
            TimeUnit.SECONDS.sleep(1);
            Socket socket = new Socket("localhost", 8888);
            byte[] bytes = new byte[1024];
            InputStream in = socket.getInputStream();
            StringBuffer sb = new StringBuffer();
            int len;
            while ((len = in.read(bytes)) != -1) {
                sb.append(new String(bytes, 0, len));
            }
            // After receiving the information, you can process it after you get the received information.
            System.out.println("Message received:" + sb.toString());
            // Close the input stream and tell the server that I have finished reading the data
            socket.shutdownInput();
            // If you need to return the processed data to the socket server, you can get the output stream for output.
            OutputStream out = socket.getOutputStream();
            out.write("Here is the processed data".getBytes(StandardCharsets.UTF_8));
            // Close the output stream and tell the server that I have finished writing the data
            socket.shutdownOutput();
            System.out.println("Data transmission completed");
            socket.close();
        }
    }
}

The corresponding server service needs to use thread pools. java provides some thread pools by default.


Although I used the thread pool provided by JDK when I first wrote socket, I found some potential problems after in-depth study of thread pool. Therefore, it is recommended to use a custom thread pool, so that you can control the parameters and customize the name of the thread pool.

public class SocketServer {
    public static void main(String[] args) throws IOException {
        Executor executor = Executors.newCachedThreadPool();
        ServerSocket server = new ServerSocket(8888);
        while(true) {
            Socket socket = server.accept();
            executor.execute(() ->{
                try(InputStream in = socket.getInputStream();OutputStream out = socket.getOutputStream();){
                    byte[] bytes = new byte[1024];
                    out.write("Here is the data sent by the server".getBytes(StandardCharsets.UTF_8));
                    // Tell the client that I have sent part of the data. If it is not called, the client cannot receive the end signal
                    socket.shutdownOutput();
                    StringBuffer sb = new StringBuffer();
                    int len;
                    while ((len = in.read(bytes)) != -1) {
                        sb.append(new String(bytes, 0, len));
                    }
                    // After receiving the information, you can process it after you get the received information.
                    System.out.println("Message received:" + sb.toString());
                    socket.shutdownInput();
                    socket.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

The customized thread pool is used in my production. The specific parameters can be set according to my own business.

public class SocketServer {
    public static void main(String[] args) throws IOException {
        // Custom thread pool
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                10,
                10,
                5L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10),
                new MySocketThreadFactory("custom socket Thread pool"),
                new ThreadPoolExecutor.DiscardPolicy());
        ServerSocket server = new ServerSocket(8888);
        while(true) {
            Socket socket = server.accept();
            executor.execute(() ->{
                try(InputStream in = socket.getInputStream();OutputStream out = socket.getOutputStream();){
                    byte[] bytes = new byte[1024];
                    out.write("Here is the data sent by the server".getBytes(StandardCharsets.UTF_8));
                    // Tell the client that I have sent part of the data. If it is not called, the client cannot receive the end signal
                    socket.shutdownOutput();
                    StringBuffer sb = new StringBuffer();
                    int len;
                    while ((len = in.read(bytes)) != -1) {
                        sb.append(new String(bytes, 0, len));
                    }
                    // After receiving the information, you can process it after you get the received information.
                    System.out.println("Message received:" + sb.toString());
                    socket.shutdownInput();
                    socket.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

Custom thread factory production is available.

public class MySocketThreadFactory implements ThreadFactory {
    /**
     * Thread group
     */
    private final ThreadGroup threadGroup;
    /**
     * The initial value of the number of threads is only shown here
     */
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    /**
     * Thread name prefix
     */
    public final String namePrefix;

    public MySocketThreadFactory(String name) {
        SecurityManager s = System.getSecurityManager();
        threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        if (null == name || "".equals(name.trim())) name = "pool";
        // The initial value of the number of thread pools is only shown here
        AtomicInteger poolNumber = new AtomicInteger(1);
        namePrefix = name + "-" + poolNumber.getAndIncrement() + "-socket-thread-";
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(threadGroup, r, namePrefix + threadNumber.getAndIncrement(), 0);
        if (t.isDaemon()) t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

The above BIO sockets can be used in production and can be used directly when the concurrency is not high. Once again, do not use thread sockets for high concurrency!

Here is the concurrent test code:

public class SocketClient {
    public static void main(String[] args) throws Exception {
        CountDownLatch downLatch = new CountDownLatch(1);
        IntStream.range(0,25).forEach(v -> new Thread(() ->{
            try {
                downLatch.await();
                Socket socket = new Socket("localhost", 8888);
                byte[] bytes = new byte[1024];
                InputStream in = socket.getInputStream();
                StringBuffer sb = new StringBuffer();
                int len;
                while ((len = in.read(bytes)) != -1) {
                    sb.append(new String(bytes, 0, len));
                }
                // After receiving the information, you can process it after you get the received information.
                System.out.println("Message received:" + sb.toString());
                // Close the input stream and tell the server that I have finished reading the data
                socket.shutdownInput();
                // If you need to return the processed data to the socket server, you can get the output stream for output.
                OutputStream out = socket.getOutputStream();
                out.write("Here is the processed data".getBytes(StandardCharsets.UTF_8));
                // Close the output stream and tell the server that I have finished writing the data
                socket.shutdownOutput();
                System.out.println("Data transmission completed");
                socket.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start());
        downLatch.countDown();
    }
}

Concurrency doesn't work. At this time, you need to adjust the setting of thread pool. It can be seen that the medium concurrent use of BIO socket is very dependent on thread pool.

The use of socket blocking short connection is over. It should be noted that the short connection is blocking, so if there is a problem with one service, it will block the rest of the service. Therefore, thread pool is opened up to deal with this problem, but it is not suitable for high concurrency scenarios. In high concurrency scenarios, NIO socket or netty framework is recommended to help complete the business!

Finally, you are welcome to add my wechat. If you have any questions, you can communicate with me in time.

Posted by born4flirt on Sat, 16 Apr 2022 15:08:32 +0930