Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOCS] Partitioning user guide and small doc fixes #2717

Merged
merged 7 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,9 @@ def __lshift__(self, other: Expression) -> Expression:

def __rshift__(self, other: Expression) -> Expression:
"""Shifts the bits of an integer expression to the right (``e1 >> e2``)

.. NOTE::

For unsigned integers, this expression perform a logical right shift.
For signed integers, this expression perform an arithmetic right shift.

Expand Down Expand Up @@ -675,6 +677,7 @@ def shift_left(self, other: Expression) -> Expression:

def shift_right(self, other: Expression) -> Expression:
"""Shifts the bits of an integer expression to the right (``expr >> other``)

.. NOTE::
For unsigned integers, this expression perform a logical right shift.
For signed integers, this expression perform an arithmetic right shift.
Expand Down
1 change: 1 addition & 0 deletions docs/source/api_docs/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Reordering

DataFrame.sort
DataFrame.repartition
DataFrame.into_partitions

Combining
*********
Expand Down
22 changes: 11 additions & 11 deletions docs/source/migration_guides/coming_from_dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ Dask aims for as much feature-parity with pandas as possible, including maintain

Daft drops the need for an Index to make queries more readable and consistent. How you write a query should not change because of the state of an index or a reset_index call. In our opinion, eliminating the index makes things simpler, more explicit, more readable and therefore less error-prone. Daft achieves this by using the [Expressions API](/user_guide/basic_concepts/expressions.rst).

In Dask you would index your DataFrame to return row `b` as follows:
In Dask you would index your DataFrame to return row ``b`` as follows:

`ddf.loc[[“b”]]`
``ddf.loc[[“b”]]``

In Daft, you would accomplish the same by using a `col` Expression to refer to the column that contains `b`:
In Daft, you would accomplish the same by using a ``col`` Expression to refer to the column that contains ``b``:

`df.where(daft.col(“alpha”)==”b”)`
``df.where(daft.col(“alpha”)==”b”)``

More about Expressions in the sections below.

Daft does not try to copy the pandas syntax
-------------------------------------------

Dask is built as a parallelizable version of pandas and Dask partitions are in fact pandas DataFrames. When you call a Dask function you are often applying a pandas function on each partition. This makes Dask relatively easy to learn for people familiar with pandas, but it also causes complications when pandas logic (built for sequential processing) does not translate well to a distributed context. When reading the documentation, Dask users will often encounter this phrase `“This docstring was copied from pandas.core… Some inconsistencies with the Dask version may exist.”` It is often unclear what these inconsistencies are and how they might affect performance.
Dask is built as a parallelizable version of pandas and Dask partitions are in fact pandas DataFrames. When you call a Dask function you are often applying a pandas function on each partition. This makes Dask relatively easy to learn for people familiar with pandas, but it also causes complications when pandas logic (built for sequential processing) does not translate well to a distributed context. When reading the documentation, Dask users will often encounter this phrase ``“This docstring was copied from pandas.core… Some inconsistencies with the Dask version may exist.”`` It is often unclear what these inconsistencies are and how they might affect performance.

Daft does not try to copy the pandas syntax. Instead, we believe that efficiency is best achieved by defining logic specifically for the unique challenges of distributed computing. This means that we trade a slightly higher learning curve for pandas users against improved performance and more clarity for the developer experience.

Expand All @@ -68,13 +68,14 @@ Dask currently does not support full-featured query optimization.
Daft uses Expressions and UDFs to perform computations in parallel
------------------------------------------------------------------

Dask provides a `map_partitions` method to map computations over the partitions in your DataFrame. Since Dask partitions are pandas DataFrames, you can pass pandas functions to `map_partitions`. You can also map arbitrary Python functions over Dask partitions using `map_partitions`.
Dask provides a ``map_partitions`` method to map computations over the partitions in your DataFrame. Since Dask partitions are pandas DataFrames, you can pass pandas functions to ``map_partitions``. You can also map arbitrary Python functions over Dask partitions using `map_partitions`.

For example:

.. code:: python

def my_function(**kwargs):
return
return ...

res = ddf.map_partitions(my_function, **kwargs)

Expand All @@ -84,7 +85,7 @@ Daft implements two APIs for mapping computations over the data in your DataFram
.. code:: python

# Add 1 to each element in column "A"
df = df.with_column("A_add_one", daft.col(“A”) + 1)
df = df.with_column("A_add_one", daft.col("A") + 1)


You can use User-Defined Functions (UDFs) to run computations over multiple rows or columns:
Expand All @@ -94,12 +95,11 @@ You can use User-Defined Functions (UDFs) to run computations over multiple rows
# apply a custom function “crop_image” to the image column
@daft.udf(...)
def crop_image(**kwargs):
return …
return ...

df = df.with_column(
"cropped",
crop_image(daft.col(image), **kwargs),
crop_image(daft.col("image"), **kwargs),
)


Expand Down
2 changes: 1 addition & 1 deletion docs/source/user_guide/integrations/delta_lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Delta Lake

Daft currently supports:

1. **Parallel + Distributed Reads:** Daft parallelizes Delta Lake table reads over all cores of your machine, if using the default multithreading runner, or all cores + machines of your Ray cluster, if using the :ref:`distributed Ray runner <scaling_up>`.
1. **Parallel + Distributed Reads:** Daft parallelizes Delta Lake table reads over all cores of your machine, if using the default multithreading runner, or all cores + machines of your Ray cluster, if using the :ref:`distributed Ray runner <distributed_computing>`.
2. **Skipping Filtered Data:** Daft ensures that only data that matches your :meth:`df.where(...) <daft.DataFrame.where>` filter will be read, often skipping entire files/partitions.
3. **Multi-cloud Support:** Daft supports reading Delta Lake tables from AWS S3, Azure Blob Store, and GCS, as well as local files.

Expand Down
2 changes: 1 addition & 1 deletion docs/source/user_guide/integrations/hudi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Apache Hudi

Daft currently supports:

1. **Parallel + Distributed Reads:** Daft parallelizes Hudi table reads over all cores of your machine, if using the default multithreading runner, or all cores + machines of your Ray cluster, if using the :ref:`distributed Ray runner <scaling_up>`.
1. **Parallel + Distributed Reads:** Daft parallelizes Hudi table reads over all cores of your machine, if using the default multithreading runner, or all cores + machines of your Ray cluster, if using the :ref:`distributed Ray runner <distributed_computing>`.
2. **Skipping Filtered Data:** Daft ensures that only data that matches your :meth:`df.where(...) <daft.DataFrame.where>` filter will be read, often skipping entire files/partitions.
3. **Multi-cloud Support:** Daft supports reading Hudi tables from AWS S3, Azure Blob Store, and GCS, as well as local files.

Expand Down
2 changes: 1 addition & 1 deletion docs/source/user_guide/integrations/huggingface.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Huggingface Datasets
===========
====================

Daft is able to read datasets directly from Huggingface via the ``hf://`` protocol.

Expand Down
2 changes: 1 addition & 1 deletion docs/source/user_guide/integrations/microsoft-azure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pass a different :class:`daft.io.AzureConfig` per function call if you wish!
df2 = daft.read_csv("az://my_container/my_other_path/**/*", io_config=io_config)

Connect to Microsoft Fabric/OneLake
****************************
***********************************

If you are connecting to storage in OneLake or another Microsoft Fabric service, set the `use_fabric_endpoint` parameter to ``True`` in the :class:`daft.io.AzureConfig` object.

Expand Down
4 changes: 2 additions & 2 deletions docs/source/user_guide/integrations/sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ You can read the results of SQL queries from databases, data warehouses, and que
Daft currently supports:

1. **20+ SQL Dialects:** Daft supports over 20 databases, data warehouses, and query engines by using `SQLGlot <https://sqlglot.com/sqlglot.html>`_ to convert SQL queries across dialects. See the full list of supported dialects `here <https://sqlglot.com/sqlglot/dialects.html>`__.
2. **Parallel + Distributed Reads:** Daft parallelizes SQL reads by using all local machine cores with its default multithreading runner, or all cores across multiple machines if using the :ref:`distributed Ray runner <scaling_up>`.
2. **Parallel + Distributed Reads:** Daft parallelizes SQL reads by using all local machine cores with its default multithreading runner, or all cores across multiple machines if using the :ref:`distributed Ray runner <distributed_computing>`.
3. **Skipping Filtered Data:** Daft ensures that only data that matches your :meth:`df.select(...) <daft.DataFrame.select>`, :meth:`df.limit(...) <daft.DataFrame.limit>`, and :meth:`df.where(...) <daft.DataFrame.where>` expressions will be read, often skipping entire partitions/columns.

Installing Daft with SQL Support
Expand Down Expand Up @@ -77,7 +77,7 @@ You can also directly provide a SQL alchemy connection via a **connection factor
Parallel + Distributed Reads
****************************

For large datasets, Daft can parallelize SQL reads by using all local machine cores with its default multithreading runner, or all cores across multiple machines if using the :ref:`distributed Ray runner <scaling_up>`.
For large datasets, Daft can parallelize SQL reads by using all local machine cores with its default multithreading runner, or all cores across multiple machines if using the :ref:`distributed Ray runner <distributed_computing>`.

Supply the :meth:`daft.read_sql` function with a **partition column** and optionally the **number of partitions** to enable parallel reads.

Expand Down
2 changes: 2 additions & 0 deletions docs/source/user_guide/poweruser/distributed-computing.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.. _distributed_computing:

Distributed Computing
=====================

Expand Down
115 changes: 113 additions & 2 deletions docs/source/user_guide/poweruser/partitioning.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,117 @@
Partitioning
============

This guide is a Work-In-Progress!
Daft is a **distributed** dataframe. This means internally, data is represented as partitions which are then spread out across your system

Please comment on this `Github issue <https://github.com/Eventual-Inc/Daft/issues/840>`_ if you wish to expedite the filling out of this guide.
Why do we need partitions?
--------------------------

When running in a distributed settings (a cluster of machines), Daft spreads your dataframe's data across these machines. This means that your
workload is able to efficiently utilize all the resources in your cluster because each machine is able to work on its assigned partition(s) independently.

Additionally, certain global operations in a distributed setting requires data to be partitioned in a specific way for the operation to be correct, because
all the data matching a certain criteria needs to be on the same machine and in the same partition. For example, in a groupby-aggregation Daft needs to bring
together all the data for a given key into the same partition before it can perform a definitive local groupby-aggregation which is then globally correct.
Daft refers to this as a "clustering specification", and you are able to see this in the plans that it constructs as well.

.. NOTE::
When running locally on just a single machine, Daft is currently still using partitioning as well. This is still useful for
controlling parallelism and how much data is being materialized at a time.

However, Daft's new experimental execution engine will remove the concept of partitioning entirely for local execution.
You may enable it with ``DAFT_ENABLE_NATIVE_EXECUTOR=1``. Instead of using partitioning to control parallelism,
this new execution engine performs a streaming-based execution on small "morsels" of data, which provides much
more stable memory utilization while improving the user experience with not having to worry about partitioning.

This user guide helps you think about how to correctly partition your data to improve performance as well as memory stability in Daft.

General rule of thumb:

1. **Have Enough Partitions**: our general recommendation for high throughput and maximal resource utilization is to have *at least* ``2 x TOTAL_NUM_CPUS`` partitions, which allows Daft to fully saturate your CPUs.
2. **More Partitions**: if you are observing memory issues (excessive spilling or out-of-memory (OOM) issues) then you may choose to increase the number of partitions. This increases the amount of overhead in your system, but improves overall memory stability (since each partition will be smaller).
3. **Fewer Partitions**: if you are observing a large amount of overhead (especially during shuffle operations such as joins and sorts), then you may choose to decrease the number of partitions. This decreases the amount of overhead in the system, at the cost of using more memory (since each partition will be larger).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe some description of how to measure overhead (vs maybe just an operation that is expensive)?


.. seealso::
:doc:`./memory` - a guide for dealing with memory issues when using Daft

How is my data partitioned?
---------------------------

Daft will automatically use certain heuristics to determine the number of partitions for you when you create a DataFrame. When reading data from files (e.g. Parquet, CSV or JSON),
each file is by default one partition on its own, but Daft will also perform splitting of partitions (for files that are egregiously large) and coalescing of partitions (for small files)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "by default" is a little misleading here as Daft by default does scan task splitting and merging.

to improve the sizing and number of partitions in your system.

To interrogate the partitioning of your current DataFrame, you may use the :meth:`df.explain(show_all=True) <daft.DataFrame.explain>` method. Here is an example output from a simple
``df = daft.read_parquet(...)`` call on a fairly large number of Parquet files.

.. code::python

import daft

df = daft.read_parquet("s3://bucket/path_to_100_parquet_files/**")
df.explain(show_all=True)

.. code::

== Unoptimized Logical Plan ==

* GlobScanOperator
| Glob paths = [s3://bucket/path_to_100_parquet_files/**]
| ...


...


== Physical Plan ==

* TabularScan:
| Num Scan Tasks = 3
| Estimated Scan Bytes = 72000000
| Clustering spec = { Num partitions = 3 }
| ...

In the above example, the call to ``df.read_parquet`` read 100 Parquet files, but the Physical Plan indicates that Daft will only create 3 partitions. This is because these files are quite small (in this example, totalling about 72MB of data) and Daft recognizes that it should be able to read them as just 3 partitions, each with about 33 files each!

How can I change the way my data is partitioned?
------------------------------------------------

You can change the way your data is partitioned by leveraging certain DataFrame methods:

1. :meth:`daft.DataFrame.repartition`: repartitions your data into `N` partitions by performing a hash-bucketing that ensure that all data with the same values for the specified columns ends up in the same partition. Expensive, requires data movement between partitions and machines.
2. :meth:`daft.DataFrame.into_partitions`: splits or coalesces adjacent partitions to meet the specified target number of total partitions. This is less expensive than a call to ``df.repartition`` because it does not require shuffling or moving data between partitions.
3. Many global dataframe operations such as :meth:`daft.DataFrame.join`, :meth:`daft.DataFrame.sort` and :meth:`daft.GroupedDataframe.agg` will change the partitioning of your data. This is because they require shuffling data between partitions to be globally correct.

Note that many of these methods will change both the *number of partitions* as well as the *clustering specification* of the new partitioning. For example, when calling ``df.repartition(8, col("x"))``, the resultant dataframe will now have 8 partitions in total with the additional guarantee that all rows with the same value of ``col("x")`` are in the same partition! This is called "hash partitioning".

.. code::python

df = df.repartition(8, daft.col("x"))
df.explain(show_all=True)

.. code::

== Unoptimized Logical Plan ==

* Repartition: Scheme = Hash
| Num partitions = Some(8)
| By = col(x)
|
* GlobScanOperator
| Glob paths = [s3://bucket/path_to_1000_parquet_files/**]
| ...

...

== Physical Plan ==

* ReduceMerge
|
* FanoutByHash: 8
| Partition by = col(url)
|
* TabularScan:
| Num Scan Tasks = 3
| Estimated Scan Bytes = 72000000
| Clustering spec = { Num partitions = 3 }
| ...
Loading