Not envy mandarin duck not envy fairy, a line of code half a day. Original: Taste of Little Sister (wechat official ID: XjjDog), welcome to share, please reserve the source.

Many students like lambda expressions, which allow you to define short, concise functions that demonstrate your coding prowess. Of course, this feature can be a disadvantage for some companies that measure work by lines of code.

The code snippet below, for example, reads like a poem. But if you don’t use it well, it can be fatal.

List<Integer> transactionsIds = 
widgets.stream()
             .filter(b -> b.getColor() == RED)
             .sorted((x,y) -> x.getWeight() - y.getWeight())
             .mapToInt(Widget::getWeight)
             .sum();
Copy the code

The key function in this code is stream. It allows you to turn a normal list into a stream, and then you can manipulate the list in a pipe-like fashion. Anyway, they all say yes.

Aren’t you familiar with these functions? Check out: What does flatMap mean?

The question

What happens if we change stream to parallelStream?

Literally, streams go from serial to parallel.

Since it’s parallel, there must be some thread safety issues in it. We’re not talking about using thread-safe collections here, though; that’s too low-level. Knowing how to use thread-safe collections in thread-unsafe environments is now a basic skill.

This time, the problem is the performance of parallel streams.

We let the code do the talking.

In the following code, eight threads are started, all of which are using parallel streams for data computation. In the execution logic, we let each task sleep for 1 second so that we can simulate the time-consuming wait for some I/O requests.

With stream, the program returns after 30 seconds, but we expect it to return in more than a second, since it’s a parallel stream and deserves the title.

Our tests revealed that we waited a long time before the mission was completed.

static void paralleTest(a) {
    List<Integer> numbers = Arrays.asList(
            0.1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29
    );
    final long begin = System.currentTimeMillis();
    numbers.parallelStream().map(k -> {
        try {
            Thread.sleep(1000);
            System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return k;
    }).collect(Collectors.toList());
}

public static void main(String[] args) {
// System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
    new Thread(() -> paralleTest()).start();
}
Copy the code

pit

In fact, this code takes different amounts of time to execute on different machines.

If it’s parallel, there must be a degree of parallelism. It’s too low to show parallelism; It’s too big, and it wastes context switching time. I was dismayed to find that many advanced developers, who knew all the parameters of the thread pool by heart and tuned it, dared to turn a blind eye to parallelStream for I/ O-intensive services.

To understand this parallelism, we need to look at the concrete constructor. This code is found in the ForkJoinPool class.

try {  // ignore exceptions in accessing/parsing properties
    String pp = System.getProperty
        ("java.util.concurrent.ForkJoinPool.common.parallelism");
    if(pp ! =null)
        parallelism = Integer.parseInt(pp);
    fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(
        "java.util.concurrent.ForkJoinPool.common.threadFactory");
    handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(
        "java.util.concurrent.ForkJoinPool.common.exceptionHandler");
} catch (Exception ignore) {
}

if (fac == null) {
    if (System.getSecurityManager() == null)
        fac = defaultForkJoinWorkerThreadFactory;
    else // use security-managed default
        fac = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
    (parallelism = Runtime.getRuntime().availableProcessors() - 1) < =0)
    parallelism = 1;
if (parallelism > MAX_CAP)
    parallelism = MAX_CAP;
Copy the code

As you can see, the degree of parallelism is controlled by the following parameters. If this parameter is not available, it is used by defaultCPU number 1Degree of parallelism.

As you can see, this function is designed for computationally intensive business. If you feed it a bunch of tasks, it goes from parallel execution to something that looks like serial execution.

-Djava.util.concurrent.ForkJoinPool.common.parallelism=N
Copy the code

Even if you use – Djava.util.concurrent.ForkJoinPool.com mon. Parallelism size = N set up an initial value, it still has a problem.

Because,parallelismThis variable is final and cannot be changed once set. In other words, the above parameters are valid only once.

John might have used the following code to set the parallelism size to 20.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism"."20");
Copy the code

Li Si may have set this value to 30 in the same way. Which value is actually used in the project depends on how the JVM loads the class information.

This approach is not very reliable.

A solution

We can separate different types of tasks by providing an external forkJoinpool, that is, by changing the submission method.

The code shown below enables task separation by explicit code submission.

ForkJoinPool pool = new ForkJoinPool(30);

final long begin = System.currentTimeMillis();
try {
    pool.submit(() ->
            numbers.parallelStream().map(k -> {
                try {
                    Thread.sleep(1000);
                    System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return k;
            }).collect(Collectors.toList())).get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
Copy the code

In this way, different scenes can have different degrees of parallelism. This is similar to CountDownLatch in that we need to manage resources manually.

Using this method, the amount of code increased, which has little to do with elegance, is not only not elegant, but ugly. The white swan has become an ugly duckling, do you still love it?

Xjjdog is a public account that doesn’t allow programmers to get sidetracked. Focus on infrastructure and Linux. Ten years architecture, ten billion daily flow, and you discuss the world of high concurrency, give you a different taste.