Want to enter Alibaba, Tencent and other Internet companies? I don't understand how these Java Concurrent Programming works

1, Overview

The traditional java concurrency capability relies on multithreading. Compared with the modern method of Reactive programming, this paper introduces the implementation of multithreading. For the introduction of Reactive programming methods, please refer to Reactive programming.

Multithreaded concurrent programming has two core concepts, atomicity and visibility. Atomicity can be seen everywhere. In short, a group of operations either succeed or fail, and there is no intermediate state.

Visibility refers to whether changes in data in one thread can be perceived by other threads.

A problem that we should always pay attention to in multithreaded programming is the processing of check then act. There are a lot of conditional judgment - > execution processing in our program. This simple processing will not have any problems in single thread, but it is very easy to make mistakes in multithreaded environment. Atomicity and visibility need to be considered comprehensively. "Race condition" expresses this problem.

The main contents of this article are: race condition, java Memory Model (happens before), synchronized, atomic class, lock, ThreadLocal variable, CountDownLatch, completable future.

2, Race condition

When multiple threads process shared resources, different results may be produced due to different execution sequences. The typical operation is check then act, as shown in the following code:

When the same object of this class executes the get method in multiple threads, the initialize method may execute multiple times because the get method is not an atomic operation. This problem can be solved by changing the get method to synchronized method or changing value to atomic class.

Let's look at another race condition.

class Waiter implements Runnable {
    private boolean shouldFinish;

    void finish() {
        shouldFinish = true;
    }

    public void run() {
        long iteration = 0;
        while (!shouldFinish) {
            iteration++;
        }

        System.out.println("Finished after: " + iteration);
    }
}
public class DataRace {

    public static void main(String[] args) throws InterruptedException {
        Waiter waiter = new Waiter();
        Thread waiterThread = new Thread(waiter);
        waiterThread.start();  // Execute the run method of waiter in another thread, which determines whether to exit the loop by judging the value of shouldFinish variable
        waiter.finish(); // Modify the value of the shouldFinish variable in the main thread
        waiterThread.join();
    }
}

Normally, after the finish method of waiter is executed, the loop in the run method will exit, but it is also possible that the run method will enter an endless loop. We can delay the wait The execution of finish () to simulate this situation. Modify the main method:

public class DataRace {

    public static void main(String[] args) throws InterruptedException {
        Waiter waiter = new Waiter();
        Thread waiterThread = new Thread(waiter);
        waiterThread.start();
        Thread.sleep(10L);  // Call the finish method after a delay of 10 milliseconds, and you will find that the program will always run without exiting. In the run method, shouldFinish is always false
        waiter.finish();
        waiterThread.join();
    }
}

When you execute this program again, you will find that the run method has entered an endless loop, even if the waiter Finish() has set shouldFinish to true, and the loop still does not exit. The reason for this problem is that the value of the shouldFinish variable read in another thread is dirty data.

You can solve this problem by declaring the shouldFinish variable volatile.

This phenomenon is derived from the happens before rule of the java memory model. The result of a thread's write operation to a variable can only be read by other threads if it complies with the happens before rule. synchronized and volatile structures, and thread Start() and thread The join () method can form a happens before relationship. The rule is described as follows (original):

  1. Sequence rule of program: in a thread, according to the sequence of the program, the previous operation happens before any subsequent operation.
  2. Volatile rule: for the write operation of a volatile variable, happens before the subsequent read operation of this variable.
  3. Lock rule: for unlocking a lock, happens before the subsequent locking operation of the lock.
  4. Thread start() rule: main thread A starts thread B, and the operation before main thread starts B can be seen in thread B. That is, start() happens before the operation in thread B.
  5. Thread join () rule: main thread A waits for sub thread B to complete. When sub thread B completes execution, main thread A can see all operations of thread B. In other words, any operation in sub thread B is returned by happens before join().
  6. Transitivity rule: if a happens before B, B happens before C, then a happens before C.

Therefore, after declaring the shouldFinish variable as volatile, it complies with rule 3. After executing the finish method, the modified shouldFinish will be read by the reading thread. If the volatile keyword is not added, the happens before rule is not met.

3, synchronized

Synchronized provides a pessimistic locking mechanism. The code blocks declared by synchronized are exclusive. Only one thread can obtain the lock at the same time, which ensures atomicity and visibility. Synchronized can be used on methods or on a block of code. When synchronized can be used on static methods, class locks are used, otherwise object locks are used.

Shackles on code blocks:

Shackles to methods:

4, ThreadLocal

Although atomicity can be achieved through synchronized, the pessimistic locking mechanism will have an impact on performance. If variables between multiple threads do not need to be shared, ThreadLocal variable can be used to avoid concurrency problems caused by multiple threads modifying the same variable at the same time.

class ThreadLocalDemo {
    private final ThreadLocal<Transaction> currentTransaction = ThreadLocal.withInitial(NullTransaction::new);

    Transaction currentTransaction() {
        Transaction current = currentTransaction.get();
        if (current.isNull()) {
            current = new TransactionImpl();
            currentTransaction.set(current);
        }
        return current;
    }
}

interface Transaction {
    boolean isNull();
}

class NullTransaction implements Transaction {
    public boolean isNull() {
        return true;
    }
}

class TransactionImpl implements Transaction {
    public boolean isNull() {
        return false;
    }
}

5, Atomics

Another way to simplify concurrent programming is to adopt atomic data structure, which ensures atomicity and visibility, can be used conveniently, and can avoid the problem of check then act in multi-threaded environment.

public class Atomic {

    public static void main(String[] args) {
        AtomicRun atomicRun = new AtomicRun();
        Thread waiterThread1 = new Thread(atomicRun);
        Thread waiterThread2 = new Thread(atomicRun);
        waiterThread1.start();
        waiterThread2.start();
    }
}

class AtomicRun implements Runnable {
    private final AtomicBoolean shouldFinish = new AtomicBoolean(false);

    public void run() {
        if (shouldFinish.compareAndSet(false, true)) {
            System.out.println("initialized only once");
        }
    }
}

Since shouldFinish is an atomic object,
shouldFinish.compareAndSet is an atomic operation, so there will be no problem of reading dirty data.

6, Locks


java. util. concurrent. The locks package provides the same functions as synchronized. On this basis, it is extended. For example, you can obtain the state of the lock and interrupt the lock. In the case of more reads and less writes, you can also use ReadWriteLock to improve performance.

class LockDemo {
    private final Lock lock = new ReentrantLock();
    private int counter0;

    public static void main(String[] args) {
        LockDemo lockDemo = new LockDemo();
        lockDemo.increment();
        System.out.println("count is: " + lockDemo.getCounter0());
    }

    public int getCounter0() {
        return counter0;
    }

    void increment() {
        lock.lock();
        try {
            counter0++;
        } finally {
            lock.unlock();
        }

    }
}

class ReadWriteLockDemo {
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private int counter1;

    void increment() {
        lock.writeLock().lock();
        try {
            counter1++;
        } finally {
            lock.writeLock().unlock();
        }
    }

    int current() {
        lock.readLock().lock();
        try {
            return counter1;
        } finally {
            lock.readLock().unlock();
        }
    }
}

When using locks, be sure to perform the unlock operation in the finally method, because the lock will not be released automatically after the program has an exception. If you do not perform the unlock in the finally method, the program will enter the deadlock state.

7, CountDownLatch

CountDownLatch is generally used to synchronize the execution progress of multiple threads. For example, one thread needs to wait for the other three threads to complete before continuing to execute. CountDownLatch can be used for processing.

CountDownLatch is similar to a counter. When a thread calls the await method of CountDownLatch, it will enter the blocking state. Other threads call the countDown method to reduce the counter by one. When the counter is reduced to 0, the operation blocked by the await method will be unblocked and continue to execute.

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        CountDownLatch latch = new CountDownLatch(1);
        Receiver receiver = new Receiver(latch);
        executorService.submit(receiver);
        latch.await();
        System.out.println("latch done");
        executorService.shutdown();
    }
}

class Receiver implements Runnable {

    private CountDownLatch latch;

    public Receiver(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        latch.countDown();
    }
}

8, Completable future

Completable future is a common multi-threaded concurrent programming method provided by Java 8. Although parallel stream also provides multi-threaded concurrency, one principle should be followed in the selection: completable future is used if there is IO operation, and parallel stream is used for pure calculation if there is no IO operation.

The reason is that the parallelStream uses the jvm's default ForkJoinPool thread pool, which generally allocates only a small number of threads (the default is the number of cores of the CPU), and other thread pools cannot be specified. When there are IO operations or similar operations with high latency, it is easy to fill the thread pool. Completable future allows you to specify a thread pool. You can specify different thread pools for different processes. It can isolate the thread pool by business.

First, we simulate a delayed IO method for subsequent demonstrations:

public static Long getPrice(String prod) {
        delay();  //Simulate the delay of service response
        Long price = ThreadLocalRandom.current().nextLong(0, 1000);
        System.out.println("Executing in " + Thread.currentThread().getName() + ", get price for " + prod + " is " + price);
        return price;
    }

    private static void delay() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

1. supplyAsync: this method is used to create an asynchronous task

ExecutorService executor = Executors.newFixedThreadPool(10);

CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> getPrice("accept"), executor);

supplyAsync is a factory method, which will return a completable future object. The input parameter is the implementation class of Supplier or Runnable, which can be expressed by Lambda expression. At the same time, the executor is specified as the thread pool for execution.

2. thenAcceptAsync: this method receives the execution result of completable future and executes the specified method with the result as input

future.thenAccept(p -> {
            System.out.println("Executing in " + Thread.currentThread().getName() + ", async price is: " + p);
        }, executor);

Take the return result in the first step (the return value of getPrice) as input to perform the operation.

3. thenApply: this method receives the execution result of completable future for calculation and returns a new completable future, which is similar to the map operation of stream.

CompletableFuture<String> result = future.thenApply(p -> p + "1");

This step converts the completable future < long > in the first step to completable future < string >.

4. Thencomposite: used to combine two completable futures. The calculation result of the first is used as the input of the second.

CompletableFuture<Long> future1 = CompletableFuture
                .supplyAsync(() -> getPrice("compose"));
CompletableFuture<String> result = future1.thenCompose(
    i -> CompletableFuture.supplyAsync(() -> {
        Thread.sleep(2000);
        return i + "World";
    })
);

5. thenCombine: further calculate the calculation results of two completable future

CompletableFuture<Long> future1 = CompletableFuture.supplyAsync(() -> getPrice("combine1"));
CompletableFuture<Long> future2 = CompletableFuture.supplyAsync(() -> getPrice("combine2"));
CompletableFuture<Long> result = future1.thenCombine(future2, (f1, f2) -> f1 + f2);

After future1 and future2 are calculated, this code will add the calculation results of the two futures and return a new completable future.

6. exceptionally: exception handling

exceptionally is the simplest exception handling method of completable future. This method returns a default value after an exception occurs.

CompletableFuture<Long> future1 = CompletableFuture.supplyAsync(() -> getPrice("exception1"));

CompletableFuture<Long> future2 = CompletableFuture
                .supplyAsync(() -> (1L / 0) ) //The simulation throws an exception
                // The default value is returned when an exception occurs. If there is no exception handling here, the exception will be thrown in the subsequent join
                .exceptionally((ex) -> {
                    System.out.println("Executing in " + Thread.currentThread().getName() + ", get excetion " + ex);
                    return 0L;
                });

CompletableFuture<Long> result = future1.thenCombine(future2, (f1, f2) -> f1 + f2);

try {
            System.out.println("Executing in " + Thread.currentThread().getName() + " ,combine price is: " + result.join());
} catch (CompletionException ex) {
            System.out.println("Executing in " + Thread.currentThread().getName() + " ,combine price error: " + ex);
}

7. Execute completable future in parallel

Suppose we have an array. Each item in the array needs to call the getPrice method to obtain the price. We can use the combination of stream and completable future.

List<Long> prices = Stream.of("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12")
                .map(p -> CompletableFuture.supplyAsync(() -> getPrice("exception1"), executor)) //Through the map operation of stream, start a completable future for each element in the array
                .collect(Collectors.toList())
                .stream()
                .map(CompletableFuture::join) //Wait for all completable future to complete the calculation
                .collect(Collectors.toList());

Note that there are two sections of collect processing, which cannot be simplified to the following:

ist<Long> prices2 = Stream.of("1", "2", "3")
                .map(p -> CompletableFuture.supplyAsync(() -> getPrice("exception1")))
                .map(CompletableFuture::join)
                .collect(Collectors.toList());

It looks more concise, but there are serious problems. For each element, after the first map generates a completable future, the join blocking operation will be performed immediately, which is equivalent to becoming serial.

Tags: Java Deep Learning Programmer architecture

Posted by webshifter on Wed, 13 Apr 2022 20:18:05 +0930