Source: Coffee latte

1. Introduction

When you are young, you must have imagined that if you could be as good as Naruto, you could play games and have classes at the same time. Unfortunately, manga is manga, and there is no such technology in reality. You either have to attend classes honestly, or you have to skip classes and play games. Although we can’t realize the technology of multiplex in reality, we can realize our desire in the computer world.

2. Being in two places in a computer

The ability to be at once in a computer doesn’t come naturally. In 1971, Intel introduced the world’s first general-purpose microprocessor, the 4004, with 2,300 transistors. At the time, Gordon Moore, one of the company’s co-founders, came up with what became known as Moore’s Law: every 18 months, the number of transistors on a chip would double. The original main frequency was 740kHz(740,000 beats per second). Now it’s almost 50 years old, and when you buy a computer, you’ll find that the main frequency is now 4.0GHZ (4 billion beats per second).

But the higher the frequency, the smaller the benefit:

  • According to calculation, the main frequency every increase 1G, power consumption will rise 25 watts, and in the chip power consumption more than 150 watts, the existing air cooling cooling system will not be able to meet the needs of heat dissipation. Part of the CPU could be used to fry eggs.

  • The pipeline is too long, resulting in low efficiency of unit frequency. In fact, the overall performance of the larger main frequency is inferior to the small main frequency.

  • Gordon Moore thinks Moore’s Law will expire in the next 10-20 years.

In the case of single-core frequency bottleneck, multi-core CPU emerged at the historic moment, not only improve the performance, and reduce the power consumption. So the multi-core CPU has gradually become the mainstream of the current market, so that our multi-threaded programming is also easier.

When it comes to multi-core CPU, we must say GPU, you may be unfamiliar with this, but when it comes to graphics card, I did CUDA programming for a period of time, I realized that this is the real parallel computing, everyone knows the picture pixel point, for example, the picture of 19201080 has 2.1 million pixels. If you wanted to convert every pixel of an image, you would have to loop through it 2.1 million times in Java. Even if we were using a multi-threaded 8-core CPU, we’d have to loop hundreds of thousands of times. However, with Cuda, up to 365535*512=100661760(100 million) threads can be executed in parallel, and images at this level can be processed immediately. But Cuda is generally good for graphics, where there are a lot of pixels to work with at the same time, but the instruction set is small so the logic can’t be too complex.

GPU is only used to expand the introduction, interested can communicate with the author.

3. Parallelism in applications

When it comes to making your service high-performance, asynchronization and parallelization will definitely come to mind at the first time. In the previous article: “Asynchronization, your high concurrency killer” has introduced the optimization method of asynchronization, interested friends can have a look. Parallelization can be used with asynchrony, or it can be optimized alone.

We can think of such a requirement that when you place a takeout order, the order may also need to check user information, discount information, merchant information, dish information and so on, which can be called synchronously, as shown in the picture below:

Imagine the five query service, the average consumption 50 ms, then the call is at least 250 ms, we consider, in this the five service there is no any rely on, who first who get after obtaining can, then we can think about it, whether can use multiple member of the movie, for the five service information at the same time?

The optimization is as follows:

These five query services can be queried in parallel, ideally optimized to 50ms. Easy to say, of course, but how do we actually land?

  • 3.1 CountDownLatch/Phaser

CountDownLatch and Phaser are synchronization classes provided by the JDK. Phaser was provided after version 1.7 and CountDownLatch was provided after version 1.5. A brief description of CountDownLatch, which can be thought of as a counter, await() method can block until time out or the counter falls to zero, and other threads can reduce by one as they complete their goal, using this mechanism we can use it for concurrency.

We can use the following code to implement our order placing requirements:

public class CountDownTask { private static final int CORE_POOL_SIZE = 4; private static final int MAX_POOL_SIZE = 12; private static final long KEEP_ALIVE_TIME = 5L; private final static int QUEUE_SIZE = 1600; protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE)); Public static void main(String[] args) throws InterruptedException {// Create a counter CountDownLatch CountDownLatch = new CountDownLatch(5); OrderInfo orderInfo = new OrderInfo(); Thread_pool.execute (() -> {system.out.println (" currentThread()) + thread.currentThread ().getName()); orderInfo.setCustomerInfo(new CustomerInfo()); countDownLatch.countDown(); }); Thread_pool.execute (() -> {system.out.println (" currentThread Discount, thread.currentthread ().getname ())); orderInfo.setDiscountInfo(new DiscountInfo()); countDownLatch.countDown(); }); Thread_pool.execute (() -> {system.out.println (" thread.currentThread ().getName()) "); orderInfo.setFoodListInfo(new FoodListInfo()); countDownLatch.countDown(); }); Thread_pool.execute (() -> {system.out.println (" Tenant, current task, Thread name :" + thread.currentThread ().getName()); orderInfo.setTenantInfo(new TenantInfo()); countDownLatch.countDown(); }); Thread_pool.execute (() -> {system.out.println (" currentThread()) + thread.currentThread ().getName()); orderInfo.setOtherInfo(new OtherInfo()); countDownLatch.countDown(); }); countDownLatch.await(1, TimeUnit.SECONDS); System.out.println(" Thread: "+ thread.currentThread ().getName()); }}Copy the code

Set up a thread pool (specific configuration according to the specific business, specific machine configuration), execute our task concurrently (generate user information, food information, etc.), and finally block with await method to wait for the result to return successfully.

  • 3.2 CompletableFuture

I believe all of you have found that CountDownLatch can realize the functions we need to meet, but there is still a problem. Our business code needs to couple CountDownLatch’s code. For example, if we execute countdownlatch.countdown () after we get the user’s information, it is obvious that our business code should not care about this part of the logic, and if we miss it during development, our await method will only be woken up with various exceptions.

So in JDK1.8 there is a class CompletableFuture, which is a multifunctional, non-blocking Future. (Future: Used to represent asynchronous results, and provides methods for checking the completion of a calculation, waiting for completion, and retrieving results.) In my previous post, I covered asynchronous Skills Complete Future in detail, so if you’re interested, check this out.

We use completableFuture. allOf to aggregate into a big CompletableFuture, then block with get() method.

public class CompletableFutureParallel { private static final int CORE_POOL_SIZE = 4; private static final int MAX_POOL_SIZE = 12; private static final long KEEP_ALIVE_TIME = 5L; private final static int QUEUE_SIZE = 1600; protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE)); public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { OrderInfo orderInfo = new OrderInfo(); // List of CompletableFuture <CompletableFuture> futures = new ArrayList<>(); Futures. The add (CompletableFuture. RunAsync (() - > {System. Out. Println (" Customer current tasks, thread name is: "+ Thread.currentThread().getName()); orderInfo.setCustomerInfo(new CustomerInfo()); }, THREAD_POOL)); Futures. The add (CompletableFuture. RunAsync (() - > {System. Out. Println (" Discount the current task, thread name is: "+ Thread.currentThread().getName()); orderInfo.setDiscountInfo(new DiscountInfo()); }, THREAD_POOL)); Futures. The add (CompletableFuture. RunAsync (() - > {System. Out. Println (" the current task Food, thread name is: "+ Thread.currentThread().getName()); orderInfo.setFoodListInfo(new FoodListInfo()); }, THREAD_POOL)); Futures. The add (CompletableFuture. RunAsync (() - > {System. Out. Println (" Other current tasks, thread name is: "+ Thread.currentThread().getName()); orderInfo.setOtherInfo(new OtherInfo()); }, THREAD_POOL)); CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); allDoneFuture.get(10, TimeUnit.SECONDS); System.out.println(orderInfo); }}Copy the code

You can see the need for us to be able to complete it quickly using CompletableFuture, but of course that’s not enough.

  • 3.3 Fork/Join

So we’ve done this with CompletableFuture so we’re executing multiple tasks in parallel, but it’s still dependent on our thread pool, and we’re using a blocking queue in our thread pool, so when one of our threads completes a task and needs to go through this blocking queue, there’s definitely going to be a race, ForkJoinTask and ForkJoinPool are available in JDK1.7.

ForkJoinPool has its own Work queue for each thread and uses a work-steal algorithm to prevent thread hunger. Worker threads use LIFO to fetch tasks, but FIFO to steal tasks from other people’s queues, thus reducing lock conflicts.

There are many examples of this framework on the web, so let’s see how we can use the code to fulfill our order requirements above:

public class OrderTask extends RecursiveTask<OrderInfo> { @Override protected OrderInfo compute() { System.out.println(" execute "+ this.getClass().getSimplename () + "thread.currentThread ().getName()"); CustomerTask = new CustomerTask(); TenantTask tenantTask = new TenantTask(); DiscountTask discountTask = new DiscountTask(); FoodTask foodTask = new FoodTask(); OtherTask otherTask = new OtherTask(); invokeAll(customerTask, tenantTask, discountTask, foodTask, otherTask); OrderInfo orderInfo = new OrderInfo(customerTask.join(), tenantTask.join(), discountTask.join(), foodTask.join(), otherTask.join()); return orderInfo; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() -1 ); System.out.println(forkJoinPool.invoke(new OrderTask())); }}class CustomerTask extends RecursiveTask<CustomerInfo>{ @Override protected CustomerInfo compute() { System.out.println(" execute "+ this.getClass().getSimplename () + "thread.currentThread ().getName()"); return new CustomerInfo(); }}class TenantTask extends RecursiveTask<TenantInfo>{ @Override protected TenantInfo compute() { System.out.println(" execute "+ this.getClass().getSimplename () + "thread.currentThread ().getName()"); return new TenantInfo(); }}class DiscountTask extends RecursiveTask<DiscountInfo>{ @Override protected DiscountInfo compute() { System.out.println(" execute "+ this.getClass().getSimplename () + "thread.currentThread ().getName()"); return new DiscountInfo(); }}class FoodTask extends RecursiveTask<FoodListInfo>{ @Override protected FoodListInfo compute() { System.out.println(" execute "+ this.getClass().getSimplename () + "thread.currentThread ().getName()"); return new FoodListInfo(); }}class OtherTask extends RecursiveTask<OtherInfo>{@override protected OtherInfo compute() {system.out.println (" execute "+ This.getclass ().getSimplename () + "Thread.currentThread().getName()"; return new OtherInfo(); }}Copy the code

We define an OrderTask and define five tasks to fetch information, fork these five tasks in compute, and finally obtain the results of these five tasks via Join to fulfill our requirements for parallelization.

  • 3.4 parallelStream

The parallel streaming API is available in JDK1.8, which allows for parallel processing when we use collections. Here is a simple example from 1 to 100:

public class ParallelStream { public static void main(String[] args) { ArrayList<Integer> list = new ArrayList<Integer>(); for (int i = 1; i <= 100; i++) { list.add(i); } LongAdder sum = new LongAdder(); List.parallelstream ().foreach (integer -> {system.out.println (" currentThread "+ thread.currentthread ().getname ()); list.parallelstream ().foreach (integer -> {system.out.println (" currentThread" + thread.currentthread ().getname ())); sum.add(integer); }); System.out.println(sum); }}Copy the code

ParallelStream’s underlying set of forks/joins is also the same set, and the default concurrency level is -1 on the number of available cpus.

  • 3.5 shard

Can imagine there is a demand, every time send vouchers to id in a certain range between the users, such as the range between the millions of users, if want to send a machine, may after all need long time, so the distributed scheduling framework such as: elastic – job provides a piecewise function, such as you use 50 machine, So id%50=0 is on machine 0, and =1 is on machine 1, so our execution time is actually split between different machines.

4. Precautions for parallelization

  • Thread-safe: In parallelStream we use LongAdder instead of using our integers and longs directly because they are unsafe in a multithreaded environment. So thread safety is something we need to pay special attention to.

  • Reasonable parameter configuration: It can be seen that we need to configure a lot of parameters, such as the size of our thread pool, the size of the waiting queue, the size of the parallelism and our waiting timeout time, etc. We need to constantly adjust according to our own business to prevent queue insufficient or unreasonable timeout time.

5. The last

This article covered what parallelism is, its various histories, how it is implemented in Java, and what to look for in parallelism. So hopefully you have a good sense of parallelization. Finally, two quick questions for you:

  1. So we have a task in parallelization and what happens if one of the tasks gets an exception?

  2. When we parallelize, the information of a certain task is not strongly dependent, that is, if there is a problem, we don’t need this part of information. When we parallelize, what should we do if there is an exception in this task?

-END-