sequence

This paper mainly studies the cancelMessage of Chronos

MqPullService

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPullService.java

public class MqPullService implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqPullService.class);

    private static final PullConfig PULL_CONFIG = ConfigManager.getConfig().getPullConfig();
    private static final Batcher BATCHER = Batcher.getInstance();
    private volatile boolean shouldStop = false; private CountDownLatch cdl; private final List<Long> succOffsets = new ArrayList<>(); private final List<Long> failOffsets = new ArrayList<>(); private SimpleCarreraConsumer carreraConsumer; private String mqPullServiceName; private final int INTERNAL_PAIR_COUNT = 5000; private final BlockingQueue<InternalPair> blockingQueue = new ArrayBlockingQueue<>(INTERNAL_PAIR_COUNT); / /... private void cancelMessage(final InternalKey internalKey, final String topic, final int action) { InternalKey tombStoneInternalKey = internalKey.cloneTombstoneInternalKey();if (internalKey.getType() == MsgTypes.DELAY.getValue()) {
            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.DELAY);

            BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(),
                    new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);
        } else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) {
            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_DELAY);

            BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action);
        } else if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {
            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_EXPONENT_DELAY);

            BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action);
        } else {
            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.UNKNOWN);

            LOGGER.error("should not go here, invalid message type: {}, internalKey: {}", internalKey.getType(), internalKey.genUniqDelayMsgId()); }} / /... }Copy the code
  • CancelMessage method first by internalKey. CloneTombstoneInternalKey tombStoneInternalKey () structure, After the MsgTypes. Execution of DELAY type BATCHER. PutToDefaultCF (tombStoneInternalKey. GenUniqDelayMsgId (), New CancelWrap (internalKey genUniqDelayMsgId (), topic). The toJsonString (), topic, tombStoneInternalKey, action); For MsgTypes. LOOP_DELAY and MsgTypes. Execution of LOOP_EXPONENT_DELAY BATCHER. PutLoopTombstoneKey (tombStoneInternalKey internalKey, topic, action)

InternalKey

DDMQ/carrera-common/src/main/java/com/xiaojukeji/carrera/chronos/model/InternalKey.java

public class InternalKey {
    private static final String SEPARATOR = "-";
    private static final int LEN_UUID = 36;
    private static final long ONE_DAY_SECONDS = 24 * 60 * 60;

    private long timestamp;
    private int type;
    private long expire;
    private long times; private long timed; private long interval; private int innerTopicSeq; private String uuid; private int segmentNum; private int segmentIndex; / /... public InternalKeycloneTombstoneInternalKey() {
        InternalKey tombstoneInternalKey = new InternalKey(this);
        tombstoneInternalKey.setType(MsgTypes.TOMBSTONE.getValue());
        returntombstoneInternalKey; } / /... }Copy the code
  • Set the type to MsgTypes cloneTombstoneInternalKey method. The TOMBSTONE. GetValue ()

CancelWrap

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/model/CancelWrap.java

public class CancelWrap {
    private String uniqDelayMsgId;
    private String topic;

    public CancelWrap() {
    }

    public CancelWrap(String uniqDelayMsgId, String topic) {
        this.uniqDelayMsgId = uniqDelayMsgId;
        this.topic = topic;
    }

    public String getUniqDelayMsgId() {
        return uniqDelayMsgId;
    }

    public void setUniqDelayMsgId(String uniqDelayMsgId) {
        this.uniqDelayMsgId = uniqDelayMsgId;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String toJsonString() {
        return JsonUtils.toJsonString(this);
    }

    @Override
    public String toString() {
        return "CancelWrap{" +
                "uniqDelayMsgId='" + uniqDelayMsgId + '\'' + ", topic='" + topic + '\'' + '}'; }}Copy the code
  • CancelWrap defines the uniqDelayMsgId and Topic attributes

Batcher

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/autobatcher/Batcher.java

public class Batcher { private static final Logger LOGGER = LoggerFactory.getLogger(Batcher.class); private static final int PULL_BATCH_ITEM_NUM = ConfigManager.getConfig().getPullConfig().getPullBatchItemNum(); private static final int MSG_BYTE_BASE_LEN = ConfigManager.getConfig().getPullConfig().getMsgByteBaseLen(); private WriteBatch wb = new WriteBatch(); private volatile int itemNum = 0; private static volatile Batcher instance = null; public static volatile ReentrantLock lock = new ReentrantLock(); / /... public void putLoopTombstoneKey(final InternalKey tombstoneInternalKey, InternalKey internalKey, final String topic, final int action) { lock.lock(); Try {// index loop // 1536811267-4-1536911267-3-0-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
            // 1536811567-4-1536911267-3-1-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
            // 1536897967-4-1536911267-3-2-300-0-9e7952e0-b709-11e8-a709-aAFb4BFC0bc5 // Common loop // 153681167-3-153691167-3-0-10-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
            while(! KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())) { internalKey = internalKey.nextUniqDelayMsgId(); } tombstoneInternalKey.setTimestamp(internalKey.getTimestamp()); tombstoneInternalKey.setTimes(internalKey.getTimed() + 2); tombstoneInternalKey.setTimed(internalKey.getTimed());if(! KeyUtils.isInvalidMsg(tombstoneInternalKey)) { putToDefaultCF(tombstoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, internalKey, action); } } finally { lock.unlock(); }} / /... }Copy the code
  • PutLoopTombstoneKey method through KeyUtils. AfterSeekTimestamp (internalKey getTimestamp ()) to find internalKey, Then add a CancelWrap record with putToDefaultCF

summary

CancelMessage method first by internalKey. CloneTombstoneInternalKey tombStoneInternalKey () structure, After the MsgTypes. Execution of DELAY type BATCHER. PutToDefaultCF (tombStoneInternalKey. GenUniqDelayMsgId (), New CancelWrap (internalKey genUniqDelayMsgId (), topic). The toJsonString (), topic, tombStoneInternalKey, action); For MsgTypes. LOOP_DELAY and MsgTypes. Execution of LOOP_EXPONENT_DELAY BATCHER. PutLoopTombstoneKey (tombStoneInternalKey internalKey, topic, action)

doc

  • carrera-chronos