If you were given a huge file containing 100 million lines of data and asked to convert it into a production database, what would you do?

The above problem is a real business requirement received to migrate an old system history data to a new production system via offline files.

Due to the tight time and large amount of data, the design process came up with solutions:

  • Split the file
  • Multithreaded import

Split the file

First we can write a small program, or use the split command to split the large file into smaller files.

-- split a largeFile into several small files, 100000 lines per file split -l 100000 largefile. TXT -d -d a 4 smallFile_Copy the code

There are two main reasons why we chose to split large files first:

First, if the program directly reads the large file, suppose that halfway through reading, the program suddenly breaks down, so it will directly lose the progress of reading the file, and need to start reading again.

After splitting the file, once the small file is finished reading, we can move the small file to a specified folder.

This way, even if the application goes down and restarts, we only need to read the rest of the file when we re-read it.

Second, a file can only be read by one application, which limits the speed of import.

After the file split, we can adopt the way of multi-node deployment, horizontal expansion. Each node reads a portion of the file, thus doubling the import speed.

Multithreaded import

Once we’ve split the file, then we need to read the file and import it.

When we split, we set each small file to contain 10W lines of data. Due to the concern of reading 10W data into the application at a time, which will lead to too high heap memory occupation and cause frequent “Full GC”, the following uses the stream reading method, reading data line by line.

Of course, if the file is too small after splitting, or the application’s heap memory setting is too large, we can load the file directly into the application memory for processing. It’s relatively easy.

The code read line by line is as follows:

File file = ... try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) { while (iterator.hasNext()) { String line=iterator.nextLine(); convertToDB(line); }}Copy the code

The above code uses the LineIterator class in Commons-io, which uses BufferedReader to read the contents of the file. It encapsulates it in iterator mode so that we can easily read it iteratively.

If you are currently using JDK1.8, you can use the JDK native class Files to convert the file to Stream and read it as follows:

Files.lines(paths.get (" file path "), charset.defaultCharset ()). ForEach (line -> {convertToDB(line); });Copy the code

In fact, take a closer look at Files#lines source code, in fact, the principle is similar to the above LineIterator, also encapsulated into an iterator mode.

Problems with the introduction of multithreading

The code to read above is not difficult to write, but it is inefficient, mainly because only a single thread can proceed to the next line after the previous one has been imported.

To speed up the import, let’s have a few more threads and import concurrently.

Multithreading we will naturally use thread pool, related code modification as follows:

File file = ... ; ExecutorService = new ThreadPoolExecutor(5, 10, 60, timeunit. MINUTES, // Number of files, Assuming the file contains 10W lines new ArrayBlockingQueue<>(10*10000), New ThreadFactoryBuilder().setNameFormat("test-%d").build()); try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) { while (iterator.hasNext()) { String line = iterator.nextLine(); executorService.submit(() -> { convertToDB(line); }); }}Copy the code

In the above code, each line read is handed directly to the thread pool for execution.

We know that thread pools work as follows:

  1. If the number of core threads is not enough, threads are created directly to execute the task.
  2. If the number of core threads is full, the task will be queued.
  3. If the queue is full, another thread will be created to execute the task.
  4. If the maximum number of threads is full and the queue is full, a rejection policy is executed.

Thread pool execution flowchart

Since we set the number of core threads in the thread pool above to 5, we quickly reached the maximum number of core threads and subsequent tasks could only be queued.

In order not to be rejected by the thread pool, we can adopt the following solution:

  • Set the queue size to large, including all lines of the file
  • Set the maximum number of threads to large, greater than the total number of lines in the entire file

The previous two solutions have the same problem. The first one is that it takes up too much memory to load all the contents of the file into memory.

The second creates too many threads, which also takes up too much memory.

Once too much memory is occupied for GC to clean up, it may cause frequent “Full GC”, or even “OOM”, resulting in slow program import.

There is, of course, a third option that combines the first two options and sets the appropriate queue length and maximum number of threads. However, ** “suitable” ** this degree is really difficult to grasp, there is also the “OOM” problem.

Therefore, in order to solve this problem, two solutions have been studied:

  • CountDownLatch is executed in batches
  • Extended thread pool

CountDownLatch is executed in batches

CountDownLatch, provided by the JDK, allows the main thread to wait until the child threads have finished executing before continuing.

Using this feature, we can modify the multithreaded import code, the main logic is as follows:

try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "Utf-8 ")) {List<String> lines = Lists. NewArrayList (); List<ConvertTask> tasks = Lists. NewArrayList (); while (iterator.hasNext()) { String line = iterator.nextLine(); lines.add(line); Lines.size () == 1000) {// Create a new asynchronous task, Add (new ConvertTask(Lists. NewArrayList (lines))); lines.clear(); } if (tasks.size() == 10) { asyncBatchExecuteTask(tasks); Tasks. add(new ConvertTask(Lists. NewArrayList (lines))); // Perform one last asyncBatchExecuteTask(tasks); }Copy the code

In this code, each asynchronous task will import 1000 rows of data, and after accumulating 10 asynchronous tasks, asyncBatchExecuteTask will be called to execute asynchronously using the thread pool.

/** * Batch execute tasks ** @param Tasks */ private static void asyncBatchExecuteTask(List<ConvertTask> Tasks) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(tasks.size()); for (ConvertTask task : tasks) { task.setCountDownLatch(countDownLatch); executorService.submit(task); CountDownLatch. Await (); countDownLatch. Await (); Tasks.clear (); }Copy the code

The asyncBatchExecuteTask method will create CountDownLatch, and the main thread will call the await method to wait for all asynchronous threads to complete.

ConvertTask asynchronous task logic is as follows:

Countdownlatch.countdown () * Otherwise, the main thread will block.  */ private static class ConvertTask implements Runnable { private CountDownLatch countDownLatch; private List<String> lines; public ConvertTask(List<String> lines) { this.lines = lines; } public void setCountDownLatch(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { try { for (String line : lines) { convertToDB(line); } } finally { countDownLatch.countDown(); }}}Copy the code

The ConvertTask task class logic is simple enough to iterate over all the rows and import them into the database. Call countDownLatch#countDown when all data imports are finished.

Once all asynchronous threads have finished executing, call countDownLatch#countDown and the main thread will wake up to continue the file reading.

Although this approach solves the above problem, it requires a certain number of tasks to be accumulated each time before all tasks can be executed asynchronously.

In addition, the next batch of tasks can be started only after all tasks are completed. The batch execution time is equal to that of the slowest asynchronous task.

There’s a certain amount of idle time in the thread pool, so is there a way to squeeze the thread pool and keep it working?

Extended thread pool

Going back to the original problem, file read imports are a ** producer-consumer ** consumption model.

The main thread reads the file as a producer and then places it on the queue.

Asynchronous threads, as consumers, are constantly reading content from queues and importing it into the database.

“Once the queue is full, the producer should block until the consumer consumes the task.”

In fact, we use thread pools as a “producer-consumer” consumption model, which also uses blocking queues.

So why doesn’t the thread pool block when the queue is full?

This is because thread pools internally use the offer method, which “does not block” when the queue is full and instead returns directly.

Is there a way to block the main thread to add tasks when the thread pool queue is full?

We can do this by creating a custom thread pool denial policy and calling BlockingQueue.put to block producers when the queue is full.

RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (! executor.isShutdown()) { try { executor.getQueue().put(r); } catch (InterruptedException e) { // should not be interrupted } } } };Copy the code

Once the pool is full, the main thread will block.

With this approach, we can directly use the multithreaded import code mentioned above.

ExecutorService executorService = new ThreadPoolExecutor( 5, 10, 60, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100), new ThreadFactoryBuilder().setNameFormat("test-%d").build(), (r, executor) -> { if (! Executor.isshutdown ()) {try {// The main thread will block executor.getQueue().put(r); } catch (InterruptedException e) { // should not be interrupted } } }); File File = new File(" File path "); try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) { while (iterator.hasNext()) { String line = iterator.nextLine(); executorService.submit(() -> convertToDB(line)); }}Copy the code

The small knot

A very large file, we can split the file into multiple files, and then deploy multiple applications to speed up the reading.

In addition, we can also use multithreading to load concurrently, but we need to be aware that the thread pool full, will reject subsequent tasks.

By extending the thread pool, we can customize the denial policy to block the main thread.

Well, the content of today’s article is here, do not know if you have other better solutions, welcome to leave a message to discuss.


  • Data structure and algorithm brush problem notes. PDF download

  • Job hunting resume template sharing.doc download

  • Java basic core knowledge summary. PDF download

  • Big data development learning route + knowledge summary