Automatic reconnection can be set only for asynchronous methods

  1. Only the asynchronous interface MQTTAsync is available and is not particularly convenient to use
  2. Parameters to Be Configured
conn_opts.automaticReconnect = 1;
conn_opts.minRetryInterval = 2; 
conn_opts.maxRetryInterval = 365*24*60*60;
Copy the code
  1. Need to configure functions
if((rc = MQTTAsync_setConnected(client, client, onReconnected)) ! = MQTTASYNC_SUCCESS) {printf("Failed to MQTTAsync_setConnected, return code %d\n", rc);
  rc = EXIT_FAILURE;
  goto destroy_exit;
 }
Copy the code
  1. Callback implementation after successful connection
void onReconnected(void* context, char* cause)
{
 MQTTAsync client = (MQTTAsync)context;
 MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
 int rc;
 printf("Successful reconnection\n");
 printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
           "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
 opts.onSuccess = onSubscribe;
 opts.onFailure = onSubscribeFailure;
 opts.context = client;
 if((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) ! = MQTTASYNC_SUCCESS) {printf("Failed to start subscribe, return code %d\n", rc);
  finished = 1;
 }
Copy the code

Second, concrete implementation

/******************************************************************************* * Copyright (c) 2012, 2020 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms Of the Eclipse Public License V2.0 * and Eclipse Distribution License V1.0 which accompany this Distribution. * * the The Eclipse Public License is available at *, https://www.eclipse.org/legal/epl-2.0/ * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Ian Craggs - initial contribution *******************************************************************************/
 
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTAsync.h"
 
#if! defined(_WIN32)
#include <unistd.h>
#else
#include <windows.h>
#endif
 
#if defined(_WRS_KERNEL)
#include <OsWrapper.h>
#endif
 
#define ADDRESS     "tcp://mqtt.eclipse.org:1883"
#define CLIENTID    "ExampleClientSub"
#define TOPIC       "MQTT Examples"
#define PAYLOAD     "Hello World!"
#define QOS         1
#define TIMEOUT     10000L
 
int disc_finished = 0;
int subscribed = 0;
int finished = 0;
 
void connlost(void *context, char *cause)
{
    MQTTAsync client = (MQTTAsync)context;
    MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
    int rc;
 
    printf("\nConnection lost\n");
    if (cause)
        printf(" cause: %s\n", cause);
 
    printf("Reconnecting\n");
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    if((rc = MQTTAsync_connect(client, &conn_opts)) ! = MQTTASYNC_SUCCESS) {printf("Failed to start connect, return code %d\n", rc);
        finished = 1; }}int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
    printf("Message arrived\n");
    printf(" topic: %s\n", topicName);
    printf(" message: %.*s\n", message->payloadlen, (char*)message->payload);
    MQTTAsync_freeMessage(&message);
    MQTTAsync_free(topicName);
    return 1;
}
 
void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
{
    printf("Disconnect failed, rc %d\n", response->code);
    disc_finished = 1;
}
 
void onDisconnect(void* context, MQTTAsync_successData* response)
{
    printf("Successful disconnection\n");
    disc_finished = 1;
}
 
void onSubscribe(void* context, MQTTAsync_successData* response)
{
    printf("Subscribe succeeded\n");
    subscribed = 1;
}
 
void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{
    printf("Subscribe failed, rc %d\n", response->code);
    finished = 1;
}
 
 
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
    printf("Connect failed, rc %d\n", response->code);
    finished = 1;
}
 
 
void onConnect(void* context, MQTTAsync_successData* response)
{
    MQTTAsync client = (MQTTAsync)context;
    MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
    int rc;
 
    printf("Successful connection\n");
 
    printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
           "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
    opts.onSuccess = onSubscribe;
    opts.onFailure = onSubscribeFailure;
    opts.context = client;
    if((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) ! = MQTTASYNC_SUCCESS) {printf("Failed to start subscribe, return code %d\n", rc);
        finished = 1; }}void onReconnected(void* context, char* cause)
{
    MQTTAsync client = (MQTTAsync)context;
    MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
    int rc;
 
    printf("Successful reconnection\n");
 
    printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
           "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
    opts.onSuccess = onSubscribe;
    opts.onFailure = onSubscribeFailure;
    opts.context = client;
    if((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) ! = MQTTASYNC_SUCCESS) {printf("Failed to start subscribe, return code %d\n", rc);
        finished = 1; }}int main(int argc, char* argv[])
{
    MQTTAsync client;
    MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
    MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
    int rc;
    int ch;
 
    if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL))
            != MQTTASYNC_SUCCESS)
    {
        printf("Failed to create client, return code %d\n", rc);
        rc = EXIT_FAILURE;
        goto exit;
    }
 
    if ((rc = MQTTAsync_setCallbacks(client, client, connlost, msgarrvd, NULL)) != MQTTASYNC_SUCCESS)
    {
        printf("Failed to set callbacks, return code %d\n", rc);
        rc = EXIT_FAILURE;
        goto destroy_exit;
    }
    if((rc = MQTTAsync_setConnected(client, client, onReconnected)) ! = MQTTASYNC_SUCCESS) {printf("Failed to MQTTAsync_setConnected, return code %d\n", rc);
        rc = EXIT_FAILURE;
        goto destroy_exit;
    }
     
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    conn_opts.onSuccess = onConnect;
    conn_opts.onFailure = onConnectFailure;
    conn_opts.context = client;
    conn_opts.automaticReconnect = 1;
    conn_opts.minRetryInterval = 2; //seconds
    conn_opts.maxRetryInterval = 365*24*60*60;
    if((rc = MQTTAsync_connect(client, &conn_opts)) ! = MQTTASYNC_SUCCESS) {printf("Failed to start connect, return code %d\n", rc);
        rc = EXIT_FAILURE;
        goto destroy_exit;
    }
 
    while(! subscribed && ! finished)#if defined(_WIN32)
            Sleep(100);
        #else
            usleep(10000L);
        #endif
 
    if (finished)
        goto exit;
 
    do
    {
        ch = getchar();
    } while(ch! ='Q'&& ch ! ='q');
 
    disc_opts.onSuccess = onDisconnect;
    disc_opts.onFailure = onDisconnectFailure;
    if((rc = MQTTAsync_disconnect(client, &disc_opts)) ! = MQTTASYNC_SUCCESS) {printf("Failed to start disconnect, return code %d\n", rc);
        rc = EXIT_FAILURE;
        goto destroy_exit;
    }
    while(! disc_finished) {#if defined(_WIN32)
            Sleep(100);
        #else
            usleep(10000L);
        #endif
    }
 
destroy_exit:
    MQTTAsync_destroy(&client);
exit:
    return rc;
}
Copy the code

MQTT’s C library paho.mqtt. C is configured for automatic reconnection

Asynchronous publish and subscribe implementation

1.Pub

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTAsync.h"
#include <unistd.h>
 
#define ADDRESS     "tcp://localhost:1883"
#define CLIENTID    "ClientPub"
#define TOPIC       "MQTT"
#define PAYLOAD     "Hello World!"
#define QOS         1
#define TIMEOUT     10000L
 
volatile MQTTAsync_token deliveredtoken;
 
int finished = 0;
bool is_connect = false;
void connlost(void *context, char *cause)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
	int rc;
 
	printf("\nConnection lost\n");
	printf(" cause: %s\n", cause);
 
	printf("Reconnecting\n");
	conn_opts.keepAliveInterval = 20;
	conn_opts.cleansession = 1;
	if((rc = MQTTAsync_connect(client, &conn_opts)) ! = MQTTASYNC_SUCCESS) {printf("Failed to start connect, return code %d\n", rc);
 		finished = 1; }}void onDisconnect(void* context, MQTTAsync_successData* response)
{
	printf("Successful disconnection\n");
	finished = 1;
}
 
 
void onSend(void* context, MQTTAsync_successData* response)
{
	//MQTTAsync client = (MQTTAsync)context;
	//MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
	//int rc;
 
	printf("Message with token value %d delivery confirmed\n", response->token);
      
	//opts.onSuccess = onDisconnect;
	//opts.context = client;
        /* if ((rc = MQTTAsync_disconnect(client, &opts)) ! = MQTTASYNC_SUCCESS) { printf("Failed to start sendMessage, return code %d\n", rc); exit(EXIT_FAILURE); } * /
}
 
 
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
	printf("Connect failed, rc %d\n", response ? response->code : 0);
	finished = 1;
}
 
 
void onConnect(void* context, MQTTAsync_successData* response)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
	MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
	int rc;
 
	printf("Successful connection\n");
	is_connect = true;
/* opts.onSuccess = onSend; opts.context = client; pubmsg.payload = (void*)PAYLOAD; pubmsg.payloadlen = (int)strlen(PAYLOAD); pubmsg.qos = QOS; pubmsg.retained = 0; deliveredtoken = 0; if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) ! = MQTTASYNC_SUCCESS) { printf("Failed to start sendMessage, return code %d\n", rc); exit(EXIT_FAILURE); } * /        
}
 
 
int main(int argc, char* argv[])
{
	MQTTAsync client;
	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
	int rc;
 
	MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
 
	MQTTAsync_setCallbacks(client, NULL, connlost, NULL.NULL);
 
	conn_opts.keepAliveInterval = 20;
	conn_opts.cleansession = 1;
	conn_opts.onSuccess = onConnect;
	conn_opts.onFailure = onConnectFailure;
	conn_opts.context = client;
        printf("before\n");
	if((rc = MQTTAsync_connect(client, &conn_opts)) ! = MQTTASYNC_SUCCESS) {printf("Failed to start connect, return code %d\n", rc);
		exit(EXIT_FAILURE);
	}
        printf("after\n");
	printf("Waiting for publication of %s\n"
         "on topic %s for client with ClientID: %s\n",
         PAYLOAD, TOPIC, CLIENTID);
        while(! is_connect) {printf("sleep\n");
            usleep(10000L);
	}
        
	MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
	   
        opts.onSuccess = onSend;
	opts.context = client;
 
        while(1)
	{
	   
	   int rc;
           printf("while1\n");
           //#define MQTTAsync_message_initializer 
	   MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
	   pubmsg.payload = (void*)PAYLOAD;
	   pubmsg.payloadlen = (int)strlen(PAYLOAD);
	   pubmsg.qos = QOS;
	   pubmsg.retained = 0;
	   deliveredtoken = 0;
 
	   if((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) ! = MQTTASYNC_SUCCESS) {printf("Failed to start sendMessage, return code %d\n", rc);
		exit(EXIT_FAILURE);
	   }
	   usleep(100000L);
	}
 
	while(! finished)#if defined(WIN32)
			Sleep(100);
		#else
			usleep(10000L);
		#endif
 
	MQTTAsync_destroy(&client);
 	return rc;
}
  
Copy the code

2.Sub

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTAsync.h"
#include <unistd.h>
 
 
#define ADDRESS     "tcp://localhost:1883"
#define CLIENTID    "ClientSub"
#define TOPIC       "MQTT"
#define PAYLOAD     "Hello World!"
#define QOS         1
#define TIMEOUT     10000L
 
volatile MQTTAsync_token deliveredtoken;
 
int disc_finished = 0;
int subscribed = 0;
int finished = 0;
 
void connlost(void *context, char *cause)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
	int rc;
 
	printf("\nConnection lost\n");
	if (cause)
		printf(" cause: %s\n", cause);
 
	printf("Reconnecting\n");
	conn_opts.keepAliveInterval = 20;
	conn_opts.cleansession = 1;
	if((rc = MQTTAsync_connect(client, &conn_opts)) ! = MQTTASYNC_SUCCESS) {printf("Failed to start connect, return code %d\n", rc);
		finished = 1; }}int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
    int i;
    char* payloadptr;
 
    printf("Message arrived\n");
    printf(" topic: %s\n", topicName);
    printf(" message: ");
 
    payloadptr = (char*)message->payload;
    for(i=0; i<message->payloadlen; i++)
    {
        putchar(*payloadptr++);
    }
    putchar('\n');
    MQTTAsync_freeMessage(&message);
    MQTTAsync_free(topicName);
    return 1;
}
 
 
void onDisconnect(void* context, MQTTAsync_successData* response)
{
	printf("Successful disconnection\n");
	disc_finished = 1;
}
 
 
void onSubscribe(void* context, MQTTAsync_successData* response)
{
	printf("Subscribe succeeded\n");
	subscribed = 1;
}
 
void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{
	printf("Subscribe failed, rc %d\n", response ? response->code : 0);
	finished = 1;
}
 
 
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
	printf("Connect failed, rc %d\n", response ? response->code : 0);
	finished = 1;
}
 
 
void onConnect(void* context, MQTTAsync_successData* response)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
	int rc;
 
	printf("Successful connection\n");
 
	printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
           "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
	opts.onSuccess = onSubscribe;
	opts.onFailure = onSubscribeFailure;
	opts.context = client;
 
	deliveredtoken = 0;
 
	if((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) ! = MQTTASYNC_SUCCESS) {printf("Failed to start subscribe, return code %d\n", rc);
		exit(EXIT_FAILURE); }}int main(int argc, char* argv[])
{
	MQTTAsync client;
	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
	MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
	int rc;
	int ch;
 
	MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
 
	MQTTAsync_setCallbacks(client, client, connlost, msgarrvd, NULL);
 
	conn_opts.keepAliveInterval = 20;
	conn_opts.cleansession = 1;
	conn_opts.onSuccess = onConnect;
	conn_opts.onFailure = onConnectFailure;
	conn_opts.context = client;
	if((rc = MQTTAsync_connect(client, &conn_opts)) ! = MQTTASYNC_SUCCESS) {printf("Failed to start connect, return code %d\n", rc);
		exit(EXIT_FAILURE);
	}
 
	while(! subscribed)#if defined(WIN32)
			Sleep(100);
		#else
			usleep(10000L);
		#endif
 
	if (finished)
		goto exit;
 
	do 
	{
		ch = getchar();
	} while(ch! ='Q'&& ch ! ='q');
 
	disc_opts.onSuccess = onDisconnect;
	if((rc = MQTTAsync_disconnect(client, &disc_opts)) ! = MQTTASYNC_SUCCESS) {printf("Failed to start disconnect, return code %d\n", rc);
		exit(EXIT_FAILURE);
	}
 	while(! disc_finished)#if defined(WIN32)
			Sleep(100);
		#else
			usleep(10000L);
		#endif
 
exit:
	MQTTAsync_destroy(&client);
 	return rc;
}
Copy the code

Reference: [MQTT implementation publishes messages on one end, receives messages on the other, asynchronous] MQTT is used asynchronously