Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
design document and json file updated
Browse files Browse the repository at this point in the history
  • Loading branch information
hyoungjook committed May 1, 2024
1 parent 0719ff4 commit 6554a81
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 26 deletions.
21 changes: 21 additions & 0 deletions docs/15721-project-ee2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"info": {
"title": "Push-based Vectorized Execution Engine compatible with Apache Datafusion",
"github": "https://github.com/cmu-db/15721-s24-ee2",
"description": "This project is an execution engine for OLAP queries based on Apache Datafusion and Arrow. It uses push-based vectorized model and custom hashing strategy for aggregates and equi-joins.",
"students": [
{
"name": "Christos Laspias",
"url": "https://www.linkedin.com/in/christos-laspias-1402a121b/"
},
{
"name": "Hyoungjoo Kim",
"url": "https://hyoungjook.github.io"
},
{
"name": "Yash Kothari",
"url": "https://CHANGE_ME"
}
]
}
}
83 changes: 57 additions & 26 deletions docs/DesignDocument.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,64 +7,95 @@
* Yash Kothari (yashkoth)

## Overview
>What is the goal of this project? What will this component achieve?

The goal of this project is to implement a fully functional query execution engine using (some) state-of-the-art techniques in rust. We are going to take inspiration from existing execution engines like Velox and Apache Arrow Datafusion. Moreover, DuckDB will be referenced as well. The main priority is to build an execution engine that works and has good code quality so that it can be easily extended in next semesters. Moreover, we want to be able to be decoupled as much as possible from other components and not make assumptions that will prove to be dead ends in the future. Optimizations are a second priority.
This project aims to implement a fully functional OLAP query execution engine using state-of-the-art techniques such as a push-based vectorized execution model.
The execution engine is responsible for executing physical query plan fragments given by the other component.
The internal abstraction of the engine is inspired by existing engines such as Apache Datafusion, DuckDB, and Velox.

## API Specification

### Overview

From the scheduler, we will receive Datafusion representation of plan graphs.
Because we process plan fragments processed from the scheduler, some operators such as HashJoinBuild should be represented as a custom plan node.
With the I/O service, we will request data of specific columns at given table, offset, and size.
The execution engine interacts with two external components: the scheduler and the I/O service.
From the scheduler, we receive plan fragments represented by the Datafusion ExecutionPlan format.
Because the Datafusion format represents the whole query instead of plan fragments, the incompatible nodes such as HashJoinBuild and Probe are represented as a custom node.
When the leaf node requires fetching files from the remote storage, the engine requests it from the I/O service.

### Encoding
The plan graphs will be encoded in Datafusion, binary protobuf format.
The data from I/O service will be given as Apache Arrow format.
The plan fragments are encoded in Datafusion ExecutionPlan data structures.
The current implementation does not support the complete interface for the physical plan.
However, in the future, it will receive the binary protobuf format and decode it to Datafusion ExecutionPlan structures.

The data from the I/O service should be given as Apache Arrow format.

### Errors
We will handle 4 possible errors.
1. Invalid Input error occurs when the input substrait file is invalid. For example, there can be parsing errors, incorrect table name included, or consists of unsupported nodes.
2. Out of Resource error occurs when execution engine requires more memory or local disk than hardware provides.
3. IO error occurs when the I/O service component returns any error.
4. Unknown error handles all other possible errors.
The engine will handle 4 possible errors.
1. **Invalid Input error** occurs when the input Datafusion plan is invalid.
2. **Out-of-Resource error** occurs when the engine requires more memory or local disk storage than the hardware provides.
3. **IO error** occurs when the I/O service component returns any error.
4. **Unknown error** handles all other possible errors.

## Architectural Design
>Explain the input and output of the component, describe interactions and breakdown the smaller components if any. Include diagrams if appropriate.
The input given to the execution engine will be a plan fragment in Datafusion format. We will deserialize the plan and convert it into our own representation of pipeline. The pipeline consists of a source, a sink, and a vector of intermediate operators. We are aiming to implement the basic operators including filter, projection, hash join, hash aggregation, sort, and limit.

The output of the query execution engine will be stored into in-memory UUID store. The scheduler team can instruct us to store the result with the specified UUID and later reference it with the same ID. The result can be stored to the remote storage, local disk, or in memory.
The input given to the execution engine is a plan fragment in Datafusion ExecutionPlan format.
We traverse the plan graph and convert it into our pipeline representation.
The pipeline consists of a source, a sink, and a vector of intermediate operators.
We currently support parquet scan, filter, projection, hash aggregation, hash join, sort, and limit operators.

We are going to implement a push based execution engine. Apache Arrow Datafusion is implementing the pull based execution model (just like Volcano), which is simpler and cleaner to implement but slower. Moreover, we are going to implement vectorized execution passing vectors between operators. The query will be interpreted and not compiled.
The output of the engine is stored in the in-memory UUID store.
When the scheduler sends the plan fragment, it also sends the UUID to which the result of the plan fragment should be stored.
When the scheduler sends a plan fragment that uses the intermediate result stored in the UUID store, it specifies the UUID value so that the engine can look up the UUID store and use the stored data to process the fragment.
The UUID store can store either the vector of RecordBatches or the HashMap.

We are going to receive data from the I/O service and implement a buffer pool in order to place data in main memory. Another 100% goal is to spill to the local disk if there is no available memory for intermediate results.
The engine operates as a push-based vectorized model.
Each operator consumes and/or produces in-memory vectorized PAX-layout data represented by Apache Arrow RecordBatch.
Although the pipeline is a linear chain of operators, its execution forms a tree because each operator can consume and produce multiple RecordBatches.
Hence, the push-based pipeline executor uses a DFS algorithm to traverse the abstract execution graph, execute the operators, and store the result in the UUID store.
It is faster than the pull-based model of Apache Datafusion.
Queries are not compiled, but we use pre-compiled vectorized kernel functions provided by the Datafusion library for some operators.

For late/early materialization there is no clear answer yet.
The hash aggregation and hash join operators are implemented using the custom hashmap structure.
They both build the hash map that maps a set of column values to the list of row offsets.
If the input has multiple RecordBatches, instead of concatenating them to create a large RecordBatch, the engine stores them separately without copying and maintains the list of pairs of (batch id, row offset).
Later, the operator traverses the list and perform appropriate operations.

We will also send the required statistics to the catalog.
The current implementation does not support buffer pools or memory spills.
In the future, the data from the I/O service is cached in the in-memory buffer pool and the excess memory is spilled to the local disk.

## Design Rationale
>Explain the goals of this design and how the design achieves these goals. Present alternatives considered and document why they are not chosen.
The techniques mentioned above are the state of the art for the execution engine and we aim to implement those. As a backup plan, we can switch to the classic pull based execution model (Volcano style) if we are out of time.
Our design aims to achieve the performance of state-of-the-art execution engines without excessive engineering costs.

The push-based vectorized model is chosen because it is faster and more efficient than the pull-based model.
However, the query compilation is not adopted as it requires excessive effort on implementation and maintenance.

## Testing Plan
>How should the component be tested?
Each operator operates on the in-memory data represented in PAX format.
The row-oriented layout is not chosen because it is not suitable for vectorized execution of OLAP queries.

The list of row offsets in the custom hashmap is stored as the pair of (batch id, row offset).
This format stores the row index information without excessive memory overhead.
Additionally, this is better than concatenating the RecordBatches because the internal contents do not have to be copied into consecutive memory.

## Testing

### Correctness Test

We will verify the correctness of our execution engine by feeding arbitrary physical execution plans into both our engine and Apache Datafusion execution engine and compare their results. We can generate datafusion physical plan using the datafusion frontend.
The unit tests are implemented for each operator, and the correctness of the result is verified manually.

We also implmented the integrated executor that executes arbitrary Datafusion ExecutionPlan.
The result of our engine is compared with the result from the default execution engine of Datafusion.

### Performance Test

We will also measure the performance of our engine by executing some queries in TPC-H. The execution runtime will be compared with the one from executing the same query in Datafusion. In addition, similar to the correctness test, the result will be verified by comparing with the results from another query engine/database system.
Using the integrated executor, we tested all TPC-H queries.
For the queries that produced the correct result, we measured the runtime of our engine executing each query and compared it with the runtime from the Datafusion engine.

## Trade-offs and Potential Problems
>Write down any conscious trade-off you made that can be problematic in the future, or any problems discovered during the design process that remain unaddressed (technical debts).
We decided to implement operators in our own, so some metadata in datafusion representation might not be properly treated. As a result, we will incrementally update our engine by supporting more and more hand-crafted datafusion plans.
We decided to implement operators ourselves, so some subtle metadata in the Datafusion ExecutionPlan might not be properly treated.
As a result, we will incrementally update our engine by supporting more and more hand-crafted physical plans.

## Glossary (Optional)
>If you are introducing new concepts or giving unintuitive names to components, write them down here.

0 comments on commit 6554a81

Please sign in to comment.