RocketMQ – Index to the Producer series of articles:

I. Introduction of Producer

Producer is the Producer of RocketMQ messages and is responsible for producing the messages. It establishes keep-alive connections with one of the nodes (randomly) in the NameServer cluster, periodically reads Topic routing information from NameServer, and stores routing information in local memory. It establishes a long connection to the Master Broker that provides Topic services, and periodically sends heartbeat to the Master Broker. It will only send messages to the Master Broker and select the appropriate Queue from the Message Queue list to send messages to achieve load balancing. It supports sending messages of various types, such as ordinary messages, transaction messages, timed messages, etc. It can send messages in three ways: synchronous, asynchronous, one-way, etc. A simple interaction diagram of the production side with the Master Broker and NameServer can be easily viewed:

Note: Producers can also query messages with brokers for other functional interactions.

Ii. Producer startup process:

Before understanding the specific production start process, we first put forward a few questions, with the problem to analyze the source code:

  1. What exactly does the message producer do when it starts?

  2. An application needs to send multiple topics, and different topics need to be sent to brokers in different clusters. How to handle this?

We can start by understanding and analyzing the producer-related class diagram relationship:

As you can see from the class diagram, MQProducer can be implemented in two ways.

One is DefaultMQProducer(non-transactional message producer). One is TransactionMQProducer(which supports transaction messages).

Next, a simple analysis will be made on the core parameters or methods of the class:

2.1 the MqAdmin # #

MqAdmin: Core Method resolution (Mq Administration Base Interface)

Void createTopic(final String Key, Final String newTopic, final Int queueNum) throws MQClientException; // Create a topic. Long searchOffset(final MessageQueue MQ, final Long TIMESTAMP) throws MQClientException; // find the maximum physical offset in the MessageQueue long maxOffset(final MessageQueue mq) throws MQClientException; // Find the minimum physical offset in the message queue. long minOffset(final MessageQueue mq) throws MQClientException; // Obtain the earliest stored message time long written variestmsgstoreTime (final MessageQueue MQ) throws MQClientException; MessageExt viewMessage(Final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; QueryResult queryMessage(final String topic, final String key, final int maxNum, final Long begin, final long end) throws MQClientException, InterruptedException; // Find messages by subject and message ID. MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;Copy the code

2.2 Core method analysis

MQProducer: Core method parsing (Producer base Interface) :

// Start void start() throws MQClientException; // Close void shutdown(); / / information according to the topic to obtain corresponding queue List < MessageQueue > fetchPublishMessageQueues (final String topic) throws MQClientException; / / synchronization - messages SendResult send (final Message MSG, final MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; Void send(final Message MSG, final MessageQueueSelector selector, final Object ARG, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException; SendResult (final Message MSG, final MessageQueueSelector selector, final Object ARG, final long timeout) throws MQClientException, RemotingException, MQBrokerException,InterruptedException; Void sendOneway(final Message MSG, final MessageQueue MQ) throws MQClientException, RemotingException, InterruptedException; // Transaction Message - Send TransactionSendResult sendMessageInTransaction(final Message MSG, final Object ARG) throws MQClientException; Send (final Collection<Message> MSGS) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;Copy the code

Note: Where start() and shutdown() indicate the start and shutdown of the producer.

2.3 clientConfig

ClientConfig: Core attribute method resolution (client configuration)

//nameServer- address, default from: System property: RocketMq. namesrv.addr or environment variable :NAMESRV_ADDR Private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); // Instance name, default: - RocketMq. client. Name Private String instanceName = System.getProperty(" RocketMq. client. Name ", "DEFAULT"); // Build mq client ID, example :ip@instanceName @unitname: 172.16.62.75@19312@unitName public String buildMQClientId() {StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); sb.append("@"); sb.append(this.getInstanceName()); if (! UtilAll.isBlank(this.unitName)) { sb.append("@"); sb.append(this.unitName); } return sb.toString(); } // Set namesrv address public void setNamesrvAddr(String namesrvAddr) {this.namesrvaddr = namesrvAddr; }Copy the code

Note: namesrvAddr indicates the nameServer address, which can be set by calling the setNamesrvAddr method, or by setting environment variables or system properties. BuildMQClientId Sets the producer Id.

Iii. TransactionMQProducer:(transaction messages, which will be explained separately and ignored in this chapter)

(abbreviated)

DefaultMQProducer :(non-transactional message producer)

// Constructor public DefaultMQProducer(final String producerGroup, RPCHook RPCHook) {this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); Public SendResult send(Message MSG) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg); } / / boot method public void the start () throws MQClientException {this. DefaultMQProducerImpl. Start (); if (null ! = traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); }}}Copy the code

Note: The constructor of DefaultMQProducer, send and start methods are all around DefaultMQProducerImpl. Default producer implementation class, its start method as the core method of producer startup, then the core analysis of the implementation of its start method.

DefaultMQProducerImpl#start

/** * mq-producer start * @param startFactory * @throws MQClientException */ public void start(final Boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: // 0- Service status set this.serviceState = servicestate. START_FAILED; //1- Check the configuration this.checkconfig (); //2- Change the instanceName of the producer to the process ID. if (! this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } //3- Create an MQClientlnstance instance this.mqclientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); //4- Register producers to MQClientlnstance. boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (! registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } / / 5 - the default topic information cache (this) defaultMQProducer) getCreateTopicKey () = 'TBW102') this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); MQClientFactory if (startFactory) {mqclientFactory.start (); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; // Set the state to break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } / / 7 to send the heart to all the broker enclosing mQClientFactory. SendHeartbeatToAllBrokerWithLock (); }Copy the code

Analysis is as follows:

0- Service status setting:

The value is set to prevent repeated startup. The enumerated classes are: ServiceState; If the initialization state is not CREATE_JUST, an exception occurs

1- Detection configuration:

private void checkConfig() throws MQClientException { Validators.checkGroup(this.defaultMQProducer.getProducerGroup()); if (null == this.defaultMQProducer.getProducerGroup()) { throw new MQClientException("producerGroup is null", null); } / / production group they belong to Not equal to DEFAULT_PRODUCER if (this. DefaultMQProducer. GetProducerGroup (.) the equals (MixAll. DEFAULT_PRODUCER_GROUP)) { throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.", null); }}Copy the code

Note: To check the validity of -producerGroup

2- And change the instanceName of the producer to the process ID.

// producerGroup = CLIENT_INNER_PRODUCER if (! this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } is called ClientConfig#changeInstanceNameToPID publicvoid changeInstanceNameToPID() {if (this.instanceName.equals("DEFAULT")) { this.instanceName = String.valueOf(UtilAll.getPid()); }}Copy the code

Note: instanceName == DEFAULT, which is changed to the ID of the started process for the MQClientInstance build

3- Create an MQClientlnstance instance

MQClientManager manages MQClientInstance, and its internal data structure is: ConcurrentHashMap, key:clientId, and MQClientManager itself is a singleton mode. The core method is analyzed as follows: MQClientManager

private static MQClientManager instance = new MQClientManager(); Private AtomicInteger factoryIndexGenerator = new AtomicInteger(); Private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>(); / / build return MQClientInstance public MQClientInstance getAndCreateMQClientInstance (final ClientConfig ClientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); // build mq clientId MQClientInstance instance = this.factorytable. get(clientId); if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev ! = null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } return instance; }Copy the code

Remark:

ClientConfig buildMQClientId above has analysis, is to build the clientId; GetAndCreateMQClientInstance the goal of this approach is to build or inquiries MQClientInstance MQClientInstance: encapsulates RocketMQ network processing API, It is a network channel through which Producer and Consumer interact with NameServer and Broker.

The advantages and disadvantages of sharing the same MQClientInstance with multiple producers are analyzed:

  1. Advantages: In general, to reduce client resource usage, if all instanceName and unitName are set to the same value, only one instance of MQClientInstance will be created (for producer topics to send messages within the same set of broker clusters)

  2. Disadvantages: What happens if MQClientInstance is reused for multiple topics? This can happen if you start multiple Producers in the same JVM without setting instanceName and unitName. The two producers share an MQClientInstance and messages are routed to the same cluster.

For example, if you have two producers with different NameServer addresses, the intention is for the two producers to distribute messages to different clusters, but because they share an MQClientInstance, This MQClientInstance is built based on the original Producer configuration. The second Producer and the other Producer are considered to be the same instance. The configuration is the same, and the routing of the messages is the same.

4- Register producers with MQClientInstance.

//key:group, value: Private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>(); // Add producers to MQClientlnstance management public Boolean registerProducer(final String group, final DefaultMQProducerImpl producer) { if (null == group || null == producer) { return false; } MQProducerInner prev = this.producerTable.putIfAbsent(group, producer); if (prev ! = null) { log.warn("the producer group[{}] exist already.", group); return false; } return true; }Copy the code

Note: DefaultMQProducerImpl implements the interface class MQProducerInner

5- Add the default topic information cache where you need to understand the topicPublishInfoTable data structure

// Key :topic value:TopicPublishInfo Private Final ConcurrentMap<String/* TOPIC */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();Copy the code

TopicPublishInfo:

Analysis, familiar admire familiar taste, MessageQueue and TopicRouteData in NameServer have been analyzed quite clear, analysis is as follows:

Public class TopicPublishInfo {private Boolean orderTopic = false; Private Boolean haveTopicRouterInfo = false; Private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); // This value is incremented by 1 each time a message queue is selected, or reset to 0 if integer.max_value is used to select a message queue. private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); // Routing information private TopicRouteData TopicRouteData; // Select the queue method, lastBrokerName is actually the brokerName that failed to be sent last time. If it is not empty, BrokerName from which this selection queue is sent selects another brokerName Public MessageQueue selectOneMessageQueue(Final String lastBrokerName) {if (lastBrokerName == null) { return selectOneMessageQueue(); } else {// If the message fails again, the next time the message queue is selected, avoid the Broker that MesageQueue was in. Otherwise, it is very likely to fail again. int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (! mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } // Use whichQueue to accessorise and fetch value, which is modulo with the number of message queues in the current routing table. MessageQueue(selectOneMessageQueue()) public MessageQueue selectOneMessageQueue() {int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); } //messageQueueList- public Boolean ok() {return null! = this.messageQueueList && ! this.messageQueueList.isEmpty(); } public Boolean isHaveTopicRouterInfo() {return haveTopicRouterInfo; }Copy the code

6 – start – MQClientInstance

MQClientInstance#start public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) {  case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; / / 1 > state - set the start failure / / If not specified, & the address from the name server If (null = = this. ClientConfig. GetNamesrvAddr ()) { / / 2 > whether nameSrvAddr address is empty, HTTP get nameSrvAddr enclosing mQClientAPIImpl. FetchNameServerAddr (); } // Start request-response channel : netty this.mQClientAPIImpl.start(); // Start various schedule tasks this.startscheduledTask (); / / 4 > "important" to Start the timer task / / Start pull service enclosing pullMessageService. Start (); / / 5 > consumption related to subsequent interpretation / / Start rebalance service enclosing rebalanceService. Start (); / / 6 > consumption related to subsequent interpretation / / Start push service enclosing defaultMQProducer. GetDefaultMQProducerImpl (.) Start (false); //7> Internally start an mqProducter, startFactory=false log.info(" The client factory [{}] start OK", this.clientid); this.serviceState = ServiceState.RUNNING; >8 Status set run break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; }}}Copy the code

Note: this.startScheduledTask();

7- Send heartbeat to all brokers

(this. MQClientFactory. SendHeartbeatToAllBrokerWithLock ())

public void sendHeartbeatToAllBrokerWithLock() { if (this.lockHeartbeat.tryLock()) { try { this.sendHeartbeatToAllBroker(); 1 > send the heart to all the broker enclosing uploadFilterClassSource (); } catch (final Exception e) {log.error("sendHeartbeatToAllBroker Exception ", e); } finally { this.lockHeartbeat.unlock(); } } else { log.warn("lock heartBeat, but failed."); }}Copy the code

Note: sendHeartbeatToAllBroker, relatively simple,

BrokerVersionTable (ConcurrentHashMap) is maintained for return results, which you should not miss. There are scheduled tasks that periodically send heartbeats to all brokers

Summary: We have seen the producer startup process in 7 steps, which can be roughly divided into: check the relevant configuration, register the relevant build classes (e.g. MQClientInstance related, Netty related, etc.), and then start the relevant scheduled tasks; A brief summary of the producer startup process is as follows: