Welcome everyone to pay attention to my wechat public number [old week chat architecture], Java back-end mainstream technology stack principle, source code analysis, architecture and a variety of Internet high concurrency, high performance, high availability of solutions.

One, foreword

Last article we introduced the MQTT protocol format and related characteristics: the article read the Internet of things MQTT protocol basic characteristics, this article we will fight some, theory and practice, in order to understand MQTT.

I tell me the Mosquitto also mentioned the readers, this is an open source message broker software, provide lightweight, support to publish/subscribe message delivery mode, the device of short message communication between devices become simple, such as is now widely used low power sensors, mobile devices such as mobile phone, embedded computer, micro controller.

Week is here to lead us on building Mosquitto server on CentOS.

Two, construction preparation

Mosquitto Install version: Mosquit1.4.4

Mosquitto each version download address: mosquitto.org/files/sourc…

MQTT protocol reference website: MQTT 3.1.1

Libwebsockets download: github.com/warmcat/lib…

CentOS version: CentOS 7.8.2003

2.1 Obtaining Software

Obtain the installation package from the official website:

Wget HTTP: / / http://mosquitto.org/files/source/mosquitto-1.4.14.tar.gzCopy the code

2.2 installation

Tar -zxvf mosquitto-1.4.14.tar.gz CD mosquitto-1.4.14Copy the code

2.3 Modifying the Configuration File

Config. mk includes multiple options that can be turned off or on as needed. However, once enabled, you need to install the corresponding module first.

vim config.mk
Copy the code
options instructions Error message for make
WITH_SRV Enable support for the C-Areas library, a library that supports asynchronous DNS lookups, seec-ares.haxx.se missing ares.h
WITH_UUID Enable lib-UUID support to generate unique UUID for each connected client. missing uuid.h
WITH_WEBSOCKETS To enable WebSocket support, install libwebSockets for applications that use the WebSocket protocol. missing libwebsockets.h
WITH_SRV:=yes
WITH_UUID:=yes
WITH_WEBSOCKETS:=yes
Copy the code

2.3.1 installed c – areas

yum install c-ares-devel -y
Copy the code

2.3.2 installation lib – uuid

yum install uuid-devel -y
yum install libuuid-devel -y
Copy the code

2.3.3 installation libwebsockets

cd~ wget https://github.com/warmcat/libwebsockets/archive/v3.2.1.tar.gz tar ZXVF v3.2.1. Tar. GzcdLibwebsockets - 3.2.1 mkdir buildcd build
cmake .. -DLIB_SUFFIX=64
make install
ldconfig

cdMosquitto-1.4.14 yum install openssl-develop-yCopy the code

2.4 Compilation and installation

make && make install
Copy the code

If your terminal appears while executing the make command:

Change WITH_WEBSOCKETS from YES to NO and compile successfully.

WITH_WEBSOCKETS: = yes to WITH_WEBSOCKETS: = noCopy the code

If your application does not require the WebSocket protocol, you can disable this parameter by setting no.

If the terminal appears like this:

So congratulations, Mosquitto installed it.

2.5 illustrates

Program files will be installed in the following locations by default

The path Program files
/usr/local/sbin mosquiotto server
/etc/mosquitto configuration
/usr/local/bin utility command

Fixed link library path

Due to the operating system version and architecture, it’s easy to find the link library after installation. For example, the libmosquit.so.1 file may not be found when starting mosquitto client, so you need to add the link library path:

vim /etc/ld.so.conf.d/liblocal.conf
Copy the code

Add the following to the file:

/usr/local/lib64
/usr/local/lib
# refresh
ldconfig
Copy the code

Start and test Mosquitto Server

3.1 start

3.1.1 MosquitTO Starts with User Mosquitto by default

You can modify the configuration file by adding users:

groupadd mosquitto
useradd -g mosquitto mosquitto
Copy the code

3.1.2 Modifying a Configuration File

mv /etc/mosquitto/mosquitto.conf.example /etc/mosquitto/mosquitto.conf
Copy the code
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
# General configuration
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
# Interval for client heartbeat
#retry_interval 20
# Refresh time of system status
#sys_interval 10
# Recovery time of system resources. 0 indicates processing as soon as possible
#store_clean_interval 10
# PID of the service process
#pid_file /var/run/mosquitto.pid
# System user of the server process
#user mosquitto
# Maximum number of concurrent client heartbeat messages
#max_inflight_messages 10
# Client heartbeat message cache queue
#max_queued_messages 100
# Used to set the expiration time of the client's long connection. By default, it never expires
#persistent_client_expiration
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
# Default listener
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
The IP address of the service binding
#bind_address
# The port number of the service binding
#port 1883
# Maximum number of connections allowed. -1 indicates no limit
#max_connections -1
# cafile: CA certificate file
Capath: CA certificate directory
# certfile: PEM certificate file
# keyfile: PEM keyfile
#cafile
#capath
#certfile
#keyfile
Certificates must be provided to ensure data security
#require_certificate false
# if require_certificate is true, use_identity_as_username must also be true
#use_identity_as_username false
# Enable PSK (pre-shared-key) support
#psk_hint
# SSL/TSL encryption algorithm, which can be obtained using the "openssl ciphers" command
# as the output of that command.
#ciphers
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
# Persistence
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
# Interval for automatic message saving
#autosave_interval 1800
# Switch of message auto-save function
#autosave_on_changes false
# Enable the persistence function
persistence true
# Persist DB files
persistence_file mosquitto.db
# Persistent DB file directory
persistence_location /var/lib/mosquitto/
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
# Logging
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
# 4 log modes: stdout, stderr, Syslog, and Topic
# None indicates no logging, which can improve performance a bit
log_dest none
# Select log level (multiple can be set)
#log_type error
#log_type warning
#log_type notice
#log_type information
Whether to record client connection information
#connection_messages true
Whether to log time
#log_timestamp true
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
# Security
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
# Client ID prefix restriction, which can be used to ensure security
#clientid_prefixes
# Allow anonymous users
#allow_anonymous true
# User/password file, default format: username:password
#password_file
# PSK format password file, default format: identity:key
#psk_file
# pattern write sensor/%u/data
Configure ACL permissions.
User 
      
Topic # restrictions: topic [read | write] < topic >
Pattern write sensor/% U /data
#acl_file
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
# Bridges
# = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
Allow "bridging" between services (available for distributed deployment)
#connection <name>
#address <host>[:<port>]
#topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix]
# Set the client ID of the bridge
#clientid
Whether to clear the messages in the remote server when the bridge is disconnected
#cleansession false
Whether to publish bridge status information
#notifications true
Set the topic to which messages will be posted in bridge mode
# $SYS/broker/connection/<clientid>/state
#notification_topic
# Set the keepalive value of the bridge
#keepalive_interval 60
Bridge mode: Automatic, lazy, and once
#start_type automatic
# Timeout of bridge mode automatic
#restart_timeout 30
# Bridge mode lazy timeout
#idle_timeout 60
# Bridge client user name
#username
# Bridge client password
#password
# bridge_cafile: CA certificate file of the bridge client
# bridge_capath: Bridge client CA certificate directory
# bridge_certfile: PEM certificate file of the bridge client
# bridge_keyfile: The PEM keyfile of the bridge client
#bridge_cafile
#bridge_capath
#bridge_certfile
#bridge_keyfile
Copy the code

About detailed configuration may refer to: mosquitto.org/man/mosquit…

3.1.3 Setting the User Name and Password

Remove the comment from the configuration file #allow_anonymous true, set it to false, #password_file remove the comment and add the location where the password file is saved:

allow_anonymous false
password_file /etc/mosquitto/pwfile.example
Copy the code
Mosquitto_passwd - c/etc/mosquitto/pwfile example after the user name needs to enter the password twice Note if you want to add user mosquitto_passwd - b The/etc/mosquitto/pwfile example usernames and passwordsCopy the code

Again, you will be prompted to enter the password twice consecutively. Note Do not add -c when creating a user for the second time. If -c is added, the user created for the first time will be overwritten.

3.1.4 start mosquitto

mosquitto -c /etc/mosquitto/mosquitto.conf -d
Copy the code

Successful will start and listen on port 1883

3.2 test

Create two new shell Windows A/B

A Subscription topic:

Mosquitto_sub-t Topic name -h host IP -u User name -p password For example, mosquitto_sub-t topic-riemann -h localhost -u mosquitto -p mosquittoCopy the code

B Push message:

Mosquitto_pub-t Topic name -h host IP -m"Message content"-u User name -p Password For example, mosquitto_pub -t topic-riemann -h localhost -m"Hello, MQTT" -u mosquitto -P mosquitto
Copy the code

3.3 Possible Problems

If you get this error:

mosquitto_sub: error while loading shared libraries: libmosquitto.so.1: cannot open shared object file: No such file or directory
Copy the code

Solution:

After compiling the mosquitto, go to the lib directory and copy the libmosquit.so.1 to /usr/local/lib. Run the following command:

cp libmosquitto.so.1 /usr/local/lib
Copy the code

Then execute the command:

sudo ln -s /usr/local/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1
ldconfig
Copy the code

3.4 Test Results

Implementations of the Mosquitto client in Java

4.1 Project structure Diagram

4.2 add pom. XML

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.6. RELEASE</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mqtt</artifactId>
        <version>5.2.5. RELEASE</version>
    </dependency>
</dependencies>
Copy the code

4.3 application. Yml

mqtt:
  host: TCP :// Server IP:1883
  clientId: client_${random.value}
  topic: test/system/module/biz
  qoslevel: 1
  username: mosquitto
  password: mosquitto
  timeout: 10000
  keepalive: 20

server:
  port: 8888
Copy the code

4.4 MqttConfig

/ * * *@author: Wechat public account [Old Zhou chat structure] */
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttConfig {
    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.host}")
    private String hostUrl;

    @Value("${mqtt.clientId}")
    private String clientId;

    @Value("${mqtt.topic}")
    private String defaultTopic;

    // The connection timed out
    @Value("${mqtt.timeout}")
    private int completionTimeout;

    @Bean
    public MqttConnectOptions getMqttConnectOptions(a) {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setKeepAliveInterval(90);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        mqttConnectOptions.setKeepAliveInterval(2);
        return mqttConnectOptions;
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory(a) {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(a) {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel(a) {
        DirectChannel directChannel = new DirectChannel();
        return directChannel;
    }

    // Receive channel
    @Bean
    public MessageChannel mqttInputChannel(a) {
        DirectChannel directChannel = new DirectChannel();
        return directChannel;
    }

    // Configure the topic that the client listens to
    @Bean
    public MessageProducer inbound(a) {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound",
                mqttClientFactory(), "test/#");
        adapter.setCompletionTimeout(completionTimeout);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    // Get data through the channel
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler(a) {
        return message -> {
            String topic = (String)message.getHeaders().get("mqtt_receivedTopic");
            log.info("Subject: {}, data received by message: {}", topic, message.getPayload()); }; }}Copy the code

4.5 MqttGateWay

/ * * *@author: Wechat public account [Old Zhou chat structure] */
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateWay {
    // Define an overload method for sending messages
    void sendToMqtt(String payload);
    // Specify topic to send messages
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
Copy the code

4.6 MqttController Control class

/ * * *@author: Wechat public account [Old Zhou chat structure] */
@Slf4j
@RestController
@RequestMapping("/api")
public class MqttController {
    @Autowired
    MqttGateWay mqttGateWay;

    @PostMapping("/publish")
    public String publish(@RequestHeader(value = "toplic") String toplic , String message) {
        log.info(String.format("topic: %s, message: %s", toplic, message));
        mqttGateWay.sendToMqtt(toplic, message);
        return "success"; }}Copy the code

4.7 MqttApplication startup class

/ * * *@author: Wechat public account [Old Zhou chat structure] */
@SpringBootApplication
public class MqttApplication {
    public static void main(String[] args) { SpringApplication.run(MqttApplication.class, args); }}Copy the code

4.8 Starting the Mosquitto Server

mosquitto -c /etc/mosquitto/mosquitto.conf -d
Copy the code

4.9 Using THE HTTP Client of IDEA to simulate HTTP requests

4.10 Test results

IDEA Console receives a message for this topic:The shell terminal also receives a message that subscribing to the topic: