sequence
This paper mainly studies the cancelMessage of Chronos
MqPullService
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPullService.java
public class MqPullService implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(MqPullService.class);
private static final PullConfig PULL_CONFIG = ConfigManager.getConfig().getPullConfig();
private static final Batcher BATCHER = Batcher.getInstance();
private volatile boolean shouldStop = false; private CountDownLatch cdl; private final List<Long> succOffsets = new ArrayList<>(); private final List<Long> failOffsets = new ArrayList<>(); private SimpleCarreraConsumer carreraConsumer; private String mqPullServiceName; private final int INTERNAL_PAIR_COUNT = 5000; private final BlockingQueue<InternalPair> blockingQueue = new ArrayBlockingQueue<>(INTERNAL_PAIR_COUNT); / /... private void cancelMessage(final InternalKey internalKey, final String topic, final int action) { InternalKey tombStoneInternalKey = internalKey.cloneTombstoneInternalKey();if (internalKey.getType() == MsgTypes.DELAY.getValue()) {
MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.DELAY);
BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(),
new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);
} else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) {
MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_DELAY);
BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action);
} else if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {
MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_EXPONENT_DELAY);
BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action);
} else {
MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.UNKNOWN);
LOGGER.error("should not go here, invalid message type: {}, internalKey: {}", internalKey.getType(), internalKey.genUniqDelayMsgId()); }} / /... }Copy the code
- CancelMessage method first by internalKey. CloneTombstoneInternalKey tombStoneInternalKey () structure, After the MsgTypes. Execution of DELAY type BATCHER. PutToDefaultCF (tombStoneInternalKey. GenUniqDelayMsgId (), New CancelWrap (internalKey genUniqDelayMsgId (), topic). The toJsonString (), topic, tombStoneInternalKey, action); For MsgTypes. LOOP_DELAY and MsgTypes. Execution of LOOP_EXPONENT_DELAY BATCHER. PutLoopTombstoneKey (tombStoneInternalKey internalKey, topic, action)
InternalKey
DDMQ/carrera-common/src/main/java/com/xiaojukeji/carrera/chronos/model/InternalKey.java
public class InternalKey {
private static final String SEPARATOR = "-";
private static final int LEN_UUID = 36;
private static final long ONE_DAY_SECONDS = 24 * 60 * 60;
private long timestamp;
private int type;
private long expire;
private long times; private long timed; private long interval; private int innerTopicSeq; private String uuid; private int segmentNum; private int segmentIndex; / /... public InternalKeycloneTombstoneInternalKey() {
InternalKey tombstoneInternalKey = new InternalKey(this);
tombstoneInternalKey.setType(MsgTypes.TOMBSTONE.getValue());
returntombstoneInternalKey; } / /... }Copy the code
- Set the type to MsgTypes cloneTombstoneInternalKey method. The TOMBSTONE. GetValue ()
CancelWrap
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/model/CancelWrap.java
public class CancelWrap {
private String uniqDelayMsgId;
private String topic;
public CancelWrap() {
}
public CancelWrap(String uniqDelayMsgId, String topic) {
this.uniqDelayMsgId = uniqDelayMsgId;
this.topic = topic;
}
public String getUniqDelayMsgId() {
return uniqDelayMsgId;
}
public void setUniqDelayMsgId(String uniqDelayMsgId) {
this.uniqDelayMsgId = uniqDelayMsgId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String toJsonString() {
return JsonUtils.toJsonString(this);
}
@Override
public String toString() {
return "CancelWrap{" +
"uniqDelayMsgId='" + uniqDelayMsgId + '\'' + ", topic='" + topic + '\'' + '}'; }}Copy the code
- CancelWrap defines the uniqDelayMsgId and Topic attributes
Batcher
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/autobatcher/Batcher.java
public class Batcher { private static final Logger LOGGER = LoggerFactory.getLogger(Batcher.class); private static final int PULL_BATCH_ITEM_NUM = ConfigManager.getConfig().getPullConfig().getPullBatchItemNum(); private static final int MSG_BYTE_BASE_LEN = ConfigManager.getConfig().getPullConfig().getMsgByteBaseLen(); private WriteBatch wb = new WriteBatch(); private volatile int itemNum = 0; private static volatile Batcher instance = null; public static volatile ReentrantLock lock = new ReentrantLock(); / /... public void putLoopTombstoneKey(final InternalKey tombstoneInternalKey, InternalKey internalKey, final String topic, final int action) { lock.lock(); Try {// index loop // 1536811267-4-1536911267-3-0-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
// 1536811567-4-1536911267-3-1-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
// 1536897967-4-1536911267-3-2-300-0-9e7952e0-b709-11e8-a709-aAFb4BFC0bc5 // Common loop // 153681167-3-153691167-3-0-10-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
while(! KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())) { internalKey = internalKey.nextUniqDelayMsgId(); } tombstoneInternalKey.setTimestamp(internalKey.getTimestamp()); tombstoneInternalKey.setTimes(internalKey.getTimed() + 2); tombstoneInternalKey.setTimed(internalKey.getTimed());if(! KeyUtils.isInvalidMsg(tombstoneInternalKey)) { putToDefaultCF(tombstoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, internalKey, action); } } finally { lock.unlock(); }} / /... }Copy the code
- PutLoopTombstoneKey method through KeyUtils. AfterSeekTimestamp (internalKey getTimestamp ()) to find internalKey, Then add a CancelWrap record with putToDefaultCF
summary
CancelMessage method first by internalKey. CloneTombstoneInternalKey tombStoneInternalKey () structure, After the MsgTypes. Execution of DELAY type BATCHER. PutToDefaultCF (tombStoneInternalKey. GenUniqDelayMsgId (), New CancelWrap (internalKey genUniqDelayMsgId (), topic). The toJsonString (), topic, tombStoneInternalKey, action); For MsgTypes. LOOP_DELAY and MsgTypes. Execution of LOOP_EXPONENT_DELAY BATCHER. PutLoopTombstoneKey (tombStoneInternalKey internalKey, topic, action)
doc
- carrera-chronos