When processing real-time streaming data, not only need to do longitudinal aggregation calculation according to time (time series aggregation engine), but also need to do horizontal comparison and calculation of the latest data, such as the latest quotation percentile of all stocks in finance, calculation of the average temperature of a batch of equipment in the Industrial Internet of Things. The DolphinDB Database provides a cross-sectional aggregation engine that aggregates the latest data from all groups in the data.

The main body of the cross section engine is divided into two parts: the cross section data table and the calculation engine. The cross section data is the internal table of the cross section engine, which holds the latest cross section data for all groups. A compute engine is a set of aggregate calculation expressions and triggers that trigger aggregate operations in a specified manner and output the results to another table.

1. Basic usage

In DolphinDB, create cross section through createCrossSectionalAggregator aggregation engine. It returns a cross-sectional data table that holds the most recent cross-sectional data for all groups. Writing data to this table means that the data goes into the cross-sectional aggregation engine for calculation. Specific usage is as follows:

createCrossSectionalAggregator(name, [metrics], dummyTable, [outputTable], keyColumn, [triggeringPattern="perBatch"], [triggeringInterval=1000])
Copy the code
  • Name is a string that represents the name of the cross-sectional aggregation engine and uniquely identifies the cross-sectional aggregation engine. It can contain letters, numbers, and underscores, but must begin with a letter.

  • Metrics is meta-code. It can be a built-in or user-defined function, such as <[sum(qty), avg(price)]>. It can use an expression to aggregate the result, such as <[AVg (price1)-avg(price2)]>. It can also aggregate the computed column. Such as < > [STD (price1 – price2)]. See metaprogramming for details.

  • DummyTable is a table object that may not contain data, but it must have the same structure as the subscribed stream data table.

  • OutputTable is a table object that holds the results of calculations. The number of columns in the output table is the number of metrics +1. The first column is TIMESTAMP, which is used to store the TIMESTAMP of the calculation, and the data type of the other columns must be the same as the data type of the results returned by Metrics.

  • KeyColumn is a string that specifies the key of the cross-sectional aggregation engine for a column in dummyTable. KeyColumn specifies that each key in the column corresponds to a unique row in the table.

  • TriggeringPattern is a string that represents how calculations are triggered. It can take the following values:

  • “PerRow “: triggers a calculation every time a row is inserted

  • “PerBatch “: a calculation is triggered each time data is inserted

  • “Interval “: indicates that the calculation is triggered at a certain interval

  • TriggeringInterval is an integer. TriggeringPattern takes effect only when the value of triggeringPattern is set to interval, indicating the interval at which calculations are triggered. The default value is 1000 milliseconds.

Example 2.

The following example illustrates the application of a cross-sectional polymerization engine. In financial transactions, it is often necessary to know in real time the latest average price of all stocks, the sum of the last trading volume and the volume of the last trade. DolphinDB’s cross-sectional aggregation engine, combined with the streaming data subscription feature, makes this easy.

(1) Create a real-time trading table

A real-time table of trades for stocks, containing the following main fields:

Sym: stock code time: time price: trading price QTY: trading volumeCopy the code

Whenever a trade occurs, real-time data is written to the TRADES table. The script for creating the TRADES table is as follows:

share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
Copy the code

(2) Create a cross-sectional aggregation engine

tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, `sym, `perRow)
Copy the code

The tradesCrossAggregator is a cross-sectional data table that is grouped by stock symbol and has only one row per stock. When the data enters the table, the AVG (Price), sum(QTY) and sum(Price *qty) of each stock are calculated. Each insertion of data triggers a calculation.

(3) Cross section data table Subscription real-time trading table

subscribeTable(,"trades","tradesCrossAggregator",-1,append! {tradesCrossAggregator},true)Copy the code

Real-time data is written to cross-sectional data tables by streaming data subscription.

(4) Simulated data generation

Def writeData(n){timev = 2000.10.08t01:01:01.001 + timestamp(1.. N) symv = take(' A 'B, n) pricev = take(102.1 33.4 73.6 223,n) qtyv = take(60 74 82 59, n) insert into trades values(timev, symv, pricev,qtyv) } writeData(4);Copy the code

View the real-time trading table, a total of four pieces of data.

Select * from trades time sym price qty -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 2000.10.08 T01:01:01. 102.1 60 002 A 2000.10.08T01:01:01.003 B 33.4 74 2000.10.08T01:01:01.004 A 73.6 82 2000.10.08T01:01:01.005 B 223 59Copy the code

Check the cross-sectional data table, which keeps the records of the two most recent trades of stocks A and B.

Select * from tradesCrossAggregator time sym price qty -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 2000.10.08 T01:01:01. 004 A 73.6 82 2000.10.08T01:01:01.005 B 223 59Copy the code

Look at the output table of the cross section engine. Because the cross section engine uses the frequency at which perRow triggers calculations perRow, the aggregate engine does a calculation for every row written to the cross section table, so there are four records.

select * from outputTable time avgPrice sumqty Total ----------------------- -------- ------ ------- 2019.07.08T10:04:41.731 102.1 60 6126 2019.07.08T10:04:41.732 67.75 134 8597.6 2019.07.08T10:04:41.732 53.5 156 8506.8 2019.07.08 T10:04:41. 732 148.3 141 19192.2Copy the code

View the state of the cross section engine using the getAggregatorStat function.

getAggregatorStat().CrossSectionalAggregator
name               user  status lastErrMsg numRows numMetrics metrics            triggeringPattern triggeringInterval
------------------ ----- ------ ---------- ------- ---------- ------------------ ----------------- ------------------
CrossSectionalDemo guest OK                2       3          [ avg(price), su...perRow            1000    
Copy the code

Remove the cross section engine using the removeAggregator function.

removeAggregator("CrossSectionalDemo")
Copy the code

3. Several ways to trigger calculations

The cross section engine triggers calculations in three ways: perRow, perBatch, and interval. In the example above, the calculation is triggered every time a row of data is inserted. Here are two other ways to trigger calculations.

  • perBatch

The perBatch parameter indicates that each batch of data is appended and writes are triggered. In this example, the cross section engine is enabled in perBatch mode. The script generates 12 records and writes them in three batches.

share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades outputTable = table(1:0, `time`avgPrice`sumqty`Total, [TIMESTAMP,DOUBLE,INT,DOUBLE]) tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, `sym, `perBatch) subscribeTable(,"trades","tradesCrossAggregator",-1,append! {tradesCrossAggregator},true) def writeData(n){timev = 2000.10.08t01:01:01.001 + timestamp(1.. N) symv = take(' A 'B, n) pricev = take(102.1 33.4 73.6 223,n) qtyv = take(60 74 82 59, N) insert into trades values(timev, SYMv, pricev, qTYv)} // Write three batches of data, which is expected to trigger three calculations and output three aggregate results. writeData(4); writeData(4); writeData(4);Copy the code

View the cross section data sheet.

Select * from tradesCrossAggregator time sym price qty -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 2000.10.08 T01:01:01. 002 A 73.6 82 2000.10.08T01:01:01.003 B 33.4 59Copy the code

View the output table. Three batches of data have been inserted, so there are three records in the output table.

select * from outputTable
time                    avgPrice sumqty Total  
----------------------- -------- ------ -------
2019.07.08T10:14:54.446 148.3    141    19192.2
2019.07.08T10:14:54.446 148.3    141    19192.2
2019.07.08T10:14:54.446 148.3    141    19192.2
Copy the code
  • interval

If interval is used to trigger calculations, triggeringInterval must be specified to trigger calculations every triggeringInterval millisecond. In the following example, 12 records are written in six intervals of 500 milliseconds. Set the cross section engine to trigger calculations every 1000 milliseconds and expect to end up with 3 records.

share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades outputTable = table(1:0, `time`avgPrice`sumqty`Total, [TIMESTAMP,DOUBLE,INT,DOUBLE]) tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, `sym, `interval,1000) subscribeTable(,"trades","tradesCrossAggregator",-1,append! {tradesCrossAggregator},true) def writeData(n){timev = 2000.10.08t01:01:01.001 + timestamp(1.. N) symv = take(' A 'B, n) pricev = take(102.1 33.4 73.6 223,n) qtyv = take(60 74 82 59, n) insert into trades values(timev, symv, pricev,qtyv) } a = now() writeData(2); sleep(500) writeData(2); sleep(500) writeData(2); sleep(500) writeData(2); sleep(500) writeData(2); sleep(500) writeData(2); sleep(500) b = now() select count(*) from outputTable 3Copy the code

If you execute select count(*) from outputTable again, the number of records in the outputTable increases over time. This is because in interval mode, calculations are triggered at the actual time and do not depend on whether new data is coming in.

4. Independent use of cross section data tables

As you can see from the above example, although the cross section table is an intermediate data table for aggregate computing, it can actually function independently in many situations. Such as we need time to refresh the new transaction price of a stock, according to the conventional way of thinking from the real-time transaction screening stock according to the code in the table and pulled out the last record, the transaction table is the amount of data with the rapid growth of time, if often do such a query, regardless of from the system resource consumption from the perspective of the efficiency of the query is not a good practice. However, the cross-sectional table only stores the latest transaction data of all stocks forever, and the amount of data is stable, which is very suitable for such a timed polling scenario.

If you want to use a separate cross section table, set metrics and outputTable to empty when creating a cross section engine.

tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", , trades,, `sym, `perRow)
Copy the code

Related links:

Streaming Data Tutorial

Time series engine tutorial

Exception detection engine tutorial