Hello, I am King Xiang, this is my blog garden, you are welcome, you are welcome again~
When most digital farmers are developing, they either deal with synchronous applications or deal with asynchronous. But if you can learn to use CompletableFuture, you will have a magical ability: turn synchronization into asynchronous (a bit like the feeling of traveling through several time and space at the same time after using the moonlight treasure box). How to do it? Let's look at the code.
Add a shop class Shop:
/**
* shop class
*
* @author King of Xiang
*/
public class Shop {
private String name = "";
public Shop(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
private double calculatePrice(String product) {
delay();
return 10 * product.charAt(0);
}
private void delay() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// Get the price at the same time
public double getPrice(String word) {
return calculatePrice(word);
}
// Add asynchronous query: convert synchronous method to asynchronous method
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> future = new CompletableFuture<>();
new Thread(() -> {
double price = calculatePrice(product);
// When the task that needs to be calculated for a long time ends and returns the result, set Future return value
future.complete(price);
}).start();
// There is no need to wait for the calculation that has not ended, and return directly future object
return future;
}
}
Then add two more test methods, one synchronous and one asynchronous, corresponding to the synchronous and asynchronous methods in the store class:
// Test the synchronization method
public static void testGetPrice() {
Shop friend = new Shop("a treasure");
long start = System.nanoTime();
double price = friend.getPrice("MacBook pro");
System.out.printf(friend.getName() + " price is: %.2f%n", price);
long invocationTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("Synchronous calls take time:" + invocationTime + " msecs");
// Other time-consuming operations (sleep)
doSomethingElse();
long retrievalTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("The synchronous method returns the required time:" + retrievalTime + " msecs");
}
// Test asynchronous methods
public static void testGetPriceAsync() throws InterruptedException, ExecutionException {
Shop friend = new Shop("a certain east");
long start = System.nanoTime();
Future<Double> futurePrice = friend.getPriceAsync("MacBook pro");
long invocationTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("Async methods take time:" + invocationTime + " msecs");
// Other time-consuming operations (sleep)
doSomethingElse();
// from future The price is read from the object, blocking if the price is unknown
double price = futurePrice.get();
System.out.printf(friend.getName() + " price is: %.2f%n", price);
long retrievalTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("The time it takes for an async method to return:" + retrievalTime + " msecs");
}
The reason why microseconds are used here is because the amount of code is too small, and if you use milliseconds, you can't see the difference at all. After running, you will find that the asynchronous time is greatly shortened.
Suppose now that we have made a website, we need to query the price of the same product on different e-commerce platforms (assuming such an interface has been implemented), then obviously, if we want to find out the prices of all platforms, we need to call them one by one, Like this (for a more realistic effect, the returned price has been adjusted a little):
private double calculatePrice(String product) {
delay();
return new Random().nextDouble() * product.charAt(0) * product.charAt(1);
}
/**
* test client
*
*/
public class ClientTest {
private List<Shop> shops = Arrays.asList(
new Shop("taobao.com"),
new Shop("tmall.com"),
new Shop("jd.com"),
new Shop("amazon.com")
);
// Returns the item price for each store by name
public List<String> findPrice(String product) {
List<String> list = shops.stream()
.map(shop ->
String.format("%s price is %.2f RMB",
shop.getName(), shop.getPrice(product)))
.collect(Collectors.toList());
return list;
}
// Synchronous way to achieve findPrices method to query each store
public void test() {
long start = System.nanoTime();
List<String> list = findPrice("IphoneX");
System.out.println(list);
System.out.println("Done in " + (System.nanoTime() - start) / 1_000_000 + " ms");
}
public static void main(String[] args) {
ClientTest client = new ClientTest();
client.test();
}
}
Since the synchronous method is called, the result query is slow - uncle can't bear it!
Wouldn’t it be faster if we could query all e-commerce platforms at the same time? You can try, using parallel streams in streaming computing:
// Returns the item price for each store by name
public List<String> findPrice(String product) {
List<String> list = shops.parallelStream()// Use parallel streams
.map(shop ->
String.format("%s price is %.2f RMB",
shop.getName(), shop.getPrice(product)))
.collect(Collectors.toList());
return list;
}
After changing it, try again, it is much faster!
We can use the CompletableFuture we have learned to transform it again:
// use CompletableFuture make an asynchronous request
// Here two different Stream Pipelining instead of putting two one after the other on the same pipeline that processes the stream map operate
// There is actually a reason for this: considering the latency characteristics between stream operations, if the stream is processed in a single pipeline, requests to different merchants can only succeed if they are executed synchronously and sequentially
// Therefore, each create CompletableFuture The object can only perform the action and notification of querying the specified merchant after the previous operation is completed. join()The method returns the result of the calculation
public List<String> findPrice(String product) {
List<CompletableFuture<String>> futures =
shops.parallelStream()
.map(shop -> CompletableFuture.supplyAsync(
() -> String.format("%s price is %.2f RMB",
shop.getName(), shop.getPrice(product))))
.collect(Collectors.toList());
return futures.stream()
// Wait for all asynchronous operations to finish ( join and Future in the interface get have the same meaning)
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
This way, a new CompletableFuture object cannot be created until the previous operation has completely completed. And using two different Stream pipelines, you can also create a new CompletableFuture object before the previous CompletableFuture is executed. Its execution process is as follows:
Is there any room for improvement? Of course there is! But the code is too complex, and in most cases, all the code listed above is enough to solve 90% of the problems in real work. However, I still post the code of CompletableFuture combined with the custom Executor, so that there is a general concept (doing not encourage bullshitting).
// use custom Executor configure CompletableFuture
public List<String> findPrice(String product) {
// A custom executor for the "best price finder" app Execotor
Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
(Runnable r) -> {
Thread thread = new Thread(r);
// Use daemon threads, this way will not prevent the shutdown of the program
thread.setDaemon(true);
return thread;
}
);
// put the executor Execotor passed as the second parameter to supplyAsync factory method
List<CompletableFuture<String>> futures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> String.format("%s price is %.2f RMB",
shop.getName(), shop.getPrice(product)), executor))
.collect(Collectors.toList());
return futures.stream()
// Wait for all asynchronous operations to finish ( join and Future in the interface get have the same meaning)
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
This is basically what CompletableFuture is all about. To sum up, there are two ways to perform parallel computing on sets:
1. Either convert it to a parallel stream, and then use operations such as map to work
2. Either enumerate each element in the collection, create a new thread, and operate within CompletableFuture
CompletableFuture provides more flexibility, it can adjust the size of the thread pool to ensure that the overall calculation will not be blocked due to I/O of the thread. So the usage suggestion is:
1. If the operation is computationally intensive and there is no I/O operation, it is recommended to use parallel stream()
2. If the parallel computing unit also involves waiting for I/O operations (including network connection waiting), then using CompletableFuture is more flexible.
Thank you for your visit! For questions about technology, products, operations and management, please follow and leave a message. Welcome to harassment, very honored~