This document introduces the basic structure and interpretation of query plans, and provides examples and explanations for both single-node and parallel (distributed) query plans.
A query plan (also known as an execution plan or query execution plan) is a critical component in the lifecycle of a database query. Generated by the query optimizer, it provides a detailed description of how a SQL statement will be executed within the database.
In centralized databases (such as PostgreSQL and other single-node systems), all data resides on a single node instance, and the entire query process occurs locally. A single-node query plan suffices to fulfill the query requirements.
However, in distributed databases (such as Greenplum and YMatrix), data is spread across multiple nodes. Users must define a distribution strategy and distribution key when creating tables. Data is then distributed across multiple MXSegment instances based on this key, with each MXSegment holding only a portion of the total dataset.
As a result, traditional single-node query plans are insufficient. A new type of parallel query plan (or distributed query plan) must be implemented. This plan leverages data distribution to enable efficient parallel computation and ensures that processing occurs as close to the data as possible—ideally on the MXSegment instances themselves, minimizing computation on the MXMaster. This approach maximizes query performance.
In YMatrix, after loading data into a table, you should first run the ANALYZE command to collect statistics. Periodic full-database ANALYZE operations are also recommended. Accurate statistics are essential for the optimizer to make informed decisions about which query plan to choose, thereby improving query performance.
Once statistics are properly collected, use the EXPALIN command to view the logical query plans generated by the optimizer for a given SQL statement. The optimizer typically evaluates multiple potential plans and selects the most efficient one. Analyzing and understanding these plans helps developers and database administrators (DBAs) optimize query performance and identify potential bottlenecks.
A query plan has a tree structure composed of multiple plan nodes, where each node represents a specific operation such as table scan, join, or aggregation, and passes its results up to the parent node for further processing.
Each node contains detailed execution information, including execution order, access method, concurrency level, and estimated cost. The query plan tree starts at leaf nodes and ends at the root node. Traversing the tree from bottom to top reveals the complete execution path of the query.
In distributed query plans, common plan node types and their associated operators are listed below:
| Plan Node Type | Operation Description | Scalar Operator | Vectorized Operator |
|---|---|---|---|
| Scan Plan Nodes | Responsible for scanning tables or indexes | SeqScan: Sequentially scans all rows in a table IndexScan: Traverses an index to fetch rows from the table BitmapHeapScan & BitmapIndexScan: Often used together. Bitmap index scan reads qualifying index entries and builds a bitmap (granularity depends on index size). Bitmap heap scan then uses this bitmap to access the table. Note: Index scan retrieves one index entry at a time and immediately checks the table row, alternating between index and table—suitable for precise lookups. Bitmap scan separates the process: first collects all matching index entries into memory and sorts them, then accesses the table—ideal for broad-range queries. DynamicSeqScan: Uses a partition selection function to determine partitions ParallelSeq: When parallel query is enabled, set the number of parallel workers equal to CPU cores for optimal efficiency |
MxVScan: Vectorized sequential scan for MARS2/MARS3/AOCO tables |
| Join Plan Nodes | Combines two or more tables | HashJoin: Builds a hash table from the smaller table using the join column as the hash key. Then scans the larger table, computes hash keys, and probes the hash table for matches. HashCond shows the joined columns. NestedLoopJoin: Iterates over rows in one dataset and scans the other for each row. Requires broadcasting one table unless both tables share the same distribution key, allowing local joins without broadcast. MergeJoin: Sorts both datasets and merges them |
MxVHashJoin: Vectorized hash join for MARS2/MARS3/AOCO tables |
| Other Plan Nodes | Materialize: Caches subquery results; stores output tuples during first execution for reuse by upper nodes Sort: Sorts tuples returned from child nodes Group: Groups sorted tuples from lower nodes Agg: Performs aggregate functions Unique: Removes duplicate rows Hash: Computes hash values; auxiliary node for HashJoin Limit: Handles LIMIT/OFFSET clausesWindowAgg: Processes window functions LockRows: Locks selected rows, optionally filtered by FOR SHARE/FOR UPDATESetOP: Combines multiple datasets via union ( UNION), intersection (INTERSECT), or difference (EXCEPT/MINUS)SubqueryScan: Evaluates subqueries Append: Concatenates multiple result sets Result: Handles expressions computed once or simple VALUES statements with only a INSERT ... VALUES clauseGroupAgg: First sorts data by grouping columns so identical groups are adjacent, then aggregates while scanning HashAgg: Groups and aggregates using a hash table, no sort required Broadcast Motion: Each MXSegment sends its rows to all others, resulting in a full local copy on every segment Redistribute Motion: Each MXSegment recalculates data distribution based on a redistribution key and sends rows to the appropriate target segment Gather Motion: Results from all MXSegments are assembled into a single stream. For most queries, this is the final step |
MxVMotion: Vectorized Gather Motion for MARS2/MARS3/AOCO tables MxVSort: Vectorized sort for MARS2/MARS3/AOCO tables MxVResult: Vectorized Result operation MxVSubqueryScan: Vectorized subquery scan MxVLimit: Vectorized Limit operation MxVHashAgg: Vectorized HashAgg operation MxVGroupAgg: Vectorized GroupAgg operation MxVAppend: Vectorized Append operation |
These nodes can be combined as needed to form a complete query plan.
Due to its nested tree structure, a query plan must be read from bottom to top.
Below are examples of a single-node query plan and a parallel query plan.
First, create two tables. (Example tables created in PostgreSQL 9.2)
=# CREATE TABLE sale (
cid int,
pid int,
pnum int,
qty float,
date timestamp
);
=# CREATE TABLE customer (
cid int,
cname text,
cwechat text
);
Insert test data.
=# INSERT INTO sale (cid, pid, pnum, qty, date) VALUES
(1, 1, 1, 10.0, '2023-07-20 08:37:06'),
(2, 3, 6, 35.0, '2023-07-15 18:22:00'),
(3, 2, 1, 3.0, '2023-07-17 11:37:00'),
(4, 1, 2, 10.0, '2023-07-20 10:37:09'),
(5, 3, 2, 35.0, '2023-07-17 08:12:06'),
(6, 1, 1, 10.0, '2023-07-02 12:10:23'),
(7, 3, 1, 35.0, '2023-07-07 08:07:00'),
(8, 5, 3, 99.0, '2023-07-20 10:37:06'),
(9, 3, 1, 35.0, '2023-07-20 15:30:00'),
(10, 3, 1, 35.0, '2023-07-20 09:00:00');
=# INSERT INTO customer (cid, cname, cwechat) VALUES
(1, 'kepler', 'kepler1997'),
(2, 'amiee', 'amiee1995'),
(3, 'lila', 'lila2002'),
(4, 'cici', 'cici1982'),
(5, 'damien', 'damien1983'),
(6, 'ariana', 'ariana1990'),
(7, 'kanye', 'kanye1960'),
(8, 'taylor', 'taylor1996'),
(9, 'michael', 'mike2005'),
(10, 'ray', 'ray1957');
Now compute the total sales amount per customer by joining the sale and customer tables on their common cid key, and analyze the query plan.
First, collect statistics before generating the plan.
=# ANALYZE sale;
=# ANALYZE customer;
Generate the query plan.
=# EXPLAIN SELECT c.cname, sum(s.pnum * s.qty) AS amount FROM sale s, customer c WHERE s.cid = c.cid GROUP BY c.cname;
QUERY PLAN
------------------------------------------------------------------------------
HashAggregate (cost=2.56..2.66 rows=10 width=14)
Group Key: c.cname
-> Hash Join (cost=1.23..2.46 rows=10 width=18)
Hash Cond: (s.cid = c.cid)
-> Seq Scan on sale s (cost=0.00..1.10 rows=10 width=16)
-> Hash (cost=1.10..1.10 rows=10 width=10)
-> Seq Scan on customer c (cost=0.00..1.10 rows=10 width=10)
(7 rows)
This query plan has two main nodes: Hash Join and HashAggregate. Each node includes three cost estimates: cost, row count (rows), and row width (width). These help estimate execution time, result set size, and complexity of data processing and transmission.
The query plan tree diagram is shown below:
_1691547703.png)
To interpret a query plan, understand its nesting. Each node represents a sub-operation, and data flows upward from child to parent. The optimizer begins with a sequential scan of the customer table, costing 1.10. It then builds a hash table in memory. Simultaneously, it scans the sale table, computes hash values, and compares them against the hash table using the cid field as the join key. At this point, startup and total costs are 1.23 and 2.46, respectively. Finally, data is grouped and aggregated by the cname field using the hash table, with a total query cost of 2.66.
Detailed explanation of components:
As before, create two tables. (Example tables created in YMatrix 5.1 using MARS3 storage engine)
Before creating MARS3 tables, install the matrixts extension for time-series functionality.
=# CREATE EXTENSION matrixts;
=# CREATE TABLE sale (
cid int,
pid int,
pnum int,
qty float,
date timestamp
) USING MARS3
DISTRIBUTED BY (cid,date)
ORDER BY (cid,date);
=# CREATE TABLE customer (
cid int,
cname text,
cwechat text
) USING MARS3
DISTRIBUTED BY (cid)
ORDER BY (cid);
Insert the same test data and collect statistics.
Now generate the query plan using the same SQL. Since vectorization is enabled by default in YMatrix, many vectorized operators appear, enhancing performance.
=# EXPLAIN SELECT c.cname, sum(s.pnum * s.qty) AS amount FROM sale s, customer c WHERE s.cid = c.cid GROUP BY c.cname;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------
Gather Motion 4:1 (slice1; segments: 4) (cost=72.26..72.44 rows=10 width=14)
-> GroupAggregate (cost=72.26..72.32 rows=2 width=14)
Group Key: c.cname
-> Sort (cost=72.26..72.27 rows=2 width=18)
Sort Key: c.cname
-> Redistribute Motion 4:4 (slice2; segments: 4) (cost=0.00..72.25 rows=2 width=18)
Hash Key: c.cname
-> Nested Loop (cost=0.00..72.20 rows=2 width=18)
Join Filter: (s.cid = c.cid)
-> Custom Scan (MxVMotion) Redistribute Motion 4:4 (slice3; segments: 4) (cost=0.00..36.07 rows=2 width=16)
Hash Key: s.cid
-> Custom Scan (MxVScan) on sale s (cost=0.00..36.02 rows=2 width=16)
-> Materialize (cost=0.00..36.04 rows=2 width=10)
-> Custom Scan (MxVScan) on customer c (cost=0.00..36.02 rows=2 width=10)
Optimizer: Postgres query optimizer
(15 rows)
This plan has six main nodes: Materialize, Redistribute Motion, Nested Loop, Sort, GroupAggregate, and Gather Motion. Each includes cost, row count, and width estimates.
Query plan tree diagram:
_1691547677.png)
The optimizer starts with a materialization operation. Scanning and materializing the customer table costs approximately 36. Since the customer table is distributed by cid and the sale table by (cid,date), the sale table must be redistributed by cid so that related rows reside on the same MXSegment for local joining. After redistribution, a nested loop join applies the s.cid = c.cid filter. By this stage, total cost reaches ~72, indicating significant resource usage. Because the query requires grouping by cname, a second redistribution occurs, followed by sorting on cname and group aggregation. Finally, results are sent to the Gather Motion node. The Gather Motion node computes the final cost (72.32) and returns results to the MXMaster.
Component details:
CREATE TABLE x AS SELECT... writes directly to a new table instead of sending to MXMaster.Slice layout for Segment 1 and Segment 2:
_1691548044.png)