background

Reading good code is a pleasure, but describing good code with your own worldview is a pain in the neck — 100 million brain cells to die.

This source code reading note was created a year ago when I was simply writing down my summary, but recently I reorganized it in the hope that it can help those in need.

With the mobile Internet fast entering the second half, more and more enterprises are shifting their attention to the Internet of Things. Bike-sharing and Xiaomi’s smart home products are typical iot applications.

Companies believe that with big data and AI technology they can gain a lot of additional value and generate new business models. Massive data can flow to the back end to generate follow-up value through access services, in which MQTT has become an undefined standard protocol of the Internet of Things and both domestic and foreign cloud factories have their brokers implemented.

  • Baidu cloud
  • Ali cloud
  • Tencent cloud
  • AWS

features

MQTT protocol is designed for the communication of a large number of remote sensors and control devices with limited computing power and working on low bandwidth and unreliable networks. It has the following main characteristics:

  1. Use the publish/subscribe messaging pattern to provide one-to-many messaging and decouple applications
  2. Message transmission that shields payload content
  3. Use TCP/IP to provide network connectivity
  4. There are three qualities of message publishing service
    • “At most once”, message publishing is entirely dependent on the underlying TCP/IP network. Message loss or duplication occurs. This level can be used for environmental sensor data where the loss of a read record does not matter because a second send will follow shortly after.
    • “At least once” ensures that messages arrive, but message duplication may occur.
    • “Only once” ensures that the message arrives once. This level can be used in cases where duplicate or missing messages result in incorrect results in a billing system.
  5. Small transfer, very low overhead (fixed length header is 2 bytes), minimal protocol switching to reduce network traffic
  6. Mechanisms for notifying interested parties of client outages using the Last Will and Testament features

== This section explains how to implement the above features ==

The term

The Client Client

Use MQTT programs or devices such as environmental monitoring sensors, shared bikes, shared charging banks, etc.

The service side Server

A program or device that acts as an intermediary between a client sending a message and a client requesting a subscription.

Publish and subscribe process

The process for client A to send A hello message to client B is as follows:

  1. Client -b subscribes to the topic named MSG
  2. Client-a sends “Hello” to server-server and specifies the subject to be sent to MSG
  3. Server-server forwards the message “Hello” to client -B

Different from the request response mode of HTTP, client A and client B do not directly connect to each other. Messages between them are forwarded by the Server. The Server is also known as an MQTT Broker, which is the intermediary between subscription and sending

Feature implementation analysis based on Moquette source code

In the process of sending A hello message from client A to client B, the following actions are required.

  1. Client A and client B connect to the Server
  2. Client -b Subscribes to a topic
  3. Client A advertises messages
  4. The Server forwards the message
  5. Client B receives the message

The following will be based on the actions of connect, subscribe, publish source tracking interpretation.

The connection

Basic Concepts:

Session: The logical communication between the client (marked by ClientId) and the server. Life cycle (lifetime) : Session >= network connection.

ClientID: The unique identifier of the client used by the server to associate a Session can only contain these upper case letters, lower case letters, and numbers (0-9A-zA-Z), up to 23 characters if ClientID is consistent across multiple TCP connections, The client and Server retain Session information. Only one TCP connection can be maintained between the Server and a ClientID at the same time.

CleanSession: Set by the client at Connect

  • 0 Enable the session reuse mechanism. After the network is disconnected and reconnected, the Session information is restored. The client and server need to have relevant Session persistence mechanism;
  • 1 Disable the session reuse mechanism. Each Connect is a new Session, and the Session only lasts as long as the network connection.

Keep Alive: The purpose is to maintain the reliability of the long connection and to confirm that both parties are online. The client sets the Keep Alive duration during Connect. If the server does not receive any packet from the client within 1.5 * KeepAlive time, it must disconnect the network connection from the client. The KeepAlive value is specified by the application and usually takes several minutes. The maximum allowed is 18 hours, 12 minutes and 15 seconds.

Will: The Will Message is stored on the server. When the network connection is closed, the server must publish this Will Message, so it is figuratively called a Will and can be used to notify of abnormal disconnection. The client sends DISCONNECT to close the link, invalidate the will and delete the conditions for Posting the will message, including: The server detects an I/O error or network fault. The client fails to communicate during the Keep Alive period. The client does not send the DISCONNECT packet first, and directly closes the network connection. Specified by the client.

Will Flag: Master switch of Will

  • 0 To turn off the Will function, Will QoS and Will Retain must be 0
  • 1 To enable the Will function, set Will Retain and Will QoS

Will QoS: Testamentary message QoS can be 0, 1, or 2, which has the same meaning as message QoS

Will Retain: Will Retain

  • 0 will messages are not reserved, subsequent subscriptions will not receive messages
  • 1 Will message retention, persistent storage

Will Topic

Will Payload: Will Payload

Connection process

  1. Check the VERSION of the MQTT protocol sent by the client when the connection is made. Sending protocols other than 3.1 and 3.1.1 do not support the response packet and close the connection after the connection is completed
  2. If client cleanSession=false or the server does not allow clientId to exist, if the client does not upload the clientId, the sending protocol does not support response packets and closes the connection after sending the clientId
  3. Check whether the user name and password are valid
  4. Initialize the connection object and place the connection object reference into connection management. If an object with the same client ID is found in connection management, close the previous connection and place the new connection object into connection management
  5. Adjust the current heartbeat detection time of the server based on the heartbeat time uploaded by the client (keepAlive * 1.5F)
  6. Will message store (to publish messages to stored topics when the connection is unexpectedly disconnected)
  7. Send a connection success response
  8. Create the current connection session
  9. When cleanSession=false sends messages already stored in the current session
public void processConnect(Channel channel, MqttConnectMessage msg) {
        MqttConnectPayload payload = msg.payload();
        String clientId = payload.clientIdentifier();
        LOG.debug("Processing CONNECT message. CId={}, username={}", clientId, payload.userName()); Check the VERSION of the MQTT protocol sent by the client when it is connected. If the version is not 3.1 or 3.1.1, the MQTT protocol does not support the response packet and closes the connection after the response packet is sentif(msg.variableHeader().version() ! = MqttVersion.MQTT_3_1.protocolLevel() && msg.variableHeader().version() ! = MqttVersion.MQTT_3_1_1.protocolLevel()) { MqttConnAckMessage badProto = connAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION); LOG.error("MQTT protocol version is not valid. CId={}", clientId);
            channel.writeAndFlush(badProto).addListener(FIRE_EXCEPTION_ON_FAILURE);
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }

        final boolean cleanSession = msg.variableHeader().isCleanSession();
        if(clientId == null || clientId.length() == 0) { // 2. CleanSession = is configured on the clientfalseIf the client does not upload the clientId, the sending protocol does not support the response message and closes the connection after the clientId is sentif(! cleanSession || ! this.allowZeroByteClientId) { MqttConnAckMessage badId = connAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED); channel.writeAndFlush(badId).addListener(FIRE_EXCEPTION_ON_FAILURE); channel.close().addListener(CLOSE_ON_FAILURE); LOG.error("The MQTT client ID cannot be empty. Username={}", payload.userName());
                return;
            }

            // Generating client id.
            clientId = UUID.randomUUID().toString().replace("-"."");
            LOG.info("Client has connected with a server generated identifier. CId={}, username={}", clientId, payload.userName()); } // 3. Check whether the user name and password are validif(! login(channel, msg, clientId)) { channel.close().addListener(CLOSE_ON_FAILURE);return; } // 4. Initialize the connection object and put the connection object reference into the connection management, If an object with the same clientId is found in the connection management, close the previous connection and add the new connection object to the connection management. ConnectionDescriptor = new ConnectionDescriptor(clientId, channel, cleanSession); final ConnectionDescriptor existing = this.connectionDescriptors.addConnection(descriptor);if(existing ! = null) { LOG.info("Client ID is being used in an existing connection, force to be closed. CId={}", clientId);
            existing.abort();
            //return; this.connectionDescriptors.removeConnection(existing); this.connectionDescriptors.addConnection(descriptor); } / / 5. According to the client upload the heartbeat of time adjust the service side heart to judge the current connection (keepAlive * 1.5 f) initializeKeepAliveTimeout (channel, MSG, clientId); StoreWillMessage (MSG, clientId); // 6. // 7. Send a connection success responseif(! sendAck(descriptor, msg, clientId)) { channel.close().addListener(CLOSE_ON_FAILURE);return;
        }

        m_interceptor.notifyClientConnected(msg);

        if(! descriptor.assignState(SENDACK, SESSION_CREATED)) { channel.close().addListener(CLOSE_ON_FAILURE);return; } / / 8. Create a current connection session final ClientSession ClientSession = this. SessionsRepository. CreateOrLoadClientSession (clientId, cleanSession); / / 9. When cleanSession =falseSends messages already stored in the current sessionif(! republish(descriptor, msg, clientSession)) { channel.close().addListener(CLOSE_ON_FAILURE);return;
        }
        
        int flushIntervalMs = 500/* (keepAlive * 1000) / 2 */;
        setupAutoFlusher(channel, flushIntervalMs);

        final boolean success = descriptor.assignState(MESSAGES_REPUBLISHED, ESTABLISHED);
        if(! success) { channel.close().addListener(CLOSE_ON_FAILURE); } LOG.info("Connected client <{}> with login <{}>", clientId, payload.userName());
    }
Copy the code

To subscribe to

The basic concept

Subscribe to the process

  1. Subscribed topic verification (permissions, topic PATH validity)
  2. Stores subscribed topics in the current session
  3. A global tree structure is used to store subscription information (subject and subscriber information), which is used to find the corresponding subscriber according to the topic when forwarding messages (Tree structure and lookup algorithm are described in the next section).
  4. Send a subscription response
  5. Scanned persistent messages that match the current subscribed topic are sent to this connection immediately
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.debug("Processing SUBSCRIBE message. CId={}, messageId={}", clientID, messageID);

        RunningSubscription executionKey = new RunningSubscription(clientID, messageID);
        SubscriptionState currentStatus = subscriptionInCourse.putIfAbsent(executionKey, SubscriptionState.VERIFIED);
        if(currentStatus ! = null) { LOG.warn("Client sent another SUBSCRIBE message while this one was being processed CId={}, messageId={}",
                clientID, messageID);
            return; } String username = NettyUtils.userName(channel); List<MqttTopicSubscription> ackTopics = List<MqttTopicSubscription> ackTopics =doVerify(clientID, username, msg);
        MqttSubAckMessage ackMessage = doAckMessageFromValidateFilters(ackTopics, messageID);
        if(! this.subscriptionInCourse.replace(executionKey, SubscriptionState.VERIFIED, SubscriptionState.STORED)) { LOG.warn("Client sent another SUBSCRIBE message while the topic filters were being verified CId={}, " +
                "messageId={}", clientID, messageID);
            return;
        }

        LOG.debug("Creating and storing subscriptions CId={}, messageId={}, topics={}", clientID, messageID, ackTopics); // 2, store subscriptions in the current session. List<Subscription> newSubscriptions =doStoreSubscription(ackTopics, clientID); A global tree structure is used to store subscription information (topic and subscriber information), which is used to find corresponding subscribers according to the topic when message forwardingfor (Subscription subscription : newSubscriptions) {
            subscriptions.add(subscription);
        }

        LOG.debug("Sending SUBACK response CId={}, messageId={}", clientID, messageID); WriteAndFlush (ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE); // fire the persisted messagesinSession // 5. Scan persistent messages that match the current subscribed topic and send messages to this connection immediatelyfor (Subscription subscription : newSubscriptions) {
            publishRetainedMessagesInSession(subscription, username);
        }

        boolean success = this.subscriptionInCourse.remove(executionKey, SubscriptionState.STORED);
        if(! success) { LOG.warn("Unable to perform the final subscription state update CId={}, messageId={}", clientID, messageID);
        } else {
            LOG.info("Client <{}> subscribed to topics", clientID); }}Copy the code

release

The basic concept

Packet Identifier: Indicates the variable header part of the Packet. It is a non-zero two-byte integer ranging from 0 to 65535.

Repeated in a flow: These packets contain PacketID and are consistent in a flow: PUBLISH(QoS>0), PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCIBE, UNSUBACK.

New non-duplicate: Each time the client sends a new packet of these types, it must allocate an unused PacketID. After the client processes the corresponding acknowledgement, the packet identifier is released for reuse.

Independent maintenance: The client and server assign packet identifiers independently to each other. Therefore, concurrent message exchange can be achieved by combining the client and server using the same message identifier. The same Packet Identifier generated by the client and server is not abnormal.

Payload: the maximum Payload is 256MB. Published Payload is allowed to be empty, which in many cases means that persistent messages (or testator messages) are empty. Utf-8 encoding is used.

Retain: Persistent message (sticky message)

RETAIN tag: Each Publish message requires the specified tag

  • The 0 server cannot store this message, nor can it remove or replace any existing reserved messages
  • The server must store the application message and its QoS level so that it can be distributed to future subscribers

A maximum of one Retain persistent message is retained per Topic. A client subscribes to a Topic with a persistent message and receives the message immediately.

The server can choose to discard persistent messages, such as in memory or when storage is tight.

If a client wants to delete a persistent message on a Topic, it can send a Retain persistence message (Will) with a Payload empty to that Topic.

QoS: Service level (message reliability)

Release process

public void processPublish(Channel channel, MqttPublishMessage msg) {
        final MqttQoS qos = msg.fixedHeader().qosLevel();
        final String clientId = NettyUtils.clientID(channel);
        LOG.info("Processing PUBLISH message. CId={}, topic={}, messageId={}, qos={}", clientId,
                msg.variableHeader().topicName(), msg.variableHeader().messageId(), qos);
        switch (qos) {
            case AT_MOST_ONCE:
                this.qos0PublishHandler.receivedPublishQos0(channel, msg);
                break;
            case AT_LEAST_ONCE:
                this.qos1PublishHandler.receivedPublishQos1(channel, msg);
                break;
            case EXACTLY_ONCE:
                this.qos2PublishHandler.receivedPublishQos2(channel, msg);
                break;
            default:
                LOG.error("Unknown QoS-Type:{}", qos);
                break; }}Copy the code

You can see from the switch statement in the code above that the message is processed separately based on the Qos level of the message

QoS0 at most once
ServerBroker->>ClientB: Sends messagesCopy the code
  1. Authority to judge
  2. Publish messages to all subscribers of the topic
  3. QoS == 0 && retain => clean old retained
void receivedPublishQos0(Channel channel, MqttPublishMessage msg) {
        // verify iftopic can be write final Topic topic = new Topic(msg.variableHeader().topicName()); String clientID = NettyUtils.clientID(channel); String username = NettyUtils.userName(channel); // 1. Check permissionsif(! m_authorizator.canWrite(topic, username, clientID)) { LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
            return; } // route message to subscribers IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg); toStoreMsg.setClientID(clientID); To all the topic / / 2. Subscribers to this. In a news release publisher. Publish2Subscribers (toStoreMsg, topic);if (msg.fixedHeader().isRetain()) {
            // 3. QoS == 0 && retain => clean old retained
            m_messagesStore.cleanRetained(topic);
        }

        m_interceptor.notifyTopicPublished(msg, clientID, username);
    }
Copy the code
QoS1 at least once
sequenceDiagram ClientA->>ServerBroker: 1. PUBLISH ServerBroker->>ServerBroker: 1.1 Stores messages ServerBroker->>ClientA: 1.2 Sends messages in response to PUBACK ServerBroker->>ClientB: 2. Sending a message ClientB->>ServerBroker: 2.1 Sending a message in response to PUBACK ServerBroker->>ServerBroker: 2.2 Deleting a messageCopy the code

1. Send a PUBLISH message

  1. Authority to judge
  2. Publish messages to all subscribers of the topic (each session stores messages to be sent)
  3. Send Ack Response
  4. Retain = true => Store the message
void receivedPublishQos1(Channel channel, MqttPublishMessage msg) {
        // verify if topic can be write
        final Topic topic = new Topic(msg.variableHeader().topicName());
        topic.getTokens();
        if(! topic.isValid()) { LOG.warn("Invalid topic format, force close the connection");
            channel.close().addListener(CLOSE_ON_FAILURE);
            return; } String clientID = NettyUtils.clientID(channel); String username = NettyUtils.userName(channel); // 1. Check permissionsif(! m_authorizator.canWrite(topic, username, clientID)) { LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
            return; } final int messageID = msg.variableHeader().messageId(); // route message to subscribers IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg); toStoreMsg.setClientID(clientID); To all the topic / / 2. The subscriber news (each session store is going to send a message). This publisher. Publish2Subscribers (toStoreMsg, topic, messageID); SendPubAck (clientID, messageID); // 4. retain =true=> Stores messagesif (msg.fixedHeader().isRetain()) {
            if(! msg.payload().isReadable()) { m_messagesStore.cleanRetained(topic); }else {
                // before wasn't stored m_messagesStore.storeRetained(topic, toStoreMsg); } } m_interceptor.notifyTopicPublished(msg, clientID, username); }Copy the code

2.1 Sending a PUBACK Message

Upon receiving the PUBACK message, the Server will execute:

  1. Deletes messages stored in the session
public void processPubAck(Channel channel, MqttPubAckMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = msg.variableHeader().messageId();
        String username = NettyUtils.userName(channel);
        LOG.trace("retrieving inflight for messageID <{}>", messageID);

        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        StoredMessage inflightMsg = targetSession.inFlightAcknowledged(messageID);

        String topic = inflightMsg.getTopic();
        InterceptAcknowledgedMessage wrapped = new InterceptAcknowledgedMessage(inflightMsg, topic, username,
                                                                                messageID);
        m_interceptor.notifyMessageAcknowledged(wrapped);
    }
Copy the code
QoS2 has one and only one time
sequenceDiagram ClientA->>ServerBroker: 1. PUBLISH ServerBroker->>ServerBroker: 1.1 Stores messages ServerBroker->>ClientA: 1.2 Sends messages in response to Rec ClientA->>ServerBroker: 2. Send Messages Rel ServerBroker->>ServerBroker: 2.1 Delete messages ServerBroker->>ServerBroker: 2.2 Store messages to send queue ServerBroker->>ClientB: 2.3 Sending messages ServerBroker->>ClientA: 2.4 Sending messages in response to Comp ClientB->>ServerBroker: 3. Send a message in response to Rec ServerBroker->>ServerBroker: 3.1 Delete the message stored in 2.2 (once confirmation) ServerBroker->>ServerBroker: 3.2 Store messages ServerBroker->>ClientB: 3.3 Send messages Rel ClientB->>ServerBroker: 3.4 Send messages to Comp ServerBroker->>ServerBroker: 3.5 Deleting a Message (Double Confirmation)Copy the code

1. Send a PUBLISH message

  1. Authority to judge
  2. Store messages
  3. Send a Rec response
void receivedPublishQos2(Channel channel, MqttPublishMessage msg) {
        final Topic topic = new Topic(msg.variableHeader().topicName());
        // check ifthe topic can be wrote String clientID = NettyUtils.clientID(channel); String username = NettyUtils.userName(channel); // 1. Check permissionsif(! m_authorizator.canWrite(topic, username, clientID)) { LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
            return; } final int messageID = msg.variableHeader().messageId(); / / 2. Store news IMessagesStore. StoredMessage toStoreMsg = asStoredMessage (MSG); toStoreMsg.setClientID(clientID); LOG.info("Sending publish message to subscribers CId={}, topic={}, messageId={}", clientID, topic, messageID);
        if (LOG.isTraceEnabled()) {
            LOG.trace("payload={}, subs Tree={}", payload2Str(toStoreMsg.getPayload()), subscriptions.dumpTree()); } this.sessionsRepository.sessionForClient(clientID).markAsInboundInflight(messageID, toStoreMsg); // 3. Send Rec to sendPubRec(clientID, messageID); // Next the client will send us a pub rel // NB publish to subscribersfor QoS 2 happen upon PUBREL from publisher

//        if (msg.fixedHeader().isRetain()) {
//            if (msg.payload().readableBytes() == 0) {
//                m_messagesStore.cleanRetained(topic);
//            } else {
//                m_messagesStore.storeRetained(topic, toStoreMsg);
//            }
//        }
        //TODO this should happen on PUB_REL, else we notify false positive
        m_interceptor.notifyTopicPublished(msg, clientID, username);
    }
Copy the code

2. Send message Rel

  1. Delete the message
  2. Forwarded message
  3. Send A Comp response to client -A
 void processPubRel(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.info("Processing PUBREL message. CId={}, messageId={}", clientID, messageID); ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID); / / 1. Delete message IMessagesStore. StoredMessage evt. = targetSession inboundInflight (messageID);if (evt == null) {
            LOG.warn("Can't find inbound inflight message for CId={}, messageId={}", clientID, messageID);
            throw new IllegalArgumentException("Can't find inbound inflight message"); } final Topic topic = new Topic(evt.getTopic()); / / 2. This forwarding message. Publisher. Publish2Subscribers (evt, topic, messageID);if (evt.isRetained()) {
            if (evt.getPayload().readableBytes() == 0) {
                m_messagesStore.cleanRetained(topic);
            } else{ m_messagesStore.storeRetained(topic, evt); } } //TODO here we should notify to the listeners //m_interceptor.notifyTopicPublished(msg, clientID, username); SendPubComp (clientID, messageID); }Copy the code

3. Send a message to Rec

  1. Delete the message
  2. Store messages (in secondPhaseStore and outboundInflightMap, respectively)
  3. Send PUBREL
public void processPubRec(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID); ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID); // remove from the inflight and move to the QoS2 second phase queue // 1. Delete messages StoredMessage ackedMsg = targetSession. InFlightAcknowledged (messageID); / / 2. Store messages (stored in secondPhaseStore and outboundInflightMap respectively) targetSession. MoveInFlightToSecondPhaseAckWaiting (messageID, ackedMsg); // once received a PUBREC reply with a PUBREL(messageID) LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID); PubRelHeader = new MqttFixedHeader(MqttMessageType.PUBREL,false, AT_LEAST_ONCE, false, 0);
        MqttMessage pubRelMessage = new MqttMessage(pubRelHeader, from(messageID));
        channel.writeAndFlush(pubRelMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
    }
Copy the code

3.4 Sending a Message to reply to Comp

  1. Delete the message
public void processPubComp(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.debug("Processing PUBCOMP message. CId={}, messageId={}", clientID, messageID);
        // once received the PUBCOMP thenremove the message from the temp memory ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID); / / 1. Delete messages StoredMessage inflightMsg = targetSession.com pleteReleasedPublish (messageID); String username = NettyUtils.userName(channel); String topic = inflightMsg.getTopic(); final InterceptAcknowledgedMessage interceptAckMsg = new InterceptAcknowledgedMessage(inflightMsg, topic, username, messageID); m_interceptor.notifyMessageAcknowledged(interceptAckMsg); }Copy the code

Topic & Subcribe

The basic concept

Topic and TopicFilter Topic filters

The utF-8 encoding string, the core mechanism of the Pub-SUB message model, cannot exceed 65535 bytes. There is no limit to the number of levels. They must not contain any of the special characters (/, +, #) mentioned below, and must contain at least one case-sensitive character, including Spaces, but not null characters (Unicode U+0000). Different topics and TopicFilters are generated. For example:

  • “/A” and “A” are different
  • “A” and “A/” are different

Topics or TopicFilters that contain only a slash “/” are legal

A special symbol in TopicFilter

A topic-level separator can appear anywhere in a Topic or TopicFilter: adjacent topic-level separators represent a zero-length Topic level

Single-layer wildcard +

Wildcards that can only be used for a single subject-level match. For example, “A/B /+” matches “A/B/C1” and “A /b/ C2”, but does not match “A/B/C /d” can match any level, including the first and last level.

For example, “+” is valid, as is “sport/+/player1”. It can be used in multiple levels or with multiple levels of wildcards.

For example, “+/tennis/#” is valid. Matches only the local level but not the upper level.

For example, “sport / +” mismatch “sport” matching “sport/”, but”/finance “matching” + / + “and” / + “, but it doesn’t match the “+”.

Multilevel wildcard #

Wildcard matches that match any level in a topic contain their own level and child level.

Such as “a/b/c / #” can match “a/b/c”, “a/b/c/d” and “a/b/c/d/e” must be the end of the last.

For example “sport/tennis/#/ranking” is invalid

The “#” is valid and will receive all app messages. (This TopicFilter should be disabled on the server side)

Those starting with $are reserved by the server

A server cannot match a Topic starting with a $character to a TopicFilter starting with a wildcard character (# or +)

The server should prevent clients from exchanging messages with other clients using this Topic.

A server implementation can use the topic name beginning with $for other purposes.

The Topic at the beginning will not receive the corresponding message

  • Clients that subscribe to “#” will not receive any messages published to topics starting with “$”
  • Clients subscribed to “+/A/B” will not receive any messages published to “$SYS/A/B”
  • Subscribe to the”SYS/ “topic message
  • Subscribe to the”SYS/A/B “topic message

If the client wants to accept bothThe message at the beginning of the topic, which needs to subscribe to both “#” and “$SYS/#”

Storage structure

  • a/b/c
  • a/a
  • a/haha
  • msg

The four topics are stored in the following structure:

  1. Children point to the lower node
  2. Subscriptions stores all subscribers to current topics

Search algorithm

To subscribe to
@Override
    public void add(Subscription newSubscription) {
        Action res;
        do {
            res = insert(newSubscription.clientId, newSubscription.topicFilter, this.root, newSubscription.topicFilter);
        } while (res == Action.REPEAT);
    }

    private Action insert(String clientId, Topic topic, final INode inode, Topic fullpath) {
        Token token = topic.headToken();
        if(! topic.isEmpty() && inode.mainNode().anyChildrenMatch(token)) { Topic remainingTopic = topic.exceptHeadToken(); INode nextInode = inode.mainNode().childOf(token);return insert(clientId, remainingTopic, nextInode, fullpath);
        } else {
            if (topic.isEmpty()) {
                return insertSubscription(clientId, fullpath, inode);
            } else {
                returncreateNodeAndInsertSubscription(clientId, topic, inode, fullpath); }}}Copy the code
Delete the subscription
public void removeSubscription(Topic topic, String clientID) {
        Action res;
        do {
            res = remove(clientID, topic, this.root, NO_PARENT);
        } while (res == Action.REPEAT);
    }

    private Action remove(String clientId, Topic topic, INode inode, INode iParent) {
        Token token = topic.headToken();
        if(! topic.isEmpty() && (inode.mainNode().anyChildrenMatch(token))) { Topic remainingTopic = topic.exceptHeadToken(); INode nextInode = inode.mainNode().childOf(token);return remove(clientId, remainingTopic, nextInode, inode);
        } else {
            final CNode cnode = inode.mainNode();
            if (cnode instanceof TNode) {
                // this inode is a tomb, has no clients and should be cleaned up
                // Because we implemented cleanTomb below, this should be rare, but possible
                // Consider calling cleanTomb here too
                return Action.OK;
            }
            if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
                // last client to leave this node, AND there are no downstream children, remove via TNode tomb
                if (inode == this.root) {
                    return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
                }
                TNode tnode = new TNode();
                return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;
            } else if (cnode.contains(clientId) && topic.isEmpty()) {
                CNode updatedCnode = cnode.copy();
                updatedCnode.removeSubscriptionsFor(clientId);
                return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
            } else {
                //someone else already removed
                returnAction.OK; }}}Copy the code
To find the
Set<Subscription> recursiveMatch(Topic topic, INode inode) {
        CNode cnode = inode.mainNode();
        if (Token.MULTI.equals(cnode.token)) {
            return cnode.subscriptions;
        }
        if (topic.isEmpty()) {
            return Collections.emptySet();
        }
        if (cnode instanceof TNode) {
            return Collections.emptySet();
        }
        final Token token = topic.headToken();
        if(! (Token.SINGLE.equals(cnode.token) || cnode.token.equals(token) || ROOT.equals(cnode.token))) {return Collections.emptySet();
        }
        Topic remainingTopic = (ROOT.equals(cnode.token)) ? topic : topic.exceptHeadToken();
        Set<Subscription> subscriptions = new HashSet<>();
        if (remainingTopic.isEmpty()) {
            subscriptions.addAll(cnode.subscriptions);
        }
        for (INode subInode : cnode.allChildren()) {
            subscriptions.addAll(recursiveMatch(remainingTopic, subInode));
        }
        return subscriptions;
    }
Copy the code

The tail

Relevant reference

MQTT protocol in general