1, Thread and process
1. What is a process / thread?
Process: a process is a running activity of a program with certain independent functions about a certain data set. It is the basic unit of operating system dynamic execution. In traditional operating system, process is not only the basic allocation unit, but also the basic execution unit.
Thread: usually a process can contain several threads. Of course, there is at least one thread in a process. Otherwise, there is no meaning of existence. Threads can make use of the resources owned by processes. In the operating system that introduces threads, processes are usually regarded as the basic unit of allocating resources, while threads are regarded as the basic unit of independent operation and scheduling. Because threads are smaller than processes and basically do not have system resources, the cost of scheduling them will be much smaller, It can improve the degree of concurrent execution among multiple programs more efficiently.
Generally speaking:
- Process: a process is a program running in the background, which is related to the operating system.
- Thread: a process consists of multiple threads.
2. The difference between process and thread
-
Thread is the smallest unit of program execution, while process is the smallest unit of resources allocated by operating system;
-
A process consists of one or more threads. Threads are different execution routes of code in a process
-
Processes are independent of each other, but threads in the same process share program memory space (including code segments, data sets, heaps, etc.) and some process level resources (such as open files and signals, etc.), and threads in one process are not visible in other processes;
-
Scheduling and switching: thread context switching is much faster than process context switching
3. Thread state
Thread.State enumeration class:
public enum State { /** * The thread state of the thread that has not been started (just new, start method not called). */ NEW, /** * The thread state of the runnable thread. The runnable thread is executing in the Java virtual machine, but it may be waiting for other resources from the operating system, such as the processor. * (Call the start method, waiting for the operating system to schedule) */ RUNNABLE, /** * The thread state of the thread that was blocked while waiting for the monitor to lock. * * The blocked thread is waiting for the monitor lock to enter the synchronization block / method, or reentering the synchronization block / method after calling Object.wait(). * * The blocked state of a thread cannot be completed by entering the synchronization method / code block. This is because the lock associated with the synchronization method / code block cannot be acquired. */ BLOCKED, /** * The thread state of the waiting thread * * The thread is waiting because one of the following methods is called * Object.wait() * Thread.join() * LockSupport.park() * * The thread in the waiting state is waiting for another thread to perform a specific operation * * The thread in the waiting state waits for wake-up (notify or notifyAll) before it has a chance to obtain the cpu time fragment to continue execution. */ WAITING, /** * The thread state of the waiting thread with the specified waiting time. * Thread.sleep * Object.wait(long) * Thread.join(long) * LockSupport.parkNanos * LockSupport.parkUntil */ TIMED_WAITING, /** * The thread has finished executing */ TERMINATED; }
- The relationship between waiting and blocked was as follows
The waiting queue is associated with the waiting state, and the synchronized queue is associated with the blocked state. When a thread migrates from the waiting queue to the synchronized queue, the thread state will be changed from waiting to blocked. It can be said that the blocked state is the only way for the thread in the waiting state to rejuvenate.
4. The difference between wait() / sleep()?
The wait/sleep function is to pause the current thread. What's the difference?
- Let go to sleep, let go of the lock
- Hold your hand tightly and go to sleep. When you wake up, you still have a lock in your hand
5. What is concurrency? What is parallel?
-
concurrent: multiple threads are accessing the same resource at the same time, and multiple threads are accessing the same point. Example: Xiaomi 9 at 10 a.m. this morning, limited purchase, Spring Festival tickets, e-commerce second kill
-
parallel: multiple tasks are performed together and then aggregated. Example: soak instant noodles, boil water in an electric kettle, tear seasoning and pour it into a bucket
6. The difference between notify and notifyAll
- notify(): only one waiting thread will be awakened, and it cannot guarantee which thread will be awakened, depending on the thread scheduler.
- notifyAll(): all threads waiting for the lock will be awakened, and all awakened threads will compete for the lock, so wait() should be placed in the loop.
2, Lock
- Low coupling and high cohesion: thread, operation (call method exposed by resource class), resource class
- First judge (while judge), then work (execute business), and finally notify (notifyAll())
- Prevent false wake-up
1. Object lock and class lock
(1) Object lock
In the synchronization code block of object lock, only one thread can execute these code blocks in a single instance, and the execution of each instance does not affect each other.
- Locks are not static
private Object lock = new Object(); public void test(){ synchronized (lock){ System.out.println("do some thing"); } }
- synchronized modifies non static methods
public synchronized void test(){ System.out.println("do some thing"); }
- Using this object
public void test(){ synchronized (this){ System.out.println("do some thing"); } }
(2) Class lock
Class information exists in the JVM method area, and the entire JVM has only one copy. The method area is shared by all threads, so class lock is shared by all threads.
- The lock is static
private static Object lock = new Object(); public void test(){ synchronized (lock){ System.out.println("do some thing"); } }
- synchronized modifies static methods
public static synchronized void test(){ System.out.println("do some thing"); }
- Use the xxx.class class class as a lock
class Test{ public void test(){ synchronized (Test.class) { System.out.println("do some thing"); } } }
2. ReentrantLock
class Counter { private Integer number = 0; //Create a ReentrantLock Lock lock = new ReentrantLock(); //Multiple conditions can be defined to wake up the thread waiting in the specified condition (see example 3 for details) Condition condition = lock.newCondition(); public void increment() throws InterruptedException { //Get lock lock.lock(); //After obtaining the lock, try should be followed closely to prevent the deadlock caused by the error of the code in the middle try { //To judge the execution condition of a thread, use while instead of if while (number != 0) { condition.await(); } //Execute business logic number++; System.out.println(Thread.currentThread().getName() + ": " + number); //Wake up other threads condition.signalAll(); } catch (Exception e) { e.printStackTrace(); }finally { //To unlock, write on the first line of finally lock.unlock(); } } public synchronized void descrement() throws InterruptedException { lock.lock(); try { while (number == 0) { condition.await(); } number--; System.out.println(Thread.currentThread().getName() + ": " + number); condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
3. ReadWriteLock
Read write lock is actually a pair of locks, a read lock (shared lock) and a write lock (mutex, exclusive lock).
- Read + read: it is equivalent to no lock. The lock will be recorded in redis, and it will be locked successfully
- Read + Write: read lock, write wait
- Write + read: write lock, read need to wait
- Write + Write: blocking mode
Example: customize a cache class to simulate reading and writing
public class ReadWriteLockDemo { public static void main(String[] args) { MyCache myCache = new MyCache(); for (int i = 0; i < 20; i++) { final int z = i; new Thread(() -> { myCache.put(String.valueOf(z), String.valueOf(z)); myCache.get(String.valueOf(z)); }, String.valueOf(i)).start(); } } } class MyCache { private Map<String, Object> cache = new HashMap<>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public void put(String key, Object val) { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "Start writing"); cache.put(key, val); System.out.println(Thread.currentThread().getName() + "Write complete:" + key); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } } public void get(String key) { readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + "Start reading"); cache.get(key); System.out.println(Thread.currentThread().getName() + "Read complete:" + key); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } } }
3, Thread safety class
1. The ArrayList is not secure
When adding data to ArraysList concurrently, Java. Util. Concurrent modificationexception will be reported
terms of settlement:
- The essence of using Vector is to add synchronized to the modified method
- Using Collections.synchronizedList() to convert an unsafe List into a safe List is essentially synchronized on the method
- Using CopyOnWriteArrayList, the bottom layer uses ReentrantLock
CopyOnWriteArrayList copy on write
Source code of CopyOnWriteArrayList when adding elements:
public boolean add(E e) { final ReentrantLock lock = this.lock; // To get the lock, only one thread can write data lock.lock(); try { Object[] elements = getArray(); int len = elements.length; // Make a copy of the original data. When writing data, it does not affect other threads to read data from the list Object[] newElements = Arrays.copyOf(elements, len + 1); // Insert new data into the new array newElements[len] = e; // Replace the old array with the new one setArray(newElements); return true; } finally { lock.unlock(); } }
2. HashSet is not secure
- The Collections.synchronizedSet() method turns an unsafe Set into a safe Set.
- Using CopyOnWriteArraySet, the bottom layer uses CopyOnWriteArrayList
3. HashMap is not secure
- The Collections.synchronizedMap() method turns an unsafe Map into a safe one.
- Using concurrent HashMap
4, Callable interface
Callable is located under the java.util.concurrent package. It is also an interface, in which the call() method is declared. However, this method is called a generic interface, and the type returned by the call() function is the V type passed in. Future is to cancel the execution result of a specific Runnable or callable task, query whether it is completed, and obtain the result. To get the returned result, you can call get(), which blocks until the task returns the result. Because future is just an interface, it can't be directly used to create objects, so there is FutureTask. The FutureTask class implements the RunnableFuture interface. RunnableFuture inherits the Runnable interface and the future interface, so it can be executed by thread as Runnable, and it can be used as future to get the return value of callable. Use a diagram to illustrate.
Therefore, when we want to run Callable through a Thread, but Thread does not support passing Callable instances in construction methods, we need to package a Callable as Runnable through FutureTask, and then get the return value of Callable after running through FutureTask.
public static void main(String[] args) throws ExecutionException, InterruptedException { //A paradigm is a return value type Callable<String> callable = () -> { System.out.println("callable thread "); return "666"; }; FutureTask<String> futureTask = new FutureTask<>(callable); new Thread(futureTask, "A").start(); //Calling the get() method will block the current Thread until the Thread is executed System.out.println(futureTask.get()); }
5, Blocking queue
A blocking queue is a queue:
Thread 1 adds elements to the blocking queue, and thread 2 removes elements from the blocking queue
-
When the queue is empty, the operation of getting elements from the queue will be blocked; Threads trying to get elements from an empty queue will be blocked until other threads insert new elements into the empty queue
-
When the queue is full, adding elements from the queue will be blocked; Threads that attempt to add new elements to the full queue will be blocked until other threads remove one or more elements from the queue or clear it completely, making the queue idle and adding new elements later
Use of blocking queues:
In the field of multithreading: the so-called blocking, in some cases, the thread will be suspended (blocking), once the conditions are met, the suspended thread will be automatically aroused. The advantage of BlockingQueue is that we don't need to care about when we need to block the thread or when we need to wake up the thread, because you can handle all this by yourself. Before the release of concurrent package, in the multithreaded environment, every programmer must control these details by himself, especially considering the efficiency and thread safety, which will bring a lot of complexity to our program.
1. Common queue
- ArrayBlockingQueue: a bounded blocking queue composed of an array structure.
- LinkedBlockingQueue: bounded by linked list structure (but the default size is integer. Max)_ Value) blocks the queue.
- Priority blocking queue: unbounded blocking queue supporting priority sorting.
- DelayQueue: delay unbounded blocking queue implemented by priority queue.
- Synchronous queue: does not store blocking queues for elements, that is, queues for individual elements.
- LinkedTransferQueue: an unbounded blocking queue composed of linked lists.
- Linked blocking deque: bidirectional blocking queue composed of linked lists.
Succession relationship:
2. General method
Method type | Throw an exception | Special value | block | overtime |
---|---|---|---|---|
insert | add() | offer() | put() | offer(e, timeout, unit) |
remove | remove() | poll() | take() | poll(timeout, unit) |
Check (view team leader elements) | element() | peek() | Not available | Not available |
noun | When the queue is full, insert | When the queue is empty, remove it | When the queue is empty, check |
---|---|---|---|
Throw an exception | Throw IllegalStateException: Queue full exception | Throw NoSuchElementException exception | Throw NoSuchElementException exception |
Special value | Return false | Returns null | Returns null |
block | Block until the queue is full | Block until the queue is not empty | |
overtime | Block until you can insert a value (return true) or timeout (return false) | Block until you can get a value or time out (return null) |
6, Line pool (Executor)
1. Advantages of thread pool
The work of the thread pool is to control the number of running threads, put tasks into the queue during processing, and then start these tasks after the thread is created. If the number of threads exceeds the maximum number, the threads that exceed the maximum number will queue up and wait for other threads to finish executing, and then take tasks from the queue to execute.
Its main features are: thread reuse; Control the maximum concurrent number; Manage threads.
- Reduce resource consumption. By reusing the created threads, the consumption caused by thread creation and destruction can be reduced.
- Improve the response speed. When a task arrives, it can be executed immediately without waiting for thread creation.
- Improve the manageability of threads. Thread is a scarce resource. If it is created without restriction, it will not only consume system resources, but also reduce the stability of the system. Using thread pool can carry out unified allocation, tuning and monitoring.
Dependency
2. Seven parameters of thread pool
- corePoolSize: the number of core threads in the thread pool. When a task is submitted, the thread pool creates a new thread to execute the task until the current number of threads equals corePoolSize. If the current number of threads is corePoolSize, the tasks that continue to be submitted are saved to the queue and waiting to be executed. If the prestartAllCoreThreads() method of the thread pool is executed, the thread pool creates and starts all core threads in advance.
- maximumPoolSize: the maximum number of simultaneous threads in the thread pool. If the current blocking queue is full and the task continues to be submitted, a new thread is created to execute the task, provided that the current number of threads is less than maximumPoolSize.
- keepAliveTime: the lifetime of redundant idle threads. When the number of threads in the current pool exceeds corePoolSize, when the idle time reaches keepAliveTime, the redundant threads will be destroyed until there are only corePoolSize threads left. By default, this parameter is only useful when the number of threads is greater than corePoolSize.
- Unit: the unit of keepAliveTime
- workQueue: used to save the blocking queue waiting for tasks to be executed. When the number of threads in the thread pool exceeds its core pool size, the thread will enter the blocking queue to wait. Generally speaking, we should try to use bounded queues: arrayblocking queue, linkedblocking queue, synchronous queue, priority blocking queue.
- ThreadFactory: to create a thread factory. Through the custom thread factory, you can set a recognized thread name for each new thread and set all threads as guard threads. The default ThreadFactory in Executors static factory, and the naming rule of thread is "pool number thread number".
- handler: the saturation policy of the thread pool. When the blocking queue is full and there are no idle working threads, if you continue to submit a task, you must adopt a policy to process the task. The thread pool provides four policies:
a) AbortPolicy: throw an exception directly, the default policy.
b) CallerRunsPolicy: uses the thread of the caller to execute the task.
c) DiscardOldestPolicy: discards the top task in the blocking queue and executes the current task.
d) DiscardPolicy: discard the task directly.
Of course, the RejectedExecutionHandler interface can also be implemented according to the application scenario, and the saturation policy can be customized, such as logging or persistent storage of tasks that cannot be processed.
3. Working mechanism of thread pool
1) If fewer threads are currently running than corePoolSize, a new thread is created to perform the task.
2) If the running thread is equal to or more than corePoolSize, the task is added to the BlockingQueue.
3) If the task cannot be added to the BlockingQueue (the bounded queue is full), a new thread is created to process the task.
4) If creating a new thread will cause the currently running thread to exceed the maximumPoolSize, the task will be rejected and the RejectedExecutionHandler.rejectedExecution() method will be called.
4. Submit task
There are two ways to submit tasks in Java thread pool: execute() and submit(). The main differences are as follows:
-
execute() submits Runnable tasks, while submit() submits Callable or Runnable tasks.
-
The execute() method is used to submit tasks that do not need a return value, so it is impossible to determine whether the task was successfully executed by the thread pool. The submit() method is used to submit a task that needs a return value. The thread pool will return an object of type future, through which you can judge whether the task is successfully executed, and you can get the return value through the get() method of future. The get() method will block the current thread until the task is completed, and use get (long timeout, The TimeUnit unit method will block the current thread and return immediately after a period of time. At this time, the task may not be finished.
-
When execute() submits, if there is an exception, it will directly throw the exception. When submit() encounters an exception, it usually does not throw the exception immediately. Instead, it will temporarily store the exception and wait for you to call the Future.get() method before throwing the exception.
5. Close the thread pool
You can close the thread pool by calling the SHUTDOWN () or SHUTDOWN now () methods of the thread pool. Their principle is to traverse the worker threads in the thread pool, and then call the interrupt() method of the thread one by one to interrupt the thread, so the task that cannot respond to the interrupt may never be terminated. However, there are some differences between them. Firstly, SHUTDOWN now() sets the state of thread pool to STOP, and then attempts to STOP all threads that are executing or suspending tasks, and returns the list of tasks waiting to be executed. However, shutdown() just sets the state of thread pool to SHUTDOWN, and then interrupts all threads that are not executing tasks. The isShutdown() method returns true as long as either of the two SHUTDOWN methods is called. When all tasks are closed, it means that the thread pool is closed successfully. At this time, calling the isTerminaed() method will return true. As for which method should be called to close the thread pool, it should be determined by the task characteristics submitted to the thread pool. Generally, the SHUTDOWN () method is called to close the thread pool. If the task does not have to be completed, the SHUTDOWN now () method can be called.
6. Configure thread pool parameters reasonably
First of all, no matter from which point of view, it is recommended to use bounded queue. The bounded queue can increase the stability and early warning ability of the system, and it can be set larger as needed, such as 5000. If we set the unbounded queue at that time, there would be more and more queues in the thread pool, which might be full of memory (OOM) and make the whole system unavailable.
In order to configure the thread pool reasonably, we must first analyze the task characteristics from the following aspects:
(1) The nature of the task
CPU intensive tasks, IO intensive tasks and hybrid tasks.
a) CPU intensive tasks (most of the time is used for calculation, logic judgment and other operations) should be configured with threads as small as possible. For example, the number of core threads is the number of server CPUs + 1. The number of server CPU threads can be obtained through the Runtime.getRuntime().availableProcessors() method.
b) As IO intensive tasks (most of the situation is that the CPU is waiting for I/O read / write operation) threads are not always executing tasks, it is necessary to configure as many threads as possible, such as twice the number of server CPUs. There is a formula to calculate the optimal number of threads for IO tasks
Nthreads = NCPU * UCPU * (1 + W/C) Among them: NCPU Is the number of cores in the processor UCPU It's expected CPU Utilization (the value should be between 0 and 1) W/C Is the ratio of waiting time to calculation time. Waiting time and computing time we are in Linux Use related vmstat Command or top Command view.
c) For hybrid tasks, if it can be split, it can be divided into a CPU intensive task and an IO intensive task. As long as the time difference between the two tasks is not too big, the throughput of the decomposed task will be higher than that of the serial task. If the execution time of these two tasks is too different, there is no need to decompose them.
(2) Priority of tasks
Priority blocking queue can be used to process tasks with different priorities, which can let tasks with higher priority execute first.
(3) Task execution time
Tasks with different execution time can be handed over to thread pools of different sizes, or priority queues can be used to let tasks with short execution time execute first.
(4) Task dependency
Whether to rely on other system resources, such as database connection. The task that depends on the database connection pool, because the thread needs to wait for the database to return the result after submitting SQL. The longer the waiting time is, the longer the CPU idle time is. Then the number of threads should be set to be larger, so as to make better use of the CPU.
7. Examples
1. JDK predefines three thread pools
In JDK1.8, five kinds of thread pools have been predefined. In addition to the work steaming pool, which is newly added in JDK1.8, the other four are available in JDK1.5. The use of the five predefined thread pools is very simple, so we will not use the code to demonstrate them one by one. The following is a brief introduction:
(1)FixedThreadPool
Create an API for FixedThreadPool that uses a fixed number of threads. It is suitable for application scenarios where the current number of threads needs to be limited in order to meet the requirements of resource management. It is suitable for servers with heavy load.
parameter | value | explain |
---|---|---|
corePoolSize | Specify when creating | |
maximumPoolSize | Same as corePoolSize | |
keepAliveTime | 0L | Redundant idle threads are terminated immediately |
BlockingQueue | LinkedBlockingQueue (the capacity is integer. Max)_ VALUE) |
Creation method:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
(2)SingleThreadExecutor
The API of singlethreadexecution using single thread is created to ensure that each task is executed in sequence when necessary; And at any point in time, there will not be multiple threads active application scenarios.
parameter | value | explain |
---|---|---|
corePoolSize | 1 | |
maximumPoolSize | 1 | |
keepAliveTime | 0L | Redundant idle threads are terminated immediately |
BlockingQueue | LinkedBlockingQueue (the capacity is integer. Max)_ VALUE) |
Creation method:
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
(3)CachedThreadPool
Create an API that creates a CachedThreadPool of new threads as needed. Unlimited thread pool is suitable for small programs that perform many short-term asynchronous tasks, or servers with light load.
parameter | value | explain |
---|---|---|
corePoolSize | 0 | |
maximumPoolSize | Integer.MAX_VALUE | It can be understood that there is no limit to the maximum number of threads |
keepAliveTime | 60s | |
BlockingQueue | SynchronousQueue | The synchronous queue capacity is 1, which means that if the main thread submits tasks faster than the threads in the thread pool process tasks, the CachedThreadPool will continue to create new threads. In extreme cases, CachedThreadPool will run out of CPU and memory resources due to creating too many threads. |
Creation method:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
2. Custom thread pool
In Alibaba specification:
Thread pool is not allowed to be created by Executors, but by ThreadPoolExecutor, which makes students more clear about the running rules of thread pool and avoids the risk of resource exhaustion. Note: the disadvantages of the thread pool object returned by Executors are as follows:
1) FixedThreadPool and SingleThreadPool:
the allowed request queue length is Integer.MAX_VALUE, a large number of requests may accumulate, resulting in OOM.
2)CachedThreadPool:
the number of creation threads allowed is Integer.MAX_VALUE, may create a large number of threads, resulting in OOM.
public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 2L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), new ThreadPoolExecutor.AbortPolicy()); try { for (int i = 0; i < 10; i++) { executor.execute(() -> { System.out.println(Thread.currentThread().getName()); }); } } catch (Exception e) { e.printStackTrace(); } finally { executor.shutdown(); } }
7, ForkJoin (branch merge)
reference resources: https://www.cnblogs.com/hongshaodian/p/12452105.html
1. Principle
ForkJoin uses divide and rule. The idea of "divide and rule" is to divide a big problem that is difficult to solve directly into some smaller problems of the same scale, so as to break through each one and divide and rule. The strategy of divide and conquer is: for a problem of scale n, if the problem can be solved easily (for example, the scale n is small), it can be solved directly, otherwise it can be decomposed into m smaller subproblems, which are independent of each other and have the same form as the original problem. Multiple threads solve these subproblems recursively, and then combine the solutions of each subproblem to get the solution of the original problem, This algorithm design strategy is called divide and conquer. Use a graph to represent the fork join principle.
2. Job theft
Work stealing means that when the tasks of the current thread have been executed, they will be automatically fetched to the Task queue of other threads to continue execution. In the ForkJoinPool, multiple threads are continuously executing tasks. Each thread not only performs tasks in its own job, but also gets tasks of other busy working threads according to the idle situation of its own working threads. In this way, it can reduce thread blocking or idle time and improve CPU utilization.
3. Use of forkjoin
ForkJoin uses two classes to accomplish the above two things: ForkjoinTask and ForkJoinPool. To use the ForkJoin framework, we must first create a ForkJoin task. It provides the operation mechanism of fork and join in tasks. Usually, we do not directly inherit the ForkjoinTask class, we only need to directly inherit its subclasses.
/** * Inherits the subclass of ForkjoinTask, and provides the operation mechanism to execute fork and join * RecursiveTask<V>: There is a return value, and the return value type is type V * RecursiveAction: no return value */ class MyTask extends RecursiveTask<Integer> { private static final int ADJUST_VALUE = 10; private int begin; private int end; private int result; public MyTask(int begin, int end) { this.begin = begin; this.end = end; } @Override protected Integer compute() { if (end - begin <= ADJUST_VALUE) { // When the difference between left and right is less than or equal to 10, the sum is calculated for (int i = begin; i <= end; i++) { result += i; } } else { // When the difference between the left and the right is greater than 10, half is divided into small tasks int middle = (end + begin) / 2; MyTask task01 = new MyTask(begin, middle); MyTask task02 = new MyTask(middle + 1, end); // Using fork() method to perform split small tasks task01.fork(); task02.fork(); // Put the execution results of task 1 and task 2 together result = task01.join() + task02.join(); } return result; } } public class ForkJoinDemo { public static void main(String[] args) { // Create a task from 0 to 100 MyTask myTask = new MyTask(0, 100); // Execution using the ForkJoinPool thread pool ForkJoinPool forkJoinPool = new ForkJoinPool(); try { //Submit task ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask); //Get the results System.out.println(forkJoinTask.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { //Close thread pool forkJoinPool.shutdown(); } } }
8, Completable future (asynchronous callback)
reference resources: A summary of the new feature of JDK1.8: complete future
A complete Future represents a task. He can use the Future method. We can also do something that the executorService mentioned before can't do with futures.
Before, future needs to wait for isDone to be true to know that the task is finished. Or the get method will block when it is called. With the use of completable future, we can use then, when and other operations to prevent the above blocking and polling isDone.
public static void main(String[] args) throws ExecutionException, InterruptedException { // runAsync() has no return value // supplyAsync() has a return value CompletableFuture.runAsync(() -> { // int i = 4 / 0; System.out.println("runAsync()In progress"); }).whenCompleteAsync((t, u) -> { //When there is no error in asynchronous execution, execute here System.out.println("runAsync()Successful implementation"); }).exceptionally(e -> { //When an error is reported, execute here System.out.println(e.getMessage()); System.out.println("runAsync()Execution failed"); return null; }) //Complete future, which uses asynchronous tasks, is created as a guard thread //Here, we need to call get() method to block the main thread to prevent the main thread from exiting after execution .get(); System.out.println("========="); //There is a return value String s = CompletableFuture.supplyAsync(() -> { // int i = 4 / 0; return "123"; }).whenCompleteAsync((t, u) -> { System.out.println(t); System.out.println(u); }).exceptionally(f -> { System.out.println(f.getMessage()); return "456"; }) //Complete future, which uses asynchronous tasks, is created as a guard thread //Here we need to call the get() method to prevent the main thread from exiting after execution .get(); System.out.println("---------" + s); }
Note: the complete future operation using asynchronous tasks is created as a guard thread. So we don't call the get() method to block the main thread. The main thread is finished. The completion of all threads will lead to a problem, that is, the daemons exit.
Several methods are introduced
- allOf/anyOf: the input parameter of these two methods is a completable future array. allOf() is returned when all tasks are completed. But it's a return value of Void. anyOf() returns when a task in the completable future group of the input parameter is completed. The return result is the result of the first completed task.
- getNow(): this method returns the result of the task when the task is finished. If the task is not finished, it returns your input parameter.
- get(): block the thread and wait for the return value
- whenXXX, the method that is called after the execution of a task.
- whenComplete(): when the main thread executes the whenComplete() method, the task of completeablefuture has been completed, so use the main thread to call whenComplete(); If the task is not completed, whenComplete() will be called with the thread used to complete the completable future task. So the main thread may be blocked.
- When completeasync (): this method is to create a new asynchronous thread to execute. So it doesn't block.
- Then compose (): the thread used to finish the last task can use the previous result to execute and return the new result
- thenCombine(): combines the results of two tasks.
- Then run (): after this task is finished, run the next task
9, Auxiliary class
1. CountDownLatch
After all the six students in the class have left, lock the door.
public static void main(String[] args) throws InterruptedException { CountDownLatch count = new CountDownLatch(6); for (int i = 0; i < 6; i++) { new Thread(() ->{ System.out.println(Thread.currentThread().getName() + "Students out of the classroom"); //Each time a thread finishes executing, the counter is decremented by 1 count.countDown(); }, String.valueOf(i)).start(); } //When count is not 0, the current thread waits count.await(); System.out.println("lock the door"); }
CountDownLatch has two main methods. When one or more threads call the await () method, these threads will block. When other threads call countDown() method, the counter will be decreased by 1 (the thread calling countDown() method will not block). When the counter value becomes 0, the thread blocked by await() method will be awakened and continue to execute.
2. CyclicBarrier
Each episode to seven dragon balls, summon a dragon
CountDownLatch is minus, CyclicBarrier is plus. When the number of waiting threads reaches the specified number, the corresponding method is executed
public static void main(String[] args) throws InterruptedException { // When the waiting thread reaches 7, the following thread will be executed CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> { System.out.println("Call for Dragon"); }); for (int i = 0; i < 7; i++) { new Thread(() ->{ System.out.println("Collected the second " + Thread.currentThread().getName() + " A dragon ball"); try { // Thread waiting, when the number of waiting threads reaches the specified threshold, it can continue to execute cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "completion of enforcement"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }, String.valueOf(i)).start(); } }
Cyclic Barrier literally means a Barrier that can be used cyclically. What it wants to do is to block a group of threads when they reach a Barrier (also known as a synchronization point). The Barrier will not open until the last thread reaches the Barrier, and all threads blocked by the Barrier will continue to work. Thread through the await() of CyclicBarrier
Methods to enter the barrier.
3. Semaphore
Semaphore is mainly used for two purposes, one is used for mutual exclusion of multiple shared resources, the other is used to control the number of concurrent threads.
In terms of semaphore, we define two operations
- Acquire: when a thread calls the acquire() operation, it either acquires the semaphore successfully (semaphore minus 1), or it waits until a thread releases the semaphore, or it times out.
- release: the semaphore is actually incremented by 1 and the waiting thread wakes up.
Example: 4 parking spaces occupied by 10 vehicles
public static void main(String[] args) throws InterruptedException { //Create semaphore, 4 parking spaces Semaphore semaphore = new Semaphore(4); for (int i = 0; i < 10; i++) { new Thread(() -> { try { // If you want to seize the parking space, the signal will be reduced by 1. If you can't grab the parking space, it will be blocked. semaphore.acquire(); // If you try to grab the parking space, you will return to true if you get it. If you can't get it, you will return to false and it won't block //boolean b = semaphore.tryAcquire(); System.out.println(Thread.currentThread().getName() + " Grab a parking space"); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println(Thread.currentThread().getName() + " Leave the parking space"); // Release the occupied parking space, add 1 to the semaphore semaphore.release(); } }, String.valueOf(i)).start(); } }
When the semaphore is specified as 1 (new Semaphore(1)), it is the same as a lock.
10, Examples
1. Selling tickets
- Low coupling and high cohesion: thread, operation (call method exposed), resource class
- Judgment / work / notice
- Prevent false wake-up
class Ticket {//Resources //ticket private int number = 30; public synchronized void saleTicket() { if (number > 0) { System.out.println(Thread.currentThread().getName() + "\t Sell No" + (number--) + "\t What's left:" + number); } } } /** * Title: three conductors sell 30 tickets * Enterprise level routine + template of multithreading programming * 1.Under the premise of high cohesion and low coupling, thread operation (call method exposed to the outside) resource class */ public class SaleTicket { public static void main(String[] args) { Ticket ticket = new Ticket(); new Thread(new Runnable() { @Override public void run() { for (int i = 1; i <= 40; i++) { ticket.saleTicket(); } } }, "A").start(); //This is not in line with Ali's specification: you must use braces in if/else/for/while/do statements, even if there is only one line of code new Thread(() -> { for (int i = 1; i <= 40; i++) ticket.saleTicket(); }, "B").start(); new Thread(new Runnable() { @Override public void run() { for (int i = 1; i <= 40; i++) { ticket.saleTicket(); } } }, "C").start(); } }
2. Counter
Define a variable to realize multiple threads adding 1 to the variable and multiple threads printing "1, 0" alternately to the variable - 1 for multiple rounds
class Counter { private Integer number = 0; public synchronized void increment() throws InterruptedException { while (number != 0) { this.wait(); } number++; System.out.println(Thread.currentThread().getName() + ": " + number); this.notifyAll(); } public synchronized void descrement() throws InterruptedException { while (number == 0) { this.wait(); } number--; System.out.println(Thread.currentThread().getName() + ": " + number); this.notifyAll(); } } public class ThreadWaitNotifyDemo { public static void main(String[] args) { Counter counter = new Counter(); new Thread(() -> { try { for (int i = 0; i < 10; i++) { counter.increment(); } } catch (InterruptedException e) { e.printStackTrace(); } }, "A").start(); new Thread(() -> { try { for (int i = 0; i < 10; i++) { counter.descrement(); } } catch (InterruptedException e) { e.printStackTrace(); } }, "B").start(); new Thread(() -> { try { for (int i = 0; i < 10; i++) { counter.increment(); } } catch (InterruptedException e) { e.printStackTrace(); } }, "C").start(); new Thread(() -> { try { for (int i = 0; i < 10; i++) { counter.descrement(); } } catch (InterruptedException e) { e.printStackTrace(); } }, "D").start(); } }
3. Sequential execution of threads
Thread A prints "A", thread B prints "B", thread C prints "C", and prints multiple rounds in the order of ABCABC
class Print { Lock lock = new ReentrantLock(); Condition condition1 = lock.newCondition(); Condition condition2 = lock.newCondition(); Condition condition3 = lock.newCondition(); int flag = 1; public void printA() { lock.lock(); try { while (flag != 1) { condition1.await(); } System.out.println("A"); flag = 2; condition2.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printB() { lock.lock(); try { while (flag != 2) { condition2.await(); } System.out.println("B"); flag = 3; condition3.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printC() { lock.lock(); try { while (flag != 3) { condition3.await(); } System.out.println("C"); flag = 1; condition1.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } public class ThreadOrderAccess { public static void main(String[] args) { Print print = new Print(); new Thread(() -> { for (int i = 0; i < 10; i++) { print.printA(); } }).start(); new Thread(() -> { for (int i = 0; i < 10; i++) { print.printB(); } }).start(); new Thread(() -> { for (int i = 0; i < 10; i++) { print.printC(); } }).start(); } }