[Author introduction] : Da Luo, Haige Zhizao architect, mainly engaged in cloud native, big data system development, once participated in the national demonstration level industrial Internet system construction, etc.

Do industrial Internet or Internet system, the most basic requirement is to show the data curve, power curve, for example, similar to the stock of time-sharing figure, usually we will take every minute – reported last time power value of the equipment, the power of this one minute if one minutes, equipment not report, take the power value of one minute, and so on. Examples are as follows:

The obtained minute curve is:

Usually we will write the data reported by the device to Apache Kafka first. In an offline computing scenario, you might consider writing data to Hive, then periodically reading Hive using Spark SQL, and then writing the results to HBase. In a real-time computing scenario, Apache Flink is used to consume Kafka data and write the results to HBase, in which case data out-of-order and delayed delivery computations need to be considered.

Moreover, the architecture based on traditional big data Hadoop needs to build ZooKeeper and HDFS, followed by Hive and HBase, and the whole system has high maintenance cost. In addition, HBase stores temporal data based on keys and values, which wastes a lot of space on the same key-value data design architecture.

The above is one of the pain points in the calculation scenario of Internet of Things device attribute curve. In addition, the characteristics of data growth, data verification and data disaster tolerance need to be considered.

In order to provide customers with integrated solutions based on 3D printing technology, the author’s company naturally needs to keep track of the operating state of the equipment and store the operating data of the equipment. At this time we found the open source of Internet of things TDengine big data platform (https://github.com/taosdata/TDengine).

If the data is complete, you can easily solve the above problem with a single SQL sentence:

select last(val) a from super_table_xx where ts >= '2021-06-07 18:10:00' and ts <= '2021-06-07 18:20:00' interval(60s) fill(value, 0);

Why can TDEngine execute so efficiently with similar SQL?

This is due to its super tables and sub-tables. For the data of a single device, TDEngine is designed to store the data in a sequential manner according to time. As a matter of fact, when the business system uses the data of the Internet of Things, whether it is real-time query or offline analysis, it has the characteristic of reading the data of a continuous period of time of a single device.

Assuming that we want to store the temperature and humidity of the device, we can design the super table as follows:

create stable if not exists s_device (ts TIMESTAMP,
  temperature double,
  humidity double
) TAGS (device_sn BINARY(1000));

In practice, for example, insert data for devices’ d1 ‘and’ d2 ‘as follows:

insert into s_device_d1 (ts, temperature, humidity) USING s_device (device_sn) TAGS ('d1') values (1623157875000, 35.34, 80.24);
insert into s_device_d2 (ts, temperature, humidity) USING s_device (device_sn) TAGS ('d2') values (1623157891000, 29.63, 79.48);

SQL > search device ‘d1’ for a time period

select * from s_device where device_sn = 'd1' and ts > 1623157871000 and ts < 1623157890000 ;

Assume that the average temperature curve for the past 7 days is calculated at 1 point per hour:

select avg(temperature) temperature from s_device where  device_sn = #{deviceSn} and ts >= #{startTime} and ts < #{endTime}  interval(1h)

Tdengine also provides a number of aggregate functions, such as the last and fill functions above for calculating a 1-minute continuous curve, and other commonly used sum and Max functions.

In the process of combining with the application, we choose the flexible and easy to use ORM framework like MyBatis. For example, for the data table ‘s_device’ above, we define entity first:

import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.sql.Timestamp;
/**
 * @author: DaLuo
 * @date: 2021/06/25
 * @description:
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@TableName(value = "s_device")
public class TestSuperDeviceEntity {
    private Timestamp ts;
    private Float temperature;
    private Float humidity;
    @TableField(value = "device_sn")
    private String device_sn ;
}

Redefine mapper:

import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.hg.device.kafka.tdengine.entity.TestSuperDeviceEntity; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import java.sql.Timestamp; import java.util.List; /** * @author: DaLuo * @date: 2021/06/25 * @description: */ @Mapper public interface TestSuperDeviceMapper extends BaseMapper<TestSuperDeviceEntity> {/** ** @Param Entity  * @return */ @Insert({ "INSERT INTO 's_device_${entity.deviceSn}' (ts ,temperature, humidity ) ", "USING s_device (device_sn) TAGS (#{entity.deviceSn}) ", "VALUES (#{entity.ts}, #{entity.temperature}, #{entity.humidity})" }) int insertOne(@Param(value = "entity") TestSuperDeviceEntity entity); /** * entities * @param entities * @return */ @insert ({"<script>", "Insert INTO ", ** * entities * @param entities * @return */ @insert (); "<foreach collection='list' item='item' separator=' '>", "'s_device_${item.deviceSn}' (ts ,temperature, humidity) USING s_device (device_sn) TAGS (#{item.deviceSn}) ", "VALUES (#{item.ts}, #{item.temperature}, #{item.humidity})", "</foreach>", "</script>" }) int batchInsert(@Param("list") List<TestSuperDeviceEntity> entities); /** * select average temperature from past time range, 1 data point per hour * @Param Devicesn * @Param StartTime Inclusive * @Param EndTime Exclusive * @Return */ @Select(" Select avg(temperature) temperature from s_device where device_sn = #{deviceSn} and ts >= #{startTime} and ts < #{endTime} interval(1h)") List<TempSevenDaysTemperature> selectSevenDaysTemperature( @Param(value = "deviceSn") String deviceSn, @Param(value = "startTime") long startTime, @Param(value = "endTime") long endTime); @AllArgsConstructor @NoArgsConstructor @Data @Builder class TempSevenDaysTemperature { private Timestamp ts; private float temperature; }}

Tdengine has a clever design that does not require child tables to be created in advance, so we can easily create a child table with the ‘tag’ tag as part of the name of the child table.

Note: Considering the internationalization nature across time zones, all of our time store query interactions use a timestamp rather than “YYYY-MM-DD HH :MM: SS” format, because the data store refers to the application time zone, the connection string time zone, the TDEngine service time zone, Using the “YYYY-MM-DD HH :MM: SS” format can easily lead to inaccurate time storage, while using the timestamp, long integer data format can perfectly avoid this problem.

There are currently two ways for Java to use TDEngine JDBC-Driver: JDBC-JNI and JDBC-RESTful, with the former more advantageous in terms of write performance. However, you need to install the Tdengine client driver on the server where the application is running.

Our application uses Kubernetes cluster, and the program runs in Docker. For this reason, we made an image suitable for our application to run. For example, the Dockerfile of the base image is shown as follows:

From OpenJDK: 8-jdk-oraclinux7 COPY TDengine-client -2.0.16.0-linux-x64.tar.gz/RUN tar-xzvf / tdengine-client-2.0.16.0-linux-x64.tar.gz &&cd/tdengine-client-2.0.16.0 &&pwd &&ls &&. /install_client.sh

Build:

Docker build-t tdenge-openJDK-8-runtime: 2.0.16.0-f Dockerfile

Referrer image Dockerfile as shown:

The FROM tdengine - its - 8 - the runtime: 2.0.16.0 ENV JAVA_OPTS = "- Duser. Timezone = Asia/Shanghai -Djava.security.egd=file:/dev/./urandom" COPY app.jar /app.jar ENTRYPOINT ["java","-jar","/app.jar"]

This allows our application to be scheduled on any K8S node.

In addition, our program involves automatic task scheduling, which requires frequent MQTT data interaction with the lower machine of the device. For example, the cloud sends instruction 1000- “Start Task A”, and the lower machine responds instruction 2000- “Receive Task A”. The instruction is understood as the device, and the sequence number and content of the instruction are understood as its properties. Naturally, this kind of data is also very suitable for storing in the TDengine temporal database:

* * * * * * * * * * * * * * * * * * * * * * * * * * * 1 row * * * * * * * * * * * * * * * * * * * * * * * * * * * ts: the 2021-06-23 16:10:30. 000 MSG: {"task_id":"7b40ed4edc1149f1837179c77d8c3c1f","action":"start"} device_sn: deviceA kind: 1000 * * * * * * * * * * * * * * * * * * * * * * * * * * * 2. Row * * * * * * * * * * * * * * * * * * * * * * * * * * * ts: the 2021-06-23 16:10:31. 000 MSG: {"task_id":"7b40ed4edc1149f1837179c77d8c3c1f","action":"received"} device_sn: deviceA kind: 2000

In the process of docking with devices, our cloud frequently needs to investigate whether messages are sent or not, so it is urgent to save instructions, so as to create a new thread in the application program, specially subscribe to the instruction set messages, and write them in batches to the TDEngine database.

Finally, the Tdengine has a super table, log. DN, which holds memory, CPU, and other usage information, so we can use Grafana to display this data and provide reliable operational data reference for monitoring!