This section describes the process of sending messages from Producer.

Following the Demo at the beginning of the section, the sending message is written as follows:

Public class SyncProducer {public static void main (String[] args) throws Exception {// Instantiate message Producer Producer DefaultMQProducer producer = new DefaultMQProducer ("GroupTest"); // Set the address of NameServer (producer.setNamesrvaddr ("localhost:9876")); // Start Producer instance producer.start (); for (int i = 0; i < 100; I++) {// create the message and specify Topic, MSG = new Message ("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes (RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // Send a message to a Broker SendResult SendResult = producer.send (MSG); Printf ("%s%n", sendResult); } // If no more messages are sent, close the Producer instance. producer.shutdown (); }}Copy the code

The method for sending messages is as follows:

SendResult sendResult = producer.send (msg);Copy the code

The contents of the send method are as follows:

public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(msg);
}Copy the code

The main call to DefaulMQProducerImpl is delegated to the send method of DefaultMQProducerImpl. DefaultMQProducerImpl, in turn, calls its own sendDefaultImpl, which completes the main action of sending. SendDefaultImpl is defined as follows:

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}Copy the code

Parameters include:

  • “Message” : indicates the Message content
  • CommunicationMode: indicates the CommunicationMode, including synchronous, asynchronous, and single step
  • SendCallback: Callback interface in asynchronous mode, including success and exception notifications
  • Timeout: indicates the timeout period

The main execution of SendDefaultImpl is as follows:

(1) Ensure that the client is successfully initialized

Make sure DefaultMQProducerImpl is in the RUNNING state

(2) Query the release information of topic

TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo = TopicPublishInfo Will immediately call updateTopicRouteInfoFromNameServer method for real-time synchronization.

TopicPlushInfo is used to represent routing information for a Topic. As described in RocketMQ in the first section, Topic data is distributed across multiple brokers and queued on a single Broker to increase parallelism.

In the diagram above, TopicPlushInfo holds a MessageQueue list and a TopicRouteData. MessageQueue represents mapping information for queues, that is, queues on the brokers mentioned above. TopicRouteData is used to describe the location of the Broker and the configuration of the Queue.

(3) Check whether routing information exists

If no release information corresponding to topic is found in the previous step, an exception will be thrown to end, otherwise, go to (4).

(4) Calculate the retry times

Calculate the number of retries according to the communication mode

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;Copy the code

If the synchronization mode is used, the system retries the configuration times (2 by default) when the configuration fails. In other cases, the system does not retry the configuration.

(5) Determine whether to retry

If yes, the number of retries has been used up and the retry attempt cannot continue. If yes, the system returns the result. Otherwise, an exception is thrown. If the number of retries does not run out, go to (6)

(6) Select a broker with short latency

Select a broker with short latency, which is provided by MQFaultStrategy. MQFaultStrategy is introduced here, which provides an optional failure delay mechanism that allows brokers that are slow to respond to requests to make their state unavailable for a period of time. The diagram below:

You can use the sendLatencyFaultEnable attribute of MQFaultStrategy to control whether to turn on the fault delay mechanism. The default is false. When this switch is turned on, each time the corresponding queue under a topic is selected, based on the previous execution time, a broker with a short delay will be preferably selected on the premise that there is a qualified broker; otherwise, random selection will be considered.

LatencyFaultTolerance Is used to maintain the “available” state of the failed broker. For each broker that is defined as “faulty”, the LatencyFaultTolerance has a corresponding FaultItem within it, which is as follows:

class FaultItem implements Comparable<FaultItem> { private final String name; //brokername private volatile long currentLatency; Private volatile long startTimestamp; private volatile long startTimestamp; Public FaultItem(final String name) {this.name = name; Return (system.currentTimemillis () -starttimestamp) >= 0;} public Boolean isAvailable() {return (system.currentTimemillis () -starttimestamp) >= 0; }}Copy the code

That is, when a broker sends a failure, it records the point at which its most recent delay occurred and the point at which its next start will be available. A broker that is “available” means: The broker does not exist in the faultItemTable property maintained by LatencyFaultTolerance, or the current time is longer than the point at which the broker is next available.

LatencyFaultTolerance Updates the available state of the “failed” broker with the updateFaultItem method, which directly updates the last delay time and last available time of the broker corresponding to the FaultItem in the faultItemTable. This method requires that the specified broker be unavailable for a specified period of time. To judge whether a broker is faulty and not use the method of time, is in MQFaultStrategy computNotAvaliableDuration method, as follows:

private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }

    return 0;
}Copy the code

The definitions of latencyMax and notAvailableDuration are as follows:

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};

private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};Copy the code

That is, if the latency time of the current request exceeds a certain level of latencyMax, the broker is considered to be in a “broken” state and the unavailability time corresponding to broke is selected from notAvailableDuration. As can be seen from the definitions of these two arrays, if the delay time is less than 50ms, the broker is considered normal and no fault delay processing is required. The Latency refers to the time it takes to invoke RemotingClient to send a Netty request, which will be explained later.

Returning to the action of the current step, the selectOneMessageQueue method of MQFaultStrategy is used to select the queue under the given topic, which reads as follows:

The above selectOneMessageQueue method has been annotated. When the fault delay mechanism is turned on, the method first rotates to select a broker “available” from the list of optional queues under Topic, And brokerName=lastBrokerName(lastBrokerName indicates the last broker used and can be null for the first selection) on the queue. If no broker matches the requirement, a queue on a “relatively good” broker is selected, and finally a queue is randomly selected from the list of alternatives under that topic. If failure delay is not enabled, a queue is randomly selected directly from the list of alternative queues on the most recently used brokers under that topic.

(7) Call RemotingClient to send netty request

Send messages using MQClientAPIImpl (internally using RemotingClient) in a single step, asynchronous, or synchronous mode. Steps as follows:

  1. The IP and port of the broker are fetched from the cache, and if there is no information about the broker in the cache, the information is synchronized from nameserver to the cache and retrieved from the cache
  • Wrap a sending context object, SendMessageContext
  • If the messages are not batch messages, set a unique ID. The ID value is: Time difference between application startup and current time + current message count
  • Try to compress the message body. Currently, batch messages are not compressed. A single message that exceeds the configured length is compressed using the Deflater algorithm. Because the upper layer retries if this step fails, finally is used to change the weight of the compressed message to the uncompressed content
  • Execute if CheckForbiddenHook exists
  • If it exists sendHook executeSendMessageHookBefore perform before sending back to mobilize
  • Wrap the request header SendMessageRequestHeader
  • Send the message using the sendMessage method of MQClientAPIImpl to get the SendResult result
  • If it exists sendHook executeSendMessageHookAfter execution after sending back to mobilize, when an exception is thrown after executing will send back to mobilize executeSendMessageHookAfter
  • After sending, set the body of the Message back to its original value (restore the compressed body value of Message) in preparation for the retry
  • Return SendResult

Update the failure state of the broker

If the fail-delay switch is turned on, the time taken to determine whether the broker is down will be used to determine whether the broker is “unavailable” for a period of time.

(9) Judge whether the results are valid

Determine whether the request is valid. If the request is valid, it is executed without exception, (7) returns the result correctly, or if it is invalid, (8) updates the faulty status of the broker, and then jumps to (5) to retry

The above is the main process of sending messages from Producer, which involves a lot of cached data, most of which are synchronized by background tasks mentioned in the previous section during the client startup process. The following is attached is the brief diagram of notes made during the reading process of the source code at that time, which describes the general process of the message sent by Producer:

For more original content, please search our wechat official account doubaotaizi.