Java CompletableFuture concurrent programming

Introduction to CompletableFuture

CompletableFuture is used for asynchronous programming in Java, asynchronous usually means non-blocking,
We can make our tasks run separately in other threads separate from the main thread, and can be called through callbacks
In order to get the execution status of the asynchronous task in the main thread, whether it is completed, and whether it is abnormal or not.
CompletableFuture implements the Future, CompletionStage interface, which implements Future
The interface can be compatible with the current thread pool framework, and the CompletionStage interface is asynchronous programming
The interface abstraction of the
CompletableFuture class.

Future and CompletableFuture

In Java, Futrue is usually used to represent a reference to an asynchronous task. For example, we refer to the task as
Put it in the thread pool, and then we will get a Futrue with isDone in the Future
The method to judge whether the task is finished, and the get method can block until the end of the task and then get
Get the result, but overall, this method is still synchronous, because the client needs to constantly block waiting or
The operator constantly polls to know if the task is complete.

The main disadvantage of Future

(1) Manual completion is not supported

I submitted a task, but the execution is too slow, I have obtained the task result through other paths,
Now there is no way to notify the executing thread of the result of this task, so it must be actively canceled or kept
wait for it to finish

(2) Further non-blocking calls are not supported

The get method of Future will block until the task is completed, but want to execute it after getting the task
Additional tasks, because Future does not support callback functions, so this function cannot be implemented

(3) Chain calls are not supported

For the execution result of Future, we want to continue to pass it to the next Future for processing, thus forming
A chained pipline call, which is not possible with Future.

(4) Multiple Future merging is not supported

For example, we have 10 Futures executing in parallel, and we want to run after all Futures have finished running,
Executing certain functions cannot be achieved through Future.

(5) Exception handling is not supported

Future's api does not have any exception handling api, so when running asynchronously, if something goes wrong
It is not well positioned.

Getting Started with CompletableFuture

Using CompletableFuture

Scenario: A CompletableFuture is created in the main thread, and then the main thread calls the get method to
Blocking, and finally we terminate it in a child thread.

/**
* A CompletableFuture is created in the main thread, and then the main thread calls the get method to block, and finally we
 make it terminate in a child thread
* @param args
*/
public static void main(String[] args) throws Exception{
	CompletableFuture<String> future = new CompletableFuture<>();
	new Thread(() -> {
		try{
			System.out.println(Thread.currentThread().getName() + "child thread starts work");
			//Child thread sleeps for 5 seconds
			Thread.sleep(5000);
			//Complete main thread in child thread
			future.complete("success");
		}catch (Exception e){
			e.printStackTrace();
		}
	}, "A").start();
	//The main thread calls the get method to block
	System.out.println("main thread call get The method gets the result as: " + future.get());
	System.out.println("main thread completes,end of blocking!!!!!!");
}

Async task with no return value

/**
* Async task with no return value
* @param args
*/
public static void main(String[] args) throws Exception{
	System.out.println("main thread starts");
	//Run an asynchronous task with no return value
	CompletableFuture<Void> future = CompletableFuture.runAsync(() -
	> {
		try {
			System.out.println("Child thread starts work");
			Thread.sleep(5000);
			System.out.println("child thread completes");
		} catch (Exception e) {
			e.printStackTrace();
		}
	});
	//main thread blocked
	future.get();
	System.out.println("main thread ends");
}

Asynchronous task with return value

/**
* Async task with no return value
* @param args
*/
public static void main(String[] args) throws Exception{
	System.out.println("main thread starts");
	//Run an asynchronous task with a return value
	CompletableFuture<String> future =
	CompletableFuture.supplyAsync(() -> {
		try {
			System.out.println("child thread starts task");
			Thread.sleep(5000);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return "child thread finished!";
	});
	//main thread blocked
	String s = future.get();
	System.out.println("main thread ends, The result of the child thread is:" + s);
}

thread dependency

When a thread depends on another thread, the thenApply method can be used to serialize the two threads
change.

private static Integer num = 10;
/**
* Add 10 to a number, then square it
* @param args
*/
public static void main(String[] args) throws Exception{
	System.out.println("main thread starts");
	CompletableFuture<Integer> future =
	CompletableFuture.supplyAsync(() -> {
		try {
			System.out.println("Add 10 Mission Start");
			num += 10;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return num;
	}).thenApply(integer -> {
		return num * num;
	});
	Integer integer = future.get();
	System.out.println("main thread ends, The result of the child thread is:" + integer);
}

Consumption processing result

thenAccept consumes the processing results, receives the processing results of the task, and consumes the processing results, but returns no results.

public static void main(String[] args) throws Exception{
	System.out.println("main thread starts");
	CompletableFuture.supplyAsync(() -> {
		try {
			System.out.println("Add 10 Mission Start");
			num += 10;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return num;
	}).thenApply(integer -> {
		return num * num;
	}).thenAccept(new Consumer<Integer>() {
		@Override
		public void accept(Integer integer) {
			System.out.println("All child threads are processed,finally called accept,The result is:" + integer);
		}
	});
}

exception handling

exceptionally exception handling, triggered when an exception occurs

public static void main(String[] args) throws Exception{
	System.out.println("main thread starts");
	CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
		int i= 1/0;
		System.out.println("Add 10 Mission Start");
		num += 10;
		return num;
	}).exceptionally(ex -> {
		System.out.println(ex.getMessage());
		return -1;
	});
	System.out.println(future.get());
}

handle is similar to the thenAccept/thenRun method, which is the last processing call, but can handle exceptions at the same time

public static void main(String[] args) throws Exception{
	System.out.println("main thread starts");
	CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
		System.out.println("Add 10 Mission Start");
		num += 10;
		return num;
	}).handle((i,ex) ->{
		System.out.println("Enter handle method");
		if(ex != null){
			System.out.println("An exception occurred,Content is:" + ex.getMessage());
			return -1;
		}else{
			System.out.println("completed normally,Content is: " + i);
			return i;
		}
	});
	System.out.println(future.get());
}

Merge results

thenCompose merges the execution results of two dependent CompletableFutures

public static void main(String[] args) throws Exception{
	System.out.println("main thread starts");
	//Add 10 to the first step
	CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
		System.out.println("Add 10 Mission Start");
		num += 10;
		return num;
	});
	//merge
	CompletableFuture<Integer> future1 = future.thenCompose(i ->
		//Another CompletableFuture
		CompletableFuture.supplyAsync(() -> {
		return i + 1;
	}));
	System.out.println(future.get());
	System.out.println(future1.get());
}

thenCombine merges two CompletableFutures tasks with no dependencies

public static void main(String[] args) throws Exception{
	System.out.println("main thread starts");
	CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
		System.out.println("Add 10 Mission Start");
		num += 10;
		return num;
	});
	CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
		System.out.println("Multiply by 10 mission starts");
		num = num * 10;
		return num;
	});
	//Merge two results
	CompletableFuture<Object> future = job1.thenCombine(job2, new
	BiFunction<Integer, Integer, List<Integer>>() {
		@Override
		public List<Integer> apply(Integer a, Integer b) {
			List<Integer> list = new ArrayList<>();
			list.add(a);
			list.add(b);
			return list;
		}
	});
	System.out.println("The combined result is:" + future.get());
}

Combine the results of multiple tasks with allOf and anyOf
allOf: a series of independent future tasks, do something after all their tasks are executed

/**
* Add 10 to a number, then square it
* @param args
*/
public static void main(String[] args) throws Exception{
	System.out.println("main thread starts");
	List<CompletableFuture> list = new ArrayList<>();
	CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
		System.out.println("Add 10 Mission Start");
		num += 10;
		return num;
	});
	list.add(job1);
	CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
		System.out.println("Multiply by 10 mission starts");
		num = num * 10;
		return num;
	});
	list.add(job2);
	CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
		System.out.println("Minus starts with 10 tasks");
		num = num * 10;
		return num;
	});
	list.add(job3);
	CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
		System.out.println("Divide by 10 task starts");
		num = num * 10;
		return num;
	});
	list.add(job4);
	//Multitasking merge
	List<Integer> collect = list.stream().map(CompletableFuture<Integer>::join)
		.collect(Collectors.toList());
	System.out.println(collect);
}

anyOf: As long as there is one return in multiple futures, the entire task can end without waiting for each future
future ends

/**
* Add 10 to a number, then square it
* @param args
*/
public static void main(String[] args) throws Exception{
	System.out.println("main thread starts");
	CompletableFuture<Integer>[] futures = new CompletableFuture[4];
	CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
		try{
			Thread.sleep(5000);
			System.out.println("Add 10 Mission Start");
			num += 10;
			return num;
		}catch (Exception e){
			return 0;
		}
	});
	futures[0] = job1;
	CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
		try{
			Thread.sleep(2000);
			System.out.println("Multiply by 10 mission starts");
			num = num * 10;
			return num;
		}catch (Exception e){
			return 1;
		}
	});
	futures[1] = job2;
	CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
		try{
			Thread.sleep(3000);
			System.out.println("Minus starts with 10 tasks");
			num = num * 10;
			return num;
		}catch (Exception e){
			return 2;
		}
	});
	futures[2] = job3;
	CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
		try{
			Thread.sleep(4000);
			System.out.println("Divide by 10 task starts");
			num = num * 10;
			return num;
		}catch (Exception e){
			return 3;
		}
	});
	futures[3] = job4;
	CompletableFuture<Object> future = CompletableFuture.anyOf(futures);
	System.out.println(future.get());
}

Tags: Java Multithreading Concurrent Programming JavaSE JUC future

Posted by jonasr on Sat, 17 Sep 2022 01:46:52 +0930