Welcome to search and follow the wechat official account of the same name [Coder’s Technical Road]. The background provides historical articles to be compiled and downloaded in PDF. Welcome to collect and discuss

background

Our payment scenario requires that the business messages consumed must not be lost and can take full advantage of the performance of a high specification server, such as a thread pool for fast processing of the business messages. For those of you who don’t quite understand what’s so hard to deal with, let me analyze it step by step.

The advantages and disadvantages of MQ

MQ is one of the most common measures we use to deal with high concurrency scenarios, and it can be useful for business decoupling, process asynchrony, and peak load shifting.

However, the introduction of this additional middleware also increases the complexity and instability of the system.

Message reliability response

Message reliability needs to be ensured at every stage of the message flow, such as transactional messages on the production side, real-time flush persistence on the broker, and manual ACK on the consumer side.

Here, we do not discuss the safeguard measures on the production and storage ends, but focus on the manual ACK mechanism on the consumer end.

Manual ACK problem

A manual ACK can ensure that the message is consumed, but you need to ensure that the order of the manual ACK is the same as the order of the message. Why?

The high performance and fast processing of message queues is due to the sequential read and write mode of files. When the system pulls messages for consumption, it pulls them according to the offset of sequential files. If the commit offset sequence is incorrect, the message status on the server is incorrect, for example, message retransmission.

Therefore, if we start a thread pool locally and pull and process messages, there is no guarantee that each thread will handle the messages sequentially, because the processing speed of each thread may not be consistent. What if we can only pull and fetch and then ACK messages synchronously?

The solution

Worst of all, you can submit a batch of tasks and wait for them to be submitted in batches. But it always seems inelegant.

I was inspired when I saw AQS in JUC.

Concurrent utility classes such as CountDownLauch are also used to deal with multi-threaded collaboration.

Our scene is completely less complex than AQS. It should be no problem to borrow its ideas.

  1. To create a two-end queue, the queue node needs to maintain its own processing state, and the corresponding MSG offset.

  2. The service pulls messages from the message center and queues them before committing them to the local thread pool for execution.

  3. After the message is consumed, the corresponding node in the queue is notified that the update status is completed.

  4. The queue header is updated and dequeued, the offset is submitted, and the status of the new queue header is determined until the head whose state is incomplete is blocked.

    undefined

Program parsing

This scheme can effectively utilize the resources of the local thread, process in parallel, and ensure the order of the final commit offset through the queue and asynchronous notification mechanism.

In the worst case (that is, the last MSG corresponding to the head node is processed), it is equivalent to waiting for a batch of threads to complete processing after the unified submission. Anything else is better than waiting.

Implementation of asynchronous notification

Public class MSGFuture {/* global variable, Private static final Map<Long, MSGFuture> FUTURES = new ConcurrentHashMap<Long, MSGFuture>(); private static final Map<Long, MSGFuture> FUTURES = new ConcurrentHashMap<Long, MSGFuture>(); /* private final long id; /* private final long ID; /* private final int timeout; Private final Lock Lock = new ReentrantLock(); Private final Condition done = lock.newcondition (); Private final long start = system.currentTimemillis (); /* Private volatile Object response; }Copy the code
Public MSGFuture(Request Request, int timeout) {/* global increase ID*/ this.id = request.getrid (); /* This. Timeout = timeout > 0? timeout : 1000; */ FUTURES. Put (id, this); }Copy the code
Public static void received(long ID, Object response) {MSGFuture Future = FUTURES. Remove (id); if (future ! = null) { future.doReceived(response); } else { logger.warn("response return timeout,id:"+id); }}Copy the code
Private void doReceived(Object res) {lock.lock(); try { response = res; done.signal(); } finally { lock.unlock(); }}Copy the code
Public Object get(int Timeout) throws TimeoutException {if (! isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (! isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (! isDone()) { throw new TimeoutException(); } } return returnFromResponse(); }Copy the code

conclusion

Some of you will say, what does this have to do with AQS

In fact, it is just a reference to the processing ideas, such as state, such as locking mechanism and notification wait. Since both are multi-threaded task coordination, there are always similarities.

In a word, don’t recite the eight-part script is useless, more understanding will be of great help ~

Copyright notice: This article was originally written by Coder’s Path to Technology. Please contact the author for reprint.