Streaming data is dynamic data that continues to grow over time. The operational data of the Internet and the sensor data of the Internet of things belong to the category of streaming data. The characteristics of streaming data determine that its data set is dynamically changing. The traditional static table-oriented computing engine is not competent for the analysis and calculation tasks in the field of streaming data, so the streaming data scenario needs a special computing engine to deal with.

DolphinDB provides a flexible dolphin-oriented aggregation engine. The createStreamAggregator function is used to create a stream aggregation engine that continuously aggregates existing streams and outputs the results to specified tables.

1. Aggregation engine application framework

The flow aggregation engine is itself a separate computing engine, and simply writing data to the aggregation engine triggers calculations and outputs the results to the target table. In the case of streaming data, the aggregation engine can provide streaming data continuously to the aggregation engine conveniently with the subscribeTable function of streaming data. The following is an example:

tradesAggregator = createStreamAggregator(5, 5, <[sum(qty)]>, trades, outputTable, `time) subscribeTable(, "trades", "tradesAggregator", 0, append! {tradesAggregator}, true)Copy the code

Subscribe to the stream data table with the _subscribeTable_ function, and append! Is triggered every time new data comes in according to the specified rule. {tradesAggregators} to continuously input stream data to the aggregation engine.

The aggregation engine mainly involves the following concepts:

  • DolphinDB provides a specific table object, streamTable, for stream data. It publishes stream data that other nodes or apps can subscribe to or consume via the _subscribeTable_ function.
  • Aggregate engine data source: _createStreamAggregator_ Returns an abstract table to which data is written, which means that the data goes into the aggregate engine for calculation.
  • Aggregate expression: Provides a set of aggregate functions that process stream data in metadata format, similar to <[sum(qty)]>,<[sum(qty), Max (qty), AVg (price)]>. The aggregation engine supports the use of all the aggregation functions in the system, as well as expressions for more complex scenarios, such as <[AVG (price1)-avg(price2)]>,<[STD (price1-price2)]>.
  • Data window (windowSize) : Specifies the length of the stream data window to intercept during each calculation.
  • Calculation period (step): Specifies the interval between calculations.

2. Data window

Each time the flow data is aggregated, one piece of data must be intercepted. The intercepted data is called a data window and its length is determined by the parameter windowSize. The calculation interval is determined by the parameter step.

Both the data window length and the calculation interval are measured in units determined by the parameter useSystemTime. There are two time concepts in streaming data aggregation computing scenarios. The first is the generation time of data, which is usually recorded in data in the format of time stamps. It may adopt different precision such as day, minute, second, millisecond and nanosecond. The second type is the time when the data enters the aggregation engine, also known as system time. This time is the time stamped on the data by the aggregation engine. It is taken from the system time of the server where the aggregation engine resides, with an accuracy of milliseconds. The system uses the parameter useSystemTime to determine the time precision of the data window length and calculation interval. When useSystemTime=true, the system time precision is the unit; otherwise, the data generation time precision is the unit.

If the boundary of the data window is constructed according to the time when the first data enters the system, it will generally be an irregular time. If there are many sets of data, and each group constructs the boundary of the data window according to the time when the first data entered the system, then the groups cannot be compared in the same data window. Therefore, the system will normalize the boundary value of the first data window according to the value of step and determine the normalizing scale alignmentSize of an integer type. The specific regularization formula is related to time accuracy and step:

When the time precision of the data is SECOND, such as DATETIME or SECOND, the value of alignmentSize is as follows:

step     alignmentSize
0~2      2
3~5      5
6~10     10
11~15    15
16~20    20
21~30    30
31~60    60
Copy the code

When the data TIME precision is millisecond, such as TIMESTAMP or TIME, the value of alignmentSize is as follows:

step alignmentSize 0~2 2 3~5 5 6~10 10 11~20 20 21~25 25 26~50 50 51~100 100 101~200 200 201~250 250 251~500 500 501 ~ 1000, 1000Copy the code

Data assumes that the first time the minimum value for firstDataTime precision, so the first data window left border of the minimum accuracy for firstDataTime/alignmentSize * after a neat alignmentSize, rounded with/on behalf of the division. For example, if the time of the first data is 2018.10.08T01:01:01.365, firstDataTime=365. If step=100, according to the above table, alignmentSize=100, it can be concluded that the minimum accuracy of the left boundary of the first normalized data window is 365\100*100=300, so the left boundary of the first normalized data window is 2018.10.08T01:01:01.300.

Let’s use an example to illustrate how the system performs streaming data calculations. The input stream data table contains two columns, time and QTY. The precision of time is millisecond. Continuous sum(QTY) calculation is carried out for flow data according to the set window. The time precision used in the flow data table for this example is millisecond, and the flow frequency of the simulated input is also set to one data piece per millisecond for ease of observation. The following code creates the flow table TRADES, sets the aggregate calculation parameters, and defines the function writeData to write simulated data to the flow table Trades.

share streamTable(1000:0, `time`qty, [TIMESTAMP, INT]) as trades outputTable = table(10000:0, `time`sumQty, [TIMESTAMP, INT]) tradesAggregator = createStreamAggregator(5, 5, <[sum(qty)]>, trades, outputTable, `time) subscribeTable(, "trades", "tradesAggregator", 0, append! {tradesAggregator}, true) def writeData(n){timev = 2018.10.08t01:01:01.001 + timestamp(1.. n) qtyv = take(1, n) insert into trades values(timev, qtyv) }Copy the code

First operation: Write five pieces of data to the stream table TRADES.

writeData(5)
Copy the code

View the flow data table:

Select * from trades QTY 2018.10.08T01:01:01.002 1 2018.10.08T01:01:01.003 1 2018.10.08T01:01:01.004 1 trades qTY 2018.10.08T01:01:01.004 2018.10.08 T01:01:01. 005 1 2018.10.08 T01:01:01. 006 1Copy the code

View the output table:

Select * from outputTable time sumQty 2018.10.08T01:01:01.000 3Copy the code

The calculation occurred at the time of 2018.10.08T01:01:01.000. It can be seen that the system has normalized the time of the first data, 2018.10.08T01:01:01.002.

The second operation: clear the table, set windowSize=6, step=3, simulate write 10 data:

share streamTable(1000:0, `time`qty, [TIMESTAMP, INT]) as trades outputTable = table(10000:0, `time`sumQty, [TIMESTAMP, INT]) tradesAggregator = createStreamAggregator(6, 3, <[sum(qty)]>, trades, outputTable, `time) subscribeTable(, "trades", "tradesAggregator", 0, append! {tradesAggregator}, true) def writeData(n){timev = 2018.10.08t01:01:01.001 + timestamp(1.. n) qtyv = take(1, n) insert into trades values(timev, qtyv) } writeData(10)Copy the code

View the flow data table:

Select * from trades QTY 2018.10.08T01:01:01.002 1 2018.10.08T01:01:01.003 1 2018.10.08T01:01:01.004 1 trades qTY 2018.10.08T01:01:01.004 2018.10.08T01:01:01.005 1 2018.10.08T01:01:01.006 1 2018.10.08T01:01:01.007 1 2018.10.08T01:01:01.008 1 2018.10.08T01:01:01.008 1 2018.10.08T01:01:01.009 1 2018.10.08T01:01:01.010 1 2018.10.08T01:01:01.011 1Copy the code

View the output table:

Select * from outputTable time qty 2018.10.08T01:01:00.997 1 2018.10.08T01:01:01.000 4 2018.10.08T01:01:01.003 6Copy the code

From this result, we can also find the calculation rules of the aggregation engine window: the start time of the window is based on the time normalization of the first data, the size of the window is windowSize, and the step is the step.

The following is an explanation of how the aggregation engine determines window data based on a cubic calculation process. For ease of reading, the same part 2018.10.08T01:01:01 is omitted from the description of the time, and only the millisecond part is listed. The beginning of the window is aligned on the basis of the time 002 of the first data, which is 000 after the time is aligned, so the time of the first triggered calculation is 000. According to windowSize=6, the theoretical window boundary is from 997 to 002 of the last second. Finally, the first calculation window contains only 002 records. Sum (qty) is 1; The second calculation takes place at 000. According to windowSize=6, the actual windowSize is 6 milliseconds (from 000 to 005). The actual window contains four data from 002 to 005, and the calculation result is 4. Similarly, the third calculation window, from 003 to 008, actually contains 6 data, resulting in 6.

Aggregate expressions

In practical applications, it is usually necessary to perform complex aggregation calculation for flow data, which requires higher flexibility of expression of aggregation engine. The DolphinDB aggregation engine supports real-time calculations using complex expressions.

  • Longitudinal aggregation calculation (by time series aggregation) :

    tradesAggregator = createStreamAggregator(6, 3, <sum(ofr)>, trades, outputTable, `time)

  • Horizontal aggregation calculation (aggregation by dimension) :

    tradesAggregator = createStreamAggregator(6, 3, <max(ofr)-min(ofr)>, trades, outputTable, `time)

    tradesAggregator = createStreamAggregator(6, 3, <max(ofr-bid)>, trades, outputTable, `time)

  • Output multiple aggregation results:

    tradesAggregator = createStreamAggregator(6, 3, <[max((ofr-bid)/(ofr+bid)*2), min((ofr-bid)/(ofr+bid)*2)]>, trades, outputTable, `time)

  • Multi-argument aggregate function call:

Some aggregate functions use multiple parameters, such as corr, Percentile, and so on.

tradesAggregator = createStreamAggregator(6, 3, <corr(ofr,bid)>, trades, outputTable, `time)

tradesAggregator = createStreamAggregator(6, 3, <percentile(ofr-bid,99)/sum(ofr)>, trades, outputTable, `time)
Copy the code
  • Calling custom functions:

    def spread(x,y){ return abs(x-y)/(x+y)*2 } tradesAggregator = createStreamAggregator(6, 3, <spread(ofr, bid)>, trades, outputTable, `time)

DolphinDB does not support Nested aggregate function calls. When you want to calculate sum(spread(OFR,bid)) in the streaming data engine, aggregated function is not allowed

4. Stream data source

DolphinDB’s aggregation engine uses streamTable as the input data source, which provides publishing of streaming data, subscribes to streaming data and triggers processing via the _subscribeTable_ function. The aggregation engine is one way to process data.

StreamTable, as the data source of the aggregation engine, not only simply feeds raw data into the aggregation engine, but also performs preliminary cleaning of data before entering the aggregation engine through subscribeTable function. The following example shows how to perform preliminary filtering of stream data.

The sensor collects voltage and current data and upload it in real time as the stream data source, but the voltage<=0.02 or current electric==NULL data need to be filtered before entering the aggregation engine.

share streamTable(1000:0, `time`voltage`electric, [TIMESTAMP, DOUBLE, INT]) as trades outputTable = table(10000:0, `time`avgElectric, [TIMESTAMP, Def writeData(blockNumber){timev = 2018.10.08t01:01:01.001 + timestamp(1.. blockNumber) vt = 1.. BlockNumber * 0.01 BIDV = take([1,NULL,2], blockNumber) insert into trades values(timev, vt, bidv); } // Custom data processing, Def dataPreHandle(aggrTable, T = select * from MSG where voltage >0.02,not electric==NULL if(size(t)>0){ insert into aggrTable values(t.time,t.voltage,t.electric) } } tradesAggregator = createStreamAggregator(6, 3, <[avg(electric)]>, trades, outputTable, `time , false, , 2000) // subscribeTable(, "trades", "tradesAggregator", 0, dataPreHandle{tradesAggregator}, true) writeData(10)Copy the code

Electric ==NULL; electric==NULL;

Select * from trades time voltage electric 2018.10.08T01:01:01.002 0.01 1 2018.10.08T01:01:01.003 0.02 2018.10.08T01:01:01.004 0.03 2 2018.10.08T01:01:01.005 0.04 1 2018.10.08T01:01:01.006 0.05 2018.10.08T01:01:01.007 0.06 2 2018.10.08T01:01:01.008 0.07 1 2018.10.08T01:01:01.009 0.08 2018.10.08T01:01:01.010 0.09 2 2018.10.08T01:01:01.011 0.1  1Copy the code

View the output table:

Select * from outputTable time avgElectric 2018.10.08T01:01:01.000 1.5 2018.10.08T01:01:01.003 1.5Copy the code

As can be seen from the result, voltage<=0.02 or electric==NULL data has been filtered, so there is no data in the first calculation window, so there is no aggregate result.

5. Aggregate engine output

The aggregation results can be output to a new or existing memory table or to a stream data table. The memory table is flexible in data operation and can be updated or deleted. The data output to the stream table cannot be changed, but the aggregated results can be published again through the stream table. The following example shows how to use an aggregation result table as a data source for another aggregation engine.

In this example, from an initial flow data table TRADES, the tradesAggregator was used to calculate the moving mean and output the result to the flow data table aggrOutput. By subscribing to the aggrOutput table and associating with the aggregation engine SecondAggregator, the movement peak of the calculated results is calculated.

Share streamTable(10000:0, 'time' voltage 'electric, [TIMESTAMP, DOUBLE, INT]) as trades You can again subscribe to outputTable = streamTable(1000:0, 'time' avgElectric, [TIMESTAMP, DOUBLE]) timev = 2018.10.08t01:01:01.001 + timestamp(1.. blockNumber) vt = 1.. BlockNumber * 0.01 bidv = take([1,2], blockNumber) insert into trades values(timev, vt, bidv); } tradesAggregator = createStreamAggregator(6, 3, <[avg(electric)]>, trades, outputTable, `time , false, , 2000) subscribeTable(, "trades", "tradesAggregator", 0, append! {tradesAggregator}, true) // outputTable2 =table(1000:0, 'time' maxAggrElec, [TIMESTAMP, DOUBLE]) SecondAggregator = createStreamAggregator(6, 3, <[max(avgElectric)]>, aggrOutput, outputTable2, `time , false, , 2000) subscribeTable(, "aggrOutput", "SecondAggregator", 0, append! {SecondAggregator}, true) writeData(10)Copy the code

View the output table:

Select * from outputTable2 time maxAggrElec 2018.10.08T01:01:00.992 1 2018.10.08T01:01:00.995 1.5Copy the code

6. CreateAggregator function description and syntax

The createStreamAggregator function associates three main pieces of information about a streaming data aggregation application:

  • Input data source

The input data source is a stream data table that associates the data source with the aggregation engine by subscribing to it.

  • Aggregate expression

Defines the logic for aggregate calculations, supporting complex expressions. The aggregation engine evaluates the flow table based on the aggregation expression and outputs the results to the destination table.

  • Output table

The aggregation results can be output to a new or existing memory table or stream data table. Memory tables are more flexible in data operation and can be updated or deleted. Data output to stream tables cannot be changed. However, data aggregation results can be published again through stream tables to meet more application scenarios.

6.1 grammar

createStreamAggregator(windowTime, rollingTime, aggregators, dummyTable, outputTable, timeColumn[,useSystemTime, keyColumn, garbageSize])

6.2 Returning Objects

Returns an abstract table object that acts as an entry point to the aggregation engine, to which data is written, meaning that the data enters the aggregation engine for computation.

6.3 parameter

  • UseSystemTime: Boolean value, which indicates the driving mode of the aggregation engine. When true, it is time driven, meaning that when a predetermined point in time is reached, the aggregation engine is activated and intercepts the stream data for calculation in a specified window. In this mode, the time is millisecond, and the system adds a millisecond timestamp to each incoming piece of data as the data window. When it is false, the aggregation engine will be activated only when the data enters the system. The system will select the timeColumn of the data as the basis of the data window. It is optional and defaults to false.
  • WindowSize: a positive integer indicating the size of the data window. The data window contains only the lower boundary and not the upper boundary.
  • Step: a positive integer, indicating the frequency of aggregation calculation, that is, the time interval for triggering calculation.

WindowSize and step have the same units, and both depend on useSystemTime. When useSystemTime=true, they are in milliseconds, and when useSystemTime=false, they are in the same units as timeColumn, the time field in the data.

In order to facilitate the observation and comparison of the calculated results, the system will align the start time of the window uniformly. For specific rules, see **2. Data window **

  • Aggregators: Metadata representing aggregation functions. Support all aggregation functions in the system, such as

    ,

    , also support the use of expressions for aggregation results to satisfy more complex scenarios, Such as < [avg (price1) – avg (price2)] >, < / STD (price1 – price2) >.
    (qty),>
    (qty)>

DolphinDB optimized some DolphinDB aggregation functions to improve streaming data aggregation, taking advantage of the results of previous calculations and minimizing double calculations.

The following is the optimized aggregation function:

Corr: correlation covar: covariance first: first element last: last element Max: maximum med: median min: minimum percentile: percentile STD: standard deviation sum: sum sum2: sum of squares var: variance WAVg: Weighted average Wsum: weighted sumCopy the code
  • DummyTable: a table that provides a sample table object. No data is required, but the table structure must be the same as the input stream table.

  • OutputTable: outputTable of aggregation results. The first column of the output table is the time type, which holds the point in time at which the computation took place, and if keyColumn is not empty, the second column, starting with the third column, holds the structure of the aggregate computation. The structure of the final output table is as follows:

    Time column Group column Aggregation result column 1 Aggregation result column 2…

  • TimeColumn: timeColumn in the input stream data table.

  • KeyColumn: Grouped column for aggregate calculation. Group the input stream data according to keyColumn. It is optional.

  • GarbageSize: positive integer. When the number of historical data records cached in memory exceeds garbageSize, the system clears the cache.

When the streaming data aggregation engine is running, each calculation needs to load new window data into memory for calculation. As the calculation process continues, more and more data will be cached in memory. In this case, a mechanism is needed to clean up the historical data that is no longer needed. Memory cleanup occurs when the number of historical rows in memory exceeds the garbageSize value.

When group calculation is required, the number of historical data records in each group is counted separately, so the action of memory clearing is carried out independently by each group. Memory cleanup is triggered when the number of historical data records for each group exceeds garbageSize.

6.4 the sample

6.4.1 dummyTable sample

This example shows dummyTable in action. Add a modelTable object with the exact same structure as trades, and make modelTable the dummyTable parameter, while the actual data is still written to trades.

share streamTable(1000:0, `time`qty, [TIMESTAMP, INT]) as trades modelTable = table(1000:0, `time`qty, [TIMESTAMP, INT]) outputTable = table(10000:0, `time`sumQty, [TIMESTAMP, INT]) tradesAggregator = createStreamAggregator(5, 5, <[sum(qty)]>, modelTable, outputTable, `time) subscribeTable(, "trades", "tradesAggregator", 0, append! {tradesAggregator}, true) def writeData(n){timev = 2018.10.08t01:01:01.001 + timestamp(1.. n) qtyv = take(1, n) insert into trades values(timev, qtyv) } writeData(6)Copy the code

Finally, the result is still output, indicating that the dummyTable parameter of the aggregation engine is only a sample table, and whether it contains data does not affect the result.

6.4.2 Group Aggregation Example

Added grouping column SYM to the input stream data table, keyColumn set to SYM during aggregate calculation.

share streamTable(1000:0, `time`sym`qty, [TIMESTAMP, SYMBOL, INT]) as trades outputTable = table(10000:0, `time`sym`sumQty, [TIMESTAMP, SYMBOL, INT]) tradesAggregator = createStreamAggregator(3, 3, <[sum(qty)]>, trades, outputTable, `time, false,`sym, 50) subscribeTable(, "trades", "tradesAggregator", 0, append! {tradesAggregator}, true) def writeData(n){timev = 2018.10.08t01:01:01.001 + timestamp(1.. n) symv =take(`A`B, n) qtyv = take(1, n) insert into trades values(timev, symv, qtyv) } writeData(6)Copy the code

To facilitate observation, sort the output of the SYM column of the execution result:

Select * from trades order by sym time SYm QTY 2018.10.08T01:01:01.002 A 1 2018.10.08T01:01:01.004 A 1 2018.10.08 T01:01:01. 006 A 1 2018.10.08 T01:01:01. 003 B 1 2018.10.08 T01:01:01. 005 B 1 2018.10.08 T01:01:01. 007 1 BCopy the code

The result of outputTable is grouping calculations based on the contents of syM columns.

Select * from outputTable time sym QTY 2018.10.08T01:01:01.000 A 1 2018.10.08T01:01:01.003 A 1 2018.10.08T01:01:01.003 B  2Copy the code

After the time is structured, each group will start from the time point 000. According to windowSize=3, step=3, the Windows of each group will be divided according to 000-003-006, and the trigger time points of 000,003 will be calculated. It should be noted that if there is no data in the window, the system will not calculate or produce results, so the first window in group B has no result output.

7. To summarize

DolphinDB Database provides streamAggregator, a lightweight, easy-to-use streaming data aggregation engine that works with streamTable streaming tables to perform real-time calculations of streaming data. It can support vertical and horizontal aggregation and combined calculation, support custom function calculation, group aggregation, invalid data pre-cleaning, multi-level calculation and other functions, can meet all aspects of real-time streaming data computing requirements.