preface

The half message belongs to the first phase of the RocketMQ transaction and can consist of two parts:

  1. Producer -> Broker sends half messages
  2. The broker processes half messages

Correspond to steps 1 and 2 in the figure.

Look at the source code with a problem, first ask a few questions:

  1. How does producer send half messages?
  2. How does a broker distinguish between ordinary and transactional messages
  1. How is it possible that half messages are not consumed by consumers?

You’ll find the answer after you read it.

Producer Half Sends messages

First, the producer code is as follows. To use the transaction message function, the producer object is declared using TransactionMQProducer.

public static void main(String[] args) throws MQClientException {
    // Create the TransactionMQProducer instance and set the producer group name
    TransactionMQProducer producer = new TransactionMQProducer("transactionGroup");
    // Set the NameServer address
    producer.setNamesrvAddr("127.0.0.1:9876");

    // Add a transaction listener
    producer.setTransactionListener(new TransactionListener() {
        /** * The method to execute the local transaction */
        @Override
        public LocalTransactionState executeLocalTransaction(Message message, Object o) {
            // Perform a local transaction
            doXXX();
            // Returns the execution result
            return LocalTransactionState.xxx;
        }

        /** * message callback executes the method */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
			// check the transaction status
            findxxx();
            // Commit a transaction while the message is checked
            returnLocalTransactionState.xxx; }});/ / start the producer
    producer.start();
}

// Send a message
SendResult result = producer.sendMessageInTransaction(msg, null);
Copy the code

Unlike a normal message, sendMessageIntransaction() is used to send a transaction method in line 37, so all the logic sent must be under this method:

@Override
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
    // The local transaction logic must be defined
    if (null= =this.transactionListener) {
        throw new MQClientException("TransactionListener is null".null);
    }

    // Wrap the topic with delay and retry tags
    msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
    return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
Copy the code

Going into the sendMessageInTransaction() method, we’ll focus on the first half of the method:

public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException {
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null".null);
    }

    // ignore DelayTimeLevel parameter
    if(msg.getDelayTimeLevel() ! =0) {
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }
    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult;
    // Mark the message as a transaction message with the TRAN_MSG flag
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    // The producer group is flagged so that the broker can perform a backcheck
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
        // Send half message
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }
    
    / / a little
}
Copy the code

You can see that the code adds two attributes to the message:

  • PROPERTY_TRANSACTION_PREPARED: Marks it as a transaction message. The broker can determine if it is a transaction message based on whether the MSG has this field.
  • PROPERTY_PRODUCER_GROUP: producer group. This is required when the broker performs a transaction result lookup.

After the message is specially wrapped into a transaction message, the send() method called is the generic message sending method through which all messages are sent.

The Broker processes Half messages

The broker/SRC/main/Java/org/apache/rocketmq/broker/processor/SendMessageProcessor. Java classes, asyncSendMessage () method has a piece of code:

.// Get the transaction property field
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// Check whether it is null && is true
if(transFlag ! =null && Boolean.parseBoolean(transFlag)) {
  if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
    response.setCode(ResponseCode.NO_PERMISSION);
    response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
    return CompletableFuture.completedFuture(response);
  }
  // Store the prepare message
  putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
  // Store ordinary messages
  putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); }...Copy the code

See how to process the prepare message.

@Override
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
    return transactionalMessageBridge.asyncPutHalfMessage(messageInner);  //1 Calls asyncPutHalfMessage()
}
Copy the code

AsyncPrepareMessage () calls the asyncPutHalfMessage() method

public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
    return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
Copy the code

Store.asyncputmessage () is the generic method used to store normal messages, so the special handling of half messages is in parseHalfMessageInner()

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    // The original subject and the original queue ID of the backup message
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId()));
    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    // The topic and queueID of transaction messages are written dead
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}
Copy the code

There are two parts of logic

  1. Back up the original topic and queueId and put the actual topic of the message onREAL_TOPICProperty, where queueId is placedREAL_QIDProperties of the
  2. Override old values with transaction-specific topic and queueId, topic =RMQ_SYS_TRANS_HALF_TOPICQueueId = 0. This means that all half messages will be in the same topic queue.

Question answer

Now we can answer the question

  1. How does producer send half messages?

A: RocketMQ uses a specialized Producer object, TransactionMQProducer, for transaction messages. The Producer’s method of sending messages wraps the messages into transaction messages.

  1. How does a broker distinguish between ordinary and transactional messages

A: The broker distinguishes normal messages from transaction messages by adding the PROPERTY_TRANSACTION_PREPARED flag to the message’s property when the message is sent.

  1. How is it possible that half messages are not consumed by consumers?

A: Messages are processed so that they are assigned to specific topic queues and are isolated. So consumers don’t have access to these messages.