Skip to content

Commit

Permalink
[BUG] Azure and Iceberg read and write fixes (#2349)
Browse files Browse the repository at this point in the history
In this PR:
- `pyarrow.dataset.write_dataset` does not properly write Parquet
metadata in version 12.0.0, set the requirements for it to be >=12.0.1
- Azure fsspec filesystem now initialized IOConfig values
- Azure URIs that look like
`PROTOCOL://account.dfs.core.windows.net/container/path-part/file` now
properly parsed, URI parsing also cleaned up and unified
- fixed small discrepancies for AzureConfig in `daft.pyi`
- Added a public test Iceberg table on Azure, a SQLite catalog that
points to the table, and a test for those tables.
  - More tests should be written - #2348

Should resolve #2005
  • Loading branch information
kevinzwang authored Jun 7, 2024
1 parent 87f6706 commit f13554f
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 57 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
matrix:
python-version: ['3.8', '3.10']
daft-runner: [py, ray]
pyarrow-version: [7.0.0, 12.0]
pyarrow-version: [7.0.0, 12.0.1]
enable-aqe: [0, 1]
os: [ubuntu-20.04, windows-latest]
exclude:
Expand Down
2 changes: 1 addition & 1 deletion benchmarking/parquet/benchmark-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pytest==7.4.0
pytest-benchmark==4.0.0
pytest-memray==1.4.1
pyarrow==12.0.0
pyarrow==12.0.1
boto3==1.28.3
6 changes: 3 additions & 3 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ class AzureConfig:
tenant_id: str | None
client_id: str | None
client_secret: str | None
anonymous: str | None
anonymous: bool | None
endpoint_url: str | None = None
use_ssl: bool | None = None

Expand All @@ -531,7 +531,7 @@ class AzureConfig:
tenant_id: str | None = None,
client_id: str | None = None,
client_secret: str | None = None,
anonymous: str | None = None,
anonymous: bool | None = None,
endpoint_url: str | None = None,
use_ssl: bool | None = None,
): ...
Expand All @@ -543,7 +543,7 @@ class AzureConfig:
tenant_id: str | None = None,
client_id: str | None = None,
client_secret: str | None = None,
anonymous: str | None = None,
anonymous: bool | None = None,
endpoint_url: str | None = None,
use_ssl: bool | None = None,
) -> AzureConfig:
Expand Down
6 changes: 4 additions & 2 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,10 @@ def write_iceberg(self, table: "pyiceberg.table.Table", mode: str = "append") ->
if parse(pyiceberg.__version__) < parse("0.6.0"):
raise ValueError(f"Write Iceberg is only supported on pyiceberg>=0.6.0, found {pyiceberg.__version__}")

if parse(pa.__version__) < parse("8.0.0"):
raise ValueError(f"Write Iceberg is only supported on pyarrow>=8.0.0, found {pa.__version__}")
if parse(pa.__version__) < parse("12.0.1"):
raise ValueError(
f"Write Iceberg is only supported on pyarrow>=12.0.1, found {pa.__version__}. See this issue for more information: https://github.com/apache/arrow/issues/37054#issuecomment-1668644887"
)

from pyiceberg.table import _MergingSnapshotProducer
from pyiceberg.table.snapshots import Operation
Expand Down
15 changes: 14 additions & 1 deletion daft/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,20 @@ def _set_if_not_none(kwargs: dict[str, Any], key: str, val: Any | None):
###
elif protocol in {"az", "abfs", "abfss"}:
fsspec_fs_cls = get_filesystem_class(protocol)
fsspec_fs = fsspec_fs_cls()

if io_config is not None:
# TODO: look into support for other AzureConfig parameters
fsspec_fs = fsspec_fs_cls(
account_name=io_config.azure.storage_account,
account_key=io_config.azure.access_key,
sas_token=io_config.azure.sas_token,
tenant_id=io_config.azure.tenant_id,
client_id=io_config.azure.client_id,
client_secret=io_config.azure.client_secret,
anon=io_config.azure.anonymous,
)
else:
fsspec_fs = fsspec_fs_cls()
resolved_filesystem, resolved_path = pafs_resolve_filesystem_and_path(path, fsspec_fs)
resolved_path = resolved_filesystem.normalize_path(_unwrap_protocol(resolved_path))
return resolved_path, resolved_filesystem
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Pillow==9.5.0
opencv-python==4.8.1.78

# Pyarrow
pyarrow==12
pyarrow==12.0.1
# Ray
ray[data, client]==2.7.1; python_version < '3.8'
ray[data, client]==2.10.0; python_version >= '3.8'
Expand Down
111 changes: 63 additions & 48 deletions src/daft-io/src/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_io_config::AzureConfig;
const AZURE_DELIMITER: &str = "/";
const DEFAULT_GLOB_FANOUT_LIMIT: usize = 1024;
const AZURE_STORAGE_RESOURCE: &str = "https://storage.azure.com";
const AZURE_STORE_SUFFIX: &str = ".dfs.core.windows.net";

#[derive(Debug, Snafu)]
enum Error {
Expand Down Expand Up @@ -64,6 +65,44 @@ enum Error {
NotAFile { path: String },
}

/// Parse an Azure URI into its components.
/// Returns (protocol, container if exists, key).
fn parse_azure_uri(uri: &str) -> super::Result<(String, Option<(String, String)>)> {
let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?;

// "Container" is Azure's name for Bucket.
//
// fsspec supports two URI formats; for compatibility, we will support both as well.
// PROTOCOL://container/path-part/file
// PROTOCOL://[email protected]/path-part/file
// See https://github.com/fsspec/adlfs/ for more details
//
// It also supports PROTOCOL://account.dfs.core.windows.net/container/path-part/file
// but it is not documented
// https://github.com/fsspec/adlfs/blob/5c24b2e886fc8e068a313819ce3db9b7077c27e3/adlfs/spec.py#L364
let username = uri.username();
let container_and_key = if username.is_empty() {
match uri.host_str() {
Some(host) if host.ends_with(AZURE_STORE_SUFFIX) => uri
.path()
.split_once('/')
.map(|(c, k)| (c.into(), k.into())),
Some(host) => Some((host.into(), uri.path().into())),
None => None,
}
} else {
Some((username.into(), uri.path().into()))
};

// fsspec supports multiple URI protocol strings for Azure: az:// and abfs://.
// NB: It's unclear if there is a semantic difference between the protocols
// or if there is a standard for the behaviour either;
// here, we will treat them both the same, but persist whichever protocol string was used.
let protocol = uri.scheme();

Ok((protocol.into(), container_and_key))
}

impl From<Error> for super::Error {
fn from(error: Error) -> Self {
use Error::*;
Expand Down Expand Up @@ -443,15 +482,11 @@ impl ObjectSource for AzureBlobSource {
range: Option<Range<usize>>,
io_stats: Option<IOStatsRef>,
) -> super::Result<GetResult> {
let parsed = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?;
let container = match parsed.host_str() {
Some(s) => Ok(s),
None => Err(Error::InvalidUrl {
path: uri.into(),
source: url::ParseError::EmptyHost,
}),
}?;
let key = parsed.path();
let (_, container_and_key) = parse_azure_uri(uri)?;
let (container, key) = container_and_key.ok_or_else(|| Error::InvalidUrl {
path: uri.into(),
source: url::ParseError::EmptyHost,
})?;

if key.is_empty() {
return Err(Error::NotAFile { path: uri.into() }.into());
Expand Down Expand Up @@ -489,15 +524,11 @@ impl ObjectSource for AzureBlobSource {
}

async fn get_size(&self, uri: &str, io_stats: Option<IOStatsRef>) -> super::Result<usize> {
let parsed = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?;
let container = match parsed.host_str() {
Some(s) => Ok(s),
None => Err(Error::InvalidUrl {
path: uri.into(),
source: url::ParseError::EmptyHost,
}),
}?;
let key = parsed.path();
let (_, container_and_key) = parse_azure_uri(uri)?;
let (container, key) = container_and_key.ok_or_else(|| Error::InvalidUrl {
path: uri.into(),
source: url::ParseError::EmptyHost,
})?;

if key.is_empty() {
return Err(Error::NotAFile { path: uri.into() }.into());
Expand Down Expand Up @@ -547,39 +578,23 @@ impl ObjectSource for AzureBlobSource {
_page_size: Option<i32>,
io_stats: Option<IOStatsRef>,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?;

// path can be root (buckets) or path prefix within a bucket.
let container = {
// "Container" is Azure's name for Bucket.
//
// fsspec supports two URI formats; for compatibility, we will support both as well.
// PROTOCOL://container/path-part/file
// PROTOCOL://[email protected]/path-part/file
// See https://github.com/fsspec/adlfs/ for more details
let username = uri.username();
match username {
"" => uri.host_str(),
_ => Some(username),
}
};

// fsspec supports multiple URI protocol strings for Azure: az:// and abfs://.
// NB: It's unclear if there is a semantic difference between the protocols
// or if there is a standard for the behaviour either;
// here, we will treat them both the same, but persist whichever protocol string was used.
let protocol = uri.scheme();
let (protocol, container_and_key) = parse_azure_uri(uri)?;

match container {
match container_and_key {
// List containers.
None => Ok(self.list_containers_stream(protocol, io_stats).await),
None => Ok(self
.list_containers_stream(protocol.as_str(), io_stats)
.await),
// List a path within a container.
Some(container_name) => {
let prefix = uri.path();
Ok(self
.list_directory_stream(protocol, container_name, prefix, posix, io_stats)
.await)
}
Some((container_name, key)) => Ok(self
.list_directory_stream(
protocol.as_str(),
container_name.as_str(),
key.as_str(),
posix,
io_stats,
)
.await),
}
}

Expand Down
Binary file added tests/assets/pyiceberg_catalog.db
Binary file not shown.
22 changes: 22 additions & 0 deletions tests/integration/iceberg/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@
]


cloud_tables_names = [
"azure.test",
# TODO(Kevin): Add more tables from more cloud providers
]


@tenacity.retry(
stop=tenacity.stop_after_delay(60),
retry=tenacity.retry_if_exception_type(pyiceberg.exceptions.NoSuchTableError),
Expand Down Expand Up @@ -73,3 +79,19 @@ def local_iceberg_tables(request, local_iceberg_catalog) -> Table:
NAMESPACE = "default"
table_name = request.param
return local_iceberg_catalog.load_table(f"{NAMESPACE}.{table_name}")


@pytest.fixture(scope="session")
def cloud_iceberg_catalog() -> Catalog:
return load_catalog(
"default",
**{
"uri": "sqlite:///tests/assets/pyiceberg_catalog.db",
"adlfs.account-name": "dafttestdata",
},
)


@pytest.fixture(scope="session", params=cloud_tables_names)
def cloud_iceberg_table(request, cloud_iceberg_catalog) -> Table:
return cloud_iceberg_catalog.load_table(request.param)
14 changes: 14 additions & 0 deletions tests/integration/iceberg/test_cloud_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import pytest

pyiceberg = pytest.importorskip("pyiceberg")

import daft
from tests.conftest import assert_df_equals


@pytest.mark.integration
def test_daft_iceberg_cloud_table_load(cloud_iceberg_table):
df = daft.read_iceberg(cloud_iceberg_table)
daft_pandas = df.to_pandas()
iceberg_pandas = cloud_iceberg_table.scan().to_arrow().to_pandas()
assert_df_equals(daft_pandas, iceberg_pandas, sort_key=[])

0 comments on commit f13554f

Please sign in to comment.