sequence

This article focuses on RocketMQ’s MQFaultStrategy

MQFaultStrategy

Rocketmq – the client – 4.6.0 – sources jar! /org/apache/rocketmq/client/latency/MQFaultStrategy.java

public class MQFaultStrategy {
    private final static InternalLogger log = ClientLogger.getLog();
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

    private boolean sendLatencyFaultEnable = false;

    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

    public long[] getNotAvailableDuration() {
        return notAvailableDuration;
    }

    public void setNotAvailableDuration(final long[] notAvailableDuration) {
        this.notAvailableDuration = notAvailableDuration;
    }

    public long[] getLatencyMax() {
        return latencyMax;
    }

    public void setLatencyMax(final long[] latencyMax) {
        this.latencyMax = latencyMax;
    }

    public boolean isSendLatencyFaultEnable() {
        return sendLatencyFaultEnable;
    }

    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
        this.sendLatencyFaultEnable = sendLatencyFaultEnable;
    }

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if(notBestBroker ! = null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); }return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }

    private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }

        return0; }}Copy the code
  • MQFaultStrategy defines the latencyFaultTolerance, sendLatencyFaultEnable, latencyMax, notAvailableDuration attributes. Its selectOneMessageQueue method is used when sendLatencyFaultEnable for false tpInfo. SelectOneMessageQueue (lastBrokerName); In sendLatencyFaultEnable is true, by first tpInfo. GetSendWhichQueue () getAndIncrement () to obtain the index, after traversal tpInfo. GetMessageQueueList (), Calculating pos (Math.abs(index++) % tpInfo.getMessageQueueList().size()), reset to 0 if less than 0; Then use the latencyFaultTolerance. Judging isAvailable if available, Return the MessageQueue if available and NULL == lastBrokerName or mq.getBrokerName().equals(lastBrokerName)
  • In sendLatencyFaultEnable selectOneMessageQueue method is true, if traversal tpInfo. GetMessageQueueList () is not available, By latencyFaultTolerance. PickOneAtLeast notBestBroker () method to choice, if its writeQueueNums greater than zero, By tpInfo. SelectOneMessageQueue choose MessageQueue (), set its brokerName notBestBroker, set its queueId is (tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); If writeQueueNums less than or equal to 0 execute latencyFaultTolerance. Remove (notBestBroker); If the front does not choose the MessageQueue. Finally use tpInfo selectOneMessageQueue ()
  • Its updateFaultItem method in sendLatencyFaultEnable to true when using computeNotAvailableDuration duration calculation, Then through latencyFaultTolerance. UpdateFaultItem (brokerName, currentLatency, duration) to update; Began after traversal latencyMax computeNotAvailableDuration method from, is foundcurrentLatency >= latencyMax[i]NotAvailableDuration [I], or 0

DefaultMQProducerImpl

Rocketmq – the client – 4.6.0 – sources jar! /org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {
    private final InternalLogger log = ClientLogger.getLog();
    private final Random random = new Random();
    private final DefaultMQProducer defaultMQProducer;
    private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
        new ConcurrentHashMap<String, TopicPublishInfo>();
    private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
    private final RPCHook rpcHook;
    private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
    private final ExecutorService defaultAsyncSenderExecutor;
    private final Timer timer = new Timer("RequestHouseKeepingService".true);
    protected BlockingQueue<Runnable> checkRequestQueue;
    protected ExecutorService checkExecutor;
    private ServiceState serviceState = ServiceState.CREATE_JUST;
    private MQClientInstance mQClientFactory;
    private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
    private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); private ExecutorService asyncSenderExecutor; / /... public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {returnthis.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); } public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); } / /... }Copy the code
  • The selectOneMessageQueue and updateFaultItem methods of DefaultMQProducerImpl are delegated to mqFaultStrategy

summary

  • MQFaultStrategy defines the latencyFaultTolerance, sendLatencyFaultEnable, latencyMax, notAvailableDuration attributes. Its selectOneMessageQueue method is used when sendLatencyFaultEnable for false tpInfo. SelectOneMessageQueue (lastBrokerName); In sendLatencyFaultEnable is true, by first tpInfo. GetSendWhichQueue () getAndIncrement () to obtain the index, after traversal tpInfo. GetMessageQueueList (), Calculating pos (Math.abs(index++) % tpInfo.getMessageQueueList().size()), reset to 0 if less than 0; Then use the latencyFaultTolerance. Judging isAvailable if available, Return the MessageQueue if available and NULL == lastBrokerName or mq.getBrokerName().equals(lastBrokerName)
  • In sendLatencyFaultEnable selectOneMessageQueue method is true, if traversal tpInfo. GetMessageQueueList () is not available, By latencyFaultTolerance. PickOneAtLeast notBestBroker () method to choice, if its writeQueueNums greater than zero, By tpInfo. SelectOneMessageQueue choose MessageQueue (), set its brokerName notBestBroker, set its queueId is (tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); If writeQueueNums less than or equal to 0 execute latencyFaultTolerance. Remove (notBestBroker); If the front does not choose the MessageQueue. Finally use tpInfo selectOneMessageQueue ()
  • Its updateFaultItem method in sendLatencyFaultEnable to true when using computeNotAvailableDuration duration calculation, Then through latencyFaultTolerance. UpdateFaultItem (brokerName, currentLatency, duration) to update; Began after traversal latencyMax computeNotAvailableDuration method from, is foundcurrentLatency >= latencyMax[i]NotAvailableDuration [I], or 0

doc

  • MQFaultStrategy