Clear requirements

What I want to solve now is the out-of-order processing of mq messages during consumption.

A quick introduction to the image above. A message parses its own packet number through a message parser. If the difference between the serial number of the packet and that of the pre-packet in the parser is equal to 1 (the message is normal), the pre-packet is updated. The functionID of the message is obtained by its specific adaptive processor, and the specific handler is obtained for processing. If the serial number is less than 0, repeated message consumption occurs. The other is out of order. Out-of-order messages are delay-queued and sorted by message sequence number. And use a timer timed to fetch into the message parser for verification.

Code implementation

The message body

package com.fire.plan.mq;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/ * * *@author zwd
 * @date 2021/8/28
 * @email [email protected]
 */
public class MessageContent implements Delayed {
    Integer sourceId;// Different dirty message ids
    Integer businessId;/ / business ID
    String msg;// Message to be sent
    String functionId;// Specific method to be handled
    long createTimeMillis;// When the message was created
    int count; // The number of queue entries

    public int getCount(a) {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    public long getCreateTimeMillis(a) {
        return createTimeMillis;
    }

    public void setCreateTimeMillis(long createTimeMillis) {
        this.createTimeMillis = createTimeMillis;
    }

    public static Builder builder(a){
        return new Builder();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return 0;
    }

    @Override
    public int compareTo(Delayed o) {
        MessageContent messageContent = (MessageContent) o;
        if (this.businessId > messageContent.businessId) {
            return 1;
        }else if (this.businessId < messageContent.businessId){
            return -1;
        }else{
            return 0; }}static class Builder{
        private Integer sourceId;// Different dirty message ids
        private Integer businessId;/ / business ID
        private String msg;// Message to be sent
        private String functionId;

        public Builder(a) {}public Builder sourceId(Integer sourceId) {
            this.sourceId = sourceId;
            return this;
        }

        public Builder businessId(Integer businessId) {
            this.businessId = businessId;
            return this;
        }


        public Builder msg(String msg) {
            this.msg = msg;
            return this;
        }

        public Builder functionId(String functionId) {
            this.functionId = functionId;
            return this;
        }

        public MessageContent build(a){
            MessageContent messageContent = new MessageContent();
            messageContent.sourceId = this.sourceId;
            messageContent.businessId = this.businessId;
            messageContent.msg = this.msg;
            messageContent.functionId = this.functionId;
            returnmessageContent; }}}Copy the code

Message handler

package com.fire.plan.mq;

/ * * *@author zwd
 * @date 2021/8/28
 * @email [email protected]
 */
public interface MessageHandler {
    void processMsg(MessageContent messageContent);
}
Copy the code
/ * * *@author zwd
 * @date 2021/8/28
 * @email [email protected]
 */
public class MessageHandlerOne implements MessageHandler{
    @Override
    public void processMsg(MessageContent messageContent) {
        System.out.println(messageContent.businessId + " is being process"); }}Copy the code
/ * * *@author zwd
 * @date 2021/8/28
 * @email [email protected]
 */
public class MessageHandlerTwo implements MessageHandler{
    @Override
    public void processMsg(MessageContent messageContent) {
        System.out.println(messageContent.businessId + " is being process"); }}Copy the code
/ * * *@author zwd
 * @date 2021/8/28
 * @email [email protected]
 */
public class MessageHandlerThree implements MessageHandler{
    @Override
    public void processMsg(MessageContent messageContent) {
        System.out.println(messageContent.businessId + " is being process"); }}Copy the code

Message adapter

/ * * *@author zwd
 * @date 2021/8/28
 * @email [email protected]
 */
public interface MessageAdaptive {
    MessageHandler adapter(MessageContent messageContent);
}
Copy the code
package com.fire.plan.mq;

import java.util.HashMap;

/ * * *@author zwd
 * @date 2021/8/28
 * @email [email protected]
 */
public class MessageAdaptiveMq implements MessageAdaptive{
    private HashMap<String, MessageHandler> handlerMapping = new HashMap<>();

    public void registry(String functionId, MessageHandler messageHandler){
        handlerMapping.put(functionId, messageHandler);
    }

    @Override
    public MessageHandler adapter(MessageContent messageContent) {

        returnhandlerMapping.get(messageContent.functionId); }}Copy the code

Message parser

package com.fire.plan.mq;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.DelayQueue;

/ * * *@author zwd
 * @date 2021/8/28
 * @email [email protected]
 */
public class MessageSolver {
    private static volatile MessageSolver messageSolver = null;
    Integer preBusinessId;
    DelayQueue<MessageContent> delayed;
    MessageAdaptive messageAdaptive;
    long timeout = 6;

    private MessageSolver(a){
        this.delayed =  new DelayQueue<>();
        preBusinessId = 0;
        TimerTask timerTask = new TimerTask() {
            @Override
            public void run(a) {
                if(! delayed.isEmpty()) { MessageContent messageContent = delayed.poll();long currentTimeMillis = System.currentTimeMillis() / 1000;
                    long createTimeMillis = messageContent.createTimeMillis;
                    if (currentTimeMillis - createTimeMillis > timeout) {
                        // Store the message to db
                        System.out.println(messageContent.businessId + " will be store to db");
                    } else {
                        // Check the pre-packet againmessageSolver.messageSolver(messageContent); }}}}; Timer timer =new Timer();
        //500ms Check the pre-packet once
        timer.schedule(timerTask, 0 ,500);
    }

    public static MessageSolver getMessageSolver(a){
        if (messageSolver == null) {
            synchronized (MessageSolver.class) {
                if (messageSolver == null) {
                    messageSolver = newMessageSolver(); }}}return messageSolver;
    }

    public void messageSolver(MessageContent messageContent){
        int flag = messageContent.businessId - messageSolver.preBusinessId;
        if (flag == 1) {
            MessageHandler adapter = messageAdaptive.adapter(messageContent);
            adapter.processMsg(messageContent);
            // Update the pre-packet number
            this.preBusinessId = messageContent.businessId;
        }else if (flag <= 0){
            System.out.println("this message has been consumed");
        }else {
            if (messageContent.getCount() == 0)  messageContent.setCreateTimeMillis(System.currentTimeMillis() / 1000);
            messageContent.setCount(messageContent.getCount() + 1); delayed.add(messageContent); }}public long getTimeout(a) {
        return timeout;
    }

    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }

    public MessageAdaptive getMessageAdaptive(a) {
        return messageAdaptive;
    }

    public void setMessageAdaptive(MessageAdaptive messageAdaptive) {
        this.messageAdaptive = messageAdaptive; }}Copy the code

test

package com.fire.plan.mq;

/ * * *@author zwd
 * @date 2021/8/28
 * @email [email protected]
 */
public class Main {
    public static void main(String[] args) throws InterruptedException {
        MessageContent message1 = MessageContent.builder()
                .sourceId(1).businessId(1).msg("first message").functionId("function01").build();
        MessageContent message2 = MessageContent.builder()
                .sourceId(1).businessId(2).msg("second message").functionId("function02").build();
        MessageContent message3 = MessageContent.builder()
                .sourceId(1).businessId(3).msg("third message").functionId("function03").build();

        MessageAdaptiveMq messageAdaptiveMq = new MessageAdaptiveMq();
        messageAdaptiveMq.registry("function01".new MessageHandlerOne());
        messageAdaptiveMq.registry("function02".new MessageHandlerTwo());
        messageAdaptiveMq.registry("function03".newMessageHandlerThree()); MessageSolver messageSolver = MessageSolver.getMessageSolver(); messageSolver.setMessageAdaptive(messageAdaptiveMq); messageSolver.messageSolver(message3); messageSolver.messageSolver(message1); messageSolver.messageSolver(message2); }}Copy the code

Output: 1 is being process 2 is being process 3 is being process