DolphinDB’s built-in DolphinDB stream data processing framework supports publishing, subscribing, preprocessing, real-time memory calculation, and rolling window calculation of complex indicators. It is an efficient and easy to use stream data processing framework.

This tutorial focuses on how to deploy automatic subscription to flow calculations when nodes start up after business code is developed based on DolphinDB stream data processing framework.

1. Business code development

Business description

Take the stock snapshot data of real-time financial transaction Level2 as an example to achieve the following business requirements:

(1) Write the stock snapshot data of Level2 into the database in real time for persistence;

(2) According to the values of LastPx, TotalVolumeTrade and TotalValueTrade of the original data, process some intermediate values of each snapshot data, such as Return, Volume and Amount;

(3) Generate some minute indicators according to the intermediate values of the original data and processing.

The process of DolphinDB is shown below:

Processing flow chart description:

(1) snapshotStream, snapshotStreamProcess, and snapshotAggr1min are all shared asynchronous persistent stream data tables. SnapshotStream is used to store Level2’s raw stock snapshot data, and snapshotStreamProcess is used to store the result data with intermediate values processed by the responsive state engine. SnapshotAggr1min Stores minute counter data processed by the time series engine. The purpose of sharing a memory table is to make the table visible to all the other sessions on the current node. API writes and DolphinDB Server sessions are not the same as those in which the table is defined. The main purpose of the persistence of streaming data table has two, one is the biggest footprint control table, by setting the enableTableShareAndPersistence cacheSize size in function, control keep the maximum number of records in this table in memory, in turn, control the table the biggest footprint; The other is the need to recover the data that has been written into the stream data table but not consumed from the persistent directory in the case of abnormal node shutdown to ensure that the stream data is consumed “at least once”. The stream table persistence is carried out asynchronously, which can effectively improve the throughput of stream table writes. Only stream tables can be consumed by subscriptions, so you need to define these tables as stream tables.

(2) subExecutor: specifies the maximum number of available flow data processing threads for a node by running the subExecutors configuration file. Specify the stream data processing thread for the consumption topic by setting the hash parameter in the subscribeTable function. For example, if subExecutor is set to N, hash can be specified from 0 to n-1, corresponding to stream data processing threads 1 to N.

(3) The responsive state engine and the time series engine are DolphinDB’s built-in stream computing engines and both implement incremental calculations. A responsive state engine handles the intermediate values of the above business requirements, and a time series engine is used to compute and generate minute metrics.

(4) loadTable(” DFS ://snapshot”, “snapshot”) is used to store raw data and persist data.

Business code

The business code of this tutorial is based on version 1.30.15. All versions of 1.30 can run the sample code of this tutorial. See the attachment for detailed code content.

2. DolphinDB startup process

The DolphinDB startup process is shown below:

  • Dolphindb. DOS System initialization scripts are required. Dolphindb. DOS in the release directory of the latest version is loaded by default.

  • User startup script (startup. DOS) User startup scripts are executed only after startup. The single server mode is configured in dolphindb. CFG, and the cluster mode is configured in cluster. CFG. If a relative path is specified or no directory is specified, the system searches for the home directory, working directory, and directory where the executable file resides on the local node in sequence. The following is an example:

    startup=/opt/DolphinDB/server/startup.dos

Add the above service codes to the startup. DOS file in the /opt/DolphinDB/server directory and set startup in the corresponding configuration file to deploy automatic subscription for stream computing when a node is started.

  • Scheduled task script (poststart. DOS) Regular tasks defined by the scheduleJob function in DolphinDB are persistent. When a node restarts, the system initializes the scheduled task module after executing the script. After the preceding steps are complete, the system executes a scheduled task script. You can call the scheduleJob function in the script to define a new scheduled task. This feature is not used in this tutorial, so you do not need to enable this configuration item. Versions 1.30.15 and 2.00.3 support poststart. DOS to automatically execute scheduled task scripts when nodes start. Add the above service codes to the startup. DOS file in the /opt/DolphinDB/server directory and set startup in the corresponding configuration file to implement automatic subscription for stream computing when a node is started.

3 Single-node single-mode deployment streams compute automatic subscriptions

Configuration file Dolphindb.cfg

localSite=localhost:8848:local8848
mode=single
maxMemSize=128
maxConnections=512
workerNum=8
localExecutors=7
maxConnectionPerSite=15
newValuePartitionPolicy=add
webWorkerNum=8
dataSync=1
chunkCacheEngineMemSize=8
persistenceDir=/opt/DolphinDB/server/local8848/persistenceDir
startup=/opt/DolphinDB/server/startup.dos
maxPubConnections=64
subExecutors=7
subPort=8849
subThrottle=1
persistenceWorkerNum=1
lanCluster=0
Copy the code

DolphinDB Server is started. Run the following statements to view the recent dolphindb.log run logs:

tail -1000f /opt/DolphinDB/server/dolphindb.log
Copy the code

The following logs are displayed in the startup script:

2021-12-01 00:23:56.314159 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.314172 <INFO> :dfs://snapshot created successfully !
2021-12-01 00:23:56.314178 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.315084 <INFO> :Prepare to share a stream table: tableName=snapshotStream raftGroup=-1
2021-12-01 00:23:56.315132 <INFO> :enableTablePersistence tableName=snapshotStream hashValue=0 offset=0 cacheSize=5000000
2021-12-01 00:23:56.315163 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.315174 <INFO> :sharedTable1:snapshotStream created  successfully !
2021-12-01 00:23:56.315182 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.315512 <INFO> :Prepare to share a stream table: tableName=snapshotStreamProcess raftGroup=-1
2021-12-01 00:23:56.315534 <INFO> :enableTablePersistence tableName=snapshotStreamProcess hashValue=1 offset=0 cacheSize=5000000
2021-12-01 00:23:56.315549 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.315562 <INFO> :sharedTable2:snapshotStreamProcess created successfully !
2021-12-01 00:23:56.315569 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.315783 <INFO> :Prepare to share a stream table: tableName=snapshotAggr1min raftGroup=-1
2021-12-01 00:23:56.315806 <INFO> :enableTablePersistence tableName=snapshotAggr1min hashValue=2 offset=0 cacheSize=2000000
2021-12-01 00:23:56.315821 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.315833 <INFO> :sharedTable3:snapshotAggr1min created successfully !
2021-12-01 00:23:56.315840 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.316775 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.316793 <INFO> :ReactiveStateEngine:snapshotProcessing created successfully !
2021-12-01 00:23:56.316800 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.316852 <INFO> :Begin to subscription topic=localhost:24110:local24110/snapshotStream/snapshotProcessing
2021-12-01 00:23:56.316888 <INFO> :Enable reconnection for topic=localhost:24110:local24110/snapshotStream/snapshotProcessing site=local24110:1
2021-12-01 00:23:56.316915 <INFO> :[subscribeTable] #attempt=0 topic=localhost:24110:local24110/snapshotStream/snapshotProcessing conn=
2021-12-01 00:23:56.316940 <INFO> :Received a request to publish table [snapshotStream] to site localhost:24111.Offset=-1
2021-12-01 00:23:56.317229 <INFO> :Subscription topic=localhost:24110:local24110/snapshotStream/snapshotProcessing hashValue=0
2021-12-01 00:23:56.317252 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.317259 <INFO> :subscribe1:snapshotStream subscribed successfully !
2021-12-01 00:23:56.317264 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.318486 <INFO> :Begin to subscription topic=localhost:24110:local24110/snapshotStreamProcess/snapshotAggr1min
2021-12-01 00:23:56.318531 <INFO> :Enable reconnection for topic=localhost:24110:local24110/snapshotStreamProcess/snapshotAggr1min site=local24110:1
2021-12-01 00:23:56.318555 <INFO> :[subscribeTable] #attempt=0 topic=localhost:24110:local24110/snapshotStreamProcess/snapshotAggr1min conn=
2021-12-01 00:23:56.318574 <INFO> :Received a request to publish table [snapshotStreamProcess] to site localhost:24111.Offset=-1
2021-12-01 00:23:56.318844 <INFO> :Subscription topic=localhost:24110:local24110/snapshotStreamProcess/snapshotAggr1min hashValue=1
2021-12-01 00:23:56.318871 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.318883 <INFO> :subscribe2:snapshotStreamProcess subscribed successfully !
2021-12-01 00:23:56.318891 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.318942 <INFO> :Begin to subscription topic=localhost:24110:local24110/snapshotStream/snapshotToDatabase
2021-12-01 00:23:56.318968 <INFO> :Enable reconnection for topic=localhost:24110:local24110/snapshotStream/snapshotToDatabase site=local24110:1
2021-12-01 00:23:56.318996 <INFO> :[subscribeTable] #attempt=0 topic=localhost:24110:local24110/snapshotStream/snapshotToDatabase conn=
2021-12-01 00:23:56.319011 <INFO> :Received a request to publish table [snapshotStream] to site localhost:24111.Offset=-1
2021-12-01 00:23:56.319042 <INFO> :Subscription topic=localhost:24110:local24110/snapshotStream/snapshotToDatabase hashValue=2
2021-12-01 00:23:56.319058 <INFO> :---------------------------------------------------------------------
2021-12-01 00:23:56.319065 <INFO> :subscribe3:snapshotStream subscribed successfully !
2021-12-01 00:23:56.319071 <INFO> :---------------------------------------------------------------------
Copy the code

Subscribed successfully! If no error message is displayed and the last line of code is successfully executed, subscribe3: snapshotStream Subscribed successfully! Is displayed, indicating that the script is successfully started and the automatic subscription deployment of stream computing is successful.

Connect the corresponding data node through the GUI and execute the login code:

login("admin", "123456")
Copy the code

SnapshotStream, snapshotStreamProcess, and snapshotAggr1min have been successfully defined and loaded.

Execute the stream publish table subscription information query function:

getStreamingStat().pubTables
Copy the code

Return all subscription information:

Execute the reactive state engine information query function:

getStreamEngineStat().ReactiveStreamEngine
Copy the code

Return message:

Execute the time series engine information query function:

getStreamEngineStat().TimeSeriesEngine

Return message:

At this point, you just need to write the real-time Level2 stock snapshot data into the stream data table snapshotStream through the API, and the real-time processing of the stream data will follow the above business processing logic.

4. DolphinDB data playback tool

DolphinDB provides a convenient historical data playback tool. It uses the replay function to play back historical data.

The object played back in this tutorial is a memory table containing Level2 historical snapshot data for 5 stocks on a given day. The CSV text file (see attachment) is stored in: / data/snapshot/stockData. CSV, playback of the HTML code is as follows, in the time dimension to accelerate the playback 1000 times:

//replay
filePath = "/data/snapshot/stockData.csv"
schema = table(loadTable("dfs://snapshot", "snapshot").schema().colDefs.name as name, loadTable("dfs://snapshot", "snapshot").schema().colDefs.typeString as type)
snapshot = loadText(filename=filePath, schema=schema)
replay(inputTables=snapshot, outputTables=snapshotStream, dateColumn=`Date, timeColumn=`Time, replayRate=1000, absoluteRate=true)
Copy the code

About 26 s later, after the historical data of 5 stocks for a certain day is played back, execute the following code to check the minute processing indicator of a stock:

select * from snapshotAggr1min where SecurityID="000001.SZ"
Copy the code

The query result is returned to the client:

You can also execute the following code to query the number of persistent data written to the database:

select count(*) from loadTable("dfs://snapshot", "snapshot")
Copy the code

The query result is returned to the client:

The following figure shows that the number of data pieces in the snapshot memory table is 24,909, indicating that all data written to the snapshotStream data table is written to the database to complete data persistence and no data loss occurs.

5. To summarize

This tutorial is mainly based on a financial transaction real-time Level2 stock snapshot data flow calculation case, combined with the business logic processing code details the flow calculation automatic subscription deployment steps when the node starts, and the common debugging means in the development and deployment process. For example, use the writeLog function to print code debug logs, properly use the GO statement to segment and execute code blocks, observe node run logs to determine the execution of user startup scripts, and use the flow data status information query function to confirm subscription information and the definition of the flow computing engine. DolphinDB is designed to improve the efficiency and difficulty of DolphinDB development when using the built-in DolphinDB stream data framework to develop flow computing scenarios. DolphinDB can also be used to automate the deployment and operation of DolphinDB code in complex real-time flow computing scenarios.

The attachment

Business code

CSV text file