The overall architecture

According to the architectural design, NamesRV acts as a registry with which brokers are registered. The producer and consumer maintain a long connection to namesRV, and pull broker information from NamesRV, and then contact namesRV

Namesrvs do not communicate with each other; they each hold the broker’s registration information. In the distributed domain, a normal cluster deployment will have a master/slave node or a leader node. But for NamesRV, each node is independent, and each NamesRV node does not communicate with each other.

RocketMQ has a unique registry design.

Namesrv principle

Before I talk about namesRV, I need to introduce how rocketMQ abstracts entities. What entities are abstracted from? This is a bit like the ER diagram when designing a database.

Physical design

This stuff is very important. You understand how entities are designed. You can get a sense of how namesRV stores data.

Namespace: The Namespace is personally understood to be used for isolation (the same purpose as the Apollo Namespace, although Apollo’s Namespace is more powerful). For example: You have three environments /qa/pre/release. Then the topic of these three environments is the same. You can specify a namespace when creating a producer or consumer.

RouteInfoManager: manages routing meta information

// Topic queue table. When initialized, topic has 4 queues by default
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// Broker information. Contains the cluster name, broker name, and address of the active and standby brokers
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// Broker cluster information. All the names of the storage cluster
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// Broker status information, which namesrv replaces each time it receives a heartbeat packet
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// Broker FilterService list for class-mode message filtering
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Copy the code

BrokerData: Broker information encapsulation

/** * broker.conf brokerClusterName configuration */
private String cluster;
/** * broker. Conf brokerName configuration */
private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
Copy the code

Relationship between cluster, brokerName, and brokerId

BrokerName (in this case, the master node) does not exist in the same cluster. For master-slave brokers. BrokerName is the same between them, but brokerId is different. The brokerId of the Master Broker defaults to 0. BrokerId of slave Broker is a non-zero number.

BrokerLiveInfo: broker

/** * Last update time. Namesrv updates this information when it receives the heartbeat of the broker. * /
private long lastUpdateTimestamp;
private DataVersion dataVersion;
// Channel in netty, namely socket
private Channel channel;
// BrokerController#getHAServerAddr
// The format is as follows: IP :port
private String haServerAddr;
Copy the code

QueueData: indicates QueueData

private String brokerName;
// Number of read queues
private int readQueueNums;
// Write queue sorting
private int writeQueueNums;
/** * 6: supports both read and write * 4: forbade * 2: forbade */
private int perm;
private int topicSynFlag;
Copy the code

For readQueueNums and writeQueueNums in rocketMQ, 99% of the time, these two values are the same. If the capacity is expanded or reduced, the difference may be different. For example, if your topic queue is set to 128; If you want to shrink the writeQueueNums to 64, change the writeQueueNums to 64, and then change the readQueueNums to 64 after the remaining 64 queues are consumed.

mqadmin updateTopic -c cluster -n localhost:9876 -t topic_test -r 64 -w 128 
Copy the code

Namesrv start

NamesrvController#initialize

The startup process is relatively simple and involves nothing more than loading the configuration. Here’s a relatively important piece of code: scan expired brokers periodically

// Scan expired brokers with 5 seconds delay and 10 seconds interval
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run(a) {
        NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
Copy the code

CURD – broker registration | heartbeat

BrokerController#start()

  1. After the broker starts, it delays 10 seconds and sends heartbeat packets to NamesRV every 30 seconds
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run(a) {
        try {
            BrokerController.this.registerBrokerAll(true.false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e); }}},1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
// brokerConfig.getRegisterNameServerPeriod() = 30000
Copy the code
  1. After the broker sends a request, NamesRV processes the logic in this method

DefaultRequestProcessor#registerBrokerWithFilterServer()

The main function of this method is to maintain clusterAddrTable, brokerAddrTable, topicQueueTable, brokerLiveTable and filterServerTable information in RouteInfoManager. Then, to ensure the consistency of the operation, the lock is used directly.

CURD — namesrv deletes failed brokers

  1. Every 10 seconds, scan expired brokers
// Scan expired brokers with 5 seconds delay and 10 seconds interval
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run(a) {
        NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
Copy the code
  1. Delete the broker that does not send heartbeat within 120s.
public class RouteInfoManager {
    public void scanNotActiveBroker(a) {
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            // The last time todo was registered
            long last = next.getValue().getLastUpdateTimestamp();
            // If the todo is not registered after 120s, the broker will be deleted
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                // Close the channel to the broker
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                // Delete the broker from the registry
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); }}}}Copy the code

Namesrv, then, just removes the broker. Namesrv does not tell the producer or consumer broker that it has been removed. Also, NamesRV has to wait at least 120s to know that the broker might be in trouble. So is there a problem with this design? If so, how to avoid it?

The broker disconnects from namesRV

Normal disconnection: If the connection to NamesRV is normal, the broker will send a request to NamesRV before closing, and NamesRV will move the broker out through RouteInfoManager#unregisterBroker.

Abnormal disconnection:

When the broker is forcibly disconnected from namesRV. Namesrv triggers an event listener that senses the broker is offline and actively deletes it. The broker is eliminated from NettyEventExecutor#run() by calling RouteInfoManager#onChannelDestroy().

In fact, the RouteInfoManager#onChannelDestroy() method is called whenever the connection to namesrv is broken.

The conclusion is that namesRV is the first to know when the connection between the broker and NamesRV is broken.

Routing discovery

RouteInfoManager#pickupTopicRouteData

Retrieve topic data from topicQueueTable, brokerAddrTable, and filterServerTable, wrap it as TopicRouteData and return it.

public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
Copy the code

Namesrv summary

  1. All in the clusterBrokerevery30sTo all in the clusterNameServerSend the heartbeat packet and registerTopicRouting information,BrokerSelf information. (registration | heartbeat)
  2. NameServerStart a scheduled taskEvery 10 sscanningBroker survival status tableIf theNameservercontinuous120sNot yet receivedBrokerThe heartbeat packet will be determinedBrokerIs offline, and this is removed from the routing tableBrokerRemoved. (out)
  3. ifNameserverBrokerThe long connection is disconnected.NameServerYou can sense it immediatelyBrokerGo offline and remove this from the routing tableBrokerRemoved. (Because it’s based ontcp/ipCommunication, disconnect can be immediately known)
  4. This can be achieved by changing the topic read/write queue
  5. Master-slave broker. BrokerName is consistent, but brokerId is not; The brokerId of the master node must be 0, and the brokerId of the slave node must be non-0. BrokerName (which refers to the master node) should never be the same for the same cluster
  6. Namesrv provides the registry discovery function for brokers.

Code appreciation

Standard way to create a thread pool

 this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
    Runtime.getRuntime().availableProcessors(),
    Runtime.getRuntime().availableProcessors(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.asyncSenderThreadPoolQueue,
    new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); }});Copy the code

Be sure to give the thread a name so that you can view the log later

How does rocketMQ gracefully shut down resources when the application closes

NamesrvStartup#start

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Override
    public Void call(a) throws Exception {
        controller.shutdown();
        return null; }}));Copy the code

Controller.shutdown () is used to shutdown resources such as connections and thread pools. The closure of the thread pool is closely related to our development, so let’s post the code here and see how others handle it

NamesrvController#shutdown

public void shutdown(a) {...// Call shutdown() instead of shutdownNow()
    this.scheduledExecutorService.shutdown(); . }Copy the code
public class ShutdownHookThread extends Thread {
    private volatile boolean hasShutdown = false;
    private AtomicInteger shutdownTimes = new AtomicInteger(0);
    private final InternalLogger log;
    private final Callable callback;

    /**
     * Create the standard hook thread, with a call back, by using {@link Callable} interface.
     *
     * @param log The log instance is used in hook thread.
     * @param callback The call back function.
     */
    public ShutdownHookThread(InternalLogger log, Callable callback) {
        super("ShutdownHook");
        this.log = log;
        this.callback = callback;
    }

    /**
     * Thread run method.
     * Invoke when the jvm shutdown.
     * 1. count the invocation times.
     * 2. execute the {@link ShutdownHookThread#callback}, and time it.
     */
    @Override
    public void run(a) {
        synchronized (this) {
            log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet() + " times.");
            if (!this.hasShutdown) {
                this.hasShutdown = true;
                long beginTime = System.currentTimeMillis();
                try {
                    this.callback.call();
                } catch (Exception e) {
                    log.error("shutdown hook callback invoked failure.", e);
                }
                long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                log.info("shutdown hook done, consuming time total(ms): "+ consumingTimeTotal); }}}}Copy the code

How do I handle concurrent writes to collections

public class RouteInfoManager {
    / / read/write locks
    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {...try {
                this.lock.writeLock().lockInterruptibly(); . . . }finally {
                this.lock.writeLock().unlock(); }... }}Copy the code
  1. The release of a lock (resource) must be written in finally to ensure that it can be released
  2. For multiple sets where atomicity is required, rocketMQ directly uses read/write locks (reads do not conflict with each other) to ensure atomicity of an operation. The reason for using read-write locks, of course, is to avoid waiting between reads.

Possible production problems

Avoid deploying namesRV and broker on the same physical machine

Unlocking best practices for RocketMQ cluster deployment from a year-end production failure

Broker autoCreateTopicEnable cannot be set to true in a production environment

In a production environment, why can’t autoCreateTopicEnable be set to true

Books recommended

  • Inside rocketMQ Technology — Ding Wei
  • Ding Wei technology blog