Open message trace

The broker end

  • The traceTopicEnable property is set to true and the default value is false. Set to true, the default topic for storing trace data is initialized when the broker starts: RMQ_SYS_TRACE_TOPIC;
  • The traceOn property is set to true and the default value is true. If this property is set to false, the client does not send trace data to the broker

producer

When constructing the Producer object, set enableMsgTrace=true, customizedTraceTopic can be null, use the default topic, and the other overloaded interfaces are similar

    /**
     * Constructor specifying producer group, enabled msgTrace flag and customized trace topic name.
     *
     * @param producerGroup Producer group, see the name-sake field.
     * @param enableMsgTrace Switch flag instance for message trace.
     * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
     * trace topic name.
     */
    public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
        this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic);
    }
Copy the code

consumer

When constructing the consumer object, set enableMsgTrace=true, customizedTraceTopic can be null, use the default topic, and the other overloaded interfaces are similar

    /**
     * Constructor specifying consumer group, enabled msg trace flag and customized trace topic name.
     *
     * @param consumerGroup Consumer group.
     * @param enableMsgTrace Switch flag instance for message trace.
     * @paramcustomizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name. * /
    public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
        this(null, consumerGroup, null.new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
    }
Copy the code

The storage medium for message trajectory data

  • Message track data is still stored in the RocketMQ broker, and each track data is sent to a specific topic just like a normal message.
  • The ID and KEYS of the original message are used as the KEYS of the trajectory message, which can be used to retrieve the trajectory data of the specified message.
  • One benefit of not using external storage media is to avoid relying on third-party components.

How does Producer collect trajectory data

A SendMessageHook is registered when producer is initialized to collect context information before and after the message is sent, and asynchronously deliver track data to the broker after the message is sent

    @Override
    public void sendMessageBefore(SendMessageContext context) {
        //if it is message trace data,then it doesn't recorded
        if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
            return;
        }
        //build the context content of TuxeTraceContext
        TraceContext tuxeContext = new TraceContext();
        tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
        context.setMqTraceContext(tuxeContext);
        tuxeContext.setTraceType(TraceType.Pub);
        tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
        //build the data bean object of message trace
        TraceBean traceBean = new TraceBean();
        // The trajectory data collected before sending is as follows
        traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
        traceBean.setTags(context.getMessage().getTags());
        traceBean.setKeys(context.getMessage().getKeys());
        traceBean.setStoreHost(context.getBrokerAddr());
        traceBean.setBodyLength(context.getMessage().getBody().length);
        traceBean.setMsgType(context.getMsgType());
        traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());
        tuxeContext.getTraceBeans().add(traceBean);
        // Collect some data to context before sending
    }

    @Override
    public void sendMessageAfter(SendMessageContext context) {
        //if it is message trace data,then it doesn't recorded
        if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
            || context.getMqTraceContext() == null) {
            return;
        }
        if (context.getSendResult() == null) {
            return;
        }

        if (context.getSendResult().getRegionId() == null| |! context.getSendResult().isTraceOn()) {// if switch is false,skip it
            return;
        }

        TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
        // The traceBean contains the information collected before sending
        TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
        TraceBeans actually has only one piece of data
        int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
        tuxeContext.setCostTime(costTime);
        if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
            tuxeContext.setSuccess(true);
        } else {
            tuxeContext.setSuccess(false);
        }
        tuxeContext.setRegionId(context.getSendResult().getRegionId());
        traceBean.setMsgId(context.getSendResult().getMsgId());
        traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
        // Calculate the storage time: it is considered half of the total time, so this is not an accurate value
        traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
        // Prepare to send track data asynchronously, not immediately
        localDispatcher.append(tuxeContext);
    }
Copy the code

How does Consumer collect trajectory data

Similar to producer, Consusmer registers a ConsumeMessageHook, but the biggest difference with producer is that producer sends track data of the message after the message is sent, while consumer collects part of the data and sends it once before consuming. After consumption, part of the data will be collected and sent once. Consumption is a total of two tracks of data. If the consumption fails and you try again, you record two more for each retry

    @Override
    public void consumeMessageBefore(ConsumeMessageContext context) {
        if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
            return;
        }
        TraceContext traceContext = new TraceContext();
        context.setMqTraceContext(traceContext);
        traceContext.setTraceType(TraceType.SubBefore);//
        traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getConsumerGroup()));//
        List<TraceBean> beans = new ArrayList<TraceBean>();
        for (MessageExt msg : context.getMsgList()) {
            if (msg == null) {
                continue;
            }
            String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
            String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);

            if(traceOn ! =null && traceOn.equals("false")) {
                // If trace switch is false ,skip it
                continue;
            }
            TraceBean traceBean = new TraceBean();
            traceBean.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic()));//
            traceBean.setMsgId(msg.getMsgId());//
            traceBean.setTags(msg.getTags());//
            traceBean.setKeys(msg.getKeys());//
            traceBean.setStoreTime(msg.getStoreTimestamp());//
            traceBean.setBodyLength(msg.getStoreSize());//
            traceBean.setRetryTimes(msg.getReconsumeTimes());//
            traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostConsumer().getmQClientFactory().getClientId());
            traceContext.setRegionId(regionId);//
            beans.add(traceBean);
        }
        if (beans.size() > 0) {
            traceContext.setTraceBeans(beans);
            traceContext.setTimeStamp(System.currentTimeMillis());
            localDispatcher.append(traceContext);// Send once before consumption}}@Override
    public void consumeMessageAfter(ConsumeMessageContext context) {
        if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
            return;
        }
        TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext();

        if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {
            // If subbefore bean is null ,skip it
            return;
        }
        TraceContext subAfterContext = new TraceContext();
        subAfterContext.setTraceType(TraceType.SubAfter);//
        subAfterContext.setRegionId(subBeforeContext.getRegionId());//
        subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));//
        subAfterContext.setRequestId(subBeforeContext.getRequestId());//
        subAfterContext.setSuccess(context.isSuccess());//

        // Caculate the cost time for processing messages
        int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());
        subAfterContext.setCostTime(costTime);//
        subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
        String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);
        if(contextType ! =null) {
            subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
        }
        localDispatcher.append(subAfterContext);// Send once after consumption
    }
Copy the code

How to Send Trace Data When a client sends or consumes a message, it ends by putting the trace message into a blocking queue. An asynchronous thread retrieves the trace message from the queue as a send task, submits it to the thread pool, and then sends it to the broker.

  1. The default size of the queue for track messages to be processed is 1024. If the queue is full, the current track messages will be discarded after logging
  2. An asynchronous thread is constantly polling to fetch up to 100 pieces of data at a time from the queue storing the trace message (or waiting 5ms is not enough), encapsulating them into a send request task and submitting them to the thread pool that sends the trace message
  3. The sending task classifies this batch of messages by topic, and sends a batch of messages to the track topic according to the batch processing. The message ID and message keys of the original message are regarded as the keys of the track message. The metadata of this batch of original messages (if there are multiple messages, the metadata of each message is finally combined into one. Each message has a field separator at the end of the metadata, which can be broken up when querying) as the message body

How to query trajectory data

Since the message track data is sent to the specified track topic, and the ID and KEYS of the original message serve as the KEYS of the track message, the message ID of the target message can be used as the key of the track message to find out relevant messages from the track topic, and the body of the detected message can be analyzed. If the message ID field of the parsed message body data matches the target message ID, it is the message trajectory data we want. Generally speaking, under normal circumstances, there should be 3 sending and consumption tracks, one sending track and two consumption tracks (pull before and after local consumption).