1. The background

This article is a short introduction to parallel processing in Java.

Note: More than 10 minutes to finish the article I call short articles, suitable for fast reading.

2. Knowledge

Parallel computing generally refers to a computing model in which many instructions can be executed simultaneously. The computational process can be broken down into small parts and then solved in a concurrent manner under the premise of simultaneous processing.

That is, it breaks down into several processes:

  • 1. Break a large task into subtasks, and subtasks can be broken up.
  • 2. All sub-tasks are executed simultaneously.
  • 3. After the execution, there may be a “generalization” task, such as summing, averaging, etc.

A more simplified understanding is: split first -> calculate at the same time -> finally “summarize”

Why parallelism? What are the advantages?

  • In order to get “save time”, “fast”. Suitable for large-scale computing scenarios. In theory, it is possible to perform n times faster on n parallel processing machines than on a single processor.
  • 2, before the computer is single-core, modern computer Cpu is multi-core, server is even multi-CPU, parallel computing can make full use of the performance of the hardware.

Parallel processing in Java

The Stream API (java.util.stream) added to JDK 8 brings functional programming for the generated environment to Java libraries, making it easier for developers to write more efficient and concise code.

Another value of Steam is its creative support for parallel processing. Example:

final Collection< Task > tasks = Arrays.asList( new Task( Status.OPEN, 5 ), new Task( Status.OPEN, 13 ), new Task( Status.CLOSED, 8 ) ); // Execute multiple tasks in parallel, Final double totalPoints = tasks.stream ().parallel().map(task -> task.getpoints ()) // or map(task ::getPoints) ) .reduce( 0, Integer::sum ); System.out.println( "Total points (all tasks): " + totalPoints );Copy the code

For the tasks set above, the code above calculates the sum of points for all the tasks. It uses the Parallel method to process all tasks in parallel and uses the Reduce method to compute the final result.

4. The extension

Parallel processing is implemented by thread pool

Jdk1.5 introduces and sends packages, including ThreadPoolExecutor, as follows:

public class ExecutorServiceTest { public static final int THRESHOLD = 10_000; public static long[] numbers; public static void main(String[] args) throws Exception { numbers = LongStream.rangeClosed(1, 10_000_000).toArray(); ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1); CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executor); int taskSize = (int) (numbers.length / THRESHOLD); for (int i = 1; i <= taskSize; i++) { final int key = i; completionService.submit(new Callable<Long>() { @Override public Long call() throws Exception { return sum((key - 1) * THRESHOLD, key * THRESHOLD); }}); } long sumValue = 0; for (int i = 0; i < taskSize; i++) { sumValue += completionService.take().get(); } system.out. println("sumValue = "+ sumValue); executor.shutdown(); } private static long sum(int start, int end) { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; }} Copy the codeCopy the code

Use the fork/join framework

The purpose of the branch/merge framework is to recursively break parallel tasks into smaller tasks and then combine the results of each sub-task to produce an overall result; The relevant codes are as follows:

public class ForkJoinTest extends java.util.concurrent.RecursiveTask<Long> { private static final long serialVersionUID = 1L; private final long[] numbers; private final int start; private final int end; public static final long THRESHOLD = 10_000; public ForkJoinTest(long[] numbers) { this(numbers, 0, numbers.length); } private ForkJoinTest(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { int length = end - start; if (length <= THRESHOLD) { return computeSequentially(); } ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2); leftTask.fork(); ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end); Long rightResult = rightTask.compute(); Long leftResult = lefttask.join (); Long leftResult = lefttask.join (); Long leftResult = lefttask.join (); return leftResult + rightResult; } private long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } public static void main(String[] args) { System.out.println(forkJoinSum(10_000_000)); } public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask<Long> task = new ForkJoinTest(numbers); return new ForkJoinPool().invoke(task); }}Copy the code

The above code implements the recursive dismantling of the molecular task and puts it into a thread pool for execution.

5. Reference:

Zh.wikipedia.org/wiki/%E5%B9…

END