RaySQL is a distributed SQL query engine that is powered by DataFusion.
DataFusion provides a high-performance query engine that is already partition-aware, with partitions being executed in parallel in separate threads. RaySQL provides a distributed query planner that translates a DataFusion physical plan into a distributed plan.
Let's walk through an example to see how that works. We'll use SQLBench-H query 3 for the example. This is an aggregate query with a three-way join.
SQLBench-H Query 3
-- SQLBench-H query 3 derived from TPC-H query 3 under the terms of the TPC Fair Use Policy.
-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance Council.
select
l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue,
o_orderdate,
o_shippriority
from
customer,
orders,
lineitem
where
c_mktsegment = 'HOUSEHOLD'
and c_custkey = o_custkey
and l_orderkey = o_orderkey
and o_orderdate < date '1995-03-21'
and l_shipdate > date '1995-03-21'
group by
l_orderkey,
o_orderdate,
o_shippriority
order by
revenue desc,
o_orderdate limit 10;
DataFusion produces the following optimized logical query plan. Note that this plan does not have the concept of partitions yet.
Limit: skip=0, fetch=10
Sort: revenue DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST, fetch=10
Projection: lineitem.l_orderkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue, orders.o_orderdate, orders.o_shippriority
Aggregate: groupBy=[[lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(35, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(35, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Inner Join: orders.o_orderkey = lineitem.l_orderkey
Inner Join: customer.c_custkey = orders.o_custkey
Filter: customer.c_mktsegment = Utf8("BUILDING")
TableScan: customer projection=[c_custkey, c_mktsegment], partial_filters=[customer.c_mktsegment = Utf8("BUILDING")]
Filter: orders.o_orderdate < Date32("9204")
TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate, o_shippriority], partial_filters=[orders.o_orderdate < Date32("9204")]
Filter: lineitem.l_shipdate > Date32("9204")
TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_shipdate], partial_filters=[lineitem.l_shipdate > Date32("9204")]
DataFusion's physical plan lists all the files to be queried, and they are organized into partitions to allow for
parallel execution within a single process. In this example, the level of concurrency was configured to be four, so
we see partitions={4 groups: [[ ... ]]
in the leaf ParquetExec
nodes, with the filenames listed in four groups.
DataFusion will soon support parallel execution for single Parquet files but for now the parallelism is based on splitting the available files into separate groups, so RaySQL will not yet scale well for single-file inputs.
Here is the full physical plan for query 3.
GlobalLimitExec: skip=0, fetch=10
SortPreservingMergeExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST]
SortExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST]
ProjectionExec: expr=[l_orderkey@0 as l_orderkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]
AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 1 }, Column { name: "o_shippriority", index: 2 }], 4), input_partitions=4
AggregateExec: mode=Partial, gby=[l_orderkey@6 as l_orderkey, o_orderdate@4 as o_orderdate, o_shippriority@5 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 2 }, Column { name: "l_orderkey", index: 0 })]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 2 }], 4), input_partitions=4
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 4), input_partitions=4
CoalesceBatchesExec: target_batch_size=8192
FilterExec: c_mktsegment@1 = BUILDING
ParquetExec: limit=None, partitions={4 groups: [[mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-5.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-10.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-11.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-13.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-6.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-14.parquet], [mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-20.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-2.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-22.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-19.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-0.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-16.parquet], [mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-21.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-23.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-4.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-17.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-9.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-1.parquet], [mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-18.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-8.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-12.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-15.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-7.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-3.parquet]]}, predicate=c_mktsegment = Utf8("BUILDING"), pruning_predicate=c_mktsegment_min@0 <= BUILDING AND BUILDING <= c_mktsegment_max@1, projection=[c_custkey, c_mktsegment]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 4), input_partitions=4
CoalesceBatchesExec: target_batch_size=8192
FilterExec: o_orderdate@2 < 9204
ParquetExec: limit=None, partitions={4 groups: [[mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-5.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-10.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-11.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-13.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-6.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-14.parquet], [mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-20.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-2.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-22.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-19.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-0.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-16.parquet], [mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-21.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-23.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-4.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-17.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-9.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-1.parquet], [mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-18.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-8.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-12.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-15.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-7.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-3.parquet]]}, predicate=o_orderdate < Date32("9204"), pruning_predicate=o_orderdate_min@0 < 9204, projection=[o_orderkey, o_custkey, o_orderdate, o_shippriority]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 4), input_partitions=4
CoalesceBatchesExec: target_batch_size=8192
FilterExec: l_shipdate@3 > 9204
ParquetExec: limit=None, partitions={4 groups: [[mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-5.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-10.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-11.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-13.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-6.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-14.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-20.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-2.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-22.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-19.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-0.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-16.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-21.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-23.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-4.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-17.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-9.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-1.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-18.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-8.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-12.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-15.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-7.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-3.parquet]]}, predicate=l_shipdate > Date32("9204"), pruning_predicate=l_shipdate_max@0 > 9204, projection=[l_orderkey, l_extendedprice, l_discount, l_shipdate]
The partitioning scheme changes throughout the plan and this is the most important concept to
understand in order to understand RaySQL's design. Changes in partitioning are implemented by the RepartitionExec
operator in DataFusion and are happen in the following scenarios.
The first join to happen is between customer
and orders
. The join condition is
customers.c_custkey = orders.o_custkey
. To perform this join in parallel we first
need to repartition the data on the join keys so that the data for customers with c_custkey = 1
and the data for orders with o_custkey = 1
can be sent to the same thread or node for
processing. This allows the join to happen in parallel. This is known as a hash-partitioned
join and is implemented by the HashJoinExec
operator in DataFusion.
We can see that DataFusion
has inserted RepartitionExec
operators around both inputs to the join.
There are multiple approaches to distributed aggregate queries. Here are two popular approaches:
- Perform aggregates in parallel on each partition, where the resulting data for each partition could contain duplicate grouping keys and then perform a final aggregate on the intermediate aggregates to remove the duplicates.
- Partition the input data by the grouping keys so that the aggregates from each partition can simply be merged to produce the final result.
For this example query, the data is already partitioned on the aggregate's grouping key so the latter approach is used.
Sort also has multiple approaches.
- The input partitions can be collapsed down to a single partition and then sorted
- Partitions can be sorted in parallel and then merged using a sort-preserving merge
DataFusion and RaySQL currently the first approach, but there is a DataFusion PR open for implementing the second.
- The input partitions can be collapsed down to a single partition and then have the limit applied
- The limit can be pushed down to each partition as well
The first two query stages to be executed will read the customer
and order
parquet files and reparition them by the join keys c_custkey
and o_custkey
.
Query Stage #0:
ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "c_custkey", index: 0 }], 4))
CoalesceBatchesExec: target_batch_size=8192
FilterExec: c_mktsegment@1 = BUILDING
ParquetExec: limit=None, partitions={4 groups: [[mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-5.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-10.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-11.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-13.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-6.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-14.parquet], [mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-20.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-2.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-22.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-19.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-0.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-16.parquet], [mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-21.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-23.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-4.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-17.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-9.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-1.parquet], [mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-18.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-8.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-12.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-15.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-7.parquet, mnt/bigdata/tpch/sf10-parquet/customer.parquet/part-3.parquet]]}, predicate=c_mktsegment = Utf8("BUILDING"), pruning_predicate=c_mktsegment_min@0 <= BUILDING AND BUILDING <= c_mktsegment_max@1, projection=[c_custkey, c_mktsegment]
Query Stage #1:
ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "o_custkey", index: 1 }], 4))
CoalesceBatchesExec: target_batch_size=8192
FilterExec: o_orderdate@2 < 9204
ParquetExec: limit=None, partitions={4 groups: [[mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-5.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-10.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-11.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-13.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-6.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-14.parquet], [mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-20.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-2.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-22.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-19.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-0.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-16.parquet], [mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-21.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-23.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-4.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-17.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-9.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-1.parquet], [mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-18.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-8.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-12.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-15.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-7.parquet, mnt/bigdata/tpch/sf10-parquet/orders.parquet/part-3.parquet]]}, predicate=o_orderdate < Date32("9204"), pruning_predicate=o_orderdate_min@0 < 9204, projection=[o_orderkey, o_custkey, o_orderdate, o_shippriority]
Now that these inputs are partitioned by join key, we can execute the join itself. The two inputs to the join are instances of ShuffleReaderExec
that
read the shuffle files produced by the previous stages. The result of the join is repartitioned by o_order_key
in preparation for the next join.
Query Stage #2:
ShuffleWriterExec(stage_id=2, output_partitioning=Hash([Column { name: "o_orderkey", index: 2 }], 4))
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=0, input_partitions=4)
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=1, input_partitions=4)
We continue to prepare for the next join with lineitem
by repartitioning the parquet files by the join key.
Query Stage #3:
ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 4))
CoalesceBatchesExec: target_batch_size=8192
FilterExec: l_shipdate@3 > 9204
ParquetExec: limit=None, partitions={4 groups: [[mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-5.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-10.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-11.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-13.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-6.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-14.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-20.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-2.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-22.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-19.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-0.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-16.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-21.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-23.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-4.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-17.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-9.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-1.parquet], [mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-18.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-8.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-12.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-15.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-7.parquet, mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/part-3.parquet]]}, predicate=l_shipdate > Date32("9204"), pruning_predicate=l_shipdate_max@0 > 9204, projection=[l_orderkey, l_extendedprice, l_discount, l_shipdate]
With the inputs repartitioned by the join keys, we can execute a query stage to perform the join. However, in this case, we also perform an aggregate in the
same stage without any additional repartitioning. This is possible because the aggregate grouping includes l_orderkey
which we already partitioned on. This
means that we can perform this aggregate in parallel and guarantee that there will be no duplicates in the output of each aggregate. Pretty neat!
The output of this shuffle is partitioned by l_orderkey
, o_orderdate
, and o_shippriority
in preparation for the ORDER BY
part of the query.
Query Stage #4:
ShuffleWriterExec(stage_id=4, output_partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 1 }, Column { name: "o_shippriority", index: 2 }], 4))
AggregateExec: mode=Partial, gby=[l_orderkey@6 as l_orderkey, o_orderdate@4 as o_orderdate, o_shippriority@5 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 2 }, Column { name: "l_orderkey", index: 0 })]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=2, input_partitions=4)
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=3, input_partitions=4)
Now we perform a final aggregate (which is maybe redundant?) and then sort the results in parallel across the partitions.
Query Stage #5:
ShuffleWriterExec(stage_id=5, output_partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 2 }, Column { name: "o_shippriority", index: 3 }], 4))
SortExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST]
ProjectionExec: expr=[l_orderkey@0 as l_orderkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]
AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=4, input_partitions=4)
Finally, we have a query stage that reads the sorted results and merges them into a single partition (preserving the
sort order) and applies the LIMIT
clause.
Query Stage #6:
GlobalLimitExec: skip=0, fetch=10
SortPreservingMergeExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST]
ShuffleReaderExec(stage_id=5, input_partitions=4)
In the previous section we walked through the plan as if we were executing one query stage at a time. However, some of these query stages could execute in parallel (assuming enough resource is available).
For example, we could execute query stage 3 to repartition the lineitem
input without waiting for query stages 0, 1,
and 2 to finish.
More generally, we can typically execute all remaining leaf nodes of the plan concurrently.
The execute_query_stage
method in context.py
is a remote method that recursively walks the query plan and executes
child plans, building up a DAG of futures.
The output of each query stage needs to be persisted somewhere so that the next query stage can read it. Currently, RaySQL is just writing the output to disk in Arrow IPC format, and this means that RaySQL is not truly distributed yet because it requires a shared file system. It would be better to use the Ray object store instead, as proposed here.
DataFusion's RepartitionExec
uses threads and channels within a single process and is not suitable for a
distributed query engine, so RaySQL rewrites the physical plan and replaces the RepartionExec
with a pair of
operators to perform a "shuffle". These are the ShuffleWriterExec
and ShuffleReaderExec
.
ShuffleWriterExec
reads input partitions and repartitions them, using the same BatchPartitioner
that DataFusion
uses, then writes the output to disk in Arrow IPC format.
ShuffleReaderExec
reads the shuffle files written by the ShuffleWriterExec
.