“Android implements message notification based on MQTT”

First, write first

In the IoT docking project, WE found that MQTT has been connected at present, which is recorded here. The official information is more detailed and mainly starts from the implementation details. The specific requirements and supporting technical solutions are sorted out for reference.

I. IoT and MQTT

When it comes to IoT(Internet of Things) and IIoT(Industrial IoT), WE have to say MQTT, which is widely used in the Internet of Things and Industrial Internet of Things, is a messaging protocol. Different from some common smart devices we know at ordinary times, such as mobile phones, computers, tablets, etc.; These devices generally have very good computing power and rely on a good network environment. However, the performance of general hardware devices is poor and the network environment is unstable, while MQTT is specifically designed for the hardware performance and the network state is unstable. Has a natural advantage.

What is MQTT

MQTT is the most commonly used messaging protocol (IoT) for the Internet of Things. MQTT stands for MQ telemetry Transport. The protocol is a set of rules that define how IoT devices publish and subscribe to data over the Internet. For messaging and data exchange between IoT and industrial IoT(IIT) devices such as embedded devices, sensors, industrial PLCS, etc. The protocol is event-driven and connects devices using a publish/subscribe (PUB/SUB) pattern. Publishers and receivers (subscribers) communicate through topics and are separated from each other. The connection between them is handled by the MQTT agent. The MQTT broker filters all incoming messages and distributes them correctly to subscribers.

Differences with traditional Http
  • MQTT is data-centric, with an underlying LAYER based on TCP links, manipulates lightweight binary data directly, and packets are small (as small as one character, two bytes). It is important to note that because of this feature, it is less demanding on the network environment state than HTTP, which is one of the reasons why it is widely used in IoT devices.
  • MQTT is based on the publish/subscribe model, which is different from the HTTP request/callback model. This determines that a device can be either a Client or a Server. Recall the publish/subscribe model, the publish of messages can be 1toN(N>=0), while HTTP is 1to1.
  • MQTT’s publish/subscribe architecture prevents it from being based on UDP(for no links), whereas HTPP’s underlying architecture can be based on TCP or UDP.
  • The difference in message volume is that MQTT packets are small while HTTP data volumes are generally large.
Four, MQTT components
1.Publish&Subscribe

MQTT performs its own decoupling for publication and subscription, mainly from three dimensions, 1. Spatial decoupling: Publishers and subscribers do not need to know each other (for example, there is no exchange of IP addresses and ports). 2. Time decoupling: Publishers and subscribers do not need to run at the same time. 3. Synchronous decoupling: The operations of two components do not need to be interrupted during publishing or receiving. The detailed information

2. The Client and the Broker

The detailed information

3.Topics&Best Practices

Attention should be paid to the matching rules of Topics, which can be divided into single wildcard and multiple wildcard. Single concatenation with + : this/is/+/single, where only the + part can be replaced with a single path (split with /). Multiple wildcards are only supported at the end: this/is/multi/#, and are multilevel.

The detailed information

4.Keep Alive

Lifetime, including other fields, are explained in great detail in the official documentation, which is still the best choice for a serious understanding of a technical implementation. There is a basic understanding of what MQTT is, and the implementation details and specifications are not clear enough and may be misleading. MQTT

5. Use of MQTT in actual projects
1. What requirements are implemented?

Taking the actual project as an example, the following functions need to be realized:

  • The server sends messages to the IoT device. Messages are classified by Type. Different messages need to be processed according to different measures.
  • According to different messages, voice playback, calling number, local data update, and server configuration delivery can be performed.

The function is relatively simple, the summary is that the server pushes the message, and the device responds according to the message.

2. Specific implementation scheme

Import dependence

implementation 'org. Eclipse. Paho: org. Eclipse paho. Client. Mqttv3:1.1.0'
implementation 'org. Eclipse. Paho: org. Eclipse paho. Android. Service: 1.1.1'
Copy the code

There are several classes: a. Principal request Client; b. Callback dataCallback; c. ConnectCallback, d. Specific message handling policy IHandler. The program mainly includes these categories, gradually realize each detail.

3. The data interface calls back IDataCallback
interface IDataCallback {
  fun connectionError(cause: Throwable?).
  fun dataMessage(dataMessage: String)
}
Copy the code
4. Connection status callback IConnectionCallback
interface IConnectionCallback {
  fun connectSuccess(a)
  fun connectFail(reason: Throwable?).
}
Copy the code
5. CMqttClient link class

Before implementation, several key parameters are listed, which are configured in MqttConnectOptions

val options = MqttConnectOptions()
// The default value is true, which indicates that the subscription is not persistent. The status will not be maintained regardless of server or client restart, and the specified message will not be delivered after the restart
// Set to false to persistent subscription, server and client restart or relink, and messages can be delivered
options.isCleanSession = false
// Link user name (account name)
options.userName = user
// The linked user password (account password)
options.password = password.toCharArray()
// Link timeout value s, default 30s, 0, waiting for network status, that is, success or failure
options.connectionTimeout = connectTimeout
If the value is 0, the client is disabled. Within the keepalive interval, when there is no message, the client will ping to check whether the link is maintained
options.keepAliveInterval = keepAliveInterval
// Whether to enable automatic reconnection. The initial attempt is to wait 1s for reconnection. In case of failure, the delay is doubled to 2 minutes.
options.isAutomaticReconnect = true
Copy the code

There are three requirements for automatic reconnection: cleanSession must be set to false, isAutomaticReconnect must be set to true, and the connection has already been made. Here is the requirement that, although MQTT can automatically retry the connection at that time must have these three premises, then for the first time due to network and other reasons failed to connect, this layer of retry mechanism is needed to achieve our own, that is, to ensure the first time to connect to the server. Source code and comments:

//Reconnect Only appropriate if cleanSession is false and we were connected. Declare as synchronized to avoid multiple calls to this method to send connect multiple times
synchronized void reconnect(a) {
  //....
}
IsAutomaticReconnect = true if the connection has been made before; // cleanSession = false; isAutomaticReconnect = true;
Copy the code
class CMqttClient {
  // Connecting to the server
  private var connecting = false
  private var mqttAndroidClient: MqttAndroidClient? = null
  // Connect to the server
  fun connectToServer(
    context: Context,
    host: String,
    clientId: String,
    accountName: String,
    accountPsw: String,
    connectTimeout: Int = 20,
    keepAliveTime: Int = 60,
    connectionCallback: IConnectionCallback? = null,
    dataCallback: IDataCallback? = null
  ) {
    if(null== mqttAndroidClient) { mqttAndroidClient = MqttAndroidClient(context, host, clientId) mqttAndroidClient? .setCallback(object: MqttCallback {
        override fun connectionLost(cause: Throwable?).{ dataCallback? .connectionError(cause) }override fun messageArrived(topic: String? , message:MqttMessage?).{ message? .let {valpayLoad = String(it.payload) Log.d(xxxxxxxxx) dataCallback? .dataMessage(payLoad) } }override fun deliveryComplete(token: IMqttDeliveryToken?). {
          //do something
        }
      })
    }
    connecting = true
    val options = MqttConnectOptions()
    options.isCleanSession = false
    options.userName = user
    options.password = password.toCharArray()
    options.connectionTimeout = connectTimeout
    options.keepAliveInterval = keepAliveInterval
    options.isAutomaticReconnect = truemqttAndroidClient? .connect(options,null.object: IMqttActionListener{
      override fun onSuccess(asyncActionToken: IMqttToken?). {
        connecting = falseconnectionCallback? .connectSuccess() }override fun onFailure(asyncActionToken: IMqttToken? , exception:Throwable?). {
        connecting = falseconnectionCallback? .connectFail(exception) } }) }// Whether a link has been established
  fun hasConnected(a): Boolean {
    return try{ mqttAndroidClient? .isConnected ==true
    } catch {
      false}}// Disconnect from the server
  fun disConnectFromServer(a) {
    if(mqttAndroidClient? .isConnected ==true) { mqttAndroidClient? .disconnect() } connecting =false
  }
  // Release resources
  fun relaseClient(a){ mqttAndroidClient? .close() mqttAndroidClient =null
    connecting = false
  }
  // Subscribe to the message
  fun subscribe(topics: Array<String>, qos: IntArray, timeOut: Long = 2000): Boolean {
    returnmqttAndroidClient? .let {try{
        val mqttToken = it.subscribe(topics, qos)
        mqttToken.waitForCompletion(timeout)
        mqttToken.isComplete
        true
      } catch {
        Log.d(xxxx)
        false
      }
    }?: false
  }
  // Unsubscribe
  fun unSubscribe(a): Boolean {
    / / to omit}}Copy the code

It should be noted that ClientId here is unique. For IoT devices, deviceId is used as ClientId. If the user userId is used, retry and other mechanisms will affect the expected results when multiple devices are logged in, bringing some difficulty to troubleshooting.

5. Message processing interface IHandler
interface IHandler {
  fun handlerMessage(message: String)
}
Copy the code

The message body will contain different types, and different processors will be implemented for each type, with annotations for flexibility.

6. Annotation templates
@Target(AnnotationTarget.CLASS)
@Retention(AnnotationRetention.SOURCE)
annotation class MQTTHandler {
  val groupName: String
  val type: String
}
Copy the code

Load the corresponding IHandler implementation class by reflection, core code

interface IHandlerProvide {
  fun provideHandler(handlerType: String): IHandler?
  fun release(a)
}

class XXXHandlerProvider: IHandlerProvide {
  override fun provideHandler(handlerType: String): IHandler? {
    //find the imp IHandler
  }
  override fun release(a) {
    //release the source}}override fun provideHandler(handlerType: String): IHandler? {
  valhandlerClass = Maneger.instance.findIHandlersByGroup(GroupName)? .get(handlerType)
}
Copy the code

To use, add a note directly:

@MQTTHandler(groupName = groupxxxx, type = typeNamexxxx)
class TestHandler: IHandler {
  override fun handlerMessage(message: String) {
    //do something what you want}}Copy the code

The main part of the whole process has been given, the core is to find out the corresponding processor through different message types; Of course, this part is mostly done by annotations, and the processor lookup is matched by reflection.

Six, documentation,

MQTT website