Realize TB level data comparison and data sorting on a single machine
Overall process
In the previous chapter, the splitting of large files has been realized Today, let's continue to share the sorting of data.
The logic of file sorting is not complex. Read the data into memory and sort it. Write to disk. Because these files are completely unrelated to each other. It can be parallelized. Let's take a look at the overall structure first.
This mainly involves the following contents:
- Parallel sort controller, whose main function is to control the number of tasks added to the thread pool.
- Thread pool, which is used to execute the current sorting task.
- Sort tasks. Encapsulate the operations involved in sorting into a task. Including reading data from the file to the memory, sorting the contents in the memory, and finally outputting the sorted contents to the original file.
Parallel controller
Let's talk about why there is a parallel controller first.
There are several reasons for the need for parallel controllers.
- Of course, this is related to the rejection strategy of the thread pool. The general strategy of the thread pool is to give priority to the number of core threads; When the number of core threads is running, other tasks will be put into the task queue when they come in; When the task comes in, the task queue is full, and the task comes in, the maximum number of threads will be started; When the most online processes are running and a task comes in, the rejection policy is executed, that is, the RejectedExecutionException exception is thrown. Therefore, when putting in tasks, you need to control the number of tasks. Otherwise, the task will be thrown directly by RejectedExecutionException. If an exception occurs directly, the task is interrupted. So I will have parallel control. Of course, some children's shoes familiar with thread pool may say that CallerRunsPolicy is used. This strategy is that if the maximum threads have been enabled and the queue has been occupied, the submission task is executed in the currently submitted thread. In fact, this is not good. If the submission thread has other things to perform, it will be occupied by this task.
- Since the task is submitted to the thread pool for execution, we don't know when the task will end. When all sorting tasks are submitted to the thread pool, and the sorting tasks in the thread pool are executing sorting, if you do not control at this time, you will start the next stage. When the test is not a particularly large file. It may cause the file to read the file that has not been written, resulting in reading the wrong data.
Based on these two points, I think we need to implement a parallel controller.
After that, why do you want to be a parallel controller. Now let's talk about how to implement the parallel controller?
My plan is like this:
Let me explain
- The task can get a future object after submitting the thread pool
/** * Abnormal execution * * @param fileItem file * @return Abnormal result */ private Future<?> runTask(File fileItem) { // Parallel operation in the form of thread pool Future<?> future = ScheduleTaskThreadPool.INSTANCE.submit( () -> { long startTime = System.currentTimeMillis(); // Convert files to collections List oneFileList = this.fileToList(fileItem); // Insert sort is used to keep the data stable without changing the sequence of previous data, but the time complexity is the square of n Collections.sort(oneFileList); // Quick sort, quick sort, unstable sort, will change the order before and after. // QuickSort.quickSortList(oneFileList); // Then output to file this.listToFile(oneFileList, fileItem); long endTime = System.currentTimeMillis(); return endTime - startTime; }); return future; }
- Add this task to the queue. When the task in the thread pool reaches the full load state (the queue is occupied and the maximum number of cores are started), remove the first task in the queue and wait for the task to end, so as to control the number and speed of thread submission. Don't forget that the submission is over. Wait until the task submission is over.
/** * Process of reading data * * @param dataList File list */ protected void fileSoft(File[] dataList) { System.out.println("Start to execute the thread pool task and output the information"); ScheduleTaskThreadPool.INSTANCE.outPoolInfo(); LinkedList<Future> rsp = new LinkedList<>(); try { for (File fileItem : dataList) { // Asynchronous call data processing Future<?> future = this.runTask(fileItem); // Add data to the end of the queue rsp.add(future); // Statistical counting dataStatistics.dataAdd(1); // First check whether the thread pool is full if (ScheduleTaskThreadPool.INSTANCE.isFull()) { // Gets and removes the header of the queue Future item = rsp.poll(); // Since the failure of a single task here cannot affect other tasks, a single exception handling is required, and the process continues try { Object value = item.get(); // Print task information System.out.println("Time use:" + value); ScheduleTaskThreadPool.INSTANCE.outPoolInfo(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } // All must be completed for (Future item : rsp) { Object value = item.get(); // Print task information System.out.println("Time use:" + value); ScheduleTaskThreadPool.INSTANCE.outPoolInfo(); } } catch (Exception e) { e.printStackTrace(); } }
At this point, the parallel task controller is ready.
Thread pool
Needless to say, you can use the thread pool in java. By the way, you should control the minimum thread, maximum thread, waiting queue and rejection strategy according to the resource situation.
public class ScheduleTaskThreadPool { /** Thread pool object for task scheduling */ public static final ScheduleTaskThreadPool INSTANCE = new ScheduleTaskThreadPool(); /** * The number of core threads in the thread pool. When a task is submitted, the thread pool creates a new thread to execute the task until the current number of threads is equal to corePoolSize; * If the current number of threads is corePoolSize, the tasks that continue to be submitted are saved to the blocking queue and wait to be executed; * If the prestartAllCoreThreads() method of the thread pool is executed, the thread pool will create and start all core threads in advance. */ private static final int MIN_THREAD_NUM = 2; /** * The maximum number of threads allowed in the thread pool. If the current blocking queue is full and the task continues to be submitted, a new thread will be created to execute the task, * * <p>If the current number of threads is less than maximumPoolSize */ private static final int MAX_THREAD_NUM = 4; /** * The maximum number of threads allowed in the thread pool. If the current blocking queue is full and the task continues to be submitted, a new thread is created to execute the task, * * <p>If the current number of threads is less than maximumPoolSize */ private static final int WAIT_NUM = 8; /** * The survival time of the thread when it is idle, that is, the time that the thread continues to survive when there is no task execution. In seconds * * <p>By default, this parameter is only useful when the number of threads is greater than corePoolSize */ private static final int KEEPALIVE = 5; /** * workQueue Must be a BlockingQueue blocking queue. When the number of threads in the thread pool exceeds its corePoolSize, * * <p>The thread will enter the blocking queue and wait for blocking. Through workQueue, the thread pool realizes the blocking function. 1. (1) do not queue, submit directly, and hand over tasks to threads for processing without keeping them. You can use synchronous queue * If there is no thread available to run the task immediately (i.e. all threads in the thread pool are working), the attempt to add the task to the buffer queue will fail, * Therefore, a new thread will be constructed to handle the newly added task and added to the thread pool (corepoolsize -- > maximumpoolsize expansion) * Executors.newCachedThreadPool()This is the strategy adopted * * <p>(2)Unbounded queue can use LinkedBlockingQueue (FIFO based on linked list). Theoretically, the queue can queue an unlimited number of tasks * This will cause new tasks to be added to the queue with all corePoolSize threads working. In this way, the created thread will not exceed corePoolSize, so the value of maximumPoolSize is invalid * * <p>(3)Bounded queues can use ArrayBlockingQueue (bounded queue based on array structure, FIFO) and specify the maximum length of the queue * Using bounded queues can prevent resource exhaustion, but it will also cause the problem that the submitted tasks will be rejected after exceeding the queue size and maximumPoolSize, * * <p>It is difficult to adjust and control. * * <p>Queue waiting for task */ private ArrayBlockingQueue queue = new ArrayBlockingQueue(WAIT_NUM); /** Create a thread factory. Through the custom thread factory, you can set a thread name with recognition for each new thread */ private ThreadFactory factory = new ScheduleTaskThreadFactory(); /** * Policy description 1, ThreadPoolExecutor Abortpolicy: discards the task and throws a RejectedExecutionException exception. * * <p>2,ThreadPoolExecutor.DiscardPolicy: It also discards the task without throwing an exception. * * <p>3,ThreadPoolExecutor.DiscardOldestPolicy: Discard the task at the top of the queue and try to execute the task again (repeat this process) * * <p>4,ThreadPoolExecutor.CallerRunsPolicy: The calling thread handles the task. This policy tries to add the current task again. It will automatically call the execute() method repeatedly until it succeeds. */ private ThreadPoolExecutor pool = new ThreadPoolExecutor( MIN_THREAD_NUM, MAX_THREAD_NUM, KEEPALIVE, TimeUnit.SECONDS, queue, factory, new ThreadPoolExecutor.AbortPolicy()); /** * Submit the thread with return value to the thread pool to run * * @param task */ public Future<?> submit(Callable task) { return pool.submit(task); } /** * Submit task to thread pool for running * * @param task */ public Future<?> submit(Runnable task) { return pool.submit(task); } /** * Currently, the thread pool is full * * @return */ public boolean isFull() { return pool.getPoolSize() == pool.getMaximumPoolSize(); } public void outPoolInfo() { StringBuilder outData = new StringBuilder(); outData.append("pool Core Size:").append(pool.getCorePoolSize()).append(","); outData.append("curr pool size:").append(pool.getPoolSize()).append(","); outData.append("max pool Size:").append(pool.getMaximumPoolSize()).append(","); outData.append("queue size:").append(pool.getQueue().size()).append(","); outData.append("task completed size:").append(pool.getCompletedTaskCount()).append(","); outData.append("active count size:").append(pool.getActiveCount()).append(","); outData.append("task count size:").append(pool.getTaskCount()).append(","); outData.append(Symbol.LINE); outData.append(Symbol.LINE); System.out.println(outData.toString()); } public void shutdown() { pool.shutdown(); } }
Summary:
After the analysis, don't you think it's complicated? As long as the parallel controller and thread pool are well controlled, it's not so difficult to do this. For the parallel controller, pay special attention to match the submission speed with the operation of the thread pool, otherwise the RejectedExecutionException exception occurs. Thread pool is relatively easy. Just use the thread pool that comes with java.
So far, parallel sorting is done. If you want to see more code, check out my github:
https://github.com/kkzfl22/datastruct/blob/master/src/main/java/com/liujun/datastruct/datacompare/bigfilecompare/common/BigFileSort.java
In the next section, I will continue to share the de duplication and comparison output of ordered files.