I am 3Y, a markdown programmer with one year CRUD experience and ten years’ experience 👨🏻💻 known as a quality octuan player all the year round

Today we continue to update the Austin project. If you haven’t seen the series yet, you can check out my historical article and give it a thumbs up along the way. It is recommended not to skip or skip to read, otherwise this article will not understand, I will not repeat the knowledge points and business I wrote before.

Today’s implementation is the isolation of consumption data for the Handler module. Before we talk about that, let’s look at what the previous implementation looked like.

Austin-api receives the request and sends it to Kafka with topicName Austin. The austin-handler creates a groupName named austinGroup to listen to the data of the Austin topic and then send messages.

In terms of system architecture, the Austin project can send many types of messages: SMS, wechat mini program, email, etc

If it is a single topic and a single group, have you ever thought about a problem: if there is an exception in a sending channel interface and time out, what will happen?

Yes, messages are blocked because they consume the same topic and use the same consumer.

01. Data isolation

To be broken? Very simple. Many topic groups will do.

Does the one above solve all the problems? Don’t. Even if it is the same channel, different types of message sending characteristics are different. For example, if I want to send a push marketing message, I may push 4000W people at a certain moment.

Then the 4000W people can be completely sent out in a short time, which is not realistic. This most likely means that push messages of the notification class are affected

And break the game? Very simple. After all, we designed the message template with this in mind. The message template has the msgType field to identify the type of the current template, so we can divide the corresponding group according to different message types.

Theoretically, we can separate a topic and a group for each message type of each channel. Since data between topics is isolated, consumption between different groups is also isolated, so we must be data isolated when consuming.

However, my current practice is: single topic and multiple groups. Consumption is isolated, but production topics are shared. I think the code will be clearer and easier to understand, and we can continue to change if there are bottlenecks later.

02. Design of the consumer end

It has been decided from above that data isolation can be achieved through single topic multiple groups. For example, I currently define 6 channels (IM /push/ email/SMS/applets/wechat service numbers) and 3 message types (notification/marketing/captcha), which is equivalent to 18 consumers.

After getting the message from Kafka, MY tentative plan was to go through a few steps: drop the message -> de-replay it -> actually send it

Reloading and sending messages are inherently network IO intensive. Therefore, in order to improve throughput, I decided to consume Kafka and store it in the cache as a buffer layer.

Making a layer of buffers improves throughput, but also introduces other problems. For example: when the application restarts, the data in the buffer is not consumed, will it be lost?

We can see how to solve this problem later (keep paying attention to this, there are many more after project optimization). Now I still think the advantages of buffers outweigh the disadvantages, so back to buffers.

My first reaction to the buffer was to implement the producer-consumer model

To implement this pattern, I initially thought it would be simple: consume Kafka messages as producers, throw data onto a blocking queue, and open multiple threads to consume the blocking queue.

On second thought, isn’t the direct thread pool done? Thread pools are implementations of producers and consumers.

As a result, the architecture becomes the following:

03. Code design

On the consumer side, we first look at the code for the Receiver. This class looks simple, just a @kafkalistener annotation modifier, which is consumed from Kafka and handed to Pending for processing

I use the @kafkalistener annotation to pull messages from Kafka, rather than the low-level Kafka API, for no other reason: you don’t need to be perfect early in the project, just wait until you get stuck. That said, it still caused me a lot of trouble when I wrote it.

First problem: @kafkalistener is an annotation, and from the source comments it can only pass a value that is sufficient to use Spring EL expressions and read a configuration. But remember, my goal is to have multiple groups consuming the same topic. And I can’t say to define a consumption method for each group, right? (I can’t sleep writing this stupid code.)

I couldn’t find a solution after digging through tech blogs all night. I even posted a message on Wechat to see if anyone had ever seen one. The next day I looked through the official documentation of Spring and finally found a solution for me.

Or official documents really!

With the solution in hand, things will be easy. Since I’m isolating every message type for every message channel, I’ll just enumerate it out and be done!

My Receiver is multi-case, so I’ll just iterate through the List (initializing the consumer on the ReceiverStart class).

Solved the problem of using @kafkalistener annotation to dynamically pass in groupId to create multiple consumers.

I ran into a second problem: Spring has the @aysnc annotation to gracefully implement thread pool method calls. I haven’t used @aysnc annotations before, but I took a look at how they work and how they work. I feel elegant. But with @aysnc, you have to create your own thread pool, and I’m going to create my own thread pool for each consumer. I’m not going to define a method for creating a thread pool for every group, am I? (I can’t sleep writing this stupid code.)

This visit to the official website and various technical blogs did not solve my problems: the @async annotation in the Spring environment can dynamically pass thread pool instances, and the creation of thread pool instances can support conditional parameter passing.

Finally, the @aysnc annotation was abandoned and implemented programmatically:

Here is an implementation of TaskPendingHolder (which simply creates a thread pool for each consumer) :

The Task implementation is now relatively simple, directly call the corresponding Handler and then send a message:

04,

The code seems simple and the business seems easy to understand, but remember that even small companies don’t have this design in their production projects. A shuttle is all too common (features are not impossible, code is not impossible, and most importantly, people are not impossible).

This article focuses on one idea: when consuming MQ, multiple groups can implement data isolation. To improve the throughput of consuming MQ, you can do another layer of buffering (if consuming is IO intensive).

Follow my wechat public number [Java3y] in addition to technology I will also talk about some daily, some words can only say quietly ~ [line interview + write Java project from zero] continuous high intensity update! O star!!!!! Original is not easy!! Three times!!

Gitee link: gitee.com/austin

GitHub link: github.com/austin