The introduction

Hello, this is Anyin.

Some time ago, I did some work related to MQTT because my work involved dealing with hardware devices. Today here also do a simple share.

Basic concept

Before we do any development work, we need to understand what MQTT is.

MQTT is a lightweight publish-subscribe messaging protocol designed specifically for iot applications in low bandwidth and unstable network environments.

  • MQTT website
  • MQTT V3.1.1 protocol specification

MQTT protocol has the following characteristics:

  • Open message protocol, easy to implement
  • Publish subscribe mode, one-to-many message publishing
  • TCP/ IP-based network connection
  • 1 byte fixed header, 2 byte heartbeat packet, packet structure is compact
  • Message QoS support, reliable transmission guarantee

Main application scenarios of MQTT:

  • Internet of Things M2M communication, Internet of Things big data collection
  • Android notification push, WEB notification push
  • Mobile instant messaging, such as Facebook Messenger
  • Intelligent hardware, intelligent furniture, intelligent appliances
  • Internet of vehicles communication, electric station pile acquisition
  • Smart city, telemedicine, distance education
  • Power, oil and energy industries

More details can be found on the official website. I won’t repeat it here.

For the installation of the MQTT server, we use EMQX, whose official website is www.emqx.io/zh

Implement an MQTT client

Once our EMQ server is installed, we can code our MQTT client to receive and send messages to the device side, all asynchronously.

  1. pom.xmlAdd the dependent
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>${mqtt.version}</version>
        </dependency>
Copy the code
  1. encapsulationMqttClientThe instance

In step 1, we relied on a third party library for MQTT, and to prevent possible replacement of other third party libraries later, we needed a simple encapsulation of its MqttClient.

Add an MQTTClient class.

    /** * Instantiate the MQTT instance *@paramProperties Configuration information *@paramFactory Extension point Factory */
    public MQTTClient(MQTTProperties properties, IExtensionHandlerFactory factory, List
       
         subscribeTopics)
        throws MqttException {
        if(CollectionUtils.isEmpty(subscribeTopics)){
            throw new CommonBusinessException("1"."Subscription list cannot be empty");
        }
        this.subscribeTopics = subscribeTopics;
        this.properties = properties;
        this.factory = factory;
        this.clientId = "SERVER_" + RandomUtil.randomString(8);
        this.init();
    }
Copy the code
  • MQTTPropertiesIs familiar with the configuration of MQTT
  • IExtensionHandlerFactoryThe extension point factory component is required to perform business processing according to different instructions when receiving messages
  • List<SubscribeTopic>The client needs to subscribe to the list of topics on the device side

Next, let’s look at the init method.

    /** * Initialize the MQTT Client instance */
    public void init(a) throws MqttException {
        String broker = "tcp://" + properties.getHost() + ":" + properties.getPort();
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            if(client == null){
                client = new MqttClient(broker, clientId, persistence);
            }
            MqttConnectOptions connOpts = this.getOptions();
            if (client.isConnected()) {
                client.disconnect();
            }
            client.connect(connOpts);
            client.setCallback(new MQTTSubscribe(this, factory, subscribeTopics));
            // Subscribe path
            client.subscribe(this.getSubscribeTopicList());
        }catch (MqttException ex) {
            log.error("MQTT service initialization failed: {}", ex.getMessage(), ex);
            throw ex;
        }
        log.info("MQTT service connected successfully");
    }
Copy the code

Here we mainly deal with some operations of the client instance to connect to the server, including setting parameters connOpts, setting the callback to receive the message setCallback, and setting the message subject subscribe to the device side.

There is something that needs special attention here. Before connecting, make a judgment of client.isConnected(). If the connection is in the state, you need to manually disconnect client.disconnect(). The main purpose is to ensure that the client is disconnected before reconnecting.

Client connection logic processing, we also need to deal with the logic of sending messages, simple encapsulation can be.

    /** * Send message *@param path path
     * @paramDeviceId deviceId *@paramContent Indicates the sent content */
    public void publish(String path, String deviceId, byte[] content){
        try {
            MqttMessage message = new MqttMessage(content);
            message.setQos(properties.getQos());
            String topic = path + deviceId;
            client.publish( topic, message);
        }catch (Exception ex){
            log.error("MQTT service failed to send message: deviceId: {} {}",deviceId, ex.getMessage(), ex); }}Copy the code
  1. Process messages for subscriptions

Now that we have handled the basic client instantiation, we need to handle the upstream messages (i.e., subscribed messages).

Business messages from different vendors may be different, either MQTT contains JSON strings of business data, or MQTT contains binary proprietary protocols.

To abstract the uplink message, we define two interfaces to abstract the whole packet object of the uplink message and an instruction of the uplink message respectively. The entire packet object for the uplink message is the complete Byte [] packet returned from the subscription interface; A certain instruction of the uplink message means that there must be a certain field in the complete packet to indicate what business the message belongs to, such as heartbeat or status.

Add an MQTTProtocol and Cmd class, respectively.

@Data
public abstract class MQTTProtocol {
    /** * Device ID */
    private String deviceId;

    /** * the unique number of the message */
    private String msgId;
    /** * A specific business instruction */
    private Cmd cmd;
}

public interface Cmd {
    /** * Instruction type, uplink instruction or downlink instruction */
    CmdTypeEnum getCmdType(a);
    /** * to which the instruction is sent topic */
    Topic getTopic(a);
}
Copy the code

Next, we add another handler interface for the protocol :MQTTProtocolHandler

public interface MQTTProtocolHandler<T extends MQTTProtocol> extends IExtensionHandler<BusinessType> {

    String getDeviceId(String topic, byte[] payload);
    /** * decode *@paramPayload Original data *@returnAgreement * /
    T decode(byte[] payload);

    /** * check *@paramProtocol Indicates the basic protocol to be resolved@paramPayload Original data *@returnTrue Yes False No */
    boolean check(T protocol, byte[] payload);

    /** * code *@paramProtocol agreement *@paramData Service data *@returnEncode data */
    byte[] encode(T protocol, byte[] data);

    /** * Business processing *@param* / protocol agreement
    byte[] handle(T protocol);

    /** * error response *@param* / protocol agreement
    byte[] error(T protocol);
}
Copy the code

This interface divides the entire message processing process into five steps: decoding, validation, encoding, business processing, and error response. The interface is an extension point, and the enumerated class of the extension point is: BusinessType, which represents the BusinessType, even though different businesses may have different codecs and processing rules. For example, the encoding and decoding of JSON data and binary proprietary protocols are different.

Then we’ll look at how we can use this extension point for business logic processing when a message is received.

@Override
    public void messageArrived(String subscribeTopic, MqttMessage message) throws Exception {
        try {
            // Resolve different business types according to topic
            BusinessType businessType = this.matchBusinessTypeBySubscribeTopic(subscribeTopic);
            // Get the specific protocol handler based on the business type
            MQTTProtocolHandler protocolHandler = extensionHandlerFactory.getExtensionHandler(businessType, MQTTProtocolHandler.class);
           // Obtain the device ID
            String deviceId = protocolHandler.getDeviceId(subscribeTopic, message.getPayload());
            // Full packet protocol decoding
            MQTTProtocol protocol = protocolHandler.decode(message.getPayload());
            if (protocol == null) {
                log.error("Unable to reply due to protocol parsing exception");
                return;
            }
            / / instructions
            Cmd cmd = protocol.getCmd();
            if(cmd == null){
                log.error("Parsed instruction is empty and cannot answer.");
                return;
            }
            // Set basic information
            protocol.setMsgId(String.valueOf(message.getId()));
            protocol.setDeviceId(deviceId);

            / / check
            boolean success = protocolHandler.check(protocol, message.getPayload());
            if(! success){this.errorHandle(protocolHandler, protocol, cmd.getTopic());
                return;
            }

            try {
                // Business processing
                byte[] result = protocolHandler.handle(protocol);

                / / reply
                if(CmdTypeEnum.DOWN == cmd.getCmdType()){
                    log.info("Downlink message, no response.");
                    return;
                }
                Topic topic = cmd.getTopic();
                if(topic == null){
                    log.error("The publish Topic for the upstream message is empty and no reply is required");
                    return;
                }
                // Reply after encoding
                byte[] content = protocolHandler.encode(protocol, result);
                client.publish(topic.getTopic(), deviceId, content);
            } catch (Exception ex) {
                log.error("Business logic processing exception: {}, raw data: {}", ex.getMessage(),  ByteUtil.byte2Str(message.getPayload()), ex);
                this.errorHandle(protocolHandler, protocol, cmd.getTopic()); }}catch (Exception ex){
            log.error("Unknown error: {}, raw data: {}", ex.getMessage(), ByteUtil.byte2Str(message.getPayload()), ex); }}Copy the code
  1. Process messages that need to be sent

In step 3, we deal with the upstream message, which involves the steps of decoding, service processing, encoding, and reply. Then we need to process the sent message, the downstream message.

Downstream message processing will be relatively simple, as long as the corresponding MQTTClient instance and protocol processor can be encoded, and then send messages

@Slf4j
public class MQTTPublish {
    private MQTTClient client;
    private MQTTProtocolHandler protocolHandler;
    public MQTTPublish(MQTTClient client, MQTTProtocolHandler protocolHandler) {
        this.client = client;
        this.protocolHandler = protocolHandler;
    }
    public void publish(MQTTProtocol protocol, byte[] data){
        byte[] content = protocolHandler.encode(protocol, data); String deviceId = protocol.getDeviceId(); String topic = protocol.getCmd().getTopic().getTopic(); client.publish(topic, deviceId, content); }}Copy the code

The above code can only handle asynchronous downlink protocols, which in some cases need to wait for a response from the device. At this point, the code will not meet the requirements.

So, we also need to rewrap this code. We design an extension point where different business types have different send logic

public interface MQTTPublishHandler extends IExtensionHandler<BusinessType> {
    <T extends BaseCmd, C extends BaseCmd> T handle(C cmd, Class<T> clazz);
}
Copy the code

The implementation class is then processed.

@Override
    public <T extends BaseCmd, C extends BaseCmd> T handle(C cmd, Class<T> clazz) {
        CmdEnum cmdEnum = CmdEnum.get(cmd.getCmd());
        / / code
        EncodeCmdHandler<C, T> handler = factory.getExtensionHandler(cmdEnum, EncodeCmdHandler.class);
        byte[] data = handler.encode(cmd);

        // Get the specific protocol handler according to the business type
        MQTTProtocolHandler protocolHandler = factory.getExtensionHandler(BusinessType.CHARGING, MQTTProtocolHandler.class);
        MQTTPublish publish = new MQTTPublish(client, protocolHandler);
        Long serial = this.getSerial(cmd.getDeviceId());

        // TODO is a concrete implementation class that requires a concrete business implementation
        ChargingMQTTProtocol protocol = new ChargingMQTTProtocol();
        protocol.setSerial(serial.shortValue());
        protocol.setDeviceId(cmd.getDeviceId());
        protocol.setVersion("10");
        protocol.setMac(cmd.getDeviceId());
        protocol.setCode(cmd.getCmd());
        protocol.setCmd(cmd);
        publish.publish(protocol, data);

        // Block a reply
        RedisMessageTask task = new RedisMessageTask();
        RedisMessageListener listener = new RedisMessageListener(task);
        try {
            / / configuration RedisKey
            String key = MQTTRedisKeyUtil.callbackKey(cmd.getTopic().getTopic(), cmd.getDeviceId(), serial);
            ChannelTopic topic = new ChannelTopic(key);
            redisMessageListenerContainer.addMessageListener(listener, topic);
            // Synchronously blocked
            Message message = (Message)task.getFuture().get(60000, TimeUnit.MILLISECONDS);
            return JsonUtil.fromJson(message.toString(), clazz);
        }catch (Exception ex){
            log.error("Message fetch failure: {}", ex.getMessage(), ex);
            throw new CommonBusinessException("1"."Redis failed to reply:" + ex.getMessage());
        } finally{ redisMessageListenerContainer.removeMessageListener(listener); }}Copy the code

This is done asynchronously using the CompletableFuture class in Java. In addition, we use the MQ mechanism of Redis to implement a listener in Redis. When there is an upstream message as a downstream response, the StringRedisTemplate#convertAndSend message is sent. After the listener receives the message, Set to CompletableFuture to answer.

RedisMessageTask holds a CompletableFuture instance and a reference to RedisMessageListener. It looks like this:

@Data
public class RedisMessageTask<T>{
    private CompletableFuture<T> future = new CompletableFuture<>();
    // Redis listener
    private RedisMessageListener listener;

}
Copy the code

RedisMessageListener holds a reference to RedisMessageTask, and when it receives a message, it sets the message to the CompletableFuture, and the block of the CompletableFuture instance gets a reply.

public class RedisMessageListener implements MessageListener {
    private RedisMessageTask task;
    public RedisMessageListener(RedisMessageTask task) {
        this.task = task;
    }
    @Override
    public void onMessage(Message message, byte[] bytes) {; task.getFuture().complete(message); }}Copy the code

Finally, send a message when the uplink replies.

stringRedisTemplate.convertAndSend(key, JsonUtil.toJson(data));
Copy the code

The last

Ok, so those are some notes on front-end time contact with MQTT. Related code because in the process of implementation, did not do more detailed design and decoupling, part of the business coupling. However, it will also be organized into a Lib package for the Anyin Cloud project, so stay tuned.