background

One of the company’s ETL projects is to download and parse data from CSV files and HDFS platforms on bloBS into the business’s Mysql

The data volume is about 20 files per hour (basically concentrated at about 50 minutes per hour), and each file contains 80,000-200,000 pieces of data, which are stored in different tables

At the time of storage is to parse the file into a batch of 1000 batch insert (space is limited, here only talk about the scene of storage). Parallel () with stream.parallel () in JDk1.8

The problem

After running for a period of time, it is found that with the increase of file quantity, the time of library entry is getting longer and longer. The analysis shows that there are about 8 library entry threads for each instance, and they wait for each other when they are fully occupied.

Troubleshoot problems

The repository tool code is as follows

DayList <List<T> dayList =... dayList <T> dayList =... dayList <T> dayList =... long start = System.currentTimeMillis(); AtomicInteger atomicInteger = new AtomicInteger(0); Daylist.parallelstream (). ForEach (list->{Integer Integer; Integer = function. Apply (list); } catch (Exception e) { log.error("[ListPageHelper.saveBatchOrSize]-------->{}",e.toString()); / / thread pool connection timeout to try again if (e instanceof CannotAcquireLockException) { log.error("[CannotAcquireLockException]-------->{}",e.toString()); integer = function.apply(it); } else { throw e; } } atomicInteger.addAndGet(integer); }); long end = System.currentTimeMillis(); Log. The info (data "-- -- -- -- -- -- -- -- - > {}, time-consuming -- -- -- -- -- -- -- - > {}", a list. The size (), (end - start));Copy the code

Java stream.parallel () uses ForkJoin, but I have not read it. I took the opportunity to read the source code and found some problems

  1. The total number of working threads in a ForkJoinPool is (CPU x 2-1) + main threads
  2. ForkJoinPool variables are global, which means that the entire project is processed by CPU *2 threads unless it is created by itself.
  3. Forkjoin is forked and then executed. The rest of the forkjoin waits for all threads to traverse before being allocated. For example, I have two lists to traverse: list1(1~1000) and List2 (1001-2000). If list1 forEach first, list2 will wait until list1 releases its free thread.

Based on the above, our problems are as follows:

  1. Only a limited number of threads can be in the library at any one time.
  2. Although the data of the same file is concurrently stored, different files are not concurrently stored before (this is not in line with our business logic, which is useful for business only when the data of all dimensions is completed).

Improved thinking (only look at the logics here)

  1. Short-term contingency strategy (minimum change): This pattern is still used, but requires parallel processing between different files.
  2. The long-term strategy needs to be restructured: a new producer-consumer model with free control over concurrent downloads and concurrent consumption.

Problem solving

Method 1: Increase the number of threads in the global ForkJoinPool, which increases the loading speed but still prevents concurrent execution of files.

/ / this can give ForkJoinPool global Settings 20 thread System. SetProperty (" java.util.concurrent.ForkJoinPool.com mon. Parallelism ", "20");Copy the code

Method 2:

  1. A new ForkJoinPool is created each time it is executed.
  2. Join tasks from stream.parallel () into a new ForkJoinPool. The code is as follows:
DayList =...... dayList <List<T> dayList =...... long start = System.currentTimeMillis(); AtomicInteger atomicInteger = new AtomicInteger(0); ForkJoinPool = new ForkJoinPool(4); forkJoinPool.submit(()->{ dayList.parallelStream().filter(CollectionUtils::isNotEmpty).forEach(list->{ Integer integer; Integer = function. Apply (list); } catch (Exception e) { log.error("[ListPageHelper.saveBatchOrSize]-------->{}",e.toString()); / / thread pool connection timeout to try again if (e instanceof CannotAcquireLockException) { log.error("[CannotAcquireLockException]-------->{}",e.toString()); integer = function.apply(it); } else { throw e; } } atomicInteger.addAndGet(integer); }); }).join(); forkJoinPool.shutdown(); long end = System.currentTimeMillis(); Log. The info (data "-- -- -- -- -- -- -- -- - > {}, time-consuming -- -- -- -- -- -- -- - > {}", a list. The size (), (end - start));Copy the code

doubt

Why using parallelStream to open a thread in a ForkJoinPool submit method takes up the number of threads in the ForkJoinPool? ParallelStream is a common thread pool in ForkJoinPool that can be shared between threads.

Introduction to several important classes in the Fork/Join framework

ForkJoinPool: Implemented from the ExecutorService is a thread executor, and other thread pools are ExecutorService subclasses. ForkJoinTask: Implemented from the Future, can be seen as the task itself. ForkJoinWorkerThread: A subclass of Thread that performs Runnable tasks.

A ForkJoinTask is not strictly a task itself, since it does not implement a Runnable interface, but its subclass AdaptedRunnableAction implements Runnable, which is the logical Runnable that the adapter mode assigns to ForkJoinFask tasks.

A ForkJoinPool consists of a forkJoinWorkQueue [], which contains ForkJoinTask[] and ForkJoinWorkerThread. The ForkJoinWorkerThread holds references to the ForkJoinPool and WorkQueue.

The diagram is as follows

Source code invocation logic diagram

instructions

The submit process

  1. ForkJoinPool# submit () through the adapter class ForkJoinTask. AdaptedRunnableAction create tasks and submit.

  2. ForkJoinPool#externalPush() specifies whether the workQueues exist. If not, create a WorkQueue[].

  3. ForkJoinPool#signalWork() creates a worker thread if the number of active threads is low.

  4. ForkJoinPool#createWorker() creates a worker thread.

  5. ForkJoinWorkerThread(ForkJoinPool pool)

    • createForkJoinWorkerThreadObject and holdForkJoinPoolThe reference.
    • callpool.registerWorker(this)To obtainworkQueueObject.
  6. RegisterWorker (ForkJoinWorkerThread WT) registerWorker(ForkJoinWorkerThread WT) registers thread objects with the WorkQueue and adds the WorkQueue to the ForkJoinPool WorkQueue[].

  7. WorkQueue(ForkJoinPool Pool, ForkJoinWorkerThread Owner) WorkQueue Object This object is used to create a ForkJoinPool or ForkJoinWorkerThread.

The fork process

  1. ForkJoinTask

    fork() calls the fork() method.

  2. Check whether the thread calling fork() is ForkJoinWorkerThread.

    • If you call the workqueue. push method of the current thread directly (here is why the parallelStream open thread takes up the number of threads in the ForkJoinPool)
    • If it’s not called globallyForkJoinPool.common .externalPush(this)
  3. WorkQueue#push(ForkJoinTask
    task) Instead of adding a task to the WorkQueue of the current thread, the ForkJoinPool#signalWork() method is used to add the task to the WorkQueue that has been reassigned to the worker thread in the ForkJoinPool.

Join the process

  1. ForkJoinTask#join()Call doJoin ().
  2. doJoin()callexec()The actual logic for performing sharding tasks in the
  3. getRawResult()Get the results of the execution

ForkJoin is a type of thread that can be added to the thread pool at the second step of the fork process. ParallelStream () is a parallelStream where the logic for creating threads to submit tasks is to call the fork() method.