1. An overview of the

Internet of Things devices (such as machine tools, boilers, elevators, water meters, gas meters, etc.) generate massive equipment state data and business message data all the time, and the detection of abnormal data is often involved in the collection, calculation and analysis of these data.

DolphinDB is a high-performance distributed Time Series database with a built-in streaming data framework that processes and analyzes iot data in real time and computatively analyzes historical data to help users make use of it. DolphinDB’s built-in DolphinDB stream data processing framework supports publishing, subscribing, preprocessing, real-time memory calculation, and rolling window calculation of complex metrics. It is an efficient and easy to use stream data processing framework. For details, see the DolphinDB Stream Data tutorial.

DolphinDB provides DolphinDB engine functions based on the streaming data framework. When you specify an exception indicator, the engine detects abnormal data in real time.

2. Anomaly detection engine framework

DolphinDB’s exception detection engine is built on a publish-subscribe model of streaming data. Case, through createAnomalyDetectionEngine function creates the anomaly detection engine, and through the subscribeTable function subscription streaming data, each time there is a new data flow will trigger append according to specified rules. {engine} to continuously input stream data into an exception detection engine. The exception detection engine checks in real time whether the data meets the user-defined alarm indicator temp>65. If abnormal data is found, it outputs it to the table outputTable.

share streamTable(1000:0, `time`device`temp, [TIMESTAMP, SYMBOL, DOUBLE]) as sensor share streamTable(1000:0, `time`device`anomalyType`anomalyString, [TIMESTAMP, SYMBOL, INT, SYMBOL]) as outputTable engine = createAnomalyDetectionEngine("engine1", <[temp > 65]>, sensor, outputTable, `time, `device, 10, 1) subscribeTable(, "sensor", "sensorAnomalyDetection", 0, append! {engine}, true)Copy the code

Here is a brief introduction to some of the concepts involved in the exception handling engine:

  • DolphinDB: DolphinDB is a specific table object used for publishing streaming data. Other nodes or applications can subscribe and consume streaming data through the subscribeTable function.
  • Exception handling engine data source: A channel that provides “raw material” to the exception handling engine. CreateAnomalyDetectionEngine () function returns an abstract form, write data to the abstract table, means that the data into the exception handling engine to calculate.
  • Exception metrics: Provides a set of Boolean expressions in metacode format for processing stream data. Aggregate functions can be included to support complex scenarios.
  • Data window: The length of stream data window intercepted during each calculation. The data window is only meaningful if the aggregate function is included in the metric.
  • Output table: The first column of the output table of the exception detection engine must be of the time type, which is used to store the timestamp of the exception detected. If a grouping column is specified, the second column is the grouping column, followed by the two columns of type int and string or symbol. It is used to record the type of exception (the expression of the exception indicator is subscript in metrics) and the content of the exception.

3. Abnormal indicators

All metrics in the anomaly detection engine require Boolean values to be returned. It’s usually a function or an expression. When the index contains an aggregation function, the window length and calculation interval must be specified, and the anomaly detection engine calculates the index in a moving window of fixed length every once in a while. Abnormal indicators generally fall into the following three types:

  • Contains only column names or non-aggregate functions, such as qTY > 10, lt(qty, prev(qty)). For such indicators, the anomaly detection engine will calculate each received data, judge whether it meets the indicators and decide whether to output.
  • All the column names that appear are in the arguments of the aggregate function, for example, AVG (qTY-price) > 10, percentile(qty, 90) < 100, sum(qty) > prev(sum(qty)). For such indicators, the exception detection engine only aggregates the data when the window moves, similar to the Time Series Aggregator.
  • The column names that appear are both parameters of the aggregation function and parameters that are not, such as AVG (qTY) > qTY, LE (Med (qty), price). For such metrics, the exception detection engine evaluates the aggregate column as the window moves and each piece of data as it arrives, with the return value of the aggregate function using the calculated value of the most recent window.

4. Data window

When an aggregation function is included in the exception indicator, the user must specify the data window. Streaming data aggregation is performed at regular intervals in moving Windows of fixed length. The window length is set by the parameter windowSize. The calculation interval is set by parameter step.

In the case of multiple groups of data, if each group constructs the boundary of data window according to the time when the first data enters the system, it is generally impossible to compare the calculation results of each group in the same data window. Considering this, the system determines the normalized scale alignmentSize of an integer according to the parameter step value to normalize the boundary value of the first data window in each group.

(1) When the data time type is MONTH, January of the year corresponding to the first data is used as the upper boundary of the window.

(2) When the time type of the data is DATE, the boundary value of the first data window is not normalized.

(2) When the data time precision is seconds or minutes, such as MINUTE, DATETIME or SECOND, the value rule of alignmentSize is shown in the following table:

step     alignmentSize
0~2      2
3~5      5
6~10     10
11~15    15
16~20    20
21~30    30
31~60    60
Copy the code

(2) When the data TIME precision is millisecond, such as TIMESTAMP or TIME, the value rule of alignmentSize is shown in the following table:

step alignmentSize 0~2 2 3~5 5 6~10 10 11~20 20 21~25 25 26~50 50 51~100 100 101~200 200 201~250 250 251~500 500 501 ~ 1000, 1000Copy the code

Assume that the minimum precision of the first data time is x, then the minimum precision of the left edge of the first data window is normalized to X /alignmentSize\*alignmentSize, where/stands for integer after division. For example, if the time of the first data is 2018.10.08T01:01:01.365, x=365. If step=100, according to the above table, alignmentSize=100, it can be concluded that the minimum precision of the left boundary of the first normalized data window is 365\100*100=300. Therefore, the first data window range after normalization is 2018.10.08T01:01:01.300 to 2018.10.08T01:01:01.400.

5. Example

5.1 Application Scenarios

Now simulate the sensor device to collect the temperature. Assume that the window length is 4ms, move the window every 2ms, collect the temperature every 1ms, and specify the following abnormal indicators:

  • The temperature of a single collection exceeds 65;
  • The temperature of a single acquisition exceeds 75% of the value in the previous window;
  • The relative error between the average temperature in the window and the average temperature in the previous window is greater than 1%.

5.2 System Design

The collected data is stored in the stream data table. The anomaly detection engine obtains real-time data by subscribing to the stream data table and performs anomaly detection. The data that meets the anomaly indicator is output to another table.

5.3 Implementation Procedure

(1) Define the flow data table sensor to store the collected data:

share streamTable(1000:0, `time`temp, [TIMESTAMP, DOUBLE]) as sensor
Copy the code

(2) Define an exception detection engine and an outputTable, which is also a stream data table:

share streamTable(1000:0, `time`anomalyType`anomalyString, [TIMESTAMP, INT, SYMBOL]) as outputTable
engine = createAnomalyDetectionEngine("engine1", <[temp > 65, temp > percentile(temp, 75), abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01]>, sensor, outputTable, `time, , 6, 3)
Copy the code

(3) Abnormal detection engine engine subscription stream data table sensor:

subscribeTable(, "sensor", "sensorAnomalyDetection", 0, append! {engine}, true)Copy the code

(4) Write 10 times of data into the sensor of the flow data table to simulate the collection temperature:

Timev = 2018.10.08T01:01:01.001 + 1.. 10 tempv = 59 66 57 60 63 51 53 52 56 55 insert into sensor values(timev, tempv)Copy the code

View the content of the flow data table sensor:

Time Temp 2018.10.08T01:01:01.002 59 2018.10.08T01:01:01.003 66 2018.10.08T01:01:01.004 57 2018.10.08T01:01:01.005 60 2018.10.08T01:01:01.006 63 2018.10.08T01:01:01.007 51 2018.10.08T01:01:01.008 53 2018.10.08T01:01:01.009 52 56 2018.10.08 T01 2018.10.08 T01:01:01. 010:01:01. 011 to 55Copy the code

Check the result table outputTable:

Time anomalyType anomalyString 2018.10.08T01:01:01.003 0 Temp > 65 2018.10.08T01:01:01.003 1 Temp > Percentile (Temp, 01) 2018.10.08T01:01:01.005 1 Temp > Percentile (temp, 2018.10.08T01:01:01.006 2 ABS ((avG (temp) -prev (avg(temp))/AVg (temp)) > 0.01 2018.10.08T01:01:01.006 1 temp > Percentile (Temp, 75) 2018.10.08T01:01:01.009 2 ABS ((AVG (Temp) -Prev (AVG (Temp))/AVG (temp)) > 0.01Copy the code

The following details the calculation process of the anomaly detection engine. For ease of reading, the same part 2018.10.08T01:01:01 is omitted from the description of the time, and only the millisecond part is listed.

(1) Index temp > 65 only contains columns temp that are not function parameters, so it will be calculated when each data arrives. In the simulated data, only the temperature at 003 meets the abnormal detection index.

In percentile(temp, 75), the temp column is used as the parameter of the aggregate function percentile and also appears separately. Therefore, when each data arrives, the Temp in the column is compared with the percentile calculated in the previous window (Temp, 75). The first window is aligned based on the time 002 of the first row of data, and the initial boundary of the window after alignment is 000. The first window is from 000 to 002, and contains only 002 records. The result of calculating percentile(temp, 75) is 59. The conditions are 003 and 005. The second window is 002 through 005, and the percentile(Temp, 75) computed is 60. Data 006 through 008 are compared to this value, and 006 meets the criteria. The third window is from 003 to 008, and the computed percentile(Temp, 75) results in 63, with data 009 to 011 compared to this value, where there are no rows that satisfy the condition. The last piece of data, 011, has not yet triggered a new window calculation.

(3) Abs ((AVG (temp) -prev (AVG (temp))/AVG (temp)) > 0.01, temp only appears as the parameter of aggregation function AVG, so it will only be checked in each window calculation. Similar to the analysis of the last indicator, avG (Temp) calculated in the first three Windows are 59, 60.5, 58.33, respectively. If abs((AVG (Temp) -prev (AVG (Temp))/AVG (Temp)) > 0.01 is calculated at time 006 and 009 in the second window and the third window.

5.4 Monitoring the status of the anomaly detection engine

getAggregatorStat().AnomalDetectionAggregator name user status lastErrMsg numGroups numRows numMetrics metrics ------- ----- ------ ---------- --------- ------- ---------- -------------------- engine1 guest OK 0 10 3 temp > 65, Temp > Percentile (Temp, 75), ABS ((AVG (Temp) -Prev (AVG (Temp))/AVG (Temp)) > 0.01Copy the code

5.5 Deleting an Exception Detection Engine

removeAggregator("engine1")
Copy the code

6. CreateAnomalyEngine

grammar

createAnomalyDetectionEngine(name, metrics, dummyTable, outputTable, timeColumn, [keyColumn], [windowSize], [step], [garbageSize]) 
Copy the code

Returns the object

Is createAnomalyDetectionEngine function returns a table object, write data to the table means that these data into anomaly detection engine is calculated.

parameter

  • Name: a character string, indicating the name and unique identifier of the exception detection engine. It can contain letters, numbers, and underscores, but must begin with a letter.
  • Metrics: meta-code. Its return value must be of type bool. It can be a function or an expression, such as <[qty > 5, eq(qty, price)]>. You can use built-in or user-defined aggregation functions (defined using the defG keyword), such as <[sum(qty) > 5, lt(AVg (price), price)]>. See metaprogramming for details.
  • DummyTable: Table object, which may not contain data, but must have the same structure as the subscribed stream data table.
  • OutputTable: A table object that holds the results of calculations. Its first column must be of time type, used to hold the timestamp of the exception detected, and the data type of this column must be the same as that of dummyTable’s time column. If the keyColumn argument is not empty, the second column of the outputTable is the keyColumn. The next two columns, int and String /symbol, record the type of the exception (subscript in metrics) and the content of the exception
  • TimeColumn: string scalar representing the timeColumn name of the input stream data table.
  • KeyColumn: String scalar representing grouped columns. The exception detection engine groups the input data by keyColumn and performs aggregate calculations in each group. It is optional.
  • WindowSize: positive integer. When metrics contains aggregate functions, windowSize must be specified to represent the length of the data window used for aggregate calculations. This parameter does not work if there is no aggregate function in metrics.
  • Step: a positive integer. When the aggregate function is included in metrics, step must be specified to represent the time interval for calculation. WindowSize must be an integer multiple of step or an exception will be thrown. This parameter does not work if there is no aggregate function in metrics.
  • GarbageSize: positive integer. It is optional and the default value is 50,000. If keyColumn is not specified, if the number of historical data in the memory exceeds garbageSize, the system clears unnecessary historical data. If keyColumn is specified, it means that memory cleaning is performed independently for each group. When the number of historical data records for a group exceeds garbageSize, historical data that is no longer needed for the group is cleared. If the number of historical data records for a group does not exceed garbageSize, the group data will not be cleaned. This parameter does not work if there is no aggregate function in metrics.

7. To summarize

DolphinDB’s DolphinDB anomaly detection engine is a lightweight, easy-to-use streaming data engine that works with streaming tables for real-time detection of streaming data and for real-time monitoring and warning in the Internet of Things.