This article was first published on 2016-11-21 09:43:07

architecture

GreenPlum uses the architecture of Share Nothing and plays the role of cheap PC well. Since then I/O is no longer the bottleneck of DW(Data Warehouse), on the contrary, the network pressure will be much greater. But GreenPlum’s query optimization strategy avoids as little network switching as possible. GreenPlum is certainly a breath of fresh air to those who are new to it.

Query optimizer

The master node of GreenPlum is responsible for SQL parsing and the generation of execution plans. Specifically, the query optimizer parses the SQL into physical execution plans for each node (segments) to execute.

GreenPlum takes a cost-based optimization strategy: if there are multiple execution paths, it evaluates the cost of execution to find the least expensive and most efficient one.

Unlike traditional query optimizers, GreenPlum’s query optimizer must consider the entire cluster globally, taking into account the cost of moving data between nodes in each candidate execution plan. For example, if there is a join, then the join is performed separately on each node (each node only joins with its own data), so its query is fast.

Query plans include traditional operations such as scan, Join, sort, aggregate, and so on.

There are three movements of data in GreenPlum:

  • Broadcast Motion (N:N): Broadcast data. Each node broadcasts the data it needs to send to the other nodes.
  • Redistribute Motion (N:N): Redistributes data. Use different hash values of join column data to redistribute filtered data in other segments.
  • Gather Motion (N:1): Aggregates summary data. Each node sends the joined data to a single node, usually the master node.

The sample

Example 1

explain select d.*,j.customer_id from data d join  jd1 j on d.partner_id=j.partner_id where j.gmt_modified> current_date - 80.;   
                                       QUERY PLAN                                          
----------------------------------------------------------------------------------------   
 Gather Motion 88:1  (slice2)  (cost=3.01.939.49. rows=2717 width=59)   
   ->  Hash Join  (cost=3.01.939.49. rows=2717 width=59)   
         Hash Cond: d.partner_id::text = j.partner_id::text   
         ->  Seq Scan on data d  (cost=0.00.260.74. rows=20374 width=50)   
         ->  Hash  (cost=1.91.1.91. rows=88 width=26)   
               ->  Broadcast Motion 88:88  (slice1)  (cost=0.00.1.91. rows=88 width=26)   
                     ->  Seq Scan on jd1 j  (cost=0.00.1.. 02 rows=1 width=26)   
                           Filter: gmt_modified > ('now'::text::date - 80)  
Copy the code

Execution plans require bottom-up analysis:

  1. Scan its own at each nodejd1Table data, according to the conditional filtering generated data (denoted asrs).
  2. Each node will generate its ownrsSend to the other nodes in turn. (Broadcast Motion (N:N))
  3. On each nodedataTable data, and received on their respective nodesrsJoin to ensure that native data is joined only to native data.
  4. Each node sends the join result to the master(Gather Motion (N:1)).

As you can see from the above execution, GreenPlum sends a copy of RS to each node that contains data from the data table.

Q: what ifrsBig or no filter at all, what’s the problem? How to deal with it?

For example, the number of rows in the jd1 and data tables is as follows:

=> select count(*) from jd1;   
 count    
-------   
    20   
(1 row)  
Copy the code
=> select count(*) from data;   
 count     
--------   
 113367  
Copy the code

If RS is large, the network becomes a bottleneck when broadcasting data. GreenPlum’s optimizer is clever in that it broadcasts small tables across segments, greatly reducing network overhead. This example shows how important statistics are to generating good query plans.

Example 2

Here’s a more complicated example:

select
    c_custkey, c_name,
    sum(l_extendedprice * (1 - 1_discount)) as revenue,
    c_acctbal, n_name, c_address, c_phone, c_comment
from
    customer, orders, lineitem, nation
where
    c_custkey = o_custkey
and 1_orderkey = o_orderkey
and o_orderdate > = date '1994-08-01'
and o_orderdate < date '1994-08-0l'
                  + interval '3 month'
and l_returnflag = 'R' 
and c_nationkey = n_nationkey
group by
    c_custkey, c_name, c_acctbal,
    c_phone, n_name, c_address, c_comment
order by
    revenue desc
Copy the code

The implementation plan is as follows:

  1. The nation table data of each node is scanned at the same time, and the nation data of each segment is broadcast to other nodes (Broadcast Motion (N:N)).
  2. Customer data of each node is scanned at the same time, and join of received Nation data is generatedRS-CN
  3. Each segment simultaneously scans its own orders table data to filter data generationRS-O
  4. Each segment simultaneously scans its own LINEItem table data and filters itRS-L
  5. Each segment will be independent simultaneouslyRS-ORS-LJoin to generateRS-OL. Note that this procedure is not requiredRedistribute Motion (N:N)Redistribute the data, because both the Orders and lineItem distribute columns are OrderKeys, which ensures that the object to be joined on each is on its own machine, so the N nodes begin parallel joins.
  6. Each node will generate itself in Step 5RS-OLRedistribute data across all nodes according to cust-key (Redistribute Motion (N:N), you can redistribute data between nodes based on hash and range (hash by default), so that each node has its ownRS-OL
  7. Each node will generate itself in Step 2RS-CNAnd on its own nodeRS-OLData joins, and native only joins with native data.
  8. Aggregate, sort, send to master.

conclusion

How does Greenplum handle and optimize joins for large and small tables?

Greenplum chooses to broadcast data from a small table rather than a large table.

For example:

Table A has 1 billion entries (empno< PK >,deptno,ename), table B has 500 entries (deptno< PK >, dNAME, LOC)

Table A join on deptno with table B

The cluster has 11 nodes: 1 master and 10 segments

With the normal hash distribution of primary key columns, only 1/10 of table A and 1/10 of table B will be on each segment node.

GreenPlum then asks all nodes to send each other 1/10 of the data they own from small table B, ensuring that each of the 10 nodes has a full copy of table B’s data. At this point, 1/10 of A on each node only needs to join B on its own node. So that’s where GreenPlum’s parallel processing power comes in.

Eventually, all nodes send the join results to the master node.

This example shows that statistics are important, and GreenPlum uses statistics to determine which table to run (Broadcast Motion (N:N)).

In addition, in practice, column values may be skewed. For example, A does not hash the data according to the primary key, but artificially assigns the data according to the deptno hash on each node. If 80% of the data in A belongs to the Sales (deptno=10) department, then one of the 10 nodes will have 1 billion ×80% of the data. Even if table B is broadcast to other nodes, it will not help because the calculation pressure is concentrated in one machine. Therefore, you must select the appropriate columns for the hash distribution.


Welcome to follow my wechat public number [database kernel] : share mainstream open source database and storage engine related technology.

The title The url
GitHub dbkernel.github.io
zhihu www.zhihu.com/people/dbke…
SegmentFault segmentfault.com/u/dbkernel
The Denver nuggets Juejin. Im/user / 5 e9d3e…
OsChina my.oschina.net/dbkernel
CNBlogs www.cnblogs.com/dbkernel