1. Business scenarios

In emporium, coupons are issued after successful payment.

The above scenario: in the e-commerce system, it will appear that the server breaks down when the payment is successful and the coupon is ready to be issued. At this point, the user can make a successful payment and not receive the coupon. In this case, it is easy to think of transactions that guarantee the atomicity of payment and coupon delivery: either payment and coupon delivery both succeed or fail at the same time, not allowing the other to succeed and the other to fail.

However, above, there is a situation: payment and coupons are highly coupled, which is easy to occur: the failure to send coupons will lead to the failure of payment.

The solution to this scenario is to introduce message-oriented MQ to decouple.

However, in the above scenario, MQ is unavailable and down. Payment will be successful, failure to send coupons.

In this case, distributed transactions need to be introduced.

2. Transaction messages

Distributed transactions are an abstract concept.

What about the concrete implementation?

There are many different implementations.

Here, the main introduction is RocketMQ transaction messages.

Flowchart of a transaction message

Process steps:
  • 1. The producer sends the half message
  • 2. MQ replies with an ACK confirmation message
  • 3. Perform local transactions: order payment. If the order payment is successful, a COMMIT message is sent to MQ. If the order payment fails, send a ROLLBACK message
  • 4. If step 3 fails to send the message, the MQ timer checks the half message. MQ callback method to check the execution of a local transaction. If the execution succeeds, the COMMIT message is returned. If the execution fails, a ROLLBACK message is returned.
  • 5. If MQ receives a COMMIT message, the half message is copied to the real topic
  • 6. Consumers consume information and issue coupons

3. How to use it

Above, you have an idea of the flow of transaction messages.

Next, know how to use it.

Again, take the coupon for payment.

3.1 Sending the half message -MQ replies to the ACK confirmation message


 @Override
    public void finishedOrder(String orderNo, String phoneNumber) {
        
        try {
          // Check out transaction message, topic: order completed
        Message msg = new Message(orderFinishedTopic, JSON.toJSONString(orderInfo).getBytes(StandardCharsets.UTF_8));
        
            // Send half message
            TransactionSendResult transactionSendResult = orderFinishedTransactionMqProducer.sendMessageInTransaction(msg, null);
            
        } catch (MQClientException e) {
           
        }

    }
Copy the code

3.2 Performing local transactions: payment

@Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
      
        try {
            // Change the status of the order
            orderService.payOrder();
          
            // The prepare message was successfully submitted
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // Failed to execute the prepare message
            returnLocalTransactionState.ROLLBACK_MESSAGE; }}Copy the code

3.3 MQ Timer Callback Querying the half message Status

@Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
      
        try {
            // Query the order status
            Integer orderStatus = orderService.getOrderStatus();
            if (Objects.equals(orderStatus, OrderStatusEnum.FINISHED.getStatus())) {          // Returns a COMMIT message
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                // Returns the rollback message
                returnLocalTransactionState.ROLLBACK_MESSAGE; }}catch (Exception e) {
            // Failed to query the order status
            returnLocalTransactionState.ROLLBACK_MESSAGE; }}Copy the code

3.4 Consumers will issue coupons for consumption

 @Bean(value = "orderFinishedConsumer")
    public DefaultMQPushConsumer finishedConsumer(@Qualifier(value = "orderFinishedMessageListener") OrderFinishedMessageListener orderFinishedMessageListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(orderFinishedConsumerGroup);
        consumer.setNamesrvAddr(namesrvAddress);
        // Topic: Finish the order
        consumer.subscribe(orderFinishedTopic, "*");
        consumer.setMessageListener(orderFinishedMessageListener);
        consumer.start();
        return consumer;
    }
    
Copy the code
Listener: OrderFinishedMessageListener
@Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {            
                // Issue a coupon
                couponService.distributeCoupon();

        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
Copy the code

Know what you are and why you are

By the end of your reading, you know how to use transaction messages.

Next, you need to understand the underlying principles: look at the source code.

Step1: first look at the code that sends the half message:

Step2: enter the code:

Step3: DefaultMQProducer#sendMessageInTransaction is called.

public TransactionSendResult sendMessageInTransaction(finalMessage msg, ... Skip a bunch of code SendResult =null; // Add attributes to the message to be sent. The table name is a transaction message, i.e., a half-message, set here totrue. (This property will be used later)
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            // Send a message -- focus 0
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }

        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            // Message sent successfully
            case SEND_OK: {
                try {
                    if(sendResult.getTransactionId() ! =null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null! = transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    if (null! = localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); }else if(transactionListener ! =null) {
                        log.debug("Used new transaction API");
                        ExecuteLocalTransaction requires subclasses to execute local transactions
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if(localTransactionState ! = LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); }}catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; }}break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

        try {
            // Finally, send an RPC request to the broker to commit or roll back the transaction
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }
        // The assembly result is returned
        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }
Copy the code

DefaultMQProducerImpl#sendMessageInTransaction

  • Simple data verification
  • Add attributes to the message to indicate the transaction message
  • Send a message and return the result of the message – emphasis 0
  • Different processing is performed depending on the result of the message
  • If the message is sent successfully, the local transaction (payment) is executed and the result of the local transaction is returned — point 1
  • Finally, depending on the result of the local transaction, send a Commit or rollback message to the broker — point 2

Above we outlined a general process. It doesn’t go into too much detail, it’s an overview of the process.

Next, let’s dive into some details:

0: sendResult = this.send(MSG); If we click on it, we’ll see that the underlying send method is actually calling DefaultMQProducerImpl#sendKernelImpl.

Step4: proceed to SendMessageProcessor#sendMessage

Step5: Transaction message, Continue to enter the TransactionalMessageServiceImpl# prepareMessage – > TransactionalMessageBridge# putHalfMessage – > TransactionalMessageBri dge#parseHalfMessageInner

Step6: then, we sit and look at the key 1, namely transactionListener. ExecuteLocalTransaction (MSG, arg);

public interface TransactionListener {
    /**
     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    /**
     * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
     * method will be invoked to get local transaction status.
     *
     * @param msg Check message
     * @return Transaction state
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
Copy the code

As you’ll see, this is an interface that has two methods, one that performs a local transaction executeLocalTransaction. The other is to check the local transaction checkLocalTransaction. These two methods need to be implemented by implementation classes.

For example: Performing local transactions: making payments

This. EndTransaction (sendResult, localTransactionState, localException);

public void endTransaction(/ / omit a pile of code / / transaction id String transactionId = sendResult. GetTransactionId ();
        / / broker address
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        Sends different results to the broker based on the transaction message and the execution result of the local transaction
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }

        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException ! =null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        // Send to the broker
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }
Copy the code

At this point, we have sent the message from the producer to the broker.

Next, we need to understand how the broker handles transaction messages.

Step8: How to check transaction messages

Annotations can be directly see code TransactionalMessageCheckService# onWaitEnd

@Override
    protected void onWaitEnd(a) {
        //timeout is the transactionTimeOut value obtained from the broker configuration file, representing the expiration time of the transaction, (the storage time of a message + timeout) > the current system time, the transaction status check will be performed on the message
        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
        CheckMax is the transactionCheckMax value obtained from the broker configuration file. It represents the maximum number of times a transaction is checked. If the number of times a transaction is checked is exceeded, the message is discarded by default, that is, a ROLLBACK message
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
        long begin = System.currentTimeMillis();
        log.info("Begin to check prepare message, begin time:{}", begin);
        / / back to check: emphasis org. Apache. Rocketmq. Broker., queue. TransactionalMessageServiceImpl. Check
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
        log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
    }
Copy the code

Step9: enter the check methods: TransactionalMessageServiceImpl# check.

Just read the notes

@Override
    public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {
        try {
            / / RMQ_SYS_TRANS_HALF_TOPIC theme
            String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
            // Get all queues under the topic RMQ_SYS_TRANS_HALF_TOPIC
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            // Data verification
            if (msgQueues == null || msgQueues.size() == 0) {
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.debug("Check topic={}, queues={}", topic, msgQueues);
            // Walk through the queue
            for (MessageQueue messageQueue : msgQueues) {
                long startTime = System.currentTimeMillis();
                OpQueue (RMQ_SYS_TRANS_OP_HALF_TOPIC)
                RMQ_SYS_TRANS_HALF_TOPIC: The topic of the prepare message, into which the transaction message is first entered.
                //RMQ_SYS_TRANS_OP_HALF_TOPIC: When the message server receives a commit or rollback request for a transaction message, it stores the message under this topic
                MessageQueue opQueue = getOpQueue(messageQueue);
                //messageQueue specifies the queue offset
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                OpQueue Specifies the offset of the queue
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);

                log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                // If one of the queues has an offset less than 0, the queue is skipped
                if (halfOffset < 0 || opOffset < 0) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                        halfOffset, opOffset);
                    continue;
                }
                The main purpose of doneOpOffset and removeMap is to avoid repeated calls to the transaction lookup interface
                List<Long> doneOpOffset = new ArrayList<>();
                HashMap<Long, Long> removeMap = new HashMap<>();
                PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                if (null == pullResult) {
                    log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                        messageQueue, halfOffset, opOffset);
                    continue;
                }
                // single thread
                // Number of empty messages
                int getMessageNullCount = 1;
                //RMQ_SYS_TRANS_HALF_TOPIC#queueId's latest offset
                long newOffset = halfOffset;
                // Offset of RMQ_SYS_TRANS_HALF_TOPIC
                long i = halfOffset;
                while (true) {
                    // Limit the maximum processing time per session to 60s
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    }
                    // If removeMap contains the current information, the next information is skipped
                    // The removeMap information is filled in above the fillOpRemoveMap
                    // The fillOpRemoveMap implementation logic is pulled from the RMQ_SYS_TRANS_OP_HALF_TOPIC topic.
                    // If the message queue offset is greater than or equal to the current processing progress of RMQ_SYS_TRANS_HALF_TOPIC#queueId
                    // Will be added to removeMap, indicating that it has been processed
                    if (removeMap.containsKey(i)) {
                        log.info("Half offset {} has been committed/rolled back", i);
                        Long removedOpOffset = removeMap.remove(i);
                        doneOpOffset.add(removedOpOffset);
                    } else {
                        // Get messages from the RMQ_SYS_TRANS_HALF_TOPIC queue based on message queue offset I
                        GetResult getResult = getHalfMsg(messageQueue, i);
                        MessageExt msgExt = getResult.getMsg();
                        // If the message is empty
                        if (msgExt == null) {
                            MAX_RETRY_COUNT_WHEN_HALF_NULL=1
                            // If the number of retries exceeds, jump out of the while loop and end the transaction status check of the message queue
                            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                break;
                            }
                            // If no new message is returned (PullStatus: pullstatus.no_new_msg), the transaction status check of the message queue is terminated.
                            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                                log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                    messageQueue, getMessageNullCount, getResult.getPullResult());
                                break;
                            } else {
                                log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                    i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                // For other reasons, set offset I to getresult.getPullResult ().getNextBeginOffset() and pull it again
                                i = getResult.getPullResult().getNextBeginOffset();
                                newOffset = i;
                                continue; }}// Determine if this message needs to be discarded or skipped.
                        NeedDiscard Basis: If the message is checked back more than the maximum number of times allowed,
                        // Then the message will be discarded. That is, the transaction message fails to commit and cannot be consumed by the consumer.
                        TRANSACTION_CHECK_TIMES is incremented by 1. By default, the maximum number of checks is 5.

                        //needSkip basis: If the transaction message exceeds the expiration time of the file,
                        // The default is 72 hours (see RocketMQ expired file for details), then the message is skipped.
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                            listener.resolveDiscardMsg(msgExt);
                            newOffset = i + 1;
                            i++;
                            continue;
                        }
                        // The message is stored longer than the start time, breaking the while loop
                        if (msgExt.getStoreTimestamp() >= startTime) {
                            log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                                new Date(msgExt.getStoreTimestamp()));
                            break;
                        }
                        // The time the message has been stored = the current time of the system - the timestamp of the message stored
                        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                        //checkImmunityTime: checks the transaction time
                        //transactionTimeout: transaction message timeout
                        long checkImmunityTime = transactionTimeout;
                        // User set checkImmunityTimeStr
                        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                        if (null! = checkImmunityTimeStr) {//checkImmunityTime=Long.valueOf(checkImmunityTimeStr)
                            checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                            if (valueOfCurrentMinusBorn < checkImmunityTime) {
                                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                    // Recent progress = current message progress +1
                                    newOffset = i + 1;
                                    i++;
                                    continue; }}}else {// If the current time is less than the transaction timeout, the while loop is terminated
                            if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                                log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                    checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                break;
                            }
                        }
                        List<MessageExt> opMsg = pullResult.getMsgFoundList();
                        // Check whether you need to check back based on the following criteria:
                        // The message has been stored for longer than the transaction timeout
                        boolean isNeedCheck = (opMsg == null&& valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg ! =null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                            || (valueOfCurrentMinusBorn <= -1);

                        if (isNeedCheck) {
                            if(! putBackHalfMsgQueue(msgExt, i)) {/ / 11
                                continue;
                            }
                            // Focus: transaction callback (asynchronous)
                            listener.resolveHalfMsg(msgExt);
                        } else {
                            // Load processed messages for filtering
                            pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                            log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                                messageQueue, pullResult);
                            continue;
                        }
                    }
                    newOffset = i + 1;
                    i++;
                }
                // Save the half message queue callback progress
                if(newOffset ! = halfOffset) { transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset); }long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                // Save the processing of opQueue processing today
                if(newOpOffset ! = opOffset) { transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset); }}}catch (Throwable e) {
            log.error("Check error", e); }}Copy the code

Step10: further research :resolveHalfMsg

public void resolveHalfMsg(final MessageExt msgExt) {
        executorService.execute(new Runnable() {
            @Override
            public void run(a) {
                try {
                    // For each half message to be backchecked, the local transaction results are checked back
                    sendCheckMessage(msgExt);
                } catch (Exception e) {
                    LOGGER.error("Send check message error!", e); }}}); }Copy the code

Step11: Continue after the sendCheckMessage(msgExt) method

/** * Send back check message *@param msgExt
     * @throws Exception
     */
    public void sendCheckMessage(MessageExt msgExt) throws Exception {
        CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = newCheckTransactionStateRequestHeader(); checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId()); checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)) ; checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId()); checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());/ / the original topic
        msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        // Id of the original queue
        msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));

        msgExt.setStoreSize(0);
        String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
        if(channel ! =null) {
            // The callback queries the status of the local transaction
            brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
        } else {
            LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId); }}Copy the code

Here, basically the transaction message flow and implementation details go through.

Any other questions in the comments section or private message turnips

5. Problem: There are other implementations of distributed transactions

The transaction message above is an implementation of a distributed transaction.

A transaction message is called a two-segment commit.

Question: Distributed transactions, what are the specific implementation methods?

Welcome to leave a message

6. Follow-up articles

  • RocketMQ- Getting Started (updated)
  • RocketMQ- Architecture and Roles (updated)
  • RocketMQ- Message Sending (updated)
  • RocketMQ- Consumption information
  • RocketMQ- Broadcast mode and Cluster Mode for Consumers (updated)
  • RocketMQ- Sequential messages (updated)
  • RocketMQ- Delayed messages (updated)
  • RocketMQ- Batch messaging
  • RocketMQ- Filters messages
  • RocketMQ- Transaction Messages (updated)
  • RocketMQ- Message store
  • RocketMQ – high availability
  • RocketMQ – high performance
  • RocketMQ- Primary/secondary replication
  • RocketMQ- Swiping mechanism
  • RocketMQ – idempotence
  • RocketMQ- Message retry
  • RocketMQ- Dead letter queue

.

Welcome to join (guan) unit (Zhu), the follow-up article dry goods.