Greenplum is the most mature open source distributed analytical database (Greenplum 6 OLTP, released in August 2019, has significantly improved performance to become a true HTAP database, and review data will be released soon), Gartner’s latest 2019 review shows Greenplum ranked 3rd globally in classical data analytics and tied 4th globally in real-time data analytics. The only open source database product in the top 10 in both categories. This means that if you choose an open source based product, there is no other option in the top ten. Gartner report details.

So how did the Greenplum distributed database come about? Greenplum is known to be based on PostgreSQL. PostgreSQL is the most advanced single-node database with many kernel documents and papers. Relatively little information is available on how to transform single-node PostgreSQL into a distributed database. This paper introduces the main work involved in developing a single-node PostgreSQL database into a distributed MPP database from six aspects. Of course, these are just a brief overview, to achieve the enterprise product cost hundreds of millions of dollars, 100 people scale database cutting-edge talent team more than ten years of research and development investment crystallization. Although not required, some basic kernel knowledge of PostgreSQL is helpful to understand some of the details in this article. Bruce Momjian’s powerpoint is an excellent introduction

(momjian. Us/main/presen…) .

1. Overview of Greenplum clustering

PostgreSQL is the most advanced standalone open source database in the world. Greenplum is based on PostgreSQL and is the most advanced open source MPP database in the world. (Please visit the Greenplum Chinese community for more information.) From a user perspective, Greenplum is a full-fledged relational database management system (RDBMS). On a physical level, it contains multiple PostgreSQL instances that can be accessed individually. Greenplum is a distributed cluster of data storage, computing, communication, and management at different levels in order to provide users with a logical database by dividing and collaborating multiple independent PostgreSQL instances. Greenplum is a cluster, but to the user, it encapsulates all the details of distribution, providing the user with a single logical database. This encapsulation greatly liberates developers and operations personnel.

This paper mainly introduces six aspects: data distribution, query plan parallelization, execution parallelization, distributed transaction, data shuffling and management parallelization.

Greenplum adds a host of other features to PostgreSQL, Examples include Append-Optimized tables, column storage tables, external tables, multi-level partitioned tables, fine-grained resource managers, and ORCA Query optimizer, backup and recovery, high availability, fault detection and recovery, cluster data migration, capacity expansion, MADlib machine learning algorithm library, containerized execution UDF, PostGIS extension, GPText Suite, Monitoring management, integrated Kubernetes, etc.

The following figure shows a bird’s eye view of a Greenplum cluster with one master node, two segment nodes, and four segment instances deployed on each segment node to improve resource utilization. Each instance, whether master or segment, is a physically independent PostgreSQL database.

2. Distributed data storage

Distribution of data storage is the first problem to be solved in distributed database. The basic principle of distributed data storage is relatively simple and easy to implement. Many database middleware can also achieve basic distributed data storage. Greenplum goes beyond basic distributed data storage to more advanced and flexible features such as multi-level partitioning and polymorphic storage. Greenplum 6 further enhances this area by implementing consistent hashing and replicated tables, and allowing users to interfere with data distribution methods based on applications. As shown in the figure below, the user is presented with a logical database, each with system tables (for example, PGClass under PGCATALOG, PG_PROC, and so on) and user tables (in this case, the SALES and CUSTOMERS tables). At the physical level, it consists of many independent databases. Each database has its own system table and user table. The Master database contains only metadata and does not hold user data. There are still user tables on the master. These user tables are empty and have no data. The optimizer needs to use these empty tables for query optimization and plan generation. Segment Most system tables on the database (except for a few tables, such as statistics tables) contain the same content as the system tables on the master database. Each segment holds a portion of the user’s table.

In Greenplum, user data is distributed among different segment instances on different nodes according to some policy. Each instance has its own separate data directory that stores user data as a disk file. Standard INSERT SQL statements can be used to automatically distribute data to the appropriate nodes according to user-defined policies, whereas INSERT performance is low and only suitable for inserting small amounts of data. Greenplum provides a dedicated parallel data loading tool for efficient data import, which can be found in the official documentation for GPFdist and Gpload. Greenplum also supports parallel COPY, which is the fastest way to load data if it is already stored on each segment. The following figure shows the user’s table sales data distributed across different segment instances.

In addition to supporting horizontal distribution of data across different nodes, Greenplum also supports partitioning according to different standards on a single node and supports multiple levels of partitioning. The partitioning methods Greenplum supports are:

  • Range partitioning: Partitioning data according to the time range or value range of a column. For example, the following SQL will create a partition table, which is partitioned by day, from 2016-01-01 to 2017-01-01 into 366 partitions:
CREATE TABLE sales (id int, date date, amt decimal(10,2)) DISTRIBUTED BY (id) PARTITION BY RANGE (date) (START (date)'2016-01-01') INCLUSIVE  
END (date '2017-01-01') EXCLUSIVE  
EVERY (INTERVAL '1 day'));Copy the code
  • Table partitioning: Partitioning data into different partitions based on the list of data values in a column. Create a table with three partitions based on gender: one partition for women, one partition for men, and a separate partition for other values such as NULL.
CREATE TABLE rank (id int, rank int, year int, gender char(1), count int )  
DISTRIBUTED BY (id)  
PARTITION BY LIST (gender)  
( PARTITION girls VALUES ('F'),  
PARTITION boys VALUES ('M'),  
DEFAULT PARTITION oth er );  
Copy the code

The following figure shows that the user’s table SALES is first distributed to two nodes, and then each node is partitioned according to some criteria. The primary purpose of partitioning is to achieve partitioning tailoring to improve performance by reducing data visits. Partition clipping means that the optimizer automatically filters out unwanted partitions based on query conditions to reduce the amount of data scanned during query execution. PostgreSQL supports static conditional partition clipping, while Greenplum implements dynamic partition clipping through the ORCA optimizer. Dynamic partitioning clipping can improve performance by a factor of ten to hundreds.

Greenplum supports polymorphic storage, which is a single user table that can store different partitions in different storage modes depending on the access mode. Generally, data of different ages have different access modes, and different access modes have different optimization schemes. Polymorphic storage provides the best performance by selecting the best storage mode for different data in a transparent way. Greenplum provides the following storage options:

  • Heap Table: The Heap Table is the default storage mode for Greenplum and also the PostgreSQL storage mode. Support efficient update and delete operations, fast access to multiple columns, usually used for OLTP-type queries.

  • Append-optimized table: A table storage mode that is Optimized for Append and is typically used to store fact tables in a data warehouse. Not suitable for frequent update operations.

  • Append-optimized (Column Oriented) table: An AOCO table is a table that has a good compression ratio and supports different compression algorithms. It is suitable for query scenarios where columns are rarely accessed.

  • External table: Data for external tables is stored externally (data is not managed by Greenplum), and Greenplum only contains metadata information for external tables. Greenplum supports many external data sources such as S3, HDFS, files, Gemfire, various relational databases, and various data formats such as Text, CSV, Avro, Parquet, etc.

As shown in the figure below, assuming that the previously mentioned table SALES is partitioned by month, different storage strategies can be used to store data at different times. For example, data from the last three months can be stored in Heap, older data can be stored in columns, and data from one year ago can be stored in S3 or HDFS using external tables.

Data distribution is the foundation of any MPP database and one of the keys to the efficiency of an MPP database. By dispersing massive data to multiple nodes, on the one hand, the amount of data processed by a single node is greatly reduced, and on the other hand, it lays a foundation for parallelization of processing. The combination of the two can greatly improve the performance of the whole system. For example, in a cluster of 100 nodes, each node stores only one percent of the total data. With 100 nodes processing in parallel at the same time, the performance will be dozens of times that of a single node with stronger configuration. If the data distribution is not uniform and data skew occurs, the performance of the whole system will be the same as that of the slowest node restricted by the short-board effect. Therefore, whether the data distribution is reasonable has a great influence on the overall performance of Greenplum.

Greenplum 6 provides the following data distribution strategy,

  • Hash distribution

  • Random distribution

  • Replicated Table

Distribution of the Hash

Hash distribution is Greenlum’s most common way of distributing data. Calculates a hash of user data based on a predefined distribution key, and then maps the hash to a segment. Distribution keys can contain multiple fields. Distribution key selection is a major factor in Greenplum’s performance. Good distribution keys evenly distribute data across segments to avoid data skew.

The code for Greenplum to compute the hash value of the distributed key is in cdbhash.c. The structure CdbHash is the primary data structure for handling distributed key hashing. The logic for calculating the hash value of the distributed key is:

  • Create a CdbHash structure using makeCdbHash(int segnum)

  • Then perform the following operation on each tuple to calculate the corresponding hash value of the tuple and determine which segment the tuple should be distributed to:

    • Cdbhashinit () : Performs initialization

    • Cdbhash (), which calls hashDatum() to preprocess the columns for different types, and addToCdbHash() to add the values to the hash

    • Cdbhashreduce () maps hash values to a segment

CdbHash structure:

typedef struct CdbHash  
{  
   uint32    hash; /* hash result */ int numsegs; /* The number of segments */ CdbHashReduce reducealg; /* Algorithm used to reduce buckets */ uint32 RRindex; /* loop index */} CdbHash;Copy the code

Major function

  • MakeCdbHash (int numsegs): Creates a CdbHash structure that maintains the following information:

    • The number of the Segment

    • The Reduction method

  • If the number of segments is a power of two, use REDUCE_BITMASK, otherwise use REDUCE_LAZYMOD.

  • The hash value inside the structure will be initialized for each tuple, which happens in cdbHashInit ().

  • void cdbhashinit(CdbHash *h)

h->hash = FNV1_32_INIT; Resets the hash value to the initial offset base value

  • Void cdbHash (cdbHash *h, Datum Datum, Oid type): Adds an attribute to the cdbHash calculation, that is, an attribute to be considered when adding the hash. This function passes in a pointer to the function addToCdbHash.

  • void addToCdbHash(void *cdbHash, void *buf, size_t len); Implements the datumHashFunction

h->hash = fnv1_32_buf(buf, len, h->hash); // Perform a 32-bit FNV 1 hash in the buffer

EvalHashKey -> cdbHash -> hashDatum -> addToCdbHash unsigned int cdbHashReduce (cdbhash *h): evalHashKey -> cdbHash -> hashDatum -> addToCdbHash To map a hash value to a segment, the main logic is to modulo it as follows:

switch (h->reducealg)  
{  
 case REDUCE_BITMASK:  
    result = FASTMOD(h->hash, (uint32) h->numsegs);       /* fast mod (bitmask) */  
    break;  
  
 case REDUCE_LAZYMOD:  
    result = (h->hash) % (h->numsegs); /* simple mod */  
    break;  
}  
Copy the code

For each tuple the following flow is executed:

  • void cdbhashinit(CdbHash *h)

  • void cdbhash(CdbHash *h, Datum datum, Oid type)

  • void addToCdbHash(void *cdbHash, void *buf, size_t len)

  • unsigned int cdbhashreduce(CdbHash *h)

Random distribution

Random distribution can be used if the hash distribution key of a table cannot be determined or if there is no reasonable distribution key to avoid data skew. Random distribution stores data inserted once on different nodes in a circular fashion. Randomness only works in a single SQL, not across SQL. For example, if each row of data is inserted into a random distribution table, the final data will be stored on the first node.

test=# create table t1 (id int) DISTRIBUTED RANDOMLY;  
CREATE TABLE  
test=# INSERT INTO t1 VALUES (1);  
INSERT 0 1  
test=# INSERT INTO t1 VALUES (2);  
INSERT 0 1  
test=# INSERT INTO t1 VALUES (3);  
INSERT 0 1  
test=# SELECT gp_segment_id, * from t1;  
gp_segment_id | id  
---------------+----  
            1 |  1  
            1 |  2  
            1 |  3  
Copy the code

Some tools use random distribution for data management, such as the capacity expansion tool GPExpand that redistributes data after adding nodes. During initialization, gpExpand marks all tables as randomly distributed and then redistributes the tables so that the redistribute operation does not affect the normal operation of services. Greenplum 6 has redesigned gpExpand so that it no longer needs to change the distribution strategy to random distribution.

Replicated Table

Greenplum 6 supports a new distribution strategy: duplicate tables, where the entire table has a complete copy on each node.

test=# CREATE TABLE t2 (id int) DISTRIBUTED REPLICATED;  
CREATE TABLE  
test=# INSERT INTO t2 VALUES (1), (2), (3);  
INSERT 0 3  
test=# SELECT * FROM t2;  
id  
----  
 1  
 2  
 3  
(3 rows)  
  
test=# SELECT gp_segment_id, * from t2;  
gp_segment_id | id  
---------------+----  
            0 |  1  
            0 |  2  
            0 |  3  
Copy the code

Copying tables solves two problems:

  • Udfs cannot access any tables on segment. Due to the nature of MPP, any segment contains only part of the data, so udFs executing on the segment cannot access any tables, otherwise the data will be computed incorrectly.
yydzero=# CREATE FUNCTION c() RETURNS bigint AS ?  
yydzero$#  SELECT count(*) from t1 AS result;  
yydzero$# ? LANGUAGE SQL;  
CREATE FUNCTION  
yydzero=# SELECT c();  
c  
---  
6  
(1 row)  
yydzero=# select c() from t2;  
ERROR:  function cannot execute on a QE slice because it accesses relation "public.t1"(seg0 slice1 192.168.1.107: pid = 76589, 25435)Copy the code

This problem does not exist if t1 above is changed to the replicated table.

Duplicate tables can be used in many scenarios, such as Spatial_ref_sys in PostGIS (PostGIS has a large number of UDFs that need to access this table) and PLr_modules in PLR. Before supporting this feature, Greenplum could only support tables such as Spatial_ref_sys with a few tricks.

  • Avoid distributed query plans: If the data of a table is copied across segments, it is possible to generate local join plans without moving data between different nodes in the cluster. If replicated tables are used to store tables with relatively small amounts of data, such as thousands of rows, performance can be significantly improved. The replicated table mode is not suitable for tables with large data volumes.

3. Parallelize query plans

PostgreSQL generates query plans that can only be executed on a single node, and Greenplum needs to parallelize query plans to take full advantage of clustering.

Greenplum introduces the Motion operator (operator) to parallelize the query plan. Motion operator realizes data transfer between different nodes, it hides the difference between MPP architecture and stand-alone from other operators, so that most other operators do not care whether they are executed on a cluster or a stand-alone. Every Motion operator has a sender and a receiver. Greenplum also implements distributed optimizations for certain operators, such as aggregation. (this section need to understand the basic knowledge of PostgreSQL optimizer, please see the SRC/backend/optimizer/README)

Optimize the instance

Before getting into the technical details, let’s look at a few examples.

The following example creates two tables T1 and T2, each with two columns C1 and c2, each with c1 as the distribution key.

CREATE table t1 AS SELECT g c1, g + 1 as c2 FROM generate_series(1, 10) g DISTRIBUTED BY (c1);  
CREATE table t2 AS SELECT g c1, g + 1 as c2 FROM generate_series(5, 15) g DISTRIBUTED BY (c1);  
  
SQL1:  
  
SELECT * from t1, t2 where t1.c1 = t2.c1;  
c1 | c2 | c1 | c2  
----+----+----+----  
 5 |  6 |  5 |  6  
 6 |  7 |  6 |  7  
 7 |  8 |  7 |  8  
 8 |  9 |  8 |  9  
 9 | 10 |  9 | 10  
10 | 11 | 10 | 11  
(6 rows)  
Copy the code

The query plan of SQL1 is as follows. Since the association key is the distributed key of two tables, the association can be performed locally. There is no need to move data in the subtree of HashJoin operator, and the GatherMotion can be summarized on the master.

QUERY PLAN ----------------------------------------------------------------- Gather Motion 3:1 (slice1; Segments: (3) cost = 3.23.. 6.48 rows=10 width=16) -> Hash Join (cost=3.23.. 6.48 ROWS =4 width=16) Hash Cond: t2.c1 = t1.c1 -> Seq Scan on T2 (cost=0.00.. 3.11 rows=4 width=8) -> Hash (cost=3.10.. 3.10 rows=4 width=8) -> Seq Scan on T1 (cost=0.00.. 3.10 rows=4 width=8) Optimizer: Legacy Query OptimizerCopy the code

SQL2:

SELECT * from t1, t2 where t1.c1 = t2.c2;  
c1 | c2 | c1 | c2  
----+----+----+----  
 9 | 10 |  8 |  9  
10 | 11 |  9 | 10  
 8 |  9 |  7 |  8  
 6 |  7 |  5 |  6  
 7 |  8 |  6 |  7  
(5 rows)  
Copy the code

Table T1’s associated key C1 is also its distribution key, and table T2’s associated key C2 is not its distribution key, so the data needs to be redistributed according to T2.c2, so that all rows t1.c1 = t2.c2 are associated on the same segment.

QUERY PLAN ----------------------------------------------------------------- Gather Motion 3:1 (slice2; Segments: (3) cost = 3.23.. 6.70 rows=10 width=16) -> Hash Join (cost=3.23.. 6.70 rows=4 width=16) Hash Cond: t2.c2 = t1.c1 -> Redistribute Motion 3:3 (slice1; Segments: (3) cost = 0.00.. 3.33 ROWS =4 width=8) Hash Key: t2.c2 -> Seq Scan on T2 (cost=0.00.. 3.11 rows=4 width=8) -> Hash (cost=3.10.. 3.10 rows=4 width=8) -> Seq Scan on T1 (cost=0.00.. 3.10 rows=4 width=8) Optimizer: Legacy Query OptimizerCopy the code

SQL3:

SELECT * from t1, t2 where t1.c2 = t2.c2;  
c1 | c2 | c1 | c2  
----+----+----+----  
 8 |  9 |  8 |  9  
 9 | 10 |  9 | 10  
10 | 11 | 10 | 11  
 5 |  6 |  5 |  6  
 6 |  7 |  6 |  7  
 7 |  8 |  7 |  8  
(6 rows)
Copy the code

The query plan of SQL3 is as follows, t1’s associated key c2 is not a distribution key, and T2’s associated key C2 is not a distribution key, so the broadcast Motion is used to make the data of one table broadcast to all nodes, to ensure the correctness of the association. The plan generated by the latest master code for this query will choose to redistribute the two tables, so why this is done can be a question :).

QUERY PLAN ----------------------------------------------------------------- Gather Motion 3:1 (slice2; Segments: (3) cost = 3.25.. 6.96 ROWS =10 width=16) -> Hash Join (cost=3.25.. 6.96 Rows =4 width=16) Hash Cond: T1.c2 = t2.c2 -> Broadcast Motion 3:3 (slice1; Segments: (3) cost = 0.00.. 3.50 rows=10 width=8) -> Seq Scan on T1 (cost=0.00.. 3.10 rows=4 width=8) -> Hash (cost=3.11.. 3.11 rows=4 width=8) -> Seq Scan on T2 (cost=0.00.. 3.11 rows=4 width=8) Optimizer: Legacy Query OptimizerCopy the code

SQL4:

SELECT * from t1 LEFT JOIN t2 on t1.c2 = t2.c2 ;  
c1 | c2 | c1 | c2  
----+----+----+----  
 1 |  2 |    |  
 2 |  3 |    |  
 3 |  4 |    |  
 4 |  5 |    |  
 5 |  6 |  5 |  6  
 6 |  7 |  6 |  7  
 7 |  8 |  7 |  8  
 8 |  9 |  8 |  9  
 9 | 10 |  9 | 10  
10 | 11 | 10 | 11  
(10 rows)  
Copy the code

SQL4 query plan is as follows, although the association key is the same as SQL3, but because of the left JOIN, the broadcast T1 method cannot be used, otherwise the data will be duplicated, so the query plan is redistributed for both tables. Depending on the path cost, the broadcast T2 approach may also be chosen for the SQL4 optimizer. (If the amount of data is the same, the cost of single-table broadcast is higher than that of double-table redistribution. For double-table redistribution, each tuple of a table is transmitted once, which means that each tuple of a table is transmitted twice. In broadcast, each tuple of a table is transmitted nSegments times.)

QUERY PLAN ----------------------------------------------------------------- Gather Motion 3:1 (slice3; Segments: (3) cost = 3.47.. 6.91 ROWS =10 width=16) -> Hash Left Join (cost=3.47.. 6.91 rows=4 width=16) Hash Cond: T1.c2 = t2.c2 -> Redistribute Motion 3:3 (slice1; Segments: (3) cost = 0.00.. 3.30 ROWS =4 width=8) Hash Key: T1.c2 -> Seq Scan on T1 (cost=0.00.. 3.10 rows=4 width=8) -> Hash (cost=3.33.. 3.33 rows=4 width=8) -> Redistribute Motion 3:3 (slice2; Segments: (3) cost = 0.00.. 3.33... Hash Key: T2.c2 -> Seq Scan on T2 (cost=0.00.. 3.11 rows=4 width=8) Optimizer: Legacy Query OptimizerCopy the code

SQL5:

SELECT  c2, count(1) from t1 group by c2;  
c2 | count  
----+-------  
 5 |     1  
 6 |     1  
 7 |     1  
 4 |     1  
 3 |     1  
10 |     1  
11 |     1  
 8 |     1  
 9 |     1  
 2 |     1  
(10 rows)  
Copy the code

The four SQL statements above show how different types of Joins affect the data movement type (the Motion type). SQL5 demonstrates Greenplum’s optimization for aggregation: two-stage aggregation. The first stage of aggregation is performed on the local data on each Segment, followed by the second stage of aggregation by redistributing to each Segment. The final summary is done by Master using Gather Motion. Greenplum also uses three-stage aggregation for SQL such as DISTINCT GROUP BY.

QUERY PLAN ----------------------------------------------------------------- Gather Motion 3:1 (slice2; Segments: (3) cost = 3.55.. 3.70 rows=10 width=12) -> HashAggregate (cost=3.55.. 3.70 rows=4 width=12) Group Key: T1.c2 -> Redistribute Motion 3:3 (slice1; Segments: (3) cost = 3.17.. 3.38 rows=4 width=12) Hash Key: t1.c2 -> HashAggregate (cost=3.17.. 3.17 rows=4 width=12) Group Key: t1.c2 -> Seq Scan on T1 (cost=0.00.. 3.10 rows=4 width=4) Optimizer: Legacy Query Optimizer (9 rows)Copy the code

Greenplum introduces new data structures and concepts for query optimization

The previous few intuitive examples show the different distributed query plans that Greenplum generates for different SQL. The main internal mechanisms are described below.

Greenplum introduces some new enhancements to PostgreSQL’s Node, Path, and Plan structures in order to turn standalone query plans into parallel plans:

  • A new Node type, Flow, is added

  • New Path type: CdbMotionPath

  • Add a new query Plan operator: Motion (the first field of Motion is Plan, and the first field of Plan structure is NodeTag Type). The first node of a Flow is also a NodeTag type, which is the same as RangeVar, IntoClause, Expr, and RangeTableRef.

  • The CdbPathLocus locus field was added to the Path structure to indicate the redistribution strategy of the result tuple under this Path

  • Add the Flow field to the Plan structure to represent the tuple Flow of the operator;

New Node type: Flow

The new node type Flow describes the Flow of tuples in a parallel plan. Each query Plan node (Plan structure) has a Flow field that represents the direction of the output tuple of the current node. Flow is a new node type, but not a query plan node. In addition, the Flow structure includes member fields for plan parallelization.

Flow has three main fields:

  • FlowType: indicates the Flow type

  • UNDEFINED: UNDEFINED Flow

  • SINGLETON: Indicates GatherMotion

  • REPLICATED: represents broadcast Motion

  • PARTITIONED: Indicates redistribution Motion.

  • Movement, which determines what motion should be used for the output of the current schedule node. It is mainly used to process the sub-query plan to adapt to the distributed environment.

  • None: Motion is not required

  • FOCUS: FOCUS on a single segment, equivalent to GatherMotion

  • BROADCAST: BROADCAST motion

  • REPARTITION: hash redistribution

  • EXPLICIT: EXPLICIT moves tuples to segments marked with the segid field

  • CdbLocusType: Type of Locus. The optimizer uses this information to select the most appropriate node for the most appropriate data flow processing and determine the appropriate Motion.

  • CdbLocusType_Null: No Locus

  • CdbLocusType_Entry: INDICATES a single backend process on entry DB (master). It can be a Query Dispatcher (QD) or a Query Executor (QE) on entryDB.

  • CdbLocusType_SingleQE: A single backend process on any node, which can be QD or any QE process

  • CdbLocusType_General: Compatible with any locus

  • CdbLocusType_Replicated: Replicated in all QEs

  • CdbLocusType_Hashed: Hashed across all QEs

  • CdbLocusType_Strewn: Data strewn storage, but distribution keys are unknown

New Path type: CdbMotionPath

Path represents one possible computational Path (such as a sequential scan or hash association), and more complex paths inherit the Path structure and record more information for optimization. Greenplum adds the CdbPathLocus locus field to the Path structure to indicate the redistribution and execution strategy of the result tuple under the current Path.

The distribution key of the table in Greenplum determines how tuples are distributed when stored, affecting how tuples are stored on disks in that segment. CdbPathLocus determines the redistribution of a tuple between different processes (QE of different segments) at execution time, that is, a tuple should be processed by that process. Tuples can come from tables or functions.

Greenplum also introduces a new path, CdbMotionPath, to indicate how the result of a subpath is transmitted from the sender process to the receiver process.

New Plan operator: Motion

As mentioned above, a Motion is a query plan tree node that shuffles data so that its parent can get the data it needs from its children. There are three types of Motion:

  • MOTIONTYPE_HASH: Redistributes data based on the redistribute key using a hash algorithm, sending each tuple that passes through the operator to the target segment, which is determined by the hash value of the redistribute key.

  • MOTIONTYPE_FIXED: send a tuple to a set of segments. Either broadcast Motion (sent to all segments) or Gather Motion (sent to a fixed segment)

  • MOTIONTYPE_EXPLICIT: Sends a tuple to the segments specified in its segid field, corresponding to the explicit redistribution Motion. The difference with MOTIONTYPE_HASH is that no hash is computed.

As mentioned earlier, Greenplum introduces the Flow * Flow field to the Plan structure to indicate the direction of the result tuple. In addition, the Plan structure also introduces several other optimization and execution related fields, such as DispatchMethod dispatch field indicating whether MPP dispatch is required, directDispatch field indicating whether directDispatch can be performed (dispatching directly to a segment, SliceTable, which facilitates MPP execution of distributed plans, motionNode, which records the parent Motion node of the current plan node, etc.

Generate a distributed query plan

The following figure shows the optimization flow for the traditional optimizer (ORCA optimizer is different) in Greenplum. This section highlights the differences from PostgreSQL stand-alone optimizer.

Standard_planner is PostgreSQL’s default optimizer, which mainly calls subquery_Planner and set_plan_References. In Greenplum, set_plan_REFERENCES is followed by a call to cDBParallelize to do the final parallelization of the query tree.

Subquery_planner optimizes a subquery, as the name suggests, to generate a query plan tree, which has two main execution stages:

  • Optimization of the basic query feature (also known as SPJ: Select/Projection/Join), implemented by Query_Planner ()

  • Optimizations for advanced query features (non-SPJ), such as clustering, are implemented by grouping_planner(), which calls query_Planner () for basic optimizations and then for advanced features.

Greenplum’s distributed approach to stand-alone scheduling takes place in two main places:

  • Single subquery: The subquery plan tree returned by Greenplum’s subquery_Planner () has done some distributed processing, such as adding a Motion operator for HashJoin, two-stage aggregation, etc.

  • Multiple subqueries: Greenplum needs to set the appropriate data flow between multiple subqueries so that the results of a subquery can be used by the upper query tree. This operation is implemented by the function cdbParallelize.

Parallelization of a single subquery

The Greenplum process for optimizing a single subquery is similar to PostgreSQL, with the main differences being:

  • Association: According to the data distribution of the left and right sub-tables of the association operator, determine whether to add the Motion node and what type of Motion, etc.

  • Optimization of advanced operations such as aggregation, such as the two-stage aggregation mentioned earlier.

The following is a brief introduction to the main process:

First use build_simple_rel() to build the simple table information. Build_simple_rel Gets basic information about the table, such as how many tuples are in the table, how many pages are occupied, etc. One important piece of information is data distribution information: GpPolicy describes the data distribution type and distribution key for the base table.

Then set_base_rel_pathlists() is used to set the access path for the base table. Set_base_rel_pathlists Call different functions according to different table types:

  • RTE_FUNCTION: create_functionscan_path()

  • RTE_RELATION: create_external_path()/create_aocs_path()/create_seqscan_path()/create_index_paths()

  • RTE_VALUES: create_valuesscan_path

These functions determine the locus type of the path node, representing a characteristic associated with data distribution processing. This information is important for subquery parallelization and is later used to determine the FLOW type of a plan when converting path to plan, which determines what type of Gang the executor uses to execute it.

How to determine locus?

For the ordinary Heap table, the sequence scan path create_seqscan_path() determines the locus information of the path using the following method:

  • If the table is hash distributed, the Locus type is CdbLocusType_Hashed

  • If the distribution is random, locus type is CdbLocusType_Strewn

  • If it is a system table, the locus type is CdbLocusType_Entry

For functions, create_function_path() has the locus of the path as follows:

  • If the function is immutable, use: CdbLocusType_General

  • If the function is mutable, use: CdbLocusType_Entry

  • If the function needs to be executed on the master, use: CdbLocusType_Entry

  • CdbLocusType_Strewn is used if the function needs to be executed over all segments

If the SQL statement contains associations, use make_rel_from_JOINlist () to generate an access path for the association tree. The corresponding functions are create_nestloop_path/ create_mergeJOIN_path/create_hashJOIN_path. The most important part of this process is determining whether and what type of Motion nodes you need to add. For example, the SQL1 association key is the distribution key of two tables T1 / T2, so there is no need to add Motion. SQL2 needs to redistribute T2 so that for any tuple of T1, all tuples of T2 that meet the association condition (T1.c1 = T2.c2) are in the same segment.

If the SQL contains advanced features such as clustering, window functions, etc., then cdb_grouping_planner() is called for optimization, such as converting the clustering to a two-stage or three-stage aggregation.

The last step is to select the cheapest path from all possible paths and call create_plan() to convert the optimal path tree into the optimal query tree.

At this stage, Locus of the Path Path influences the Flow type of the generated Plan Plan. Flow is related to Gang in the executor section. Flow allows the executor to care not how the data is distributed or what the distribution key is, but whether the data is distributed in multiple segments or in a single segment. The correspondence between Locus and Flow:

  • FLOW_SINGLETON: Locus_Entry/Locus_SingleQE/Locus_General

  • FLOW_PARTITIONED: Locus_Hash/Locus_Strewn/Locus_Replicated

Parallelization of multiple subqueries

The main purpose of cDBParallelize () is to resolve the data flow between multiple subqueries and generate the final parallel query plan. It has two main steps: Prescan and apply_motion

  • Prescan serves two purposes. One purpose is to mark certain types of schedule nodes (such as Flow) for later apply_motion processing. The second purpose is to mark or transform subplans. SubPlan is not actually a query plan node, but an expression node that contains a plan node and its Range Table. Subplans correspond to SubLink (SQL subquery expression) in the query tree and may appear in an expression. Prescan does the following for the plan tree contained in the SubPlan:

  • If Subplan is an Initplan, a motion node is added by calling apply_motion at the root of the query tree.

  • If a Subplan is an unrelated multi-row subquery, a Gather or broadcast operation is performed on the subquery based on the Flow information contained in the plan node. A new Materialized node is added on the query tree to prevent rescanning of Subplan. Efficiency is improved because it avoids re-executing the subquery every time.

  • If the Subplan is a related subquery, convert it to an executable form. Recursively scan until a leaf scan node is encountered, and then replace the scan node with the following form. After this transformation, the query tree can be executed in parallel because the related subqueries are now part of the result node and in the same Slice as the outer query nodes.

Result  
          \  
           \_Material  
             \  
              \_Broadcast (or Gather)  
                \  
                 \_SeqScan  
Copy the code
  • Apply_motion: Adds motion nodes to the top-level query tree based on the planned Flow nodes. Add Motion nodes depending on the SubPlan type (such as InitPlan, unrelated multi-row subquery, related subquery).

For example, SELECT * FROM TBL WHERE ID = 1, prescan() adds a GatherMotion to the root of the query tree, and apply_motion() adds a GatherMotion to the root.

This article focuses on the Greenplum cluster overview, distributed data storage, and distributed query optimization. The next article will continue with distributed query execution, distributed transactions, data shuffling, and cluster management.