DolphinDB supports dynamic loading of external plug-ins to extend the system’s functionality. Plug-ins are written in C++ and need to be compiled into “.so” or “.dll” shared library files. This article focuses on introducing the methods and precautions of plug-in development, and introduces in detail the process of plug-in development in the following specific scenarios:

  • How to develop plug-in functions that support time series data processing
  • How do YOU develop aggregate functions for handling distributed SQL
  • How to develop plug-in functions that support new distributed algorithms
  • How to develop plug-in functions that support streaming data processing
  • How do I develop plug-in functions that support external data sources

1. How to develop plug-ins

1.1 Basic Concepts

DolphinDB’s plug-in implements functions that can be called in scripts. A plug-in function may be an Operator function or a System function. The difference is that the former takes two or fewer arguments, while the latter accepts any arguments and supports session access.

To develop an operator function, you need to write a C++ function whose prototype is ConstantSP (const ConstantSP& a, const ConstantSP& b). When the number of function parameters is 2, a and B are the first and second parameters of the plug-in function respectively. When the number of parameters is 1, b is a placeholder and has no practical use. When there are no arguments, both a and b are placeholders.

To develop a system function, you need to write a C++ function prototype ConstantSP (Heap* Heap, vector

& args). Arguments passed in when users call DolphinDB plug-in functions are stored in the C++ vector args in sequence. The heap parameter does not need to be passed in by the user.

ConstantSP in function prototypes can represent most DolphinDB objects (scalars, vectors, matrices, tables, and so on). Other commonly used variable types derived from it are VectorSP (vector), TableSP (table), and so on.

1.2 Creating variables

To create a scalar, you can directly use the new statement to create an object of type declared in the header file ScalarImp. H and assign it to a ConstantSP. ConstantSP is an encapsulated smart pointer that automatically frees memory when a variable’s reference count is zero, so there is no need for users to manually delete a variable that has been created:

ConstantSP i = new Int(1); ConstantSP d = new Date(2019, 3, 14); 2019.03.14 ConstantSP s = new String("DolphinDB"); DolphinDB = new Void(); // Create a void variable, often used to represent empty function argumentsCopy the code

The header file util.h declares a series of functions for quickly creating variables of a certain type and format:

VectorSP v = Util::createVector(DT_INT, 10); // create a vector of type v->setInt(0, 60) with initial length 10; VectorSP t = Util::createVector(DT_ANY, 0); // Create a vector of any type (tuple) t->append(new Int(3)) with initial length 0; // the equivalent of t.apend! (3) t->get(0)->setInt(4); // we can't use t->setInt(0, 4) because t is a tuple, ConstantSP seq = Util::createIndexVector(5, 10); setInt(0, 4) is valid only for vectors of type int. // Equivalent to 5.. 14 int seq0 = seq->getInt(0); Seq [0] ConstantSP mat = Util::createDoubleMatrix(5, 10); // create a double matrix mat->setColumn(3, seq); Mat [3] = seqCopy the code

1.3 Exception handling and Parameter verification

1.3.1 Exception Handling

Exception throwing and handling during plug-in development, as in general C++ development, are through the throw keyword to throw exceptions, try statement block to handle exceptions. DolphinDB declares exception types in the header file Exceptions. H.

Plug-in functions typically throw runtimeExceptions if they encounter runtime errors.

During plug-in development, it is common to validate function arguments and throw an IllegalArgumentException if the arguments do not meet the requirements. Commonly used parameter verification functions are:

  • ConstantSP->getType()DolphinDB: Returns the types of variables (int, char, date, etc.). The types of DolphinDB are defined in header filesTypes.hIn the.
  • ConstantSP->getCategory(): Return the class of the variable. The common classes are INTEGRAL (int, char, short, long, etc.), FLOATING (float, double, etc.), TEMPORAL (time, TEMPORAL), LITERAL (string types, string, symbol, etc.) are defined in header filesTypes.hIn the.
  • ConstantSP->getForm()DolphinDB: returns the format of variables (scalars, vectors, tables, etc.). The format of DolphinDB is defined in header filesTypes.hIn the.
  • ConstantSP->isVector(): Determines whether a variable is a vector.
  • ConstantSP->isScalar(): Determines whether a variable is a scalar.
  • ConstantSP->isTable(): Determines whether the variable is a table.
  • ConstantSP->isNumber(): Checks whether the variable is numeric.
  • ConstantSP->isNull(): Checks whether the variable is null.
  • ConstantSP->getInt(): Gets the integer value corresponding to the variable, often used to determine the boundary.
  • ConstantSP->getString(): Gets the string corresponding to the variable.
  • ConstantSP->size(): Gets the length of the variable.

More parameter verification functions are generally in the Constant class method of the coreConcept. h header file.

1.3.2 Example of parameter verification

This section develops a plug-in function to take the factorial of a non-negative integer and return a variable of type long.

DolphinDB had a maximum of 2^ 63-1 for long and up to 25 factorials. Therefore, only parameters in the range 0 to 25 are valid.

#include "CoreConcept.h" #include "Exceptions.h" #include "ScalarImp.h" ConstantSP factorial(const ConstantSP &n, const ConstantSP &placeholder) { string syntax = "Usage: factorial(n). "; if (! n->isScalar() || n->getCategory() ! = INTEGRAL) throw IllegalArgumentException("factorial", syntax + "n must be an integral scalar."); int nValue = n->getInt(); if (nValue < 0 || nValue > 25) throw IllegalArgumentException("factorial", syntax + "n must be a non-negative integer less than 26."); long long fact = 1; for (int i = nValue; i > 0; i--) fact *= i; return new Long(fact); }Copy the code

1.4 Call the DolphinDB built-in function

DolphinDB’s built-in functions are sometimes called to process data. Some classes already define some common built-in functions as methods:

VectorSP v = Util::createIndexVector(1, 100); ConstantSP avg = v->avg(); // equivalent to avg(v) ConstantSP sum2 = v->sum2(); Sum2 (v) v->sort(false); // sort(v, false)Copy the code

If you need to call other built-in functions, the plug-in function must be of type system function. Get a built-in function from the heap->currentSession()->getFunctionDef function and call it with the call method. If the built-in function is an operator function, the prototype call(Heap, const ConstantSP&, const ConstantSP&) should be called; If it is a system function, the prototype call(Heap, vector

&) should be called. Here is an example of calling the built-in function cumsum:

ConstantSP v = Util::createIndexVector(1, 100); v->setTemporary(false); // The value of v may be modified during built-in function calls. SetTemporary (false) FunctionDefSP cumsum = heap->currentSession()->getFunctionDef("cumsum"); ConstantSP result = cumsum->call(heap, v, new Void()); Cumsum (v), where new Void() is a placeholder and has no practical useCopy the code

2. How to develop plug-in functions supporting time series data processing

One of DolphinDB’s distinctive features is its good support for time series.

This chapter takes writing a plug-in of MSUM function as an example to introduce how to develop plug-in function to support time series data processing.

A time series processing function usually takes a vector as an argument and computes each element in the vector. In this case, the msum function takes two arguments: a vector and a window size. Its prototype is:

ConstantSP msum(const ConstantSP &X, const ConstantSP &window);
Copy the code

The return value of msum is a vector of the same length as the input vector. For simplicity, this example assumes that the return value is a vector of type double. The return value can be allocated space in advance using the Util::createVector function:

int size = X->size();
int windowSize = window->getInt();
ConstantSP result = Util::createVector(DT_DOUBLE, size);
Copy the code

Vector in DolphinDB plugin when processing, can be recycled getDoubleConst, getIntConst functions, such as bulk for a certain length of read-only data, stored in the corresponding types of buffer, from the buffer data to calculate. This is more efficient than looping over getDouble,getInt, etc. For simplicity in this example, getDoubleConst is used uniformly to get data of length Util::BUF_SIZE each time. This function returns a const double* pointing to the buffer header:

double buf[Util::BUF_SIZE];

INDEX start = 0;
while (start < size) {
    int len = std::min(Util::BUF_SIZE, size - start);
    const double *p = X->getDoubleConst(start, len, buf);
    for (int i = 0; i < len; i++) {
        double val = p[i];
        // ...
    }
    start += len;
}
Copy the code

In this case, msum computes the sum of all the data in the windowSize length window in X. A temporary variable tmpSum can be used to record the sum of the current window. Each time the window moves, you can calculate the sum of the data in the current window by incrementing tmpSum with the value at the end of the new window and subtracting the value at the head of the old window. To write a calculated value to result, we loop through the result->getDoubleBuffer to get a readable buffer, and then write the buffer back to the array using the result->setDouble function. The setDouble function checks whether the given buffer address is the same as the underlying address of the variable, and if it is, no data copy will occur. In most cases, the buffer obtained with getDoubleBuffer is the actual storage area of the variable, which can reduce data copying and improve performance.

Note that DolphinDB uses the minimum value of a double (defined as the macro DBL_NMIN) to represent the NULL value of a double.

Return value before windowSize – 1 element is NULL. The pre-Windowsize elements in X and the post-windowsize elements can be processed in two loops, with the former loop doing only summation and the latter loop doing addition and subtraction. The final implementation is as follows:

ConstantSP msum(const ConstantSP &X, const ConstantSP &window) { INDEX size = X->size(); int windowSize = window->getInt(); ConstantSP result = Util::createVector(DT_DOUBLE, size); double buf[Util::BUF_SIZE]; double windowHeadBuf[Util::BUF_SIZE]; double resultBuf[Util::BUF_SIZE]; Double tmpSum = 0.0; INDEX start = 0; while (start < windowSize) { int len = std::min(Util::BUF_SIZE, windowSize - start); const double *p = X->getDoubleConst(start, len, buf); double *r = result->getDoubleBuffer(start, len, resultBuf); for (int i = 0; i < len; i++) { if (p[i] ! = DBL_NMIN) // p[i] is not NULL tmpSum += p[i]; r[i] = DBL_NMIN; } result->setDouble(start, len, r); start += len; } result->setDouble(windowSize - 1, tmpSum); // tmpSum while (Util::BUF_SIZE, size-start) {int len = STD ::min(Util::BUF_SIZE, size-start); const double *p = X->getDoubleConst(start, len, buf); const double *q = X->getDoubleConst(start - windowSize, len, windowHeadBuf); double *r = result->getDoubleBuffer(start, len, resultBuf); for (int i = 0; i < len; i++) { if (p[i] ! = DBL_NMIN) tmpSum += p[i]; if (q[i] ! = DBL_NMIN) tmpSum -= q[i]; r[i] = tmpSum; } result->setDouble(start, len, r); start += len; } return result; }Copy the code

3. How to develop aggregate functions for handling distributed SQL

In DolphinDB, SQL aggregate functions usually take one or more vectors as arguments and eventually return a scalar. When developing plug-ins for aggregate functions, you need to know how to access elements in vectors.

DolphinDB vectors are stored in two ways. One is a regular array, where the data is stored continuously in memory. The other is a large array, where data is stored in chunks.

This chapter shows you how to develop aggregate functions, focusing on accessing elements in an array, using the example of writing a function that evaluates geometric averages.

3.1 Aggregate function example

The geometricMean function accepts a vector as a parameter. In order to prevent overflow, its logarithmic form is generally adopted, i.e

geometricMean([x1, x2, ..., xn])
    = exp((log(x1) + log(x2) + log(x3) + ... + log(xn))/n)
Copy the code

To implement the distributed version of this function, you can first develop the aggregation function plug-in logSum to calculate the logarithmic sum of data on a partition, then define a Reduce function using defG keyword and a Mapr keyword to define a MapReduce function.

In DolphinDB plug-in development, arrays are often handled in terms of whether they are regular or large. The isFastMode function can be used to determine:

ConstantSP logSum(const ConstantSP &x, const ConstantSP &placeholder) { if (((VectorSP) x)->isFastMode()) { // ... } else { // ... }}Copy the code

If an array is a regular array, it is stored consecutively in memory. You can use the getDataArray function to get a pointer to its data. Suppose the data is stored as a double:

if (((VectorSP) x)->isFastMode()) { int size = x->size(); double *data = (double *) x->getDataArray(); double logSum = 0; for (int i = 0; i < size; i++) { if (data[i] ! = DBL_NMIN) // is not NULL logSum += std::log(data[i]); } return new Double(logSum); }Copy the code

If the data is a large array, it is stored in chunks in memory. You can use getSegmentSize to get the size of each block and getDataSegment to get the address of the first block. It returns a secondary pointer to an array of Pointers, each element of which points to an array of data for each block:

/ /... else { int size = x->size(); int segmentSize = x->getSegmentSize(); double **segments = (double **) x->getDataSegment(); INDEX start = 0; int segmentId = 0; double logSum = 0; while (start < size) { double *block = segments[segmentId]; int blockSize = std::min(segmentSize, size - start); for (int i = 0; i < blockSize; i++) { if (block[i] ! = DBL_NMIN) // is not NULL logSum += std::log(block[i]); } start += blockSize; segmentId++; } return new Double(logSum); }Copy the code

In real development, the underlying storage of an array is not necessarily a double. Users need to consider specific types. This example uses generic programming to handle different types uniformly, see the attachment for the code.

3.2 Calling DolphinDB functions

It is often necessary to implement both non-distributed and distributed versions of an aggregate function, and the system chooses to call that version based on which version is more efficient.

DolphinDB defines non-distributed geometricMean functions:

def geometricMean(x) {
	return exp(logSum::logSum(x) \ count(x))
}
Copy the code

Then by defining Map and Reduce functions, mapR is finally used to define the distributed version:

def geometricMeanMap(x) {
	return logSum::logSum(x)
}

defg geometricMeanReduce(myLogSum, myCount) {
    return exp(sum(myLogSum) \ sum(myCount))
}

mapr geometricMean(x) { geometricMeanMap(x), count(x) -> geometricMeanReduce }
Copy the code

This implements the geometricMean function.

If you execute this function in a single-machine environment, you only need to load the plug-in on the node where it is executed. If there is data on the remote node, the plug-in needs to be loaded on each of the remote nodes. You can manually execute the loadPlugin function on each node, or you can quickly load the plug-in on each node with the following script:

each(rpc{, loadPlugin, pathToPlugin}, getDataNodes())
Copy the code

Create a partitioned table with the following script and verify the function:

db = database("", VALUE, 1 2 3 4) t = table(take(1.. 4, 100) as id, rand (1.0, 100) as val) t0 = db. CreatePartitionedTable (t, ` TB, ` id) t0. Append! (t) select geometricMean(val) from t0 group by idCopy the code

3.3 Random Access to large Arrays

Large arrays can be accessed at random, but with subscripting. GetSegmentSizeInBit getSegmentSizeInBit getSegmentSizeInBit getSegmentSizeInBit getSegmentSizeInBit getSegmentSizeInBit getSegmentSizeInBit getSegmentSizeInBit getSegmentSizeInBit getSegmentSizeInBit getSegmentSizeInBit

int segmentSizeInBit = x->getSegmentSizeInBit(); int segmentMask = (1 << segmentSizeInBit) - 1; double **segments = (double **) x->getDataSegment(); int index = 3000000; Double result = segments[index >> segmentSizeInBit][index & segmentMask]; double result = segments[index >> segmentSizeInBit][index & segmentMask]; // ^ Block offset ^ block offsetCopy the code

3.4 Which method should be selected to access the vector

Introduced in the previous chapter by getDoubleConst getIntConst gens method so as to obtain read-only buffer, and through getDoubleBuffer getIntBuffer gens method so as to obtain the read-write buffer, the two access vector method. This chapter introduces the underlying storage of direct access to vectors through getDataArray and getDataSegment methods. In real development, the former method is more general and should generally be chosen. But in some special cases (such as knowing explicitly that the data is stored in a large array and knowing the type of data), you can use the second approach.

4. How to develop plug-in functions that support new distributed algorithms

In DolphinDB, map-Reduce is a common computing framework for performing distributed algorithms. DolphinDB provides Mr And IMR functions that allow users to script distributed algorithms. These two functions are also used when writing plug-ins for distributed algorithms. This chapter describes how to write custom map and reduce functions in C++ and call Mr And imr functions to realize distributed computing.

4.1 Distributed algorithm example

This chapter uses Mr As an example to implement a function to average all the columns in a distributed table. It describes the overall process of writing DolphinDB distributed algorithm plug-ins and the technical details that need to be noted.

In plug-in development, user-defined map, reduce, final, and term functions can be operator functions or system functions.

In this example, the map function computes the column names in a partition of the table and returns a tuple of length 2 containing the sum of the data and the number of non-empty elements of the data. The concrete implementation is as follows:

ConstantSP columnAvgMap(Heap *heap, vector<ConstantSP> &args) {
    TableSP table = args[0];
    ConstantSP colNames = args[1];
    double sum = 0.0;
    int count = 0;
    
    for (int i = 0; i < colNames->size(); i++) {
        string colName = colNames->getString(i);
        VectorSP col = table->getColumn(colName);
        sum += col->sum()->getDouble();
        count += col->count();
    }

    ConstantSP result = Util::createVector(DT_ANY, 2);
    result->set(0, new Double(sum));
    result->set(1, new Int(count));
    return result;
}
Copy the code

In this example, the reduce function is the sum of map results. DolphinDB’s built-in add function provides this function, which can be obtained from heap->currentSession()->getFunctionDef(“add”) :

FunctionDefSP reduceFunc = heap->currentSession()->getFunctionDef("add");
Copy the code

The final function of this example is to divide the sum of data sum and the number of non-empty elements count in the reduce result to obtain the average of the corresponding columns in all partitions. The concrete implementation is as follows:

ConstantSP columnAvgFinal(const ConstantSP &result, const ConstantSP &placeholder) {
    double sum = result->get(0)->getDouble();
    int count = result->get(1)->getInt();
    
    return new Double(sum / count);
}
Copy the code

Once you have defined functions such as Map, reduce, and final, export them as plug-in functions (extern “C” before function declarations in header files and list them in the text file that loads the plug-in), The Mr Function can then be called with these functions as arguments by retrieving them from heap->currentSession->getFunctionDef. Such as:

FunctionDefSP mapFunc = Heap->currentSession()->getFunctionDef("columnAvg::columnAvgMap");
Copy the code

In this case, the map function takes two parameters table and colNames, but Mr Only allows the map function with one parameter, so need to call the map function, in the form of partial application can use the Util: : createPartialFunction to pack it as part of the application, implementation is as follows:

vector<ConstantSP> mapWithColNamesArgs {new Void(), colNames};
FunctionDefSP mapWithColNames = Util::createPartitalFunction(mapFunc, mapWithColNamesArgs);
Copy the code

Use heap->currentSession()->getFunctionDef(” Mr “) to get the built-in Mr Function. Call Mr -> Call, which is the equivalent of calling Mr In DolphinDB scripts. The columnAvg function is defined as follows:

ConstantSP columnAvg(Heap *heap, vector<ConstantSP> &args) {
    ConstantSP ds = args[0];
    ConstantSP colNames = args[1];

    FunctionDefSP mapFunc = heap->currentSession()->getFunctionDef("columnAvg::columnAvgMap");
    vector<ConstantSP> mapWithColNamesArgs = {new Void(), colNames};
    FunctionDefSP mapWithColNames = Util::createPartialFunction(mapFunc, mapWithColNamesArgs);    // columnAvgMap{, colNames}
    FunctionDefSP reduceFunc = heap->currentSession()->getFunctionDef("add");
    FunctionDefSP finalFunc = heap->currentSession()->getFunctionDef("columnAvg::columnAvgFinal");

    FunctionDefSP mr = heap->currentSession()->getFunctionDef("mr");    // mr(ds, columnAvgMap{, colNames}, add, columnAvgFinal)
    vector<ConstantSP> mrArgs = {ds, mapWithColNames, reduceFunc, finalFunc};
    return mr->call(heap, mrArgs);
}
Copy the code

4.2 Calling DolphinDB functions

If you execute this function in a single-machine environment, you only need to load the plug-in on the node where it is executed. But if you have data on the remote node, you need to load the plug-in on each of the remote nodes. You can manually execute the loadPlugin function on each node, or you can quickly load the plug-in on each node with the following script:

each(rpc{, loadPlugin, pathToPlugin}, getDataNodes())
Copy the code

After loading the plug-in, generate the data source with the sqlDS function and call the function:

n = 100 db = database("dfs://testColumnAvg", VALUE, 1.. 4) t = db.createPartitionedTable(table(10:0, `id`v1`v2, [INT,DOUBLE,DOUBLE]), `t, `id) t.append! (table(take(1.. 4, n) as id, rand(10.0, n) as v1, rand(100.0, n) as v2)) ds = sqlDS(<select * from t>)Copy the code

5. How to develop plug-in functions that support streaming data processing

In DolphinDB, the stream data subscriber can process incoming data through a handler function. The subscription data can be a table or a tuple, as determined by the msgAsTable parameter of the subsrciebeTable function. You can usually use handler functions to filter data, insert another table, and so on.

This chapter will write a handler function. The message type it accepts is a tuple. Two additional parameters are accepted: a scalar or vector indices of type int representing the subscripts of elements in a tuple and a table. It inserts the corresponding subscript column in the tuple into the table.

Bool append(Vector

& values, INDEX& insertedRows, String & errMsg) And writes the number of insertedRows to insertedRows. Otherwise return false and write an error message to errMsg. The implementation of the plug-in is as follows:

ConstantSP handler(Heap *heap, vector<ConstantSP> &args) {
    ConstantSP indices = args[0];
    TableSP table = args[1];
    ConstantSP msg = args[2];

    vector<ConstantSP> msgToAppend;
    for (int i = 0; i < indices->size(); i++) {
        int index = indices->get(i);
        msgToAppend.push_back(msg->get(index));
    }

    INDEX insertedRows;
    string errMsg;
    table->append(msgToAppend, insertedRows, errMsg);
    return new Void();
}
Copy the code

In a practical application, you might need to know the cause of an insert error. You can import the header file logger. h to write the error message to the log. Note that you need to add the macro definition -dlogging_level_2 when compiling the plug-in:

/ /... bool success = table->append(msgToAppend, insertedRows, errMsg); if (! success) LOG_ERR("Failed to append to table: ", errMsg);Copy the code

You can use the following script to simulate a stream data write and validate the handler function:

loadPlugin("/path/to/PluginHandler.txt") share streamTable(10:0, `id`sym`timestamp, [INT,SYMBOL,TIMESTAMP]) as t0 t1 = table(10:0, `sym`timestamp, [SYMBOL,TIMESTAMP]) subscribeTable(, `t0, , , Handler: : handler {[1, 2], the t1}) t0. Append! (table(1.. 100 as id, take(`a`b`c`d, 100) as symbol, now() + 1.. 100 as timestamp)) select * from t1Copy the code

6. How to develop plug-in functions that support external data sources

There are several concerns when designing extensible interface plug-ins for third-party data:

  1. Data source. A data source is a special data object that contains a meta-description of the data entity. Executing a data source yields the data entity, which may be a table, matrix, vector, and so on. Users can provide data source callsolsEx.randomForestClassifierAnd other distributed computing functions can also be called先生.imrorComputingModel.hThe lower-level computing model defined in. DolphinDB’s built-in functionsqlDSGet the data source through an SQL expression. When designing third-party data interfaces, it is often necessary to implement a function that gets the data source, divides a large file into parts, each representing a subset of the data, and finally returns a tuple of the data source. A data source, usually represented by a Code object, is a function call that takes metadata and returns a table.
  2. Schema. The structure of a table describes the number of columns in the table, the column names of each column, and the data types. Third-party interfaces typically need to implement a function that quickly retrieves the table structure of the data so that users can adjust column names and column data types based on that structure.
  3. IO. In a multi-core, multi-CPU environment, IO can be a bottleneck. DolphinDB provided an abstract IO interface,DataInputStreamandDataOutputStream, these interfaces encapsulate data compression, Endianness, IO types (network, disk, buffer, etc.) and other details for easy development. In addition, the IO implementation for multithreading is specially implemented,BlockFileInputStreamandBlockFileOutputStream. This implementation has two advantages:
  • Realize parallel computing and IO. When thread A is processing data, the background thread asynchronously helps thread A to prefetch the data needed later.
  • Avoid multithreaded disk contention. As the number of threads increases, the performance deteriorates dramatically if the threads concurrently read and write to the same disk. This implementation serializes reads and writes to the same disk to improve throughput.

This chapter introduces several functions that are typically implemented to provide a simple example for designing third-party data interfaces.

6.1 Data Format Description

Assume that the data in this example is stored in a flat file database, row by row in binary format, and the data is stored directly from the file header. Each line has four columns, which are ID (stored in the signed 64-bit long integer format, 8 bytes), SYMBOL (stored in the C string format, 8 bytes), Date (stored in the BCD code format, 8 bytes), and value (stored in the IEEE 754 double-precision floating point format, 8 bytes). Each line has 32 bytes. Here’s an example of a line:

The hexadecimal representation of this line is:

0x 00 00 00 00 00 00 00 05
0x 49 42 4D 00 00 00 00 00
0x 02 00 01 09 00 03 01 03
0x 40 24 33 33 33 33 33 33
Copy the code

6.2 extractMyDataSchema function

This function extracts the table structure of the data file. In this case, the table structure is deterministic without actually reading the file. This function provides an example of how to generate a table structure. It creates a structure table using the Util::createTable function: Util::createTable

ConstantSP extractMyDataSchema(const ConstantSP &placeholderA, const ConstantSP &placeholderB) {
    ConstantSP colNames = Util::createVector(DT_STRING, 4);
    ConstantSP colTypes = Util::createVector(DT_STRING, 4);
    string names[] = {"id", "symbol", "date", "value"};
    string types[] = {"LONG", "SYMBOL", "DATE", "DOUBLE"};
    colNames->setString(0, 4, names);
    colTypes->setString(0, 4, types);

    vector<ConstantSP> schema = {colNames, colTypes};
    vector<string> header = {"name", "type"};

    return Util::createTable(header, schema);
}
Copy the code

In real development, you might want to get the table structure by reading files and so on. How to read a file is explained later.

6.3 loadMyData function

The loadMyData function reads the file and prints a DolphinDB table. Given a file path, by Util: : create an input stream createBlockFileInputStream, since then, for the call flow readBytes function to read the bytes of a given length, readBool read under a Boolean value, ReadInt reads the next int, and so on. In this example, the syntax of loadMyData is loadMyData(path, [start], [length]). In addition to accepting the file path, we also accept two arguments of type int: start and length, which represent the number of lines to start reading and the total number of lines to read, respectively. CreateBlockFileInputStream function of the number of bytes can be read by parameters decided to start and need the total number of bytes read:

ConstantSP loadMyData(Heap *heap, vector<ConstantSP> &args) { ConstantSP path = args[0]; long long fileLength = Util::getFileLength(path->getString()); size_t bytesPerRow = 32; int start = args.size() >= 2 ? args[1]->getInt() : 0; int length = args.size() >= 3 ? args[2]->getInt() : fileLength / bytesPerRow - start; DataInputStreamSP inputStream = Util::createBlockFileInputStream(path->getString(), 0, fileLength, Util::BUF_SIZE, start * bytesPerRow, length * bytesPerRow); char buf[Util::BUF_SIZE]; size_t actualLength; while (true) { inputStream->readBytes(buf, Util::BUF_SIZE, actualLength); if (actualLength <= 0) break; / /... }}Copy the code

When data is read, it is usually cached in an array and then batch inserted until the buffer is full. For example, suppose you want to read a binary file whose contents are all char bytes and write it to a DolphinDB vector vec that is of type CHAR. Finally, return the table with only one column of VEC:

char buf[Util::BUF_SIZE];
VectorSP vec = Util::createVector(DT_CHAR, 0);
size_t actualLength;

while (true) {
    inputStream->readBytes(buf, Util::BUF_SIZE, actualLength);
    if (actualLength <= 0)
        break;
    vec->appendChar(buf, actualLength);
}

vector<ConstantSP> cols = {vec};
vector<string> colNames = {"col0"};

return Util::createTable(colNames, cols);
Copy the code

Please refer to the attached code for the complete code of this section. In real development, the function that loads the data might also accept the table structure parameter Schema and change the data type read as needed.

6.4 loadMyDataEx function

The loadMyData function always loads data into memory, which can easily become a bottleneck when data files are very large. So the loadMyDataEx function is designed to solve this problem. It saves static binary files as DolphinDB distributed tables in a smooth flow of data by importing and saving them at the same time, rather than importing them into memory and saving them as partialized tables.

The loadMyDataEx function can be used as a reference to the DolphinDB built-in loadTextEx function. The syntax is loadMyDataEx(dbHandle, tableName, partitionColumns, path, [start], [length]). If a table exists in the database, the imported data is added to the existing table RESULT. If the table does not exist, create a table result and add data. Finally return this table:

string dbPath = ((SystemHandleSP) db)->getDatabaseDir(); vector<ConstantSP> existsTableArgs = {new String(dbPath), tableName}; bool existsTable = heap->currentSession()->getFunctionDef("existsTable")->call(heap, existsTableArgs)->getBool(); // existsTable(dbPath, tableName) ConstantSP result; Vector <ConstantSP> loadTableArgs = {db, tableName}; result = heap->currentSession()->getFunctionDef("loadTable")->call(heap, loadTableArgs); // loadTable(db, tableName)} else {TableSP schema = extractMyDataSchema(new Void(), new Void())); ConstantSP dummyTable = DBFileIO::createEmptyTableFromSchema(schema); vector<ConstantSP> createTableArgs = {db, dummyTable, tableName, partitionColumns}; result = heap->currentSession()->getFunctionDef("createPartitionedTable")->call(heap, createTableArgs); CreatePartitionedTable (db, dummyTable, tableName, partitionColumns)}Copy the code

The code implementation that reads the data and adds it to the table uses the Pipeline framework. Its initial task is a series of loadMyData function calls with different start parameters. The pipeline follower function is a part of the application append! {result}, equivalent to the entire data read task into several parts, call loadMyData block read, the corresponding data through append! Insert into the table. The code for the core part is as follows:

int sizePerPartition = 16 * 1024 * 1024; int partitionNum = fileLength / sizePerPartition; vector<DistributedCallSP> tasks; FunctionDefSP func = Util::createSystemFunction("loadMyData", loadMyData, 1, 3, false); int partitionStart = start; int partitionLength = length / partitionNum; for (int i = 0; i < partitionNum; i++) { if (i == partitionNum - 1) partitionLength = length - partitionLength * i; vector<ConstantSP> partitionArgs = {path, new Int(partitionStart), new Int(partitionLength)}; ObjectSP call = Util::createRegularFunctionCall(func, partitionArgs); Push_back (new DistributedCall(call, true)); // loadMyData(Path, partitionStart, partitionLength) tasks.push_back(new DistributedCall(call, true)); partitionStart += partitionLength; } vector<ConstantSP> appendToResultArgs = {result}; FunctionDefSP appendToResult = Util::createPartialFunction(heap->currentSession()->getFunctionDef("append!" ), appendToResultArgs); // equivalent to append! {result} vector<FunctionDefSP> functors = {appendToResult}; PipelineStageExecutor executor(functors, false); executor.execute(heap, tasks);Copy the code

Please refer to the attached code for the complete code of this section. Using Pipeline framework to implement data block import is just a way of thinking. For specific development, you can use either the StaticStageExecutor declared in computingModel.h or the Thread model declared in concurrent.h, Thread. There are many implementation methods, which need to be selected according to the actual scenario.

6.5 myDataDS function

The myDataDS function returns a tuple of data sources. Each data source is a Code object, said function calls can be Util: : createRegularFunctionCall generated. Execute this object to retrieve the corresponding data. Here is an example of generating a data source based on the loadMyData function:

ConstantSP myDataDS(Heap *heap, vector<ConstantSP> &args) { ConstantSP path = args[0]; long long fileLength = Util::getFileLength(path->getString()); size_t bytesPerRow = 32; int start = args.size() >= 2 ? args[1]->getInt() : 0; int length = args.size() >= 3 ? args[2]->getInt() : fileLength / bytesPerRow - start; int sizePerPartition = 16 * 1024 * 1024; int partitionNum = fileLength / sizePerPartition; int partitionStart = start; int partitionLength = length / partitionNum; FunctionDefSP func = Util::createSystemFunction("loadMyData", loadMyData, 1, 3, false); ConstantSP dataSources = Util::createVector(DT_ANY, partitionNum); for (int i = 0; i < partitionNum; i++) { if (i == partitionNum - 1) partitionLength = length - partitionLength * i; vector<ConstantSP> partitionArgs = {path, new Int(partitionStart), new Int(partitionLength)}; ObjectSP code = Util::createRegularFunctionCall(func, partitionArgs); LoadMyData (path, partitionStart, partitionLength) dataSources->set(I, new DataSource(code)); } return dataSources; }Copy the code

The complete code for the tutorial can be found at github.com/dolphindb/T…