8 asynchronous implementation methods, performance burst!

Asynchronous execution is no stranger to developers. In the actual development process, asynchronous execution is often used in many scenarios. Compared with synchronous execution, asynchronous execution can greatly shorten the time-consuming request link, such as: sending SMS, email, asynchronous Updates, etc., these are typical scenarios that can be implemented asynchronously.

What is asynchronous?

First, let's look at a common scenario where a user places an order:

In the synchronous operation, when we execute sending SMS, we must wait for this method to be completely executed before performing the operation of gifting points. If the action of gifting points takes a long time to execute, we need to wait for sending SMS. This is a typical synchronization scenario.

In fact, there is no dependency between sending text messages and gifting credits. Through asynchrony, we can realize that the two operations of gifting credits and sending text messages can be performed at the same time, such as:

This is the so-called asynchrony, isn't it very simple, let's talk about several implementations of asynchrony.

Eight implementations of asynchrony

  1. Thread Thread

  1. Future

  1. Asynchronous framework CompletableFuture

  1. Spring annotation @Async

  1. Spring ApplicationEvent event

  1. message queue

  1. Third-party asynchronous frameworks, such as Hutool's ThreadUtil

  1. Guava asynchronous

1. Thread asynchronous

public class AsyncThread extends Thread {

    @Override
    public void run() {
        System.out.println("Current thread name:" + Thread.currentThread().getName() + " Send email success!");
    }

    public static void main(String[] args) {
        AsyncThread asyncThread = new AsyncThread();
        asyncThread.run();
    }
}

Of course, if a Thread thread is created every time, frequently created and destroyed, and system resources are wasted, we can use the thread pool:

private ExecutorService executorService = Executors.newCachedThreadPool();

public void fun() {
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            log.info("Execute business logic...");
        }
    });
}

Business logic can be encapsulated into Runnable or Callable, and executed by the thread pool.

2. Future is asynchronous

@Slf4j
public class FutureManager {

    public String execute() throws Exception {

        ExecutorService executor = Executors.newFixedThreadPool(1);
        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {

                System.out.println(" --- task start --- ");
                Thread.sleep(3000);
                System.out.println(" --- task finish ---");
                return "this is future execute final result!!!";
            }
        });

        //The main thread will be blocked when the return value is needed here
        String result = future.get();
        log.info("Future get result: {}", result);
        return result;
    }

    @SneakyThrows
    public static void main(String[] args) {
        FutureManager manager = new FutureManager();
        manager.execute();
    }
}

Output result:

 --- task start ---  --- task finish --- Future get result: this is future execute final result!!!

Shortcomings of Future

The shortcomings of Future include the following points:

  1. Unable to passively receive the calculation results of asynchronous tasks: Although we can actively submit asynchronous tasks to threads in the thread pool for execution, after the asynchronous task is executed, the main thread cannot be notified whether the task is completed or not. It needs to pass the get method Actively obtain the results of task execution.

  1. Future components are isolated from each other: sometimes after a long-time-consuming asynchronous task is executed, you want to use the result returned by it to perform further calculations. This calculation will also be an asynchronous task, and the relationship between the two requires program development. Personnel manually assign the binding, Future cannot form a task flow (pipeline), and each Future is isolated from each other, so there is the following CompletableFuture, CompletableFuture can connect multiple Futures in series Form a task flow.

  1. Futrue does not have a good error handling mechanism: so far, if an asynchronous task has an exception during execution, the caller cannot passively perceive it, and must know whether an asynchronous task execution has an error by catching the exception of the get method. So as to make further judgment processing.

3. CompletableFuture implements asynchronous

public class CompletableFutureCompose {

    /**
     * thenAccept Subtasks and parent tasks share the same thread
     */
    @SneakyThrows
    public static void thenRunAsync() {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
        CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something...");
        });
        //Wait for task 1 to complete
        System.out.println("cf1 result->" + cf1.get());
        //Wait for task 2 to complete
        System.out.println("cf2 result->" + cf2.get());
    }

    public static void main(String[] args) {
        thenRunAsync();
    }
}

We don't need to explicitly use ExecutorService. CompletableFuture internally uses ForkJoinPool to handle asynchronous tasks. If we want to customize our own asynchronous thread pool in some business scenarios, it is also possible.

4. Spring's @Async is asynchronous

Custom asynchronous thread pool:

/**
 * Thread pool parameter configuration, multiple thread pools to achieve thread pool isolation, @Async annotation, the system custom thread pool is used by default, multiple thread pools can be set in the project, when calling asynchronously, specify the name of the thread pool to be called, For example: @Async("taskName")
 **/
@EnableAsync
@Configuration
public class TaskPoolConfig {

    /**
     * custom thread pool
     *
     * @author: jacklin
     * @since: 2021/11/16 17:41
     **/
    @Bean("taskExecutor")
    public Executor taskExecutor() {
        //Returns the number of Java virtual machines with available processors 12
        int i = Runtime.getRuntime().availableProcessors();
        System.out.println("The maximum number of threads in the system: " + i);
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //Core thread pool size
        executor.setCorePoolSize(16);
        //Maximum number of threads
        executor.setMaxPoolSize(20);
        //Configure the queue capacity, the default value is Integer.MAX_VALUE
        executor.setQueueCapacity(99999);
        //active time
        executor.setKeepAliveSeconds(60);
        //thread name prefix
        executor.setThreadNamePrefix("asyncServiceExecutor -");
        //Sets the maximum number of seconds this executor should block on shutdown to wait for remaining tasks to finish their execution before the rest of the container proceeds with shutdown
        executor.setAwaitTerminationSeconds(60);
        //Wait for all tasks to end before closing the thread pool
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

AsyncService:

public interface AsyncService {

    MessageResult sendSms(String callPrefix, String mobile, String actionType, String content);

    MessageResult sendEmail(String email, String subject, String content);
}

@Slf4j
@Service
public class AsyncServiceImpl implements AsyncService {

    @Autowired
    private IMessageHandler mesageHandler;

    @Override
    @Async("taskExecutor")
    public MessageResult sendSms(String callPrefix, String mobile, String actionType, String content) {
        try {

            Thread.sleep(1000);
            mesageHandler.sendSms(callPrefix, mobile, actionType, content);

        } catch (Exception e) {
            log.error("Sending SMS is abnormal -> ", e)
        }
    }


    @Override
    @Async("taskExecutor")
    public sendEmail(String email, String subject, String content) {
        try {

            Thread.sleep(1000);
            mesageHandler.sendsendEmail(email, subject, content);

        } catch (Exception e) {
            log.error("send email abnormal -> ", e)
        }
    }
}

In actual projects, using @Async to call the thread pool, the recommended way is to use the custom thread pool mode, and it is not recommended to directly use @Async to directly implement asynchrony.

5. Spring ApplicationEvent event implements asynchronous

Define events:

public class AsyncSendEmailEvent extends ApplicationEvent {

    /**
     * Mail
     **/
    private String email;

   /**
     * theme
     **/
    private String subject;

    /**
     * content
     **/
    private String content;
  
    /**
     * receiver
     **/
    private String targetUserId;

}

Define event handlers:

@Slf4j
@Component
public class AsyncSendEmailEventHandler implements ApplicationListener<AsyncSendEmailEvent> {

    @Autowired
    private IMessageHandler mesageHandler;
    
    @Async("taskExecutor")
    @Override
    public void onApplicationEvent(AsyncSendEmailEvent event) {
        if (event == null) {
            return;
        }

        String email = event.getEmail();
        String subject = event.getSubject();
        String content = event.getContent();
        String targetUserId = event.getTargetUserId();
        mesageHandler.sendsendEmailSms(email, subject, content, targerUserId);
      }
}

In addition, ApplicationEvent may sometimes be used to implement asynchronous use. When an abnormal error occurs in the program, a compensation mechanism needs to be considered. At this time, Spring Retry can be used to retry to help us avoid data inconsistency caused by such an exception.

6. Message queue

Callback event message producer:

@Slf4j
@Component
public class CallbackProducer {

    @Autowired
    AmqpTemplate amqpTemplate;

    public void sendCallbackMessage(CallbackDTO allbackDTO, final long delayTimes) {

        log.info("The producer sends a message, callbackDTO,{}", callbackDTO);

        amqpTemplate.convertAndSend(CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getExchange(), CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getRoutingKey(), JsonMapper.getInstance().toJson(genseeCallbackDTO), new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //Set the delay millisecond value for the message, and set the delay time for the message to be sent from the switch to the queue by setting the x-delay header to the message
                message.getMessageProperties().setHeader("x-delay", delayTimes);
                message.getMessageProperties().setCorrelationId(callbackDTO.getSdkId());
                return message;
            }
        });
    }
}

Callback event message consumer:

@Slf4j
@Component
@RabbitListener(queues = "message.callback", containerFactory = "rabbitListenerContainerFactory")
public class CallbackConsumer {

    @Autowired
    private IGlobalUserService globalUserService;

    @RabbitHandler
    public void handle(String json, Channel channel, @Headers Map<String, Object> map) throws Exception {

        if (map.get("error") != null) {
            //deny the news
            channel.basicNack((Long) map.get(AmqpHeaders.DELIVERY_TAG), false, true);
            return;
        }

        try {

            CallbackDTO callbackDTO = JsonMapper.getInstance().fromJson(json, CallbackDTO.class);
            //Execute business logic
            globalUserService.execute(callbackDTO);
            //The message is successfully manually confirmed, corresponding to the message confirmation mode acknowledge-mode: manual
            channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);

        } catch (Exception e) {
            log.error("Callback failed -> {}", e);
        }
    }
}

7. ThreadUtil asynchronous tool class

@Slf4j
public class ThreadUtils {

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            ThreadUtil.execAsync(() -> {
                ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
                int number = threadLocalRandom.nextInt(20) + 1;
                System.out.println(number);
            });
            log.info("Current page:" + i + "threads");
        }

        log.info("task finish!");
    }
}

8. Guava asynchronous

Guava's ListenableFuture, as the name suggests, is a Future that can be listened to, and it is an extension and enhancement of java's native Future. We know that Future represents an asynchronous calculation task, and the calculation result can be obtained when the task is completed. If we want to display the results to the user or do other calculations once the calculation is completed, we must use another thread to continuously query the calculation status. In doing so, the code is complex and inefficient. Using Guava ListenableFuture can help us detect whether the Future is completed, no need to wait for the asynchronous calculation result through the get() method, and automatically call the callback function if it is completed, which can reduce the complexity of concurrent programs.

ListenableFuture is an interface that inherits from jdk's Future interface, adding the void addListener(Runnable listener, Executor executor) method.

Let's see how to use ListenableFuture. First you need to define an instance of ListenableFuture:

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
final ListenableFuture<Integer> listenableFuture = executorService.submit(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        log.info("callable execute...")
            TimeUnit.SECONDS.sleep(1);
        return 1;
    }
});

First, initialize a ListeningExecutorService method through the static method listeningDecorator method of the MoreExecutors class, and then use the submit method of this instance to initialize the ListenableFuture object.

The work to be done by ListenableFuture is defined in the implementation class of the Callable interface. Here it just sleeps for 1 second and then returns a number 1. With an instance of ListenableFuture, you can execute this Future and execute the callback function after the Future is completed.

Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(Integer result) {
        //Successfully executed...
        System.out.println("Get listenable future's result with callback " + result);
    }

    @Override
    public void onFailure(Throwable t) {
        //Exception handling...
        t.printStackTrace();
    }
});

So, the above are the 8 ways to achieve asynchrony introduced in this issue.

Tags: Java programming language

Posted by ltd on Mon, 30 Jan 2023 17:42:28 +1030