Iot scenario

For Internet of Things applications, MQTT was used. The main realization is similar to some scenic spots using the big screen to display the number of scenic spots in real time, more than the number is not allowed to enter. That is, the gate equipment is used to monitor the tourists entering the scenic spot, and then the message is sent to the big screen through MQTT. Finally, the big screen displays the number of tourists in the scenic spot in real time, and responds to a message to inform the gate equipment that it has received the message (to ensure the message arrives). This article will simulate a real usage process to explain, that is, the gate releases the message — the server (agent) receives the message and forwards it to the big screen — the big screen responds after receiving the message (publishes the message) — the server receives the message and forwards it to the gate device.

2 about the MQTT

2.1 introduction

MQTT (Message Queuing Telemetry Transport) is an instant messaging protocol developed by IBM. It is a publish/subscribe, extremely simple and lightweight messaging protocol designed for constrained devices and low bandwidth, high latency, or unreliable networks. Its design idea is light, open, simple, standard, easy to implement. These characteristics make it a good choice for many scenarios, especially for constrained environments such as machine-to-machine communication (M2M) and the Internet of Things. MQTT is more lightweight and has a lower bandwidth footprint than XMPP.

2.2 the characteristics of

MQTT protocol has the following characteristics:

  1. Use the publish/subscribe messaging pattern to provide one-to-many messaging and decouple applications.

  2. Message transmission that shields payload content.

  3. Use TCP/IP to provide network connectivity.

  4. There are three qualities of message publishing service:

    • Qos is 0: once at most. Message advertisement depends on the underlying TCP/IP network. Message loss or duplication occurs. This level can be used for environmental sensor data where the loss of a read record does not matter because a second send will follow shortly after.
    • Qos is 1: “at least once”, ensuring message arrival, but message duplication may occur. This level can be used in situations where you need to get every message and have no impact on your usage scenario if the message is sent repeatedly.
    • If the qos is 2, the message is received once. This level can be used in cases where duplicate or missing messages result in incorrect results in a billing system.
  5. Small transfer, very low overhead (fixed length header is 2 bytes), minimal protocol switching to reduce network traffic. Mechanisms for notifying interested parties of client outages using the Last Will and Testament features.

2.3 MQTT architecture

This architecture diagram, drawn in conjunction with the example described at the beginning of the article, describes the three identities of MQTT in practice. That is, a gate device is configured as Publisher at the entrance of the scenic spot. When the gate device monitors the entry of tourists, it will issue a message with Topic (such as “tourist_Enter”) to the MQTT-broker. When the server receives a published message, it performs subject-based filtering and forwards the message to subscribers subscribed to the topic. The big screen of scenic spot serves as Subscriber, and the subject of subscription is also “tourist_enter”, so that the message forwarded by the server can be received. After receiving the message, the number of current scenic spots can be displayed on the big screen in real time.

The gate device and the large screen in the structure diagram are both clients that can publish and subscribe. For example, after the big screen receives a message, it can also publish a message notifying the gate device that it has received the message.

3 Setting up the MQTT server

To use AN MQTT, you first need to set up an MQTT server (typically backend personnel in a company). In order to facilitate the test, front-end personnel will first use the server provided by the third party. There are many kinds of servers officially recommended, and I choose Apollo (belonging to Apache ActiveMQ) here.

1. Download and decompress

Click the download address and choose the version that is most suitable for your operating system to download. I am using Windows here and make the following selection:

D:\ apache-Apollo-1.7.1

2. Create a server instance

Go to the bin directory of the decompressed file (for example, CD D:\ apache-Apollo-1.7.1 \bin) and enter Apollo create myBroker (where mybroker is a custom server name) to create a server instance. The details are as follows:

The myBroker folder is then generated in the bin directory, where the etc\ Apollo.xml file in the myBroker folder is the configuration server information file, etc\users.properties file contains the username and password used to connect to the MQTT server, Note that only the password can be changed here (many blogs say that both username and password are changed here without verification). To change the username, go to the etc\groups.properties file. Properties/etc\groups.properties/wildma/admin/wildma/etc\users.properties/etc\users.properties/etc\users.properties/admin/wildma/wildma Then the passwords corresponding to these two user names are password and 123456 respectively

3. Start the server

Go to the bin directory in the MyBroker folder and type Apollo-broker.cmd run to start the server. If the following page is displayed, the service is successfully enabled.

4. Check whether the installation is successful

Finally, enter http://127.0.0.1:61680/ in the browser. If the interface can be successfully opened, the installation is successful. You can log in with the two usernames configured above.

Debug the use of MQTTFX, the client of MQTT

For the convenience of debugging MQTT, I choose MQTTFX as the gate equipment client. Specific use is as follows:

  1. Download Click the download address to choose the version that best suits your operating system. The diagram below:

  1. Install \

    After downloading, click Next to install the software successfully. After the installation is successful, open the software interface. The diagram below:

  1. Configuration \

    Click on the Settings above to add a new profile. Fill in the profile name, the server address (as the server is the local machine, use the local IP address ipconfig/all), and the port number (as soon as the server is enabled, it can set the address for Accepting connections: TCP ://0.0.0.0:61613, use port 61613, user name, password, and click OK. The diagram below:

  1. Subscribe message \

    Select the newly added configuration file “gateway Device” and click “Connect” to Connect to the server. Click “Subscribe” to set up a Topic (such as tourist_Enter), and click “Subscribe” to the right of Topic to Subscribe messages. The diagram below:

  1. Release the news \

    Click “Publish”, type the Topic you just subscribed to (tourist_enter), type the content of the message you want to Publish (Tourist Enter), click “Publish” next to Topic to Publish the message. The diagram below:

If you return to the subscription screen, you will see the newly published message as shown below:

5 Use of MQTT on Android

Using MQTT in Android requires the use of the Paho Android Service library, which is an MQTT client library written in Java. GitHub address: github.com/eclipse/pah…

5.1 integration

  1. Add dependencies to the build.gradle file of the Module
repositories { maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" } } dependencies { compile 'org. Eclipse. Paho: org. Eclipse paho. Client. Mqttv3:1.1.0' compile 'org. Eclipse. Paho: org. Eclipse paho. Android. Service: 1.1.1'}Copy the code
  1. Add restrictions to androidmanifest.xml
    <uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
    <uses-permission android:name="android.permission.WAKE_LOCK" />
Copy the code
  1. In androidmanifest.xml, register the Service (MyMqttService) to write the Service for itself, as described below
<service android:name="org.eclipse.paho.android.service.MqttService" /> <! --MqttService--> <service android:name="com.dongyk.service.MyMqttService"/> <! --MyMqttService-->Copy the code

5.2 Specific Code

5.2.1 The main ways to use MQTT in Android are as follows:
  • Connect: connect to MQTT server. Here we mainly talk about the method of 3 parameters, as follows:
 @Override
 public IMqttToken connect(MqttConnectOptions options, Object userContext,
   IMqttActionListener callback) throws MqttException {
    //...
}
Copy the code

Parameter options: Used to carry a series of parameters for connecting to the server, such as the user name and password. Parameter userContext: Optional object used to pass context to the callback. Generally, you can pass NULL. Parameter callback: The callback used to listen for a successful connection to MQTT

  • Publish: publish the message, using the four-parameter method as follows:
 @Override
 public IMqttDeliveryToken publish(String topic, byte[] payload, int qos,
   boolean retained) throws MqttException, MqttPersistenceException {
    //...
 }
Copy the code

Parameter Topic: Topic parameter payload: byte array of the message qos: provides the quality of service of the message. 0, 1, or 2 parameter retained: Whether to retain the last message after the server is disconnected

  • We subscribe to a message with two parameters:
 @Override
 public IMqttToken subscribe(String topic, int qos) throws MqttException,
   MqttSecurityException {
    //...
 }
Copy the code

Parameter Topic: the topic parameter of the subscription message qos: the quality of service of the subscription message, which can be passed 0, 1, or 2

5.2.2 MQTT Service — MyMqttService

MQTT: Connect, publish, subscribe

package com.wildma.mqttandroidclient;

import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;
import android.os.Build;
import android.os.Handler;
import android.os.IBinder;
import android.support.annotation.Nullable;
import android.support.annotation.RequiresApi;
import android.util.Log;
import android.widget.Toast;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * Author       wildma
 * Github       https://github.com/wildma
 * CreateDate   2018/11/08
 * Desc         ${MQTT服务}
 */

public class MyMqttService extends Service {

    public final String TAG = MyMqttService.class.getSimpleName();
    private static MqttAndroidClient  mqttAndroidClient;
    private        MqttConnectOptions mMqttConnectOptions;
    public        String HOST           = "tcp://192.168.0.102:61613";//服务器地址(协议+地址+端口号)
    public        String USERNAME       = "admin";//用户名
    public        String PASSWORD       = "password";//密码
    public static String PUBLISH_TOPIC  = "tourist_enter";//发布主题
    public static String RESPONSE_TOPIC = "message_arrived";//响应主题
    @RequiresApi(api = 26)
    public        String CLIENTID       = Build.VERSION.SDK_INT >= Build.VERSION_CODES.O
            ? Build.getSerial() : Build.SERIAL;//客户端ID,一般以客户端唯一标识符表示,这里用设备序列号表示

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        init();
        return super.onStartCommand(intent, flags, startId);
    }

    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        return null;
    }

    /**
     * 开启服务
     */
    public static void startService(Context mContext) {
        mContext.startService(new Intent(mContext, MyMqttService.class));
    }

    /**
     * 发布 (模拟其他客户端发布消息)
     *
     * @param message 消息
     */
    public static void publish(String message) {
        String topic = PUBLISH_TOPIC;
        Integer qos = 2;
        Boolean retained = false;
        try {
            //参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息
            mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 响应 (收到其他客户端的消息后,响应给对方告知消息已到达或者消息有问题等)
     *
     * @param message 消息
     */
    public void response(String message) {
        String topic = RESPONSE_TOPIC;
        Integer qos = 2;
        Boolean retained = false;
        try {
            //参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息
            mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 初始化
     */
    private void init() {
        String serverURI = HOST; //服务器地址(协议+地址+端口号)
        mqttAndroidClient = new MqttAndroidClient(this, serverURI, CLIENTID);
        mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调
        mMqttConnectOptions = new MqttConnectOptions();
        mMqttConnectOptions.setCleanSession(true); //设置是否清除缓存
        mMqttConnectOptions.setConnectionTimeout(10); //设置超时时间,单位:秒
        mMqttConnectOptions.setKeepAliveInterval(20); //设置心跳包发送间隔,单位:秒
        mMqttConnectOptions.setUserName(USERNAME); //设置用户名
        mMqttConnectOptions.setPassword(PASSWORD.toCharArray()); //设置密码

        // last will message
        boolean doConnect = true;
        String message = "{"terminal_uid":"" + CLIENTID + ""}";
        String topic = PUBLISH_TOPIC;
        Integer qos = 2;
        Boolean retained = false;
        if ((!message.equals("")) || (!topic.equals(""))) {
            // 最后的遗嘱
            try {
                mMqttConnectOptions.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
            } catch (Exception e) {
                Log.i(TAG, "Exception Occured", e);
                doConnect = false;
                iMqttActionListener.onFailure(null, e);
            }
        }
        if (doConnect) {
            doClientConnection();
        }
    }

    /**
     * 连接MQTT服务器
     */
    private void doClientConnection() {
        if (!mqttAndroidClient.isConnected() && isConnectIsNomarl()) {
            try {
                mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 判断网络是否连接
     */
    private boolean isConnectIsNomarl() {
        ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
        NetworkInfo info = connectivityManager.getActiveNetworkInfo();
        if (info != null && info.isAvailable()) {
            String name = info.getTypeName();
            Log.i(TAG, "当前网络名称:" + name);
            return true;
        } else {
            Log.i(TAG, "没有可用网络");
            /*没有可用网络的时候,延迟3秒再尝试重连*/
            new Handler().postDelayed(new Runnable() {
                @Override
                public void run() {
                    doClientConnection();
                }
            }, 3000);
            return false;
        }
    }

    //MQTT是否连接成功的监听
    private IMqttActionListener iMqttActionListener = new IMqttActionListener() {

        @Override
        public void onSuccess(IMqttToken arg0) {
            Log.i(TAG, "连接成功 ");
            try {
                mqttAndroidClient.subscribe(PUBLISH_TOPIC, 2);//订阅主题,参数:主题、服务质量
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void onFailure(IMqttToken arg0, Throwable arg1) {
            arg1.printStackTrace();
            Log.i(TAG, "连接失败 ");
            doClientConnection();//连接失败,重连(可关闭服务器进行模拟)
        }
    };

    //订阅主题的回调
    private MqttCallback mqttCallback = new MqttCallback() {

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            Log.i(TAG, "收到消息: " + new String(message.getPayload()));
            //收到消息,这里弹出Toast表示。如果需要更新UI,可以使用广播或者EventBus进行发送
            Toast.makeText(getApplicationContext(), "messageArrived: " + new String(message.getPayload()), Toast.LENGTH_LONG).show();
            //收到其他客户端的消息后,响应给对方告知消息已到达或者消息有问题等
            response("message arrived");
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {

        }

        @Override
        public void connectionLost(Throwable arg0) {
            Log.i(TAG, "连接断开 ");
            doClientConnection();//连接断开,重连
        }
    };

    @Override
    public void onDestroy() {
        try {
            mqttAndroidClient.disconnect(); //断开连接
        } catch (MqttException e) {
            e.printStackTrace();
        }
        super.onDestroy();
    }
}
Copy the code

The general logic of MyMqttService class is to start the service, call init() to initialize various parameters, including server address, username, password, etc., and then call doClientConnection() to connect to the MQTT server. IMqttActionListener is used to listen for a successful connection between MQTT and subscribe to the topic. MqttCallback is the callback of the subscribed topic. After receiving the message, messageArrived() method in the callback will be executed. After receiving the message, UI will be updated and response() method will be called to inform the other party that the message has arrived or there is a problem with the message.

5.2.3 Enabling the Service

Start the service in MainActivity. For the convenience of not updating the UI, here is a line of code to start the service, as follows:

import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.view.View; public class MainActivity extends AppCompatActivity { @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); MyMqttService.startService(this); // Start service}}Copy the code

6 Simulate real scenes

Using the same example as at the beginning of this article, the MQTTFX client is now used as the gate device, and the Android code above is run as the large screen.

  1. Connect the big screen to the server Run the big screen APK on an Android TV, no TV can be replaced with an Android phone. Remember to set the publish subject to “tourist_Enter” and the response subject to “message_ARRIVED” in the code.

  2. Connect the gate device to the server \

Select gate device — click Connect — Publish subject set to “tourist_Enter” as shown below:

Switch to the Subscribe interface — response subject set to “Message_ARRIVED” — click the Subscribe button to Subscribe, as shown below:

  1. Publish Click the Publish button in the figure in Step 2 to Publish

  2. When the big screen receives the message forwarded by the server, it will display the number of people entering the field on the big screen and respond to inform the other party that the message has arrived. For the sake of simplicity, the code shows a Toast. The concrete display is not textured.

  3. The gate device received a message \

When MQTTFX switches to the Subscribe interface, you can see the big screen response message as follows:

The above process is a rough simulation of the MQTT usage process I used in development, of course, my real project is not that simple, but also includes various data and UI interactive display. I hope that the simulation of this real use process can make you better understand the use of MQTT, please point out any deficiencies.

Project address: MqttAndroidClient

References:

  • MQTT 101 — How to Get Started with the Lightweight IoT Protocol
  • MQTT Client Library Encyclopedia – Paho Android Service
  • Android APP essential advanced features, notification push MQTT