Skip to content

Commit

Permalink
[FEAT] Delta lake allow unsafe rename for local writes (#2824)
Browse files Browse the repository at this point in the history
I don't know a good way to test this since we believe this issue only
arises when you mount a fabric table onto the local filesystem. However
it is a pretty change that should not affect any existing behavior. I
will verify that it works with our users once it is released.
  • Loading branch information
kevinzwang authored Sep 12, 2024
1 parent 2e35d5b commit 805fbce
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
TypeVar,
Union,
)
from urllib.parse import urlparse

from daft.api_annotations import DataframePublicAPI
from daft.context import get_context
Expand Down Expand Up @@ -761,6 +760,7 @@ def write_deltalake(
configuration: Optional[Mapping[str, Optional[str]]] = None,
custom_metadata: Optional[Dict[str, str]] = None,
dynamo_table_name: Optional[str] = None,
allow_unsafe_rename: bool = False,
io_config: Optional[IOConfig] = None,
) -> "DataFrame":
"""Writes the DataFrame to a `Delta Lake <https://docs.delta.io/latest/index.html>`__ table, returning a new DataFrame with the operations that occurred.
Expand All @@ -777,6 +777,7 @@ def write_deltalake(
configuration (Mapping[str, Optional[str]], optional): A map containing configuration options for the metadata action.
custom_metadata (Dict[str, str], optional): Custom metadata to add to the commit info.
dynamo_table_name (str, optional): Name of the DynamoDB table to be used as the locking provider if writing to S3.
allow_unsafe_rename (bool, optional): Whether to allow unsafe rename when writing to S3 or local disk. Defaults to False.
io_config (IOConfig, optional): configurations to use when interacting with remote storage.
Returns:
Expand All @@ -795,6 +796,7 @@ def write_deltalake(
from packaging.version import parse

from daft import from_pydict
from daft.filesystem import get_protocol_from_path
from daft.io import DataCatalogTable
from daft.io._deltalake import large_dtypes_kwargs
from daft.io.object_store_options import io_config_to_storage_options
Expand Down Expand Up @@ -826,14 +828,19 @@ def write_deltalake(
raise ValueError(f"Expected table to be a path or a DeltaTable, received: {type(table)}")

# see: https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/
scheme = urlparse(table_uri).scheme
scheme = get_protocol_from_path(table_uri)
if scheme == "s3" or scheme == "s3a":
if dynamo_table_name is not None:
storage_options["AWS_S3_LOCKING_PROVIDER"] = "dynamodb"
storage_options["DELTA_DYNAMO_TABLE_NAME"] = dynamo_table_name
else:
storage_options["AWS_S3_ALLOW_UNSAFE_RENAME"] = "true"
warnings.warn("No DynamoDB table specified for Delta Lake locking. Defaulting to unsafe writes.")

if not allow_unsafe_rename:
warnings.warn("No DynamoDB table specified for Delta Lake locking. Defaulting to unsafe writes.")
elif scheme == "file":
if allow_unsafe_rename:
storage_options["MOUNT_ALLOW_UNSAFE_RENAME"] = "true"

pyarrow_schema = pa.schema((f.name, f.dtype.to_arrow_dtype()) for f in self.schema())

Expand Down

0 comments on commit 805fbce

Please sign in to comment.