JAVA development MQTT summary

MQTT is introduced

  • It is a lightweight message transfer protocol commonly used for machine-to-machine (M2M) and Internet of Things (IoT) communication between machines
  • Applicable to scenarios with low network bandwidth
  • Contains publish and subscribe mode. Through a broker, any client can subscribe or publish messages on a topic, and the subscribed clients will receive the messages

MQTT is still a demo written before the company has requirements, which is recorded here for easy reference when people use it. It does not involve the specific explanation of MQTT, but only sticks the code and operation process.

For a primer on MQTT, as well as a description of its features, protocols, and structure, see the following article

www.runoob.com/w3cnote/mqt…

What is MQTT, what does it do, and where are its application scenarios? Please refer to the article below

www.ibm.com/developerwo…

Refer to the following article for the download configuration setup process of the MQTT server apache-Apollo used in this article

Blog.csdn.net/qq_29350001…

To start creating the broker,

RaindeMacBook-Pro:bin rain$ ./apollo create mybroker Creating apollo instance at: mybroker Generating ssl keystore... Warning: THE JKS keystore is in a proprietary format. It is recommended to use"keytool -importkeystore -srckeystore keystore -destkeystore keystore -deststoretype pkcs12"Migrate to the industry standard format PKCS12. You can now start the broker by executing:"/ Users/rain/Documents/Soft/apache - Apollo - 1.7.1 / bin/mybroker/bin/Apollo - broker" run

Or you can run the broker in the background using:

   "/ Users/rain/Documents/Soft/apache - Apollo - 1.7.1 / bin/mybroker/bin/Apollo - broker - service" start


Copy the code

Enter the newly generated broker

RaindeMacBook-Pro:bin rain$ ls
apollo		apollo.cmd	mybroker	testbroker
RaindeMacBook-Pro:bin rain$ cd mybroker/
RaindeMacBook-Pro:mybroker rain$ ls
bin	data	etc	log	tmp
RaindeMacBook-Pro:mybroker rain$ cd bin
RaindeMacBook-Pro:bin rain$ ls
apollo-broker		apollo-broker-service
Copy the code

You can see that there are two files that start Apollo-broker

After successful startup, you can access it in the browser. The default user name and password are admin and password

Just going in, the Topics TAB is empty. I took the screenshot after RUNNING the application, so there is a list of Topics

Configure Maven

Add the following configuration to pom.xml

<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> The < version > 1.2.0 < / version > < / dependency >Copy the code

Create the following class

MqttServer

import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; Public class MqttServer2 {/** * proxy server IP address */ public static final String MQTT_BROKER_HOST ="TCP: / / 127.0.0.1:61613"; /** * public static final String MQTT_TOPIC ="test2";

    private static String userName = "admin";
    private static String password = "password"; /** * public static final String MQTT_CLIENT_ID ="android_server_xiasuhuei32"; private static MqttTopic topic; private static MqttClient client; public static void main(String... Args) {// push message MqttMessage message = new MqttMessage(); try { client = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true);
            options.setUserName(userName);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(10);
            options.setKeepAliveInterval(20);

            topic = client.getTopic(MQTT_TOPIC);

            message.setQos(1);
            message.setRetained(false);
            message.setPayload("message from server222222".getBytes());
            client.connect(options);

            while (true) {
                MqttDeliveryToken token = topic.publish(message);
                token.waitForCompletion();
                System.out.println("222 has been sent"); Thread.sleep(10000); } } catch (Exception e) { e.printStackTrace(); }}}Copy the code

MqttClient

import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; Public class MyMqttClient {/** * proxy server IP address */ public static final String MQTT_BROKER_HOST ="TCP: / / 127.0.0.1:61613"; /** * public static final String MQTT_CLIENT_ID ="android_xiasuhuei321"; /** * public static final String MQTT_TOPIC ="xiasuhuei321";

    /**
     *
     */
    public static final String USERNAME = "admin"; /** * PASSWORD */ public static final String PASSWORD ="password";
    public static final String TOPIC_FILTER = "test2"; private volatile static MqttClient mqttClient; private static MqttConnectOptions options; public static void main(String... Args) {try {// host is the host name, clientid is the clientid connected to MQTT, usually represented by the unique identifier of the client, // MemoryPersistence sets the preservation form of clientid, Default is to save mqttClient in memory = new mqttClient (MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence()); Options = new MqttConnectOptions(); // Set whether to clear the sessionfalseIndicates that the server keeps a record of the client's connection, // set totrueOptions. setCleanSession(true); // set the USERNAME options.setUserName(USERNAME); // Set the PASSWORD options.setPassword(password.tochararray ()); / / set the options for seconds. Overtime setConnectionTimeout (10); / / set the session for seconds heartbeat Server will be every 1.5 * 20 seconds to the client sends a message to determine whether a client online, but this method doesn't options. The mechanism of reconnection setKeepAliveInterval (20); / / connect the mqttClient. Connect (options); / / subscribe mqttClient. Subscribe (TOPIC_FILTER); // Set the callback mqttClient.setCallback(new)MqttCallback() {
                @Override
                public void connectionLost(Throwable throwable) {
                    System.out.println("connectionLost");
                }

                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    System.out.println("Topic: " + s + " Message: "+ mqttMessage.toString()); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }); } catch (Exception e) { e.printStackTrace(); }}}Copy the code

PublishSample

import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; Public class PublishSample {public static void main(String[] args) {String topic ="test2";
        String content = "Hello, ha ha";
        int qos = 1;
        String broker = "TCP: / / 127.0.0.1:61613";
        String userName = "admin";
        String password = "password";
        String clientId = "pubClient"; // MemoryPersistence = new MemoryPersistence(); MqttClient sampleClient = new MqttClient(Broker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); // Remember the state connOpts. SetCleanSession (false); // set the connection userName connOpts. SetUserName (userName); connOpts.setPassword(password.toCharArray()); // Establish a connection sampleclient.connect (connOpts); MqttMessage = new MqttMessage(content.getBytes()); Message.setqos (qos); message.setqos (qos); // Publish message sampleclient.publish (topic, message); // Disconnect sampleclient.disconnect (); // Close the client sampleclient.close (); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep "+ me); me.printStackTrace(); }}}Copy the code

SubscribeSample

import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * subscribes */ public class SubscribeSample {public static void main(String[] args) throws MqttException {String HOST ="TCP: / / 127.0.0.1:61613";
        String TOPIC = "test2";
        int qos = 1;
        String clientid = "subClient";
        String userName = "admin";
        String passWord = "password"; Try {// host is the host name,testFor clientid, the clientid connected to MQTT, generally represented by a unique identifier of the client, MemoryPersistence sets the preservation form of clientid, MqttClient = new MqttClient(HOST, clientid, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); // Set whether to clear the sessionfalseIndicates that the server keeps client connection recordstrueOptions. setCleanSession(true); // set the connection userName options.setUserName(userName); // Set the connection passWord options.setPassword(password.tochararray ()); / / set the options for seconds. Overtime setConnectionTimeout (10); / / set the session for seconds heartbeat Server will be every 1.5 * 20 seconds to the client sends a message to determine whether a client online, but this method doesn't options. The mechanism of reconnection setKeepAliveInterval (20); // Set the callback function client.setcallback (new)MqttCallback() {

                public void connectionLost(Throwable cause) {
                    System.out.println("connectionLost");
                }

                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    System.out.println("topic:"+topic);
                    System.out.println("Qos:"+message.getQos());
                    System.out.println("message content:"+new String(message.getPayload()));

                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                    System.out.println("deliveryComplete---------"+ token.isComplete()); }}); client.connect(options); // Subscribe message client.subscribe(TOPIC, qos); } catch (Exception e) { e.printStackTrace(); }}}Copy the code

Start the program

1. After MqttServer2 is started, messages are sent in a loop.

2. Start MyMqttClient to receive messages.

At this point, the whole program is basically running.

3. Start the PublishSample, publish a message, and then start the SubscribeSample to subscribe to the published message.

4. Published messages are also displayed in MyMqttClient