Operating environment:

  • JDK 8+
  • Maven 3.0 +
  • Redis

Technology stack:

  • SpringBoot 2.0 +
  • Redis (Lettuce client,RedisTemplate template method)
  • Netty 4.1 +
  • MQTT 3.1.1

IDE:

  • The IDEA or the Eclipse
  • Lombok plug-in

Introduction to the

In recent years, the Internet of Things has made rapid progress. The United States has the “Industrial Internet”, Germany has the “Industry 4.0”, China also has the “Made in China 2025”, behind which are cloud computing, big data. New technologies such as cloud computing, big data and artificial intelligence could add up to $6 trillion of additional value to China’s manufacturing sector alone, according to a Report by The Boston Consulting Group.

Domestic and foreign giants have stopped in industrial Internet, such as Amazon AWS, Microsoft Azure, domestic three telecom operators, Baidu Cloud, Huawei, Jinshan Cloud, etc., among which Tencent Cloud, Ali Cloud is the most, but also brought in the traditional manufacturing masters, domestic giants have layout on the Internet of things. At the 2018 Cloud-Shenzhen Summit, Alibaba Senior Vice President, Ali Cloud President Hu Xiaoming announced that Alibaba will officially enter IoT. Hu xiaoming said IoT is a new main track for Alibaba Group after e-commerce, finance, logistics and cloud computing.

IOT Technology snooping

As a developer, the author is not an investor or a pioneer. It doesn’t really matter what the details are. What I care about is how to use technology to implement or simulate an IOT server that supports millions of links. It is not rigorous, just for your reference.

For more information on why we chose the middleware shown below or if you are not familiar with MQTT, you can read my two previous articles:

  1. IOT high-performance server implementation path
  2. Netty implements high performance IOT server (Groza) on MQTT protocol

Technical profile

Quick start

Run the test

  1. Git clone github.com/sanshengshu…

  2. cd netty-iot

  3. Run NettyIotApplication

  4. Open the http://localhost:8080/groza/v1/123456/auth, to get the password!

  1. Start Eclipse Paho and fill in your user name and password to connect.

  2. Start another Eclipse Paho and subscribe to any topic, such as Test. Another Eclipse Paho publishes the topic Test. You can receive the message.

  3. Unsubscribe from the topic and publish the message again. I don’t get a message.

With the previous 2 articles and learned the MQTT V3.1.1 protocol, said so much, the hand itch is very.

You build it, You run it!

Project Structure Introduction

Netty-iot Flag School - Authentication Flag School - Service - User Name, Password Authentication Implementation Class ├─ Util - Authentication Tool Class ├─ Common - Public Class ├─ Auth - User Name, Password Authentication Interface ├─ Message -- Protocol Storage Entity and Interface ├─ Session -- Session Storage Entity and Interface ├─ SUBSCRIBE -- Subscription Entity and Interface ├─ config -- Redis Configuration ├─ Protocol -- MQTT Protocol Implementation ├─ Server - MQTT Server ├─ Store - Redis Data Storage ├─ Cache ├─ Message ├─ Session ├─ Subscribe ├─ Web - Web Services ├─ Neo Eliot Application - Service Start classCopy the code

Redis

The installation

To experience Redis, you need to use a Linux or Mac environment, or a Virtual machine for Windows. There are four main ways:

  • Install using Docker.
  • Compiled from Github source code.
  • Install apt-get install(Ubuntu), yum install(RedHat), or brew install(Mac) directly.
  • If you are too lazy to install it, you can use the Web-based version of Web Redis.

Specific operations are as follows:

Docker way

# pull redis image
> docker pull redis
Run the Redis container
> docker run --name myredis -d -p6379:6379 redis
Redis can be operated directly from the command line
> docker exec -it myredis redis-cli...
Copy the code

Github source code compilation method

# Download source code
> git clone- the 2.8 branch - the depth 1 [email protected]: antirez/redis. Git >cd redis
# compiler
> make
> cd src
Daemonize means to run in the background
> ./redis-server --daemonize yes
Run the command line
> ./redis-cli...
Copy the code

Direct installation

# mac
> brew install redis
# ubuntu
> apt-get install redis
# redhat
> yum install redis
Run the client
> redis-cli
Copy the code

use

Spring Boot not only supports the common ORM framework, but also provides a very good package for the common middleware. With the arrival of Spring Boot2.x, the supported components are becoming richer and more mature, among which the support for Redis not only enriches its API, Also replace the underlying Jedis dependency and replace it with Lettuce, you can refer to this article to configure the project. So I use Lettuce as a client to cache messages transmitted by my MQTT protocol.

The following is how Redis works

  • OpsForValue: Corresponds to String (String)
  • OpsForZSet: corresponding to ZSet (ordered set)
  • OpsForHash: corresponding Hash
  • OpsForList: corresponds to List (List)
  • OpsForSet: corresponding Set (Set)
  • OpsForGeo: corresponding to GEO (geographical location)

I mainly use opsForValue,opsForHash and opsForZSet for strings. I recommend StringRedisTemplate.

I will briefly explain the basic operations for opsForValue and opsForHash.

Redis Hash data agency

Redis hashes allow users to store multiple key-value pairs into a single Redis key. Public interface HashOperations<H,HK,HV> HashOperations Provides a series of methods to manipulate hash:

java > template.opsForHash().put("books"."java"."think in java");
redis-cli > hset books java "think in java"  If the command line string contains Spaces, enclose it in quotes
(integer) 1
------
java > template.opsForHash().put("books"."golang"."concurrency in go");
redis-cli > hset books golang "concurrency in go"
(integer) 1
------
java > template.opsForHash().put("books"."python"."python cookbook");
redis-cli > hset books python "python cookbook"
(integer) 1
------
java > template.opsForHash().entries("books")
redis-cli > hgetall books  # entries(), key and value intervals appear
1) "java"
2) "think in java"
3) "golang"
4) "concurrency in go"
5) "python"
6) "python cookbook"
------
java > template.opsForHash().size("books")
redis-cli > hlen books
(integer) 3
------
java > template.opsForHash().get("redisHash"."age")
redi-cli > hget books java
"think in java"
------
java > 
Map<String,Object> testMap = new HashMap();
      testMap.put("java"."effective java");
      testMap.put("python"."learning python");
      testMap.put("golang"."modern golang programming");
template.opsForHash().putAll("books".testMap);
redis-cli > hmset books java "effective java" python "learning python" golang "modern golang programming"  # batch set
OK...

Copy the code

Redis Set data structure

Redis’ Set is an unordered collection of type string. Collection members are unique, which means that no duplicate data can occur in the collection. The collection in Redis is realized by hash table, so the complexity of adding, deleting and searching is O(1).

java > template.opsForSet().add("python"."java"."golang")
redis-cli > sadd books python java golang
(integer) 3
------
java > template.opsForSet().members("books")
redis-cli > smembers books  Note that the order is not the same as the inserted one, because set is unordered
1) "java"
2) "python"
3) "golang"
------
java > template.opsForSet().isMember("books"."java")
redis-cli > sismember books java  Contains (o) = contains(o)
(integer) 1
------
java > template.opsForSet().size("books")
redis-cli > scard books  # get length equal to count()
(integer) 3
------
java > template.opsForSet().pop("books")
redis-cli > spop books  # pop up a
"java".Copy the code

MQTT

MQTT is a lightweight publish/subscribe messaging protocol that was originally created around 1998 by IBM and Arcom (later part of Eurotech). The MQTT 3.1.1 specification has now been standardized by the OASIS Consortium.

Client download

For the MQTT client, I chose Eclipse Paho, a project that provides open source client implementations of MQTT and MQTT-SN messaging protocols for new, existing, and emerging Internet of Things (IoT) applications. The specific download address can be downloaded according to your own operating system.

MQTT Control packets

├─ Connect └ ─ DisConnect └ ─ PingReq └ ─ Heartbeat Request ├─ PubAck └ ─ PubComp └ ─ QoS2, the first step ├─ PubRec └ ─ PubRel └ ─ Subscribe └ ─ UnSubscribeCopy the code

Connect

Let’s implement the client Connect protocol against MQTT 3.1.1.

  1. The server can disconnect the client if the protocol name is incorrect while decoding the message. According to this specification, the server cannot continue processing the CONNECT message.

  2. The server uses the client identifier (ClientId) to identify the client. Each client connected to the server has a unique client identifier (ClientId).

    // The message decoder is abnormalif (msg.decoderResult().isFailure()) {
                Throwable cause = msg.decoderResult().cause();
                if(cause instanceof MqttUnacceptableProtocolVersionException) {/ / does not support the protocol version MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK,false, MqttQoS.AT_MOST_ONCE, false, 0),
                            new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), null);
                    channel.writeAndFlush(connAckMessage);
                    channel.close();
                    return;
                } else if(cause instanceof MqttIdentifierRejectedException) {/ / unqualified clientId MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK,false, MqttQoS.AT_MOST_ONCE, false, 0),
                            new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
                    channel.writeAndFlush(connAckMessage);
                    channel.close();
                    return;
                }
                channel.close();
                return;
            }
    Copy the code
  3. If clientId is empty or null, the client must provide clientId, regardless of whether cleanSession is 1. There is no reference standard protocol implementation

           if (StrUtil.isBlank(msg.payload().clientIdentifier())) {
                MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
                        new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                        new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
                channel.writeAndFlush(connAckMessage);
                channel.close();
                return;
            }
    Copy the code
  4. User name and password authentication: The client must provide the user name and password for connection, regardless of whether the user name and password flags are set to 1. This section does not refer to standard protocol implementation

               String username = msg.payload().userName();
               String password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), CharsetUtil.UTF_8);
               if(! grozaAuthService.checkValid(username,password)) { MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK,false, MqttQoS.AT_MOST_ONCE, false, 0),
                           new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null);
                   channel.writeAndFlush(connAckMessage);
                   channel.close();
                   return;
               }
    Copy the code
  5. If the clientId of this new connection is already stored in the session, the connection to the previous clientId is closed

     if (grozaSessionStoreService.containsKey(msg.payload().clientIdentifier())){
                SessionStore sessionStore = grozaSessionStoreService.get(msg.payload().clientIdentifier());
                Channel previous = sessionStore.getChannel();
                Boolean cleanSession = sessionStore.isCleanSession();
                if (cleanSession){
                    grozaSessionStoreService.remove(msg.payload().clientIdentifier());
                    grozaSubscribeStoreService.removeForClient(msg.payload().clientIdentifier());
                    grozaDupPublishMessageStoreService.removeByClient(msg.payload().clientIdentifier());
                    grozaDupPubRelMessageStoreService.removeByClient(msg.payload().clientIdentifier());
                }
                previous.close();
            }
    Copy the code
  6. Dealing with Testamentary information

    SessionStore sessionStore = new SessionStore(msg.payload().clientIdentifier(), channel, msg.variableHeader().isCleanSession(), null);
            if (msg.variableHeader().isWillFlag()){
                MqttPublishMessage willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
                        new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.valueOf(msg.variableHeader().willQos()),msg.variableHeader().isWillRetain(),0),
                        new MqttPublishVariableHeader(msg.payload().willTopic(),0),
                        Unpooled.buffer().writeBytes(msg.payload().willMessageInBytes())
                );
                sessionStore.setWillMessage(willMessage);
            }
    Copy the code
  7. Process connected heartbeat packets

    if (msg.variableHeader().keepAliveTimeSeconds() > 0){
                if (channel.pipeline().names().contains("idle")){
                    channel.pipeline().remove("idle");
                }
                channel.pipeline().addFirst("idle",new IdleStateHandler(0, 0, math.round (msg.variableHeader().keepAlivetimeseconds () * 1.5f))); }Copy the code
  8. This stores the session message and returns the client connection to store the clientId into the channel’s map

    grozaSessionStoreService.put(msg.payload().clientIdentifier(),sessionStore);
            channel.attr(AttributeKey.valueOf("clientId")).set(msg.payload().clientIdentifier()); Boolean sessionPresent = grozaSessionStoreService.containsKey(msg.payload().clientIdentifier()) && ! msg.variableHeader().isCleanSession(); MqttConnAckMessage okResp = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK,false,MqttQoS.AT_MOST_ONCE,false,0),
                    new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED,sessionPresent),
                    null
            );
            channel.writeAndFlush(okResp);
    Copy the code
  9. If cleanSession is 0, the pending DUP messages of QoS1 and QoS2 stored in the same clientId need to be resold

     if(! msg.variableHeader().isCleanSession()){ List<DupPublishMessageStore> dupPublishMessageStoreList = grozaDupPublishMessageStoreService.get(msg.payload().clientIdentifier()); List<DupPubRelMessageStore> dupPubRelMessageStoreList = grozaDupPubRelMessageStoreService.get(msg.payload().clientIdentifier()); dupPublishMessageStoreList.forEach(dupPublishMessageStore -> { MqttPublishMessage publishMessage = (MqttPublishMessage)MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBLISH,true,MqttQoS.valueOf(dupPublishMessageStore.getMqttQoS()),false,0),
                            new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(),dupPublishMessageStore.getMessageId()),
                            Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes())
                    );
                    channel.writeAndFlush(publishMessage);
                });
                dupPubRelMessageStoreList.forEach(dupPubRelMessageStore -> {
                    MqttMessage pubRelMessage = MqttMessageFactory.newMessage(
                            new MqttFixedHeader(MqttMessageType.PUBREL,true,MqttQoS.AT_MOST_ONCE,false,0),
                            MqttMessageIdVariableHeader.from(dupPubRelMessageStore.getMessageId()),
                            null
                    );
                    channel.writeAndFlush(pubRelMessage);
                });
            }
    Copy the code

    Other MQTT messages are checked against the project and MQTT V3.1.1.

User name and password authentication

/** * @author */ @service public class AuthServiceImpl implements GrozaAuthService {private RSAPrivateKey privateKey; @Override public boolean checkValid(String username, String password) {if (StringUtils.isEmpty(username)){
            return false;
        }
        if (StringUtils.isEmpty(password)){
            return false;
        }
        RSA rsa = new RSA(privateKey,null);
        String value = rsa.encryptBcd(username, KeyType.PrivateKey);
        return value.equals(password) ? true : false;
    }

    @PostConstruct
    public void init() {
        privateKey = IoUtil.readObj(AuthServiceImpl.class.getClassLoader().getResourceAsStream("keystore/auth-private.key")); }}Copy the code

other

This is the end of Netty’s exhaustive code for implementing a high-performance IOT server (Groza).

Original is not easy, if you feel good, I hope to give a recommendation! Your support is the biggest motivation for my writing!

The following will take you through Netty’s implementation of MQTT protocol IOT server.

Copyright Notice:

Author: Mu Shuwei

Blog garden source: www.cnblogs.com/sanshengshu…

github.com/sanshengshu…

Sanshengshui.github. IO /