Thread pool for concurrent programming

1, Basic concepts of thread pool

From the following construction method of thread pool source code, you can have a general understanding of the basic concept of thread pool:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

This is the most complete parameter when creating a thread. Explain the above parameters:

corePoolSize: number of core threads, the number of threads allowed to exist for a long time in the thread pool;

maxPoolSize: maximum number of threads, the maximum number of threads allowed in the thread pool;

keepAliveTime: the maximum time allowed for threads that exceed the number of core threads to survive;

Unit: the maximum time unit allowed for threads that exceed the number of core threads to survive;

workQueue: a waiting queue that stores threads that are temporarily unable to execute, that is, threads that exceed the number of core threads;

handler: reject policy. When the waiting queue is full and the thread with the maximum number of threads is running in the thread pool, how to handle it if there are more threads.

The following is an illustration of the whole process after a thread comes in:

II. Common queues of thread pool

LinkedBlockingQueue

Ordinary blocking queue. The maximum length of the queue is integer MAX;

DelayQueue

A blocking queue that can be sorted according to time, but it should be noted that the classes of the elements in the queue need to implement the Delayed interface, and the compareTo method needs to be implemented to change the interface. The reason is that the shorter the waiting time in the DelayQueue, the earlier it runs; DelayQueue is realized through PriorityQueue, and the bottom layer of PriorityQueue is realized through tree structure;

SynchronousQueue

A blocking queue with a length of 0. This queue will not put any information inside. It is only used to schedule threads. When a thread reads the information of the SynchronousQueue queue, because its own capacity is 0, the thread will wait. At this time, when another thread "stores" information into the queue, the thread will directly get the information for processing. It plays the role of task scheduling. It should be noted that the capacity of Synchronous is 0, so you can only execute the put() method and cannot call the add() method. Otherwise, the following exception will be thrown

Exception in thread "main" java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98)
	at pool.D_SynchronousQueue.main(D_SynchronousQueue.java:12)

TransferQueue

It is also an information transmission queue, but the difference between it and synchronous queue is that it has capacity. It should be noted that its particularity will be enabled only when its transfer() method is executed. After a thread executes the transfer() method, the thread will not break the link with the TransferQueue. It will wait for another thread to process the information it stores before breaking the link.

3, Reject strategy

The reject policy refers to how the thread pool handles when the waiting queue is full and the number of threads reaches the maximum number of threads before adding a task. jdk itself has four rejection strategies:

AbortPolicy: throw an exception;

Discard policy: directly discard the task;

Discard old policy: discard the earliest task in the waiting queue;

CallerRunsPolicy: the thread calling the thread pool executes the task.

In addition to the above jdk built-in rejection policy, we can also customize the rejection policy. In actual development, most cases use the custom rejection policy. The following is a general description of how to customize the rejection policy:

public class D_RejectHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // The first step is to print the task information to be executed in order to let us know what the task of "reject" is
        // The second step is to compensate for the tasks to be 'rejected'. You can make a loop attempt to judge whether the thread pool can be executed during the loop
        // Step 3: if the task r cannot be executed in step 2, put r into the cache or message queue for compensation
    }
}

4, jdk's own thread pool

SingleThreadPool

Single thread pool: the number of core threads and the maximum number of threads are 1. The waiting queue is an unbounded queue with a survival time of 0s. The creation method is as follows:

    // Create SingleThreadPool
    ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
    // Thread pool execution thread
    singleThreadPool.execute(()-> System.out.println("SingleThreadPool")); 
    // Create source code
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    // ThreadFactory can also be passed in when creating a single thread pool. The thread name created by the default ThreadFactory is unfriendly
    // We can customize ThreadFactory to handle thread names in a friendly way
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

When using a single thread pool, the waiting queue is LinkedBlockingQueue, and its maximum value is integer Max, if there are too many threads, it may cause OOM.

CachedThreadPool

CachedThreadPool, the number of core threads is 0, and the maximum number of threads is integer Max, the thread survival time is 60s, and the waiting queue is synchronous queue. As described in the common queue above, the capacity of this queue is 0, which is equivalent to a pipeline. Through this queue, we can ensure the sequential execution of threads. See the following code:

    // Create CachedThreadPool
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    // Add thread to thread pool
    cachedThreadPool.execute(()-> System.out.println("CachedThreadPool"));
    // Create source code
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    // Create the source code and specify CachedThredPool
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }    

This thread pool is not recommended because the maximum number of threads is integer Max, if there are too many threads, the cpu is basically switching threads, and the program will get stuck.

FixedThreadPool

The number of fixed threads is the same as the maximum number of threads. Therefore, the survival time is 0s and the waiting queue is LinkedBlockingQueue.

    // Create FixedThreadPool
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
    // Thread pool add thread
    fixedThreadPool.execute(()-> System.out.println("FixedThreadPool"));
    // Create the source code and specify the number of core threads / maximum number of threads
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    // Create the source code and specify the number of core threads / maximum threads and ThreadFactory
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

Because the task queue is the same as SingleThreadPool, when there are too many threads, it will also cause OOM.

ScheduledThreadPool

Timed task thread pool. The number of core threads is specified when creating. The maximum number of threads is integer Max, the survival time is 0, which means that as long as the thread is idle, it will die. The waiting queue uses DelayedWorkQueue. The thread pool can specify scheduled tasks.

    // Create ScheduledThreadPool
    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
    // Perform scheduled tasks
    scheduledThreadPool.scheduleAtFixedRate(
                ()-> System.out.println("FixedThreadPool"), 0, 500, TimeUnit.SECONDS);
    
    // Create the source code and specify the number of core threads
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    // Create the source code and specify the number of core threads and threadFactory
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

When using the built-in thread pool of the above jdk, if ThreadFactory is not specified, the default ThreadFactory will be used, which is:

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            // Thread name prefix
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

When using jdk's own thread pool, you cannot specify a rejection policy. The default rejection policy, AbortPolicy, is used.

5, Part of the thread pool source code

Here is a brief description of some of the source code of ThreadPoolExecutor:

    // A number of int type. The size of int is 8 bytes and 32 bits. The first three bits represent the status of thread pool and the last 29 bits represent the number of threads
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // Gets the number of bits representing the number of threads in the thread pool
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // Calculate the maximum number of threads allowed in the thread pool
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // Five states of thread pool, 1 Running: running status; 2.shutdown: the shutdown() method is called;
    // 3.stop: called the shutdown now() method; 4.tidying: sorting stage; 5.terminated: end of thread pool operation
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Get thread pool status
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // Gets the number of working threads in the thread pool
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // ctl is generated according to the status of thread pool and the number of working threads
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    // Is thread pool status c less than s
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    // Whether thread pool status c is greater than or equal to s
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

execute method:

    public void execute(Runnable command) {
        // If the added task is empty, an exception will be thrown directly
        if (command == null)
            throw new NullPointerException();
        // Get current thread status
        int c = ctl.get();
        // If the number of worker threads in the thread pool is less than the number of core threads, a core thread is created
        if (workerCountOf(c) < corePoolSize) {
            // addWorker method. true represents the creation of core threads, and false represents non core threads
            if (addWorker(command, true))
                return;
            // Get the thread pool state again to prevent the thread pool state from being changed during thread creation
            c = ctl.get();
        }
        // If the thread pool status is running and the task is added to the queue successfully
        if (isRunning(c) && workQueue.offer(command)) {
            // Get thread pool status again
            int recheck = ctl.get();
            // If the current thread pool state is not running and the task is successfully removed from the waiting queue, the reject policy is executed
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // Failed to create a non core thread, execute the reject policy
        else if (!addWorker(command, false))
            reject(command);
    }   }

6, Two special thread pools

WorkStealingPool

In the thread pools we mentioned above, all threads take tasks from a waiting queue. Each thread in worksealingpool has a corresponding waiting queue. When the thread execution ends, it will take tasks from its own queue. If there are no tasks in the queue corresponding to the thread, it will take tasks from the queues of other threads. From the source code of creating worksealingpool, we can see that it is actually a ForkJoinPool, as follows

    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

Its general work flow chart is as follows:

ForkJoinPool

This thread pool is suitable for scenarios where a task is large and can be split. Its working principle is shown in the figure below:

The following is an introduction to the thread pool through a piece of code:

 

    static int[] nums = new int[100000];
    static int counts = 30000;
    static Random random = new Random();
    static {
        for(int i = 0; i < nums.length; i++){
            nums[i] = random.nextInt(100);
        }
        System.out.println("Add by common method"+Arrays.stream(nums).sum());
    }

    /**
     * ForkJoinPool The tasks inside must implement recursive task (recursive action)
     * RecursiveTask There is a return value
     * RecursiveAction No return value
     */
    static class MyTask extends RecursiveTask<Integer>{
        int start, end;
        MyTask(int start, int end){
            this.start = start;
            this.end = end;
        }
        /**
         * Specific implementation of task splitting
         * @return
         */
        @Override
        protected Integer compute() {
            // If the total quantity is less than the limit of task splitting, it will be calculated directly
            if (end - start <= counts){
                int result = 0;
                for (int i = start; i < end; i++){
                    result += nums[i];
                }
                System.out.println("start:"+start+",end:"+end+",result:"+result);
                return result;
            }else {
                // If the total quantity is greater than the splitting limit, the task is split recursively
                int middle = start + (end - start)/2;
                MyTask taskOne = new MyTask(start, middle);
                MyTask taskTwo = new MyTask(middle, end);
                taskOne.fork();
                taskTwo.fork();
                return taskOne.join() + taskTwo.join();
            }
        }
    }
    public static void main(String[] args) {
        // Create ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // Create execution task
        MyTask myTask = new MyTask(0, nums.length);
        // Hand over the task to the thread pool
        forkJoinPool.execute(myTask);
        long result = myTask.join();
        System.out.println("adopt forkJoinPool Calculation results:"+result);
    }

Let's take another look at the implementation results:

Add by common method 4969283
start:50000,end:75000,result:1246136
start:0,end:25000,result:1244464
start:75000,end:100000,result:1244184
start:25000,end:50000,result:1234499
 adopt forkJoinPool Calculation results:4969283

Let's analyze the execution results. According to the output inside, we can clearly find that the total calculation task of a data is divided into four parts for execution, namely [0,25000], [25000,50000], [50000,75000], [75000,100000). This splitting rule is processed through the value we set and the task splitting rule.

7, Supplementary knowledge

Executor: it is a task executor that separates thread creation and thread running. There is only one execute method in it, and the input parameter is Runnable. The source code is as follows:

public interface Executor {
    // Used to perform tasks
    void execute(Runnable command);
}

ExecutorService: it inherits from Executor and improves the life cycle of task Executor, and further improves the task Executor. The thread pool we call is implemented based on ExecutorService. Take a look at the source code of this interface:

public interface ExecutorService extends Executor {

    // Close the thread pool and no longer receive new tasks.
    // And will not wait for the task that has been submitted to complete.
    void shutdown();

    // Try to stop all running and waiting threads and return the collection of waiting threads
    List<Runnable> shutdownNow();

    // Gets whether the current thread has been closed
    boolean isShutdown();

    // Returns true if all tasks are completed after shutdown
    // Note that the premise is to execute shutdown / shutdown now first
    boolean isTerminated();

    // Until all tasks are completed after receiving the shutdown request, or after the specified time, or before the current thread is interrupted
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

   // Submit a task with a return value and process it asynchronously. After it is handed over to the thread pool, it will not wait for the result,
    // You can call the get() method to block and wait, and return the execution result
    <T> Future<T> submit(Callable<T> task);

    // Submit a runnable task asynchronously. The general process is the same as above
    <T> Future<T> submit(Runnable task, T result);

    // Submit a runnable task asynchronously. The general process is the same as above, but calling the get() method returns null
    Future<?> submit(Runnable task);

}

Completablefuture is a more advanced management class. It can manage both tasks and the running results of tasks. You can have a general understanding of it through the following code:

        // Create a completable future and leave the tasks to be performed to it for management
        CompletableFuture<Integer> first = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(500);
            }catch (Exception e){
                e.printStackTrace();
            }
           return 1;
        });
        CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            }catch (Exception e){
                e.printStackTrace();
            }
            return 2;
        });
        CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1500);
            }catch (Exception e){
                e.printStackTrace();
            }
            return 3;
        });
        long start = System.currentTimeMillis();
        // The above three asynchronous tasks are blocked until any task is completed
        System.out.println(CompletableFuture.anyOf(first, second, third).get());
        System.out.println("any Time consuming:"+(System.currentTimeMillis() - start));
        // The above three asynchronous tasks need to be blocked until the execution of all tasks is completed
        CompletableFuture.allOf(first, second, third).get();
        System.out.println("all Time consuming:"+(System.currentTimeMillis() - start));
            System.out.println("first:"+first.get()+"second:"+second.get()+"third:"+third.get());
        System.out.println("Output time:"+(System.currentTimeMillis() - start));

Take a look at the output:

1
any Time: 519
all Time: 1501
first:1second:2second:2
 Output time: 1501

Tags: Java Multithreading

Posted by Chas267 on Fri, 15 Apr 2022 17:00:28 +0930