preface

MQTT is a lightweight publish-subscribe mode messaging protocol designed specifically for iot applications in low bandwidth and unstable network environments. MQTT is based on the publish/subscribe paradigm and works on the TCP/IP protocol family. The MQTT protocol is lightweight, simple, open and easy to implement, which makes it applicable to a wide range of applications.

MQTT is based on client-server communication mode. MQTT server is called MQTT Broker. At present, there are many MQTT brokers available in the industry. This article takes EMQ X, the most popular MQTT message server in the open source community, as an example. Using the public Broker provided by EMQ, this article uses a simple client to connect to the Broker and publish and process messages. Summarize the use methods and examples of MQTT client libraries in different programming languages and platforms.

The selected client libraries are as follows:

  • Eclipse Paho C and Eclipse Paho Embedded C
  • Eclipse Paho Java Client
  • Eclipse Paho MQTT Go client
  • Emqtt: Erlang MQTT client library provided by EMQ
  • Mqtt.js Web side & Node.js platform MQTT client
  • Eclipse Paho Python

The MQTT community includes a number of MQTT client libraries, which readers can view here.

Example Application description

The behavior of the MQTT client throughout the lifecycle can be summarized as: establish a connection, subscribe to a topic, receive a message and process it, publish a message to a specified topic, unsubscribe, and disconnect.

Standard client libraries expose corresponding methods in each link. Different libraries require the same method parameters in the same link. The specific parameters to be selected and the functions to be enabled must be determined by the user’s in-depth understanding of the MQTT protocol features and the actual application scenarios.

This paper takes a client to connect, publish and process messages as an example, and gives the parameters roughly needed for each link:

  • Establish a connection:
    • Specifies the access address and port of the MQTT Broker
    • Specifies whether the transport type is TCP or MQTT over WebSocket
    • To enable TLS, select the protocol version and carry the corresponding certificate
    • If authentication is enabled on the Broker, the client must carry the MQTT Username Password information
    • Configure client parameters such as keepalive duration, clean Session call retention flag, MQTT protocol version, and LWT
  • Subscribe to the topic: After the connection is established, you can subscribe to topics. You need to specify topic information
    • Specify a Topic filter that supports Topic wildcards when subscribing+#The use of
    • Note that some brokers and cloud service providers do not support some QoS levels. For example, AWS IoT, Ali Cloud IoT suite and Azure IoT Hub do not support QoS 2 messaging
    • Subscription topics may fail due to network issues, Broker side ACL rule restrictions
  • Receive the message and process it:
    • Processing functions are typically specified at connection time, and this part is handled slightly differently depending on the network programming model of the client library and the platform
  • news: Publishes messages to the specified topic
    • Specifies the target topic, noting that the topic cannot contain wildcards+#, the inclusion of wildcards in the topic may result in message publishing failure, client disconnection, etc. (depending on how the Broker and client libraries are implemented)
    • Specify the message QoS level. There are also different levels of QoS supported by different brokers and platforms. For example, Azure IoT Hub publishing QoS 2 messages will disconnect the client
    • Specifies the message body content, which cannot exceed the maximum message size set by the Broker
    • Specify message Retain retains the message flag bit
  • unsubscribe:
    • Specify the target topic
  • disconnect:
    • Active disconnection, will issue will Message (LWT)

Eclipse Paho C and Eclipse Paho Embedded C

Eclipse Paho Embedded C and Eclipse Paho Embedded C are both client libraries under the Eclipse Paho project. Both are full-featured MQTT clients written using ANSI C. Eclipse Paho Embedded C can be used on desktop operating systems, but mainly for Embedded environments such as MBED, Arduino, and FreeRTOS.

The client has both synchronous and asynchronous apis, starting with MQTTClient and MQTTAsync:

  • The synchronous API is designed to be simpler and more useful; some calls will block until the operation is complete, making it programmatically easier to use;
  • There is only one call block in the asynchronous APIAPI-waitForCompletion, which is more suitable for non-main thread environments.

For details on how to download and use the two libraries, please go to the project home page to view. This article uses Eclipse Paho C, and directly provides sample code as follows:

#include "stdio.h"
#include "stdlib.h"
#include "string.h"

#include "MQTTClient.h"

#define ADDRESS     "tcp://broker.emqx.io:1883"
#define CLIENTID    "emqx_test"
#define TOPIC       "testtopic/1"
#define PAYLOAD     "Hello World!"
#define QOS         1
#define TIMEOUT     10000L

int main(int argc, char* argv[])
{
    MQTTClient client;
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    MQTTClient_message pubmsg = MQTTClient_message_initializer;
    MQTTClient_deliveryToken token;
    int rc;

    MQTTClient_create(&client, ADDRESS, CLIENTID,
        MQTTCLIENT_PERSISTENCE_NONE, NULL);
  
    // Connection parameters
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;

    if((rc = MQTTClient_connect(client, &conn_opts)) ! = MQTTCLIENT_SUCCESS) {printf("Failed to connect, return code %d\n", rc);
        exit(- 1);
    }
  
    // Publish message
    pubmsg.payload = PAYLOAD;
    pubmsg.payloadlen = strlen(PAYLOAD);
    pubmsg.qos = QOS;
    pubmsg.retained = 0;
    MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
    printf("Waiting for up to %d seconds for publication of %s\n"
            "on topic %s for client with ClientID: %s\n",
            (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
    rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
    printf("Message with delivery token %d delivered\n", token);
  
    // Disconnect
    MQTTClient_disconnect(client, 10000);
    MQTTClient_destroy(&client);
    return rc;
}
Copy the code

Eclipse Paho Java Client

The Eclipse Paho Java Client is an MQTT Client library written in Java that can be used on the JVM or other Java-compatible platforms, such as Android.

The Eclipse Paho Java Client provides the MqttAsyncClient and MqttClient asynchronous and synchronous apis.

Install via Maven:

<dependency>
  <groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.2</version>
</dependency>
Copy the code

The following is the sample connection code:

App.java

package io.emqx;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class App {
    public static void main(String[] args) {
        String subTopic = "testtopic/#";
        String pubTopic = "testtopic/1";
        String content = "Hello World";
        int qos = 2;
        String broker = "tcp://broker.emqx.io:1883";
        String clientId = "emqx_test";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);

            // Connection options
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName("emqx_test");
            connOpts.setPassword("emqx_test_password".toCharArray());
            // Retain connection
            connOpts.setCleanSession(true);

            // Set callback
            client.setCallback(new PushCallback());

            // Setup connection
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);

            System.out.println("Connected");
            System.out.println("Publishing message: " + content);

            // Publish
            client.subscribe(subTopic);

            // Required parameters for publishing message
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            client.publish(pubTopic, message);
            System.out.println("Message published");

            client.disconnect();
            System.out.println("Disconnected");
            client.close();
            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep "+ me); me.printStackTrace(); }}}Copy the code

Callback message handling class onMessagecallback.java

package io.emqx;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class OnMessageCallback implements MqttCallback {
    public void connectionLost(Throwable cause) {
        // Reconnect after lost connection.
        System.out.println("Connection lost, and re-connect here.");
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // Message handler after receiving message
        System.out.println("Topic:" + topic);
        System.out.println("QoS:" + message.getQos());
        System.out.println("Payload:" + new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------"+ token.isComplete()); }}Copy the code

Eclipse Paho MQTT Go client

Eclipse Paho MQTT Go Client Eclipse Paho MQTT Go Client is the Go language version of the Client library under the Eclipse Paho project. The library is capable of connecting to the MQTT Broker to publish messages, subscribe to topics, and receive published messages, supporting a fully asynchronous mode of operation.

The client relies on The Google Proxy and WebSockets packages, and is installed using the following command:

go get github.com/eclipse/paho.mqtt.golang
Copy the code

The following is the sample connection code:

package main

import (
	"fmt"
	"log"
	"os"
	"time"

	"github.com/eclipse/paho.mqtt.golang"
)

var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	fmt.Printf("TOPIC: %s\n", msg.Topic())
	fmt.Printf("MSG: %s\n", msg.Payload())
}

func main(a) {
	mqtt.DEBUG = log.New(os.Stdout, "".0)
	mqtt.ERROR = log.New(os.Stdout, "".0)
	opts := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883").SetClientID("emqx_test_client")
	
	opts.SetKeepAlive(60 * time.Second)
	// Message callback handler
	opts.SetDefaultPublishHandler(f)
	opts.SetPingTimeout(1 * time.Second)

	c := mqtt.NewClient(opts)
	iftoken := c.Connect(); token.Wait() && token.Error() ! =nil {
		panic(token.Error())
	}

	// Subscription
	if token := c.Subscribe("testtopic/#".0.nil); token.Wait() && token.Error() ! =nil {
		fmt.Println(token.Error())
		os.Exit(1)}// Publish message
	token := c.Publish("testtopic/1".0.false."Hello World")
	token.Wait()

	time.Sleep(6 * time.Second)

	// Cancel subscription
	if token := c.Unsubscribe("testtopic/#"); token.Wait() && token.Error() ! =nil {
		fmt.Println(token.Error())
		os.Exit(1)}// Disconnect
	c.Disconnect(250)
	time.Sleep(1 * time.Second)
}
Copy the code

Emqtt: Erlang MQTT client library provided by EMQ

Emqtt is a client library provided by the official EMQ of the open source MQTT Broker EMQ X and is available in the Erlang language.

Erlang ecology has multiple MQTT Broker implementations, such as RabbitMQ, VerenMQ, EMQ X, etc., which support MQTT through plug-ins. While the MQTT client libraries have little choice, EMQTT is the best choice among the Erlang client libraries included by the MQTT community.

Emqtt is fully implemented by Erlang, complete support MQTT V3.1.1 and MQTT V5.0 protocol versions, support SSL single-double authentication and WebSocket connections. Another MQTT benchmarking tool, EMQTT_bench, is built on top of this client library.

Emqtt can be used as follows:

ClientId = <<"test">>.
{ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]).
{ok, _Props} = emqtt:connect(ConnPid).
Topic = <<"guide/#">>.
QoS = 1.
{ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, {Topic, QoS}).
{ok, _PktId} = emqtt:publish(ConnPid, <<"guide/1"> >, < <"Hello World!">>, QoS).
%% If the qos of publish packet is 0, `publish` function would not return packetid.
ok = emqtt:publish(ConnPid, <<"guide/2"> >, < <"Hello World!"> >,0).

%% Recursively get messages from mail box.
Y = fun (Proc) -> ((fun (F) -> F(F) end) ((fun(ProcGen) -> Proc(fun() -> (ProcGen(ProcGen))() end) end))) end.
Rec = fun(Receive) -> fun() - >receive {publish, Msg} -> io:format("Msg: ~p~n", [Msg]), Receive(); _Other -> Receive() after 5 -> ok end end end.
(Y(Rec))().

%% If you don't like y combinator, you can also try named function to recursively get messages in erlang shell.
Receive = fun Rec() -> receive {publish, Msg} -> io:format("Msg: ~p~n", [Msg]), Rec(); _Other -> Rec() after 5 -> ok end end.
Receive().

{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, <<"guide/#">>).

ok = emqtt:disconnect(ConnPid).
Copy the code

Mqtt.js Web side & Node.js platform MQTT client

Mqtt.js is a module written in JavaScript that implements the MQTT protocol client function and can be used in either node.js or browser environments. When used in Node.js, you can either use the -g global installation on the command line or integrate it into a project and invoke it.

Because of the single-threaded nature of JavaScript, mqTT. js is a fully asynchronous MQTT client. Mqtt. js supports MQTT and MQTT over WebSocket, with the following levels of support in different runtime environments:

  • Browser environment: MQTT over WebSocket (including wechat applet, Alipay applet and other customized browser environment)
  • Node.js environment: MQTT, MQTT over WebSocket

Except for a few connection parameters, the API is the same across different environments.

Install using NPM:

npm i mqtt
Copy the code

Install using CDN (browser) :

<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
    // Initialize a global mqtt variable
    console.log(mqtt)
</script>
Copy the code

Example code:

// const mqtt = require('mqtt')
import mqtt from 'mqtt'

// Connection option
const options = {
  		clean: true.// Retain connection
      connectTimeout: 4000.// Timeout
      // Authtication
      clientId: 'emqx_test'.username: 'emqx_test'.password: 'emqx_test',}// Connection string
// ws: unsecured WebSocket
// wss: secured WebSocket connection
// mqtt: unsecured TCP connection
// mqtts: secured TCP connection
const connectUrl = 'wss://broker.emqx.io:8084/mqtt'
const client = mqtt.connect(connectUrl, options)

client.on('reconnect', (error) => {
    console.log('reconnect:', error)
})

client.on('reconnect', (error) => {
    console.log('reconnect:', error)
})

client.on('message', (topic, message) => {
  console.log('the message:, topic, message.toString())
})
Copy the code

Eclipse Paho Python

Eclipse Paho Python is the Python language version of the client library under the Eclipse Paho project, which is able to connect to an MQTT Broker to publish messages, subscribe to topics, and receive published messages.

Install using the PyPi package management tool:

pip install paho-mqtt
Copy the code

Example code:

import paho.mqtt.client as mqtt


# Successful Connection Callback
def on_connect(client, userdata, flags, rc):
    print('Connected with result code '+str(rc))
    client.subscribe('testtopic/#')

# Message delivery callback
def on_message(client, userdata, msg):
    print(msg.topic+""+str(msg.payload))

client = mqtt.Client()

# Set callback handler
client.on_connect = on_connect
client.on_message = on_message

# Set up connection
client.connect('broker.emqx.io'.1883.60)
# Publish message
client.publish('emqtt',payload='Hello World',qos=0)

client.loop_forever()

Copy the code

conclusion

About MQTT protocol, MQTT client library use process, commonly used MQTT client introduction here, welcome readers through EMQ X MQTT learning, project development and use.


For more information, visit our official website, emqx. IO, or visit our open source project, github.com/emqx/emqx. For more information, visit our official documentation.