I haven’t written this article for a long time. First, I will write down a requirement from some time ago, and follow this line to upgrade to the Netty framework. I have been studying RabbitMQ recently, so I will write down the article on RabbitMQ.

start

Requirement: PLC controller will transfer data through TCP protocol, the project needs to record data to the database, but also need to send data to the PLC controller to change the value of the controller.

entity

  1. PLC controller
  2. PLC controller point (used to control corresponding equipment, such as lights – on or off)
  3. The PLC data

The database table

(Only the most core fields are written)

PLC controller:

The field name type describe
id bigint The primary key ID
cloud_name varchar The controller has a unique identifier — object cloud name, for example, C1710479131
version int Controller Version
token varchar Controller token authentication

PLC controller points:

The field name type describe
id bigint The primary key ID
controller_id bigint The controller ID
point_name varchar The unique name of a point
value decimal value
state tinyint Status (not sure what it’s for)

PLC controller data:

The field name type describe
id bigint The primary key ID
controller_cloud_name varchar Control device cloud name
point_name varcahr Call the roll
value deciamal value
state tinyint state
timestamp timestamp The time stamp

The reason for not associating PLC tables with other tables with ids is that there is a need to record only numerical data without switching

The data transmitted by the controller in this project can be divided into two types: BO(Binary Output) and AI(analog input point). The former BO only has a value of 1 or 0 for on or off. The latter AI will have arbitrary floating point numbers, such as temperature or humidity.

plan

I had two plans in mind:

  1. When initializing the project, the allocation thread opens the TCP client to get data — integrated into the project
  2. Standalone deployments use MQ to transfer data

Finally, I choose the first scheme, because the project itself is a single project, because this functional requirement requires additional maintenance of other connections is not cost-effective.

implementation

Since we choose to use the first plan. Let’s start by learning how to initialize a SpringBoot project.

SpringBoot initializes system resources

Approach one — implement the ApplicationRunner interface

Create a xxxxxRunner class that implements the ApplicationRunner interface

Add the @Component annotation and the @order annotation. The smaller the number in the Order annotation, the larger the boot Order

Implement the run method in the interface. In the run method, write the required method

Method two – Implement the CommandLineRunner interface

It is the same as method 1, which implements the interface with the @order annotation. The smaller the Order value, the higher the priority. Then implement the run method. I won’t tell you much here.

Of course, there are many methods of initial loading that are interested in learning about.

Allocate a thread pool to establish SocketServer connections

First we need a thread for SocketServer to listen for connections to the PLC controller. So where do you get threads? Thread pools, of course, and custom thread pools.

Details see Alibaba “Java development manual”!

Once we get the thread pool, we need a Task method to start the controller server.

    /** * Get the method to start the controller server *@return Runnable
     */
    private Runnable getSocketServerCommand(a){
        return () -> {
            ServerSocket serverSocket = null;
            Socket accept = null;
            try{
                // The listening connection length is enabled
                while(true) {if (serverSocket == null || serverSocket.isClosed()){
                        logger.info("{}, starting server, port :{}",Thread.currentThread().getName(),serverPort);
                        serverSocket = new ServerSocket(serverPort);
                    }
                    // Establish a connection
                    accept = serverSocket.accept();
                    synchronized (accept){
                        // Set up a loop to listen for messagesrecvMsg(accept); }}}catch (IOException e) {
                logger.error("{}, controller server error! {}",Thread.currentThread().getName(),e);
            }finally {
                logger.info("{}, closing controller server!",Thread.currentThread().getName());
                try {
                    if(accept ! =null){
                        accept.close();
                    }
                    if(serverSocket ! =null){ serverSocket.close(); }}catch (IOException ioException) {
                    logger.error("{}, error closing server controller! {}", Thread.currentThread().getName(),ioException); }}}; }Copy the code

The loop determines whether SocketServer needs to be restarted and constantly listens for new connections. The lock here is that I don’t know if the socket will reset if a large number of connections come in at the same time (which is unlikely). Read around the accept method source code did not find the core point, or did not understand. The Socket is then retrieved using the recvMsg method.

Allocate a thread pool to Socket connections

In order to ensure that each controller can be connected, the thread pool is used to assign tasks to each socket to establish a long connection.

Similarly, we need a task method to handle the socket.

First, inputStream is obtained through socket to obtain the data of PLC controller.

/** * receive message command *@paramSocket Socket connection */
    private Runnable getRecvMsgCommand(Socket socket) {
        return ()->{
            InputStream myInputStream = null;
            logger.info("{}, connecting {}",Thread.currentThread().getName(),socket.getInetAddress().getHostAddress());
            try {
                myInputStream = socket.getInputStream();
                // Put the data into the loop repeatedly
                while(true) {if (socket.isClosed()){
                        break;
                    }
                    // Get data
                    int size = myInputStream.available();
                    byte[] buffer = new byte[size];
                    myInputStream.read(buffer);
                    String res = new String(buffer,StandardCharsets.UTF_8);
                    // Parse the data
                    if(! StrUtil.isBlankIfStr(res)){// Exclude the first heartbeat
                        if(! NumberUtil.isNumber(res)){// Perform data storage
                            logger.info("{}, receive data :{}",Thread.currentThread().getName(),res);
                        }
                    }
                    Thread.sleep(100); }}catch (IOException | InterruptedException e) {
                logger.error("{},socket connection error! {}",Thread.currentThread().getName(),e);

            }finally {
                logger.info("{}, closing Socket connection!",Thread.currentThread().getName());
                try {
                    if(myInputStream ! =null){
                        myInputStream.close();
                    }
                    if (!socket.isClosed()){
                        socket.close();
                    }
                } catch (IOException ioException) {
                    logger.error({}, failed to close the socket connection! {}",Thread.currentThread().getName(),ioException); }}}; }Copy the code

So we’ve got pretty good data at this point. But there are other problems with the long connection heartbeat. In the PLC controller when idle time exceeds the configured time, will disconnect TCP connection, and will always send a large number of blank data (very strange), so we need is the PLC long connection, avoid idle disconnect.

Try with Resource automatically retrievals Closeable resources, so you don’t need to manually close resources in finally, but it needs resources and implements either Closeable or AutoCloseable interfaces. Those of you who are interested can go and check it out.

Send a heartbeat

The way I originally thought of sending the heartbeat was to calculate the time difference, like this

                myInputStream = socket.getInputStream();
			   long timeA = System.currentTimeMillis();
                while(true) {if (socket.isClosed()){
                        break;
                    }
                    // Send a heartbeat when greater than 60 seconds to maintain a long connection
                    if(System.currentTimeMillis() - timeA > 60 * 1000) {// Get the output stream
                        outInputStream = socket.getOutputStream();
                        // Remember to close the output stream when sending the heartbeat
                    }
                    int size = myInputStream.available();
                    byte[] buffer = new byte[size];
                    myInputStream.read(buffer);
                    String res = new String(buffer,StandardCharsets.UTF_8);
                    // Parse the data
                    if(! StrUtil.isBlankIfStr(res)){// Exclude the first heartbeat
                        if(! NumberUtil.isNumber(res)){// Perform data storage
                            logger.info("{}, receive data :{}",Thread.currentThread().getName(),res);
                        }
                    }
                    Thread.sleep(100);
                }
Copy the code

But it quickly became clear that it wasn’t possible. I found that this loop does not loop over and over, but only executes once when there is a message. I looked through the source code and found the problem.

The answer is in myInputStream.read(buffer). Let’s look at the comment on read

The last sentence of the first paragraph

This method blocks until input data is available, end of file is detected, or an exception is thrown.
Copy the code

So in this loop, the whole body of the loop is blocked by the read() method and only happens once when there is a message.

So I had no choice but to open another thread and send a heartbeat every 60 seconds

/** * get the command to send heartbeat packets */
    public static Runnable getHeartbeatCommand(Socket socket){
        return ()->{
            OutputStream myOutputStream = null;
            try {
                myOutputStream = socket.getOutputStream();
                logger.info("{}, start sending heartbeat!",Thread.currentThread().getName());
                while (true) {if (socket.isClosed()){
                        break;
                    }
                    myOutputStream.write("[]\n".getBytes(StandardCharsets.UTF_8));
                    myOutputStream.flush();
                    Thread.sleep(60 * 1000); }}catch (IOException | InterruptedException e) {
                logger.error("{}, error sending heartbeat! {}",Thread.currentThread().getName(),e);
                e.printStackTrace();
            }finally {
                logger.info("{}, shutting down heartbeat!",Thread.currentThread().getName());
                try {
                    if(myOutputStream ! =null){ myOutputStream.close(); }}catch (IOException e) {
                    logger.error("{}, failed to shut down heartbeat! {}",Thread.currentThread().getName(),e); }}}; }Copy the code

So far we have successfully received the data and completed the long connection of PLC.

What we need to do next is to send the specified data to the controller to control the PLC corresponding equipment.

To send data

Summary: In the previous code we allocated a thread to socketServer during initialization, and then we allocated a thread to each connection to receive data and a thread to send heartbeat.

So far, each of our threads corresponds to a connection to a controller. Sending data is required to obtain the corresponding connection, so how to obtain the connection to the controller?

The PLC controller will send a data in each new connection, and this data, we can change.

We can use this unique object cloud name to store socket connections, which means our code will look like this:

                myInputStream = socket.getInputStream();
                while(true) {if (socket.isClosed()){
                        break;
                    }
                    int size = myInputStream.available();
                    byte[] buffer = new byte[size];
                    myInputStream.read(buffer);
                    String res = new String(buffer,StandardCharsets.UTF_8);
                    Thread.sleep(100);
                    // Parse the data
                    if(! StrUtil.isBlankIfStr(res)){// If it is the first time to receive a unique object cloud name used to store the Socket
                            if(! res.startsWith(IotControllerUtils.BRACKET_START)){ logger.info("{}, controllerName:{}",Thread.currentThread().getName(),res);
                                // Store sockets in a map
                                IotControllerUtils.putSocketMapValue(res, socket);
                                continue;
                            }
                            // Perform data storage
                            logger.info("{}, receive data :{}",Thread.currentThread().getName(),res); }}Copy the code

We store connected sockets in a map with the unique object cloud name as our key. So we can get the connection and send the message.

Attention! This map must be shared by all connected threads, which means it is in a thread-unsafe environment. However, even though all threads are working on the same map, the values they put in are unique to the key. Does this make thread-safe?

We only need to get the socket outputStream to send the data we need (data from the CONFIGURATION of the PLC controller database).

conclusion

  1. The data can be initialized using either the ApplicationRunner interface or the CommandLineRunner interface
  2. Socket connections allocate thread pools, and the inputStream.read() method blocks
  3. To retrieve connection send data, remember to save the socket connection in a unique way

Since netty is basically what I use to connect to the server, I think I’ll continue to learn about Netty. The next article should be RabbitMQ.

Thank you for your attention. At present, the senior engineers of the company are all programmers who have recently switched to Java, so they are basically in a state of self-study with no one to teach them. They often read Nuggets and Github, and usually have a plan to learn, and try to stick to leetcode one question a day.

So I want to change my job recently, class 21, base Shenzhen, there is no internal push wow!