IoT Push system

What is the IoT

The Internet of Things is short for IoT, which means The Internet of Things. For specific knowledge, please refer to: What is IoT? What is AIot?

Design of IoT push system

For example, for some smart devices, you need to send a command to the device to download or play music through APP or small program in wechat, so what do you need to do to complete the above task?First, a push server is needed. This server is mainly responsible for message distribution and does not process business messages. The device will connect to the push server, and the APP will send the instructions to the push server, and then the push server will send the instructions to the corresponding device.

However, when more and more people are buying equipment and push server can withstand the greater pressure, this time you need to do to push the server cluster, a no, ten things, so there is a problem, is to push server increases, equipment, how to find the corresponding server and then establish a connection and the server, the registry can solve this problem, Each server is registered with the registry, and the device requests the registry to get the address of the push server, and then establishes a connection with the server.

There will also be a corresponding Redis cluster to record the subscribed topics and device information. When the APP sends instructions to the device, it actually sends a string of data. The corresponding push API will be provided, and some interfaces will be provided to send the data through the interface. The push API is not directly connected to the push server, and there will be MQ cluster in the middle, which is mainly used for message storage. The push API pushes messages to MQ, and the push server subscribes to messages from MQ. The above is the design of a simple IoT push system.

Let’s see the structure diagram below:Note: The connection between the device and the registry is a short one, while the connection between the device and the push server is a long one

Heartbeat detection Mechanism

Brief Introduction to heartbeat Detection

Heartbeat detection is to determine whether the peer is still alive. Generally, simple packets are sent periodically. If no response is received from the peer within a specified period of time, the peer is determined to be dead

Netty provides the IdleStateHandler class to implement heartbeat, which is simple to use as follows:

pipeline.addFirst(new IdleStateHandler(0.0.1, TimeUnit.SECONDS));
Copy the code

Here’s the constructor for IdleStateHandler:

public IdleStateHandler(
            long readerIdleTime, long writerIdleTime, long allIdleTime,
            TimeUnit unit) {
        this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
Copy the code

The four parameters are described as follows: 1: readerIdleTime, read timeout period 2: writerIdleTime, write timeout period 3: allIdleTime, all event timeout period 4: TimeUnit Unit, timeout period unit

Example code for the heartbeat detection mechanism

Simple example: Server:

static final int BEGIN_PORT = 8088;
    static final int N_PORT = 100;

    public static void main(String[] args) {
        new PingServer().start(BEGIN_PORT, N_PORT);
    }

    public void start(int beginPort, int nPort) {
        System.out.println("Start the service....");

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.handler(new LoggingHandler(LogLevel.INFO));
        bootstrap.group(bossGroup, workerGroup);
        bootstrap.channel(NioServerSocketChannel.class);
        bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);

        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addFirst(new IdleStateHandler(0.0.1, TimeUnit.SECONDS));
                pipeline.addLast(new PingHandler());
                // Each connection has a ConnectionCountHandler that increments the number of connections
                pipeline.addLast(newConnectionCountHandler()); }}); bootstrap.bind(beginPort).addListener((ChannelFutureListener) future -> { System.out.println("Port binding succeeded:" + beginPort);
        });
        System.out.println("Service started!");
}
Copy the code
public class PingHandler extends SimpleUserEventChannelHandler<IdleStateEvent> {
    private static final ByteBuf PING_BUF = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer("ping".getBytes()));

    private int count;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] data = new byte[buf.readableBytes()];
        buf.readBytes(data);
        String str = new String(data);
        if ("pong".equals(str)) {
            System.out.println(ctx + "--" + str);
            count--;
        }
        ctx.fireChannelRead(msg);
    }

    @Override
    protected void eventReceived(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        if (evt.state() == ALL_IDLE) {
            if (count >= 3) {
                System.out.println("Client connection unresponsive detected, disconnected:" + ctx.channel());
                ctx.close();
                return;
            }

            count++;
            System.out.println(ctx.channel() + " ---- ping"); ctx.writeAndFlush(PING_BUF.duplicate()); } ctx.fireUserEventTriggered(evt); }}Copy the code

Client:

// IP address of the server
 private static final String SERVER_HOST = "localhost";

 static final int BEGIN_PORT = 8088;
 static final int N_PORT = 100;

 public static void main(String[] args) {
     new PoneClient().start(BEGIN_PORT, N_PORT);
 }

 public void start(final int beginPort, int nPort) {
     System.out.println("Client start....");
     EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
     final Bootstrap bootstrap = new Bootstrap();
     bootstrap.group(eventLoopGroup);
     bootstrap.channel(NioSocketChannel.class);
     bootstrap.option(ChannelOption.SO_REUSEADDR, true);
     bootstrap.handler(new ChannelInitializer<SocketChannel>() {
         @Override
         protected void initChannel(SocketChannel ch) {
             ch.pipeline().addLast(newPongHandler()); }});int index = 0;
     int port;

     String serverHost = System.getProperty("server.host", SERVER_HOST);
     ChannelFuture channelFuture = bootstrap.connect(serverHost, beginPort);
     channelFuture.addListener((ChannelFutureListener) future -> {
         if(! future.isSuccess()) { System.out.println("Connection failed, exit!");
             System.exit(0); }});try {
         channelFuture.get();
     } catch (ExecutionException e) {
         e.printStackTrace();
     } catch(InterruptedException e) { e.printStackTrace(); }}Copy the code
public class PongHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private static final ByteBuf PONG_BUF = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer("pong".getBytes()));

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] data = new byte[buf.readableBytes()];
        buf.readBytes(data);
        String str = new String(data);
        if ("ping".equals(str)) { ctx.writeAndFlush(PONG_BUF.duplicate()); }}}Copy the code

Server output result:

Million long connection optimization

Connection optimization code example

Server:

    static final int BEGIN_PORT = 11000;
    static final int N_PORT = 100;

    public static void main(String[] args) {
        new Server().start(BEGIN_PORT, N_PORT);
    }

    public void start(int beginPort, int nPort) {
        System.out.println("Start the service....");

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup);
        bootstrap.channel(NioServerSocketChannel.class);
        bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);

        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                // Each connection has a ConnectionCountHandler that increments the number of connections
                pipeline.addLast(newConnectionCountHandler()); }});// Enable 100 ports from 10000 to 100099
        for (int i = 0; i < nPort; i++) {
            int port = beginPort + i;
            bootstrap.bind(port).addListener((ChannelFutureListener) future -> {
                System.out.println("Port binding succeeded:" + port);
            });
        }
        System.out.println("Service started!");
    }
Copy the code

Client:

// IP address of the server
    private static final String SERVER_HOST = "192.168.231.129";

    static final int BEGIN_PORT = 11000;
    static final int N_PORT = 100;

    public static void main(String[] args) {
        new Client().start(BEGIN_PORT, N_PORT);
    }

    public void start(final int beginPort, int nPort) {
        System.out.println("Client start....");
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        final Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.SO_REUSEADDR, true);

        int index = 0;
        int port;

        String serverHost = System.getProperty("server.host", SERVER_HOST);
        // Start with port 10000 and connect in ascending order
        while(! Thread.interrupted()) { port = beginPort + index;try {
                ChannelFuture channelFuture = bootstrap.connect(serverHost, port);
                channelFuture.addListener((ChannelFutureListener) future -> {
                    if(! future.isSuccess()) { System.out.println("Connection failed, exit!");
                        System.exit(0); }}); channelFuture.get(); }catch (Exception e) {
            }

            if (++index == nPort) {
                index = 0; }}}Copy the code

ConnectionCountHandler class:

public class ConnectionCountHandler extends ChannelInboundHandlerAdapter {

    // This is used to count the number of connections and output to the console every two seconds
    private static final AtomicInteger nConnection = new AtomicInteger();

    static {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            System.out.println("Connection number:" + nConnection.get());
        }, 0.2, TimeUnit.SECONDS);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        nConnection.incrementAndGet();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) { nConnection.decrementAndGet(); }}Copy the code

The above code will be packaged as a JAR and run on Linux. For the above optimization, the application side will not be done for the time being, but the operating system side will be optimized to support millions of connections.

TCP connection quadruple

TCP connection quadruple: server IP+ server POST+ client IP+ client POST

Ports typically range from 1 to 65535:

Configuration optimization

Now install two Linux systems on the virtual machine and configure them as follows:

address CPU memory JDK role
192.168.15.130 VM – 4 cores 8G 1.8 The client
192.168.15.128 VM – 4 cores 8G 1.8 The service side

Starting the server: Java – Xmx4g – Xms4g – cp network – study – 1.0 – the SNAPSHOT – jar – with – dependencies. Jar com.dongnaoedu.network.net ty. M letters. The Server > out.log 2>&1 & Start the client: Java-xmx4g-xms4g-dserver. host= 192.168.15.128-cp network-study-1.0-snapshot-jar-with-dependencies com.dongnaoedu.network.netty.million.Client

After the server is started, you can run the tail -f command to view logs in out.log.After the client starts, if the following error is reported, you need to change the maximum file handle of the system and the maximum file handle of the process:

Caused by: java.io.IOException: Too many open files
        at sun.nio.ch.FileDispatcherImpl.init(Native Method)
        at sun.nio.ch.FileDispatcherImpl.<clinit>(FileDispatcherImpl.java:35)...8 more
Copy the code

Optimizing the maximum number of file handles: Check whether the maximum number of file handles meets the requirements. If the maximum number of file handles does not meet the requirements, run the vim /etc/sysctl.conf command to insert the following configurations:

fs.file-max = 1000000
Copy the code
  1. Set the maximum number of file handles that can be opened by a single processulimit -aCheck whether the current Settings meet the requirements:
[root@test-server2 download]# ulimit -a | grep "open files"
open files                      (-n) 1024
Copy the code

When the number of concurrent Tcp connections exceeds the upper limit, the message “Too many Open Files” is displayed, and all new clients fail to access. Through the vim/etc/security/limits. Conf modify configuration parameters:

* soft nofile 1000000
* hard nofile 1000000
Copy the code

Logout takes effect after the configuration parameters are modified.

  • If the program is interrupted, or an exception is reported
java.io.IOException: There is no space at sun equipment. Nio. Ch. EPollArrayWrapper. EpollCtl ats (Native Method) sun.nio.ch.EPollArrayWrapper.updateRegistrations(EPollArrayWrapper.java:299)
	at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:268)
	at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
	at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	at sun.nio.ch.SelectorImpl.selectNow(SelectorImpl.java:105)
	at io.netty.channel.nio.SelectedSelectionKeySetSelector.selectNow(SelectedSelectionKeySetSelector.java:56)
	at io.netty.channel.nio.NioEventLoop.selectNow(NioEventLoop.java:750)
	at io.netty.channel.nio.NioEventLoop$1.get(NioEventLoop.java:71)
	at io.netty.channel.DefaultSelectStrategy.calculateStrategy(DefaultSelectStrategy.java:30)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:426)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

Copy the code
  • In this case, you can view the operating system logsmore /var/log/messages, or when the program startstail -f /var/log/messagesMonitor logs. If the following information is displayed in the log, TCP/IP parameters need to be optimized
Jun  4 16:55:01 localserver kernel: TCP: too many orphaned sockets
Jun  4 16:55:01 localserver kernel: TCP: too many orphaned sockets
Jun  4 16:55:01 localserver kernel: TCP: too many orphaned sockets
Jun  4 16:55:01 localserver kernel: TCP: too many orphaned sockets
Jun  4 16:55:01 localserver kernel: TCP: too many orphaned sockets
Jun  4 16:55:01 localserver kernel: TCP: too many orphaned sockets
Jun  4 16:55:01 localserver kernel: TCP: too many orphaned sockets
Copy the code

== Optimizes TCP/IP parameters: ==

  • View the limit on the client port range
cat /proc/sys/net/ipv4/ip_local_port_range
Copy the code
  • Run vim /etc/sysctl.conf to modify network parameters

  • The client changed the limit of the port range

net.ipv4.ip_local_port_range = 1024 65535
Copy the code
  • Tuning TCP parameters
net.ipv4.tcp_mem = 786432 2097152 3145728
net.ipv4.tcp_wmem = 4096 4096 16777216
net.ipv4.tcp_rmem = 4096 4096 16777216
net.ipv4.tcp_keepalive_time = 1800
net.ipv4.tcp_keepalive_intvl = 20
net.ipv4.tcp_keepalive_probes = 5
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 1
net.ipv4.tcp_fin_timeout = 30
Copy the code

== Parameter description :==

Net.ipv4. tcp_mem: specifies the memory allocated for the TCP connection. The unit is page (a page is usually 4KB, which can be viewed using the getconf PAGESIZE command). The three values are minimum, default, and maximum. For example, if the maximum is 3145728 in the above configuration, the maximum memory allocated to TCP is 31457284/1024/1024 = 12GB. A TCP connection accounts for approximately 7.5KB, which can be roughly calculated as millions of connections ≈7.51000000/4=1875000 3145728 is sufficient to meet the test requirements.

Net.ipv4. tcp_wMEm: Write buffer memory size allocated for each TCP connection, in bytes. The three values are minimum, default, and maximum.

Net.ipv4. tcp_rmem: Read buffer memory size allocated for each TCP connection, in bytes. The three values are minimum, default, and maximum.

Net.ipv4. tcp_keepalive_time: indicates the interval between the last packet sending and the first Keep Alive probe sending, which is used to check whether the TCP connection is valid.

Net.ipv4. tcp_keepALIve_intvl: interval for sending probe messages when no response is obtained.

Net.ipv4. tcp_KeepALIve_probes: Determines the number of probe messages that are continuously sent when a TCP connection fails and determines that the connection fails when the number reaches.

Net.ipv4. tcp_TW_reuse: Reuse whether to allow the TIME_WAIT Socket to be used for a new TCP connection. The default value is 0, which indicates closed.

Net.ipv4. tcp_tw_recycle: Indicates whether to enable the TIME_WAIT Socket fast recycle function. The default value is 0, indicating that the function is disabled.

Net.ipv4. tcp_FIN_TIMEOUT: The time that the socket remains in the FIN_WAIT_2 state when it is closed. The default value is 60.