-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DOCS] Partitioning user guide and small doc fixes #2717
Merged
Merged
Changes from 6 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,6 +73,7 @@ Reordering | |
|
||
DataFrame.sort | ||
DataFrame.repartition | ||
DataFrame.into_partitions | ||
|
||
Combining | ||
********* | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
.. _distributed_computing: | ||
|
||
Distributed Computing | ||
===================== | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,117 @@ | ||
Partitioning | ||
============ | ||
|
||
This guide is a Work-In-Progress! | ||
Daft is a **distributed** dataframe. This means internally, data is represented as partitions which are then spread out across your system | ||
|
||
Please comment on this `Github issue <https://github.com/Eventual-Inc/Daft/issues/840>`_ if you wish to expedite the filling out of this guide. | ||
Why do we need partitions? | ||
-------------------------- | ||
|
||
When running in a distributed settings (a cluster of machines), Daft spreads your dataframe's data across these machines. This means that your | ||
workload is able to efficiently utilize all the resources in your cluster because each machine is able to work on its assigned partition(s) independently. | ||
|
||
Additionally, certain global operations in a distributed setting requires data to be partitioned in a specific way for the operation to be correct, because | ||
all the data matching a certain criteria needs to be on the same machine and in the same partition. For example, in a groupby-aggregation Daft needs to bring | ||
together all the data for a given key into the same partition before it can perform a definitive local groupby-aggregation which is then globally correct. | ||
Daft refers to this as a "clustering specification", and you are able to see this in the plans that it constructs as well. | ||
|
||
.. NOTE:: | ||
When running locally on just a single machine, Daft is currently still using partitioning as well. This is still useful for | ||
controlling parallelism and how much data is being materialized at a time. | ||
|
||
However, Daft's new experimental execution engine will remove the concept of partitioning entirely for local execution. | ||
You may enable it with ``DAFT_ENABLE_NATIVE_EXECUTOR=1``. Instead of using partitioning to control parallelism, | ||
this new execution engine performs a streaming-based execution on small "morsels" of data, which provides much | ||
more stable memory utilization while improving the user experience with not having to worry about partitioning. | ||
|
||
This user guide helps you think about how to correctly partition your data to improve performance as well as memory stability in Daft. | ||
|
||
General rule of thumb: | ||
|
||
1. **Have Enough Partitions**: our general recommendation for high throughput and maximal resource utilization is to have *at least* ``2 x TOTAL_NUM_CPUS`` partitions, which allows Daft to fully saturate your CPUs. | ||
2. **More Partitions**: if you are observing memory issues (excessive spilling or out-of-memory (OOM) issues) then you may choose to increase the number of partitions. This increases the amount of overhead in your system, but improves overall memory stability (since each partition will be smaller). | ||
3. **Fewer Partitions**: if you are observing a large amount of overhead (especially during shuffle operations such as joins and sorts), then you may choose to decrease the number of partitions. This decreases the amount of overhead in the system, at the cost of using more memory (since each partition will be larger). | ||
|
||
.. seealso:: | ||
:doc:`./memory` - a guide for dealing with memory issues when using Daft | ||
|
||
How is my data partitioned? | ||
--------------------------- | ||
|
||
Daft will automatically use certain heuristics to determine the number of partitions for you when you create a DataFrame. When reading data from files (e.g. Parquet, CSV or JSON), | ||
each file is by default one partition on its own, but Daft will also perform splitting of partitions (for files that are egregiously large) and coalescing of partitions (for small files) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think "by default" is a little misleading here as Daft by default does scan task splitting and merging. |
||
to improve the sizing and number of partitions in your system. | ||
|
||
To interrogate the partitioning of your current DataFrame, you may use the :meth:`df.explain(show_all=True) <daft.DataFrame.explain>` method. Here is an example output from a simple | ||
``df = daft.read_parquet(...)`` call on a fairly large number of Parquet files. | ||
|
||
.. code::python | ||
|
||
import daft | ||
|
||
df = daft.read_parquet("s3://bucket/path_to_100_parquet_files/**") | ||
df.explain(show_all=True) | ||
|
||
.. code:: | ||
|
||
== Unoptimized Logical Plan == | ||
|
||
* GlobScanOperator | ||
| Glob paths = [s3://bucket/path_to_100_parquet_files/**] | ||
| ... | ||
|
||
|
||
... | ||
|
||
|
||
== Physical Plan == | ||
|
||
* TabularScan: | ||
| Num Scan Tasks = 3 | ||
| Estimated Scan Bytes = 72000000 | ||
| Clustering spec = { Num partitions = 3 } | ||
| ... | ||
|
||
In the above example, the call to ``df.read_parquet`` read 100 Parquet files, but the Physical Plan indicates that Daft will only create 3 partitions. This is because these files are quite small (in this example, totalling about 72MB of data) and Daft recognizes that it should be able to read them as just 3 partitions, each with about 33 files each! | ||
|
||
How can I change the way my data is partitioned? | ||
------------------------------------------------ | ||
|
||
You can change the way your data is partitioned by leveraging certain DataFrame methods: | ||
|
||
1. :meth:`daft.DataFrame.repartition`: repartitions your data into `N` partitions by performing a hash-bucketing that ensure that all data with the same values for the specified columns ends up in the same partition. Expensive, requires data movement between partitions and machines. | ||
2. :meth:`daft.DataFrame.into_partitions`: splits or coalesces adjacent partitions to meet the specified target number of total partitions. This is less expensive than a call to ``df.repartition`` because it does not require shuffling or moving data between partitions. | ||
3. Many global dataframe operations such as :meth:`daft.DataFrame.join`, :meth:`daft.DataFrame.sort` and :meth:`daft.GroupedDataframe.agg` will change the partitioning of your data. This is because they require shuffling data between partitions to be globally correct. | ||
|
||
Note that many of these methods will change both the *number of partitions* as well as the *clustering specification* of the new partitioning. For example, when calling ``df.repartition(8, col("x"))``, the resultant dataframe will now have 8 partitions in total with the additional guarantee that all rows with the same value of ``col("x")`` are in the same partition! This is called "hash partitioning". | ||
|
||
.. code::python | ||
|
||
df = df.repartition(8, daft.col("x")) | ||
df.explain(show_all=True) | ||
|
||
.. code:: | ||
|
||
== Unoptimized Logical Plan == | ||
|
||
* Repartition: Scheme = Hash | ||
| Num partitions = Some(8) | ||
| By = col(x) | ||
| | ||
* GlobScanOperator | ||
| Glob paths = [s3://bucket/path_to_1000_parquet_files/**] | ||
| ... | ||
|
||
... | ||
|
||
== Physical Plan == | ||
|
||
* ReduceMerge | ||
| | ||
* FanoutByHash: 8 | ||
| Partition by = col(url) | ||
| | ||
* TabularScan: | ||
| Num Scan Tasks = 3 | ||
| Estimated Scan Bytes = 72000000 | ||
| Clustering spec = { Num partitions = 3 } | ||
| ... |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe some description of how to measure overhead (vs maybe just an operation that is expensive)?