Hi, I’m crooked.

Today I would like to share with you a thread pool after the extension, and I think the extension idea is very good.

Don’t worry, I’m a bit of a title-seeker. I don’t think you’re going to be asked this in an interview, but it’s possible you’re going to be asked this at work.

To introduce this thread pool, LET me give you a scenario that will make it easier to understand.

Take this meme for example.

Let’s say we have two programmers. Let’s call them rich and prosperous.

The emojis above depict a day at work for these two programmers, in programmatic terms.

First we make an object that represents what the programmer is doing:

public class CoderDoSomeThing { private String name; private String doSomeThing; public CoderDoSomeThing(String name, String doSomeThing) { this.name = name; this.doSomeThing = doSomeThing; }}Copy the code

Then, use code to describe what wealth and riches do:

public class NbThreadPoolTest { public static void main(String[] args) { CoderDoSomeThing rich1 = new CoderDoSomeThing(" riches ", "start Idea"); CoderDoSomeThing rich2 = new CoderDoSomeThing(" rich ", "do database, even tomcat, cruD a output "); CoderDoSomeThing rich3 = new CoderDoSomeThing(" riches ", "mouth crazy turn up "); CoderDoSomeThing rich4 = new CoderDoSomeThing(" rich ", "interface access error "); CoderDoSomeThing rich5 = new CoderDoSomeThing(" rich ", "mentality collapse, unload Idea"); CoderDoSomeThing www1 = new CoderDoSomeThing(" wealth ", "start Idea"); CoderDoSomeThing www2 = new CoderDoSomeThing(" wealth ", "database, tomcat, cruD output "); CoderDoSomeThing www3 = new CoderDoSomeThing(" fortune ", "mouth crazy turn up "); CoderDoSomeThing www4 = new CoderDoSomeThing(www4); CoderDoSomeThing www5 = new CoderDoSomeThing(" rich wealth ", "mentality collapse, uninstall Idea"); }}Copy the code

A brief explanation of the variable names shows that I have been thoughtful.

Rich is rich, so the variable name is rich.

Prosperous wealth, is woof woof, so the variable name is called WWW.

If you look at the name of my class, NbThreadPoolTest, you know I’m going to use a thread pool.

In fact, wealth and wealth two people can do their own things, do not interfere with each other, that is, they should be their own threads.

It sounds like you can use a thread pool.

So, I changed the program to look like this to use the thread pool:

public class NbThreadPoolTest { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(5); List<CoderDoSomeThing> coderDoSomeThingList = new ArrayList<>(); CoderDoSomeThingList. Add (new CoderdosomethingSomething(" riches ", "start Idea")); Coderdosomethinglist.add (new CoderdosomethingSomethingSomething.add (new Coderdosomethingsomething.add (tomcat,crud)); CoderDoSomeThingList. Add (new CoderDoSomeThing (" wealth ", "corners of the mouth up crazy")); CoderDoSomeThingList. Add (new CoderDoSomeThing(" riches ", "interface access error ")); Coderdosomethinglist.add (new Coderdosomethingsomething(" riches ", "broken heart, uninstall Idea")); CoderDoSomeThingList. Add (new CoderdosomethingSomethingSomethingSomethingSomethingSomethingSomethingStartIdea); Coderdosomethinglist.add (new CoderdosomethingSomethingSomething.add (new Coderdosomethingsomething.add (tomcat,crud)); CoderDoSomeThingList. Add (new CoderDoSomeThing (" prosperous wealth ", "corners of the mouth up crazy")); Coderdosomethinglist.add (new CoderdosomethingSomethingsomethingsomething.add); CoderDoSomeThingList. Add (new CoderDoSomeThing (" prosperous wealth ", "mentality collapsed, unload the Idea")); coderDoSomeThingList.forEach(coderDoSomeThing -> { executorService.execute(() -> { System.out.println(coderDoSomeThing.toString()); }); }); }}Copy the code

The above program encapsulates the activities of the rich and the rich in a list, and then iterates through the list to dump the contents of the list into the thread pool.

So after the above program is executed, one possible output looks like this:

At first glance, wealth and wealth are doing things at the same time.

But on closer inspection, everyone was doing things in the wrong order.

For example, Prosperous wealth looks a little “schizophrenic”, just started Idea, the corners of the mouth began to rise crazily.

So, this is where I get what I want.

What kind of things do I want?

It is to ensure that the rich and the rich do things at the same time, but also to ensure that they do things in a certain order, that is, according to the order I put into the thread pool to execute.

To put it more formally, it looks like this:

I need a thread pool that ensures that incoming tasks are divided into tasks in one dimension and executed in the order in which they are submitted. This thread pool can increase throughput through parallel processing (multiple threads) while ensuring that tasks within a certain range run in strict order.

To use my previous example, “by a certain dimension” is the name of a person, the dimension of wealth and wealth.

How would you do that, please?

An analysis of

What would I do?

First of all, I’m pretty sure that JDK thread pools can’t do this.

Because from the point of view of the thread pool principle, parallelism and sequencing cannot be satisfied at the same time.

You know what I mean?

For example, if I wanted to use a thread pool to ensure sequencing, it would look like this:

A thread pool with only one thread, which guarantees sequencing.

But does it make sense?

It makes sense because it doesn’t take up the main thread, but it doesn’t make much sense because it emasculates important “multithreading” capabilities.

So how do we bring parallelism up in this scenario?

Wait, it looks like we already have a thread pool that guarantees sequencing.

So if we expand it horizontally, make more, don’t we have parallel capability?

If there are multiple thread pools with only one thread, then I can also map “dimension” to “per thread pool” by that dimension.

This is what it looks like in the program:

The area marked ① is the creation of multiple thread pools with only one thread in order to ensure sequential consumption.

The place labeled ② maps the relationship between the name and the thread pool through a map. Here is just a hint, for example, we can also use the user number module to locate the corresponding thread pool, for example, the user number is odd with one thread pool, the user number is even with another thread pool.

So instead of defining how many single-thread pools there are in “a dimension,” they’re reusable, and there’s a little twist to be made.

The place marked ③ is to go to the corresponding thread pool in the map by name.

From the output, there is no problem:

See here some friends will say: you this is not cheating?

I thought we agreed on a thread pool, but you’ve already got more than one.

If you want to look at it that way, you have to narrow it.

You have to think of a large thread pool with multiple thread pools of only one thread.

So the pattern opens up.

My above writing is a very simple Demo, mainly to draw out the idea of this scheme.

What I want to introduce is an open source project based on this idea.

It was written by a big man of a big company. I took a look at the source code and applauded: It was really good.

Let me give you the last use case and the output:

From the case, the way to use is also very simple.

The difference from the JDK’s native usage is what I’ve framed.

Start with a KeyAffinityExecutor object instead of a native thread pool.

KeyAffinityExecutor involves a single word, Affinity.

Translated to have the same meaning:

So KeyAffinityExecutor translates to a thread pool of the same type as key, which is a bit of a pin name when you understand its functionality and scope.

You can then call the executeEx method of the KeyAffinityExecutor object and pass in an additional parameter, which is the dimension that identifies the same type of task, for example, the name field.

From the use case point of view, it can be said that the package is very good, out of the box.

KeyAffinityExecutor usage

Let’s start with the usage of this class.

The corresponding open source project address is this:

Github.com/PhantomThie…

If you want to use it, import the following Maven address:

< the dependency > < groupId > com. Making. Phantomthief < / groupId > < artifactId > more - lambdas < / artifactId > < version > 0.1.55 < / version > </dependency>Copy the code

The core code is this interface:

com.github.phantomthief.pool.KeyAffinityExecutor

There are a lot of comments in this interface that you can pull down and take a look at.

What I’m going to show you here is a comment on the interface, written by the author, that describes his tool.

This is a thread pool that is consumed in the specified Key affinity order.

KeyAffinityExecutor is a special task thread pool.

It ensures that incoming tasks by Key are executed in the order in which they are submitted. This is useful in scenarios where you want to increase throughput through parallel processing while ensuring that a range of tasks run in a strict sequence.

The built-in implementation of KeyAffinityExecutor maps the specified Key to a fixed single-threaded thread pool, which maintains an optionally large number of such single-threaded thread pools to maintain some degree of task parallelism.

It is important to note that the KeyAffinityExecutor defined by this interface does not require tasks with the same Key to run on the same thread. Although the implementation class can implement this way, it is not a mandatory requirement, so do not rely on such an assumption when using it.

A lot of people ask, what’s the difference between this and taking an array of thread pools yourself and simply modulating them?

In fact, most scenarios do not differ much, but when data skew occurs, data hashed to the same location can be delayed by hotspot skew data.

When the concurrency is low (threshold can be set), this implementation will select the most idle thread pool to post, and isolate the skew data as much as possible to reduce the impact on other data.

In this paragraph of introduction, the author simply explains the application scenarios and internal principles of the project, which is similar to the previous analysis.

In addition, there are two other things that need special attention.

The first place is here:

If a custom object is a differentiated task dimension, be sure to override its hashCode and equals to ensure that it can be identified.

This reminder is the same reason why HashMap keys should override hashCode and equals methods if they are objects.

Programming fundamentals, just mentioned, will not be repeated.

The second point, which needs to be clarified, is his core idea.

He did not use simple modeling because it is possible to skew data in simple modeling scenarios.

I personally understand the author’s thinking in this way.

First of all, let’s explain how the data skew of the model is. Here’s a simple example:

In the code snippet above, I added a new character called “Master Fishmonger”. Add an ID field to the object.

Suppose we mod the id field by 2:

What will happen is that the master and the rich will use the same thread pool.

Obviously, due to the frequent operation of the master, “Touch fish” became hot data, which led to the tilt of the connection pool numbered 0, and thus affected the normal work of fugui.

What is the KeyAffinityExecutor strategy?

It picks the most idle thread pool to post to.

How do you understand that?

Again, if we build a thread pool like this:

KeyAffinityExecutor executorService =
                KeyAffinityExecutor.newSerializingExecutor(3, 200, "MY-POOL-%d");
Copy the code

The first parameter, 3, means that it will build three thread pools with only one thread in this thread pool.

So when we use it to submit tasks, since the dimension is the ID dimension, we have exactly three ids, so we fill up the thread pool just right:

There is no data skew at this time.

But what if I changed the previous argument for building the thread pool from 3 to 2?

KeyAffinityExecutor executorService =
                KeyAffinityExecutor.newSerializingExecutor(2, 200, "MY-POOL-%d");
Copy the code

The commit mode is unchanged, and the logic of delay for tasks with id 1 and 2 is added. The purpose is to observe how to deal with data with ID 3:

Needless to say, the thread pool is running low when submitting to perform the master fish operation. What to do?

At this point, according to the author, “the most idle thread pool is selected for Posting.”

I use this data to illustrate:

Therefore, when performing the master fishing operation, one of the only two options will be chosen.

How to choose?

Whoever has the lowest concurrency will be selected.

Since there is a delay time in the task, we can observe that the concurrency of the rich thread is 5, and the concurrency of the rich thread is 6.

Therefore, when the master fish operation is executed, the thread with a concurrency of 5 will be selected for processing.

In this scenario, data skew occurs. But the premise of the tilt has changed to the point that there are no threads currently available.

So, the authors say, “isolate skew data as much as possible.”

The biggest difference between these two schemes is the utilization degree of thread resources. If it is simple module taking, there may be available threads when data skew occurs.

KeyAffinityExecutor ensures that the pool will run out of threads when data skewness occurs.

Then you can examine the subtle differences between the two schemes.

KeyAffinityExecutor source

Source code is not much, a total of these classes:

But the vast majority of his source code is lambdas, basically are functional programming, if you are weak in this respect, then it looks like a bit more difficult.

If you want to get your hands on the source code, MY advice is to pull the project locally and start with its test cases:

Github.com/PhantomThie…

I’m going to report some of the key points THAT I’ve seen, so that you can sort out your thoughts when you look at it yourself.

First of all, we must start with its construction method. The author has marked the meaning of each input parameter very clearly:

Suppose our constructor looks like this, meaning that we build three thread pools with only one thread, each with queue size 200:

KeyAffinityExecutor executorService =
                KeyAffinityExecutor.newSerializingExecutor(3, 200, "WHY-POOL-%d");
Copy the code

First we need to find out where the logic is for building a “thread pool with only one thread”.

This method is hidden in the constructor:

com.github.phantomthief.pool.KeyAffinityExecutorUtils#executor(java.lang.String, int)

Here you can see the “thread pool with only one thread” we’ve been referring to, and the length of the queue can also be specified:

This method returns a Supplier interface, which you’ll need in a moment.

Now, where does the number 3 come from?

It is hidden in the constructor’s build method, which will eventually call this method:

com.github.phantomthief.pool.impl.KeyAffinityImpl#KeyAffinityImpl

You can make a breakpoint at this point and Debug it.

Let me explain some of the key parameters for this part of the box:

The first is the count argument, which we defined as 3. So range of 0,3, is 0,1,2.

Then there is supplier, which is the supplier interface returned by the executor method, and you can see that it encapsulates a thread pool.

Then there is a very critical operation: map(ValueRef::new).

The ValueRef object in this operation is key:

com.github.phantomthief.pool.impl.KeyAffinityImpl.ValueRef

The concurrency variable within this object is the key.

Remember “pick the most idle actuator (thread pool)”?

How to judge whether idle?

The concurrency variable is called.

The corresponding code is here:

com.github.phantomthief.pool.impl.KeyAffinityImpl#select

If the key reaches the breakpoint, it has not been mapped before, so you need to specify a thread pool for it.

To specify this thread pool, loop through the all collection, which contains the ValueRef object:

So, the comparingInt(ValueRef:: Concurrency) method is simply selecting the thread pool with the least concurrency.

If the thread pool is never used or no task is currently in use, then the concurrency must be 0 and all will be selected.

If all thread pools are being used, set concurrency to the lowest thread pool.

Here I just give you a general idea, if you want to understand, go to their own source code.

If you are familiar with lambdas, you will find it really elegant and comfortable to look at.

If you don’t know lambdas…

Why don’t you start learning?

I also found two familiar things.

Here, my friends, is what it is:

Isn’t this just a dynamic adjustment of thread pool parameters?

The second was this:

I have also written about dynamic tuning in RabbitMQ and highlighted these three areas:

  • Add {@link #setCapacity(int)} and {@link #getCapacity()}
  • {@link #capacity} check boundary from == to >=
  • Partial signal() trigger changed to signalAll()

The author also mentions that there are NPE bugs in the RabbitMQ version.

I’m not going to go into this in detail, but if you’re interested in comparing the code, you should be able to see where the problem is.

Tell me something about the Dubbo

Why Dubbo?

Because I seem to have found traces of the KeyAffinityExecutor in Dubbo, too.

Why does it seem?

Because it didn’t end up being merged into the code base.

The corresponding link is here:

Github.com/apache/dubb…

There are so many documents submitted in this submission:

Inside, we can find familiar things:

The idea is the same, but you’ll notice that even though the idea is the same, the structure of the code written by two different people is very different.

Dubbo defines the code hierarchy more clearly, for example, defining an abstract AbstractKeyAffinity object, and then implementing both random and minimal concurrency schemes.

There are differences in these details.

But the provider of the code ended up using the code and came up with an alternative:

Github.com/apache/dubb…

In this commit, he mainly commits this class:

org.apache.dubbo.common.threadpool.serial.SerializingExecutor

This class, as you know by its name, emphasizes serialization.

Take a look at its test case and you’ll see how it works:

The first is that its constructor entry is another thread pool.

The SerializingExecutor execute method is then used to submit the task.

Inside the task, all you do is take the key for val from the map, add one and put it back.

We all know that this operation is thread unsafe in the case of multiple threads, and the final result must be less than the number of loops.

However, in the case of a single thread, it is definitely ok.

So how do you map a thread pool to a single thread?

That’s what SerializingExecutor does.

And its principle is extremely simple, the core code is only a few lines.

First it creates a queue of its own:

The submitted tasks are thrown into the queue.

And then execute them one by one.

How do you guarantee execution one by one?

There are a number of methods, it is an AtomicBoolean object to control:

This enables the serialization of multi-threaded tasks.

It just amazes me that the SerializingExecutor class currently has no use scenarios in Dubbo.

However, if you time you will realize the function of this strange, like other people to give you a thread pool, but to your process in a certain consideration, need to put the task serialization, this time is certainly not move someone else’s thread pool, then you can think of Dubbo here has a ready-made, more elegant, pretend bility of high solution.

One last word

Ok, see here, forward, look, like any arrangement, if you arrange it, I don’t mind. Writing is tiring and needs some positive feedback.

Here’s one for readers:

This article has been collected from personal blog, welcome to play.

www.whywhy.vip/