From f3c2cb6057224cd2c42023b85b4cbd60819d3aaa Mon Sep 17 00:00:00 2001 From: Xiayue Charles Lin Date: Wed, 20 Sep 2023 16:48:14 -0700 Subject: [PATCH 01/12] wip --- src/daft-io/src/azure_blob.rs | 114 ++++++++++++++++++++++++++++++++-- 1 file changed, 109 insertions(+), 5 deletions(-) diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index ee62da1525..6386cae925 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -1,12 +1,15 @@ use async_trait::async_trait; use azure_storage::prelude::*; -use azure_storage_blobs::prelude::*; +use azure_storage_blobs::{ + container::{operations::BlobItem, Container}, + prelude::*, +}; use futures::{StreamExt, TryStreamExt}; use snafu::{IntoError, ResultExt, Snafu}; use std::{num::ParseIntError, ops::Range, string::FromUtf8Error, sync::Arc}; use crate::{ - object_io::{LSResult, ObjectSource}, + object_io::{FileMetadata, FileType, LSResult, ObjectSource}, GetResult, }; use common_io_config::AzureConfig; @@ -86,6 +89,25 @@ impl From for super::Error { } } +fn container_to_file_metadata(container: &Container) -> FileMetadata { + FileMetadata { + filepath: format!("https://{}", container.name), + size: None, + filetype: FileType::Directory, + } +} + +fn blob_item_to_file_metadata(prefix: &str, blob_item: &BlobItem) -> FileMetadata { + match blob_item { + BlobItem::Blob(blob) => { + todo!() + } + BlobItem::BlobPrefix(prefix) => { + todo!() + } + } +} + pub(crate) struct AzureBlobSource { blob_client: Arc, } @@ -117,6 +139,76 @@ impl AzureBlobSource { } .into()) } + + async fn _list_containers(&self) -> super::Result { + let responses_stream = self + .blob_client + .clone() + .list_containers() + .include_metadata(true) + .into_stream(); + + // It looks like the azure rust library API + // does not currently allow using the continuation token: + // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/service/operations/struct.ListContainersBuilder.html + // https://docs.rs/azure_core/0.15.0/azure_core/struct.Pageable.html + // For now, collect the entire result. + let responses = responses_stream.try_collect::>().await; + + match responses { + Ok(responses) => { + let containers = responses + .iter() + .flat_map(|resp| &resp.containers) + .map(container_to_file_metadata) + .collect::>(); + + let result = LSResult { + files: containers, + continuation_token: None, + }; + + Ok(result) + } + Err(e) => todo!(), + } + } + + async fn _list_directory( + &self, + container_name: &str, + delimiter: &str, + prefix: &str, + ) -> super::Result { + let responses_stream = self + .blob_client + .container_client(container_name) + .list_blobs() + .delimiter(delimiter.to_string()) + .into_stream(); + + // It looks like the azure rust library API + // does not currently allow using the continuation token: + // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/container/operations/list_blobs/struct.ListBlobsBuilder.html + // https://docs.rs/azure_core/0.15.0/azure_core/struct.Pageable.html + // For now, collect the entire result. + let responses = responses_stream.try_collect::>().await; + + match responses { + Ok(responses) => { + let blob_items = responses + .iter() + .flat_map(|resp| &resp.blobs.items) + .map(|blob_item| blob_item_to_file_metadata(prefix, blob_item)) + .collect::>(); + + todo!() + } + Err(e) => { + todo!() + } + } + } } #[async_trait] @@ -175,12 +267,24 @@ impl ObjectSource for AzureBlobSource { Ok(metadata.blob.properties.content_length as usize) } + // path can be root (buckets) or path within a bucket. async fn ls( &self, - _path: &str, - _delimiter: Option<&str>, + path: &str, + delimiter: Option<&str>, _continuation_token: Option<&str>, ) -> super::Result { - unimplemented!("azure ls"); + let parsed = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?; + let delimiter = delimiter.unwrap_or("/"); + + // "Container" is Azure's name for Bucket. + let container = parsed.host_str(); + + match container { + // List containers. + None => self._list_containers().await, + // List a path within a container. + Some(s) => todo!(), + } } } From 2611b7bc31a89d5d9353c02899010357817b1bb5 Mon Sep 17 00:00:00 2001 From: Xiayue Charles Lin Date: Wed, 20 Sep 2023 17:57:20 -0700 Subject: [PATCH 02/12] wip --- src/daft-io/src/azure_blob.rs | 60 ++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index 6386cae925..ab4a5c93a8 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -89,25 +89,6 @@ impl From for super::Error { } } -fn container_to_file_metadata(container: &Container) -> FileMetadata { - FileMetadata { - filepath: format!("https://{}", container.name), - size: None, - filetype: FileType::Directory, - } -} - -fn blob_item_to_file_metadata(prefix: &str, blob_item: &BlobItem) -> FileMetadata { - match blob_item { - BlobItem::Blob(blob) => { - todo!() - } - BlobItem::BlobPrefix(prefix) => { - todo!() - } - } -} - pub(crate) struct AzureBlobSource { blob_client: Arc, } @@ -160,7 +141,7 @@ impl AzureBlobSource { let containers = responses .iter() .flat_map(|resp| &resp.containers) - .map(container_to_file_metadata) + .map(|container| self.container_to_file_metadata(container)) .collect::>(); let result = LSResult { @@ -199,7 +180,9 @@ impl AzureBlobSource { let blob_items = responses .iter() .flat_map(|resp| &resp.blobs.items) - .map(|blob_item| blob_item_to_file_metadata(prefix, blob_item)) + .map(|blob_item| { + self.blob_item_to_file_metadata(container_name, prefix, blob_item) + }) .collect::>(); todo!() @@ -209,6 +192,39 @@ impl AzureBlobSource { } } } + fn _container_to_file_metadata(&self, container: &Container) -> super::Result { + Ok(FileMetadata { + filepath: self + .blob_client + .container_client(container.name) + .url()? + .to_string(), + size: None, + filetype: FileType::Directory, + }) + } + + fn _blob_item_to_file_metadata( + &self, + container_client: &ContainerClient, + blob_item: &BlobItem, + ) -> super::Result { + match blob_item { + BlobItem::Blob(blob) => Ok(FileMetadata { + filepath: container_client.blob_client(&blob.name).url()?.to_string(), + size: Some(blob.properties.content_length), + filetype: FileType::File, + }), + BlobItem::BlobPrefix(prefix) => { + let container_url = container_client.url()?.to_string(); + Ok(FileMetadata { + filepath: format!("{}/{}", container_url, &prefix.name), + size: None, + filetype: FileType::Directory, + }) + } + } + } } #[async_trait] @@ -267,7 +283,7 @@ impl ObjectSource for AzureBlobSource { Ok(metadata.blob.properties.content_length as usize) } - // path can be root (buckets) or path within a bucket. + // path can be root (buckets) or path prefix within a bucket. async fn ls( &self, path: &str, From 153b9fffb7955e8ee5d4fa72756f40f714a5e19c Mon Sep 17 00:00:00 2001 From: Xiayue Charles Lin Date: Thu, 21 Sep 2023 12:45:37 -0700 Subject: [PATCH 03/12] Fix error handling --- src/daft-io/src/azure_blob.rs | 53 ++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index ab4a5c93a8..5bd552de70 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use azure_storage::prelude::*; use azure_storage_blobs::{ - container::{operations::BlobItem, Container}, + container::{self, operations::BlobItem, Container}, prelude::*, }; use futures::{StreamExt, TryStreamExt}; @@ -37,8 +37,8 @@ enum Error { source: azure_storage::Error, }, - #[snafu(display("Unable to create Http Client {}", source))] - UnableToCreateClient { source: reqwest::Error }, + #[snafu(display("Unable to create Azure Client {}", source))] + UnableToCreateClient { source: azure_storage::Error }, #[snafu(display("Unable to parse URL: \"{}\"", path))] InvalidUrl { @@ -46,6 +46,12 @@ enum Error { source: url::ParseError, }, + #[snafu(display("Unable to produce Azure URL for: \"{}\"", object))] + AzureUrlError { + object: String, + source: azure_storage::Error, + }, + #[snafu(display("Azure Storage Account not set and is required.\n Set either `AzureConfig.storage_account` or the `AZURE_STORAGE_ACCOUNT` environment variable."))] StorageAccountNotSet, @@ -141,8 +147,8 @@ impl AzureBlobSource { let containers = responses .iter() .flat_map(|resp| &resp.containers) - .map(|container| self.container_to_file_metadata(container)) - .collect::>(); + .map(|container| self._container_to_file_metadata(container)) + .collect::, _>>()?; let result = LSResult { files: containers, @@ -161,9 +167,9 @@ impl AzureBlobSource { delimiter: &str, prefix: &str, ) -> super::Result { - let responses_stream = self - .blob_client - .container_client(container_name) + let container_client = self.blob_client.container_client(container_name); + + let responses_stream = container_client .list_blobs() .delimiter(delimiter.to_string()) .into_stream(); @@ -180,10 +186,8 @@ impl AzureBlobSource { let blob_items = responses .iter() .flat_map(|resp| &resp.blobs.items) - .map(|blob_item| { - self.blob_item_to_file_metadata(container_name, prefix, blob_item) - }) - .collect::>(); + .map(|blob_item| self._blob_item_to_file_metadata(&container_client, blob_item)) + .collect::, _>>()?; todo!() } @@ -196,8 +200,11 @@ impl AzureBlobSource { Ok(FileMetadata { filepath: self .blob_client - .container_client(container.name) - .url()? + .container_client(&container.name) + .url() + .context(AzureUrlSnafu { + object: &container.name, + })? .to_string(), size: None, filetype: FileType::Directory, @@ -211,12 +218,21 @@ impl AzureBlobSource { ) -> super::Result { match blob_item { BlobItem::Blob(blob) => Ok(FileMetadata { - filepath: container_client.blob_client(&blob.name).url()?.to_string(), + filepath: container_client + .blob_client(&blob.name) + .url() + .context(AzureUrlSnafu { object: &blob.name })? + .to_string(), size: Some(blob.properties.content_length), filetype: FileType::File, }), BlobItem::BlobPrefix(prefix) => { - let container_url = container_client.url()?.to_string(); + let container_url = container_client + .url() + .context(AzureUrlSnafu { + object: &prefix.name, + })? + .to_string(); Ok(FileMetadata { filepath: format!("{}/{}", container_url, &prefix.name), size: None, @@ -300,7 +316,10 @@ impl ObjectSource for AzureBlobSource { // List containers. None => self._list_containers().await, // List a path within a container. - Some(s) => todo!(), + Some(container_name) => { + self._list_directory(container_name, delimiter, path); + todo!() + } } } } From 3b08d15641f6f4b2c4e3897e4e6eedcdf2dc2268 Mon Sep 17 00:00:00 2001 From: Xiayue Charles Lin Date: Mon, 25 Sep 2023 13:19:11 -0700 Subject: [PATCH 04/12] wip --- src/daft-io/src/azure_blob.rs | 105 ++++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 44 deletions(-) diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index 5bd552de70..e94fe71434 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -127,7 +127,7 @@ impl AzureBlobSource { .into()) } - async fn _list_containers(&self) -> super::Result { + async fn _list_containers(&self, protocol: &str) -> super::Result { let responses_stream = self .blob_client .clone() @@ -147,8 +147,8 @@ impl AzureBlobSource { let containers = responses .iter() .flat_map(|resp| &resp.containers) - .map(|container| self._container_to_file_metadata(container)) - .collect::, _>>()?; + .map(|container| self._container_to_file_metadata(protocol, container)) + .collect::>(); let result = LSResult { files: containers, @@ -163,15 +163,17 @@ impl AzureBlobSource { async fn _list_directory( &self, + protocol: &str, container_name: &str, - delimiter: &str, prefix: &str, + delimiter: &str, ) -> super::Result { let container_client = self.blob_client.container_client(container_name); let responses_stream = container_client .list_blobs() .delimiter(delimiter.to_string()) + .prefix(prefix.to_string()) .into_stream(); // It looks like the azure rust library API @@ -186,58 +188,47 @@ impl AzureBlobSource { let blob_items = responses .iter() .flat_map(|resp| &resp.blobs.items) - .map(|blob_item| self._blob_item_to_file_metadata(&container_client, blob_item)) - .collect::, _>>()?; + .map(|blob_item| self._blob_item_to_file_metadata(protocol, container_name, blob_item)) + .collect::>(); - todo!() + let result = LSResult { + files: blob_items, + continuation_token: None, + }; + Ok(result) } Err(e) => { todo!() } } } - fn _container_to_file_metadata(&self, container: &Container) -> super::Result { - Ok(FileMetadata { - filepath: self - .blob_client - .container_client(&container.name) - .url() - .context(AzureUrlSnafu { - object: &container.name, - })? - .to_string(), + + fn _container_to_file_metadata(&self, protocol: &str, container: &Container) -> FileMetadata { + // NB: Cannot pass through to Azure client's .url() methods here + // because they return URIs of the form https://.../container/path. + FileMetadata { + filepath: format!("{protocol}://{}", &container.name), size: None, filetype: FileType::Directory, - }) + } } fn _blob_item_to_file_metadata( &self, - container_client: &ContainerClient, + protocol: &str, + container_name: &str, blob_item: &BlobItem, - ) -> super::Result { + ) -> FileMetadata { match blob_item { - BlobItem::Blob(blob) => Ok(FileMetadata { - filepath: container_client - .blob_client(&blob.name) - .url() - .context(AzureUrlSnafu { object: &blob.name })? - .to_string(), + BlobItem::Blob(blob) => FileMetadata { + filepath: format!("{protocol}://{}/{}", container_name, &blob.name), size: Some(blob.properties.content_length), filetype: FileType::File, - }), - BlobItem::BlobPrefix(prefix) => { - let container_url = container_client - .url() - .context(AzureUrlSnafu { - object: &prefix.name, - })? - .to_string(); - Ok(FileMetadata { - filepath: format!("{}/{}", container_url, &prefix.name), - size: None, - filetype: FileType::Directory, - }) + }, + BlobItem::BlobPrefix(prefix) => FileMetadata { + filepath: format!("{protocol}://{}/{}", container_name, &prefix.name), + size: None, + filetype: FileType::Directory, } } } @@ -304,21 +295,47 @@ impl ObjectSource for AzureBlobSource { &self, path: &str, delimiter: Option<&str>, - _continuation_token: Option<&str>, + continuation_token: Option<&str>, ) -> super::Result { let parsed = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?; let delimiter = delimiter.unwrap_or("/"); + + // It looks like the azure rust library API + // does not currently allow using the continuation token: + // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/container/operations/list_blobs/struct.ListBlobsBuilder.html + // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/service/operations/struct.ListContainersBuilder.html + // https://docs.rs/azure_core/0.15.0/azure_core/struct.Pageable.html + if continuation_token.is_some() { + todo!() + } + // "Container" is Azure's name for Bucket. - let container = parsed.host_str(); + let container = { + // fsspec supports two URI formats are supported; for compatibility, we will support both as well. + // PROTOCOL://container/path-part/file + // PROTOCOL://container@account.dfs.core.windows.net/path-part/file + // See https://github.com/fsspec/adlfs/ for more details + let username = parsed.username(); + match username { + "" => parsed.host_str(), + _ => Some(username), + } + }; + + // fsspec supports multiple URI protocol strings for Azure: az:// and abfs://. + // NB: It's unclear if there is a semantic difference between the protocols + // or if there is a standard for the behaviour either; + // here, we will treat them both the same, but persist whichever protocol string was used. + let protocol = parsed.scheme(); match container { // List containers. - None => self._list_containers().await, + None => self._list_containers(protocol).await, // List a path within a container. Some(container_name) => { - self._list_directory(container_name, delimiter, path); - todo!() + let prefix = parsed.path(); + self._list_directory(protocol, container_name, prefix, delimiter).await } } } From d901363976b1796fe7523c7470486e14ee6bd89d Mon Sep 17 00:00:00 2001 From: Xiayue Charles Lin Date: Mon, 25 Sep 2023 16:43:34 -0700 Subject: [PATCH 05/12] most things working now --- src/daft-io/src/azure_blob.rs | 232 ++++++++++++++++++++++------------ 1 file changed, 153 insertions(+), 79 deletions(-) diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index e94fe71434..0bc44416bb 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -1,12 +1,12 @@ use async_trait::async_trait; use azure_storage::prelude::*; use azure_storage_blobs::{ - container::{self, operations::BlobItem, Container}, + container::{operations::BlobItem, Container}, prelude::*, }; use futures::{StreamExt, TryStreamExt}; use snafu::{IntoError, ResultExt, Snafu}; -use std::{num::ParseIntError, ops::Range, string::FromUtf8Error, sync::Arc}; +use std::{collections::HashSet, num::ParseIntError, ops::Range, string::FromUtf8Error, sync::Arc}; use crate::{ object_io::{FileMetadata, FileType, LSResult, ObjectSource}, @@ -16,6 +16,31 @@ use common_io_config::AzureConfig; #[derive(Debug, Snafu)] enum Error { + // Input parsing errors. + #[snafu(display("Unable to parse URL: \"{}\"", path))] + InvalidUrl { + path: String, + source: url::ParseError, + }, + #[snafu(display( + "Unable to parse data as Utf8 while reading header for file: {path}. {source}" + ))] + UnableToParseUtf8 { path: String, source: FromUtf8Error }, + + #[snafu(display( + "Unable to parse data as Integer while reading header for file: {path}. {source}" + ))] + UnableToParseInteger { path: String, source: ParseIntError }, + + // Generic client errors. + #[snafu(display("Azure Storage Account not set and is required.\n Set either `AzureConfig.storage_account` or the `AZURE_STORAGE_ACCOUNT` environment variable."))] + StorageAccountNotSet, + #[snafu(display("Unable to create Azure Client {}", source))] + UnableToCreateClient { source: azure_storage::Error }, + #[snafu(display("Azure client generic error: {}", source))] + AzureGenericError { source: azure_storage::Error }, + + // Parameterized client errors.\ #[snafu(display("Unable to connect to {}: {}", path, source))] UnableToConnect { path: String, @@ -28,42 +53,26 @@ enum Error { source: azure_storage::Error, }, - #[snafu(display("Unable to determine size of {}", path))] - UnableToDetermineSize { path: String }, - #[snafu(display("Unable to read data from {}: {}", path, source))] UnableToReadBytes { path: String, source: azure_storage::Error, }, - #[snafu(display("Unable to create Azure Client {}", source))] - UnableToCreateClient { source: azure_storage::Error }, - - #[snafu(display("Unable to parse URL: \"{}\"", path))] - InvalidUrl { + #[snafu(display("Unable to read metadata about {}: {}", path, source))] + RequestFailedForPath { path: String, - source: url::ParseError, - }, - - #[snafu(display("Unable to produce Azure URL for: \"{}\"", object))] - AzureUrlError { - object: String, source: azure_storage::Error, }, - #[snafu(display("Azure Storage Account not set and is required.\n Set either `AzureConfig.storage_account` or the `AZURE_STORAGE_ACCOUNT` environment variable."))] - StorageAccountNotSet, + #[snafu(display("Not Found: \"{}\"", path))] + NotFound { path: String }, - #[snafu(display( - "Unable to parse data as Utf8 while reading header for file: {path}. {source}" - ))] - UnableToParseUtf8 { path: String, source: FromUtf8Error }, - - #[snafu(display( - "Unable to parse data as Integer while reading header for file: {path}. {source}" - ))] - UnableToParseInteger { path: String, source: ParseIntError }, + #[snafu(display("Unable to access container: {}: {}", path, source))] + ContainerAccessError { + path: String, + source: azure_storage::Error, + }, } impl From for super::Error { @@ -87,6 +96,10 @@ impl From for super::Error { }, } } + NotFound { ref path } => super::Error::NotFound { + path: path.into(), + source: error.into(), + }, _ => super::Error::Generic { store: super::SourceType::AzureBlob, source: error.into(), @@ -140,25 +153,19 @@ impl AzureBlobSource { // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/service/operations/struct.ListContainersBuilder.html // https://docs.rs/azure_core/0.15.0/azure_core/struct.Pageable.html // For now, collect the entire result. - let responses = responses_stream.try_collect::>().await; - - match responses { - Ok(responses) => { - let containers = responses - .iter() - .flat_map(|resp| &resp.containers) - .map(|container| self._container_to_file_metadata(protocol, container)) - .collect::>(); - - let result = LSResult { - files: containers, - continuation_token: None, - }; - - Ok(result) - } - Err(e) => todo!(), - } + let containers = responses_stream + .try_collect::>() + .await + .with_context(|_| AzureGenericSnafu {})? + .iter() + .flat_map(|resp| &resp.containers) + .map(|container| self._container_to_file_metadata(protocol, container)) + .collect::>(); + + Ok(LSResult { + files: containers, + continuation_token: None, + }) } async fn _list_directory( @@ -170,36 +177,101 @@ impl AzureBlobSource { ) -> super::Result { let container_client = self.blob_client.container_client(container_name); - let responses_stream = container_client + // Blob stores expose listing by prefix and delimiter, + // but this is not the exact same as a unix-like LS behaviour + // (e.g. /somef is a prefix of /somefile, but you cannot ls /somef) + // To use prefix listing as LS, we need to ensure the path given is exactly a directory or a file, not a prefix. + + // It turns out Azure list_blobs("path/") will match both a file at "path" and a folder at "path/", which is exactly what we need. + let prefix_with_delimiter = format!("{}{delimiter}", prefix.trim_end_matches(delimiter)); + let full_path = format!("{}://{}{}", protocol, container_name, prefix); + let full_path_with_trailing_delimiter = format!( + "{}://{}{}", + protocol, container_name, &prefix_with_delimiter + ); + + let results = container_client .list_blobs() .delimiter(delimiter.to_string()) - .prefix(prefix.to_string()) - .into_stream(); - - // It looks like the azure rust library API - // does not currently allow using the continuation token: - // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/container/operations/list_blobs/struct.ListBlobsBuilder.html - // https://docs.rs/azure_core/0.15.0/azure_core/struct.Pageable.html - // For now, collect the entire result. - let responses = responses_stream.try_collect::>().await; - - match responses { - Ok(responses) => { - let blob_items = responses - .iter() - .flat_map(|resp| &resp.blobs.items) - .map(|blob_item| self._blob_item_to_file_metadata(protocol, container_name, blob_item)) - .collect::>(); - - let result = LSResult { - files: blob_items, - continuation_token: None, + .prefix(prefix_with_delimiter.clone()) + .into_stream() + .try_collect::>() + .await + .with_context(|_| RequestFailedForPathSnafu { + path: &full_path_with_trailing_delimiter, + })? + .iter() + .flat_map(|resp| &resp.blobs.items) + .map(|blob_item| self._blob_item_to_file_metadata(protocol, container_name, blob_item)) + .collect::>(); + + match &results[..] { + [] => { + // If an empty list is returned, we need to check whether the prefix actually exists and has nothing after it + // or if it is a nonexistent prefix. + // (Azure does not return marker files for empty directories.) + + let prefix_exists = match prefix { + "" | "/" => true, + _ => { + // To check whether the prefix actually exists, check whether it exists as a result one directory above. + let upper_dir = prefix // "/upper/blah/" + .trim_end_matches(delimiter) // "/upper/blah" + .trim_end_matches(|c: char| c.to_string() != delimiter); // "/upper/" + + let upper_results = container_client + .list_blobs() + .delimiter(delimiter.to_string()) + .prefix(upper_dir.to_string()) + .into_stream() + .try_collect::>() + .await + .with_context(|_| RequestFailedForPathSnafu { + path: format!("{}://{}{}", protocol, container_name, upper_dir), + })? + .iter() + .flat_map(|resp| &resp.blobs.items) + .map(|blob_item| { + self._blob_item_to_file_metadata( + protocol, + container_name, + blob_item, + ) + .filepath + }) + .collect::>(); + upper_results.contains(&full_path_with_trailing_delimiter) + } }; - Ok(result) + + if prefix_exists { + Ok(LSResult { + files: vec![], + continuation_token: None, + }) + } else { + Err(Error::NotFound { path: full_path }.into()) + } } - Err(e) => { - todo!() + [result] => { + // Azure prefixing does not differentiate between directories and files even if the trailing slash is provided. + // This returns incorrect results when we asked for a directory and got a file. + // (The other way around is okay - we can ask for a path without a trailing slash and get a directory with a trailing slash.) + // If we get a single result, we need to check whether we asked for a directory but got a file, in which case the requested directory does not exist. + + if full_path.len() > result.filepath.len() { + Err(Error::NotFound { path: full_path }.into()) + } else { + Ok(LSResult { + files: results, + continuation_token: None, + }) + } } + _ => Ok(LSResult { + files: results, + continuation_token: None, + }), } } @@ -229,7 +301,7 @@ impl AzureBlobSource { filepath: format!("{protocol}://{}/{}", container_name, &prefix.name), size: None, filetype: FileType::Directory, - } + }, } } } @@ -300,16 +372,17 @@ impl ObjectSource for AzureBlobSource { let parsed = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?; let delimiter = delimiter.unwrap_or("/"); - // It looks like the azure rust library API // does not currently allow using the continuation token: // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/container/operations/list_blobs/struct.ListBlobsBuilder.html // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/service/operations/struct.ListContainersBuilder.html // https://docs.rs/azure_core/0.15.0/azure_core/struct.Pageable.html - if continuation_token.is_some() { - todo!() - } - + assert!( + continuation_token.is_none(), + "unexpected Azure continuation_token {:?} received", + continuation_token + ); + // "Container" is Azure's name for Bucket. let container = { // fsspec supports two URI formats are supported; for compatibility, we will support both as well. @@ -325,7 +398,7 @@ impl ObjectSource for AzureBlobSource { // fsspec supports multiple URI protocol strings for Azure: az:// and abfs://. // NB: It's unclear if there is a semantic difference between the protocols - // or if there is a standard for the behaviour either; + // or if there is a standard for the behaviour either; // here, we will treat them both the same, but persist whichever protocol string was used. let protocol = parsed.scheme(); @@ -335,7 +408,8 @@ impl ObjectSource for AzureBlobSource { // List a path within a container. Some(container_name) => { let prefix = parsed.path(); - self._list_directory(protocol, container_name, prefix, delimiter).await + self._list_directory(protocol, container_name, prefix, delimiter) + .await } } } From d399bc58f1edb4ec1e618810f821ada2b18d0b22 Mon Sep 17 00:00:00 2001 From: Xiayue Charles Lin Date: Tue, 26 Sep 2023 17:10:49 -0700 Subject: [PATCH 06/12] async stream rewrite --- src/daft-io/src/azure_blob.rs | 335 +++++++++++++++++++++++++++++----- 1 file changed, 287 insertions(+), 48 deletions(-) diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index 0bc44416bb..a9307349f5 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -4,7 +4,7 @@ use azure_storage_blobs::{ container::{operations::BlobItem, Container}, prelude::*, }; -use futures::{StreamExt, TryStreamExt}; +use futures::{pin_mut, stream::BoxStream, StreamExt, TryStreamExt}; use snafu::{IntoError, ResultExt, Snafu}; use std::{collections::HashSet, num::ParseIntError, ops::Range, string::FromUtf8Error, sync::Arc}; @@ -40,7 +40,7 @@ enum Error { #[snafu(display("Azure client generic error: {}", source))] AzureGenericError { source: azure_storage::Error }, - // Parameterized client errors.\ + // Parameterized client errors. #[snafu(display("Unable to connect to {}: {}", path, source))] UnableToConnect { path: String, @@ -140,7 +140,11 @@ impl AzureBlobSource { .into()) } - async fn _list_containers(&self, protocol: &str) -> super::Result { + async fn list_containers_stream( + &self, + protocol: String, + ) -> BoxStream> { + // Paginated stream of results from Azure API call. let responses_stream = self .blob_client .clone() @@ -148,24 +152,197 @@ impl AzureBlobSource { .include_metadata(true) .into_stream(); - // It looks like the azure rust library API - // does not currently allow using the continuation token: - // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/service/operations/struct.ListContainersBuilder.html - // https://docs.rs/azure_core/0.15.0/azure_core/struct.Pageable.html - // For now, collect the entire result. - let containers = responses_stream - .try_collect::>() - .await - .with_context(|_| AzureGenericSnafu {})? - .iter() - .flat_map(|resp| &resp.containers) - .map(|container| self._container_to_file_metadata(protocol, container)) - .collect::>(); + // Flatmap each page of results to a single stream of our standardized FileMetadata. + responses_stream + .flat_map(move |response| match response { + Ok(response) => { + let containers = response + .containers + .iter() + .map(|container| { + Ok(self._container_to_file_metadata(protocol.clone(), container)) + }) + .collect::>(); + futures::stream::iter(containers) + } + Err(error) => { + let error = Err(Error::AzureGenericError { source: error }.into()); + futures::stream::iter(vec![error]) + } + }) + .boxed() + } + + async fn _list_directory_delimiter_stream( + &self, + container_client: &ContainerClient, + protocol: &str, + container_name: &str, + prefix: &str, + delimiter: &str, + ) -> BoxStream> { + // Calls Azure list_blobs with the prefix + // and returns the result flattened and standardized into FileMetadata. + + // Clone and own some references that we need for the lifetime of the stream. + let protocol = protocol.to_string(); + let container_name = container_name.to_string(); + let prefix = prefix.to_string(); + + // Paginated response stream from Azure API. + let responses_stream = container_client + .list_blobs() + .delimiter(delimiter.to_string()) + .prefix(prefix.clone()) + .into_stream(); + + // Map each page of results to a page of standardized FileMetadata. + responses_stream + .flat_map(move |response| match response { + Ok(response) => { + let paths_data = response + .blobs + .items + .iter() + .map(|blob_item| { + Ok(self._blob_item_to_file_metadata( + &protocol, + &container_name, + blob_item, + )) + }) + .collect::>(); + futures::stream::iter(paths_data) + } + Err(error) => { + let error = Err(Error::RequestFailedForPath { + path: format!("{}://{}{}", &protocol, &container_name, &prefix), + source: error, + } + .into()); + futures::stream::iter(vec![error]) + } + }) + .boxed() + } + + async fn list_directory_stream( + &self, + protocol: &str, + container_name: &str, + prefix: &str, + delimiter: &str, + ) -> BoxStream> { + let container_client = self.blob_client.container_client(container_name); + + // Clone and own some references that we need for the lifetime of the stream. + let protocol = protocol.to_string(); + let container_name = container_name.to_string(); + let prefix = prefix.to_string(); + let delimiter = delimiter.to_string(); + + // Blob stores expose listing by prefix and delimiter, + // but this is not the exact same as a unix-like LS behaviour + // (e.g. /somef is a prefix of /somefile, but you cannot ls /somef) + // To use prefix listing as LS, we need to ensure the path given is exactly a directory or a file, not a prefix. + + // It turns out Azure list_blobs("path/") will match both a file at "path" and a folder at "path/", which is exactly what we need. + let prefix_with_delimiter = format!("{}{delimiter}", prefix.trim_end_matches(&delimiter)); + let full_path = format!("{}://{}{}", protocol, container_name, prefix); + let full_path_with_trailing_delimiter = format!( + "{}://{}{}", + protocol, container_name, &prefix_with_delimiter + ); - Ok(LSResult { - files: containers, - continuation_token: None, - }) + let mut unchecked_results = self + ._list_directory_delimiter_stream( + &container_client, + &protocol, + &container_name, + &prefix_with_delimiter, + &delimiter, + ) + .await; + + // The Azure API call result is almost identical to the desired list directory result + // with a couple of exceptions: + // 1. Nonexistent paths return an empty list instead of 404. + // 2. A file "path" can be returned for a lookup "path/", which is not the desired behaviour. + // + // To check for and deal with these cases, + // manually process the first two items of the stream. + + let maybe_first_two_items = vec![ + unchecked_results.next().await, + unchecked_results.next().await, + ] + .into_iter() + .flatten() + .collect::>(); + + match &maybe_first_two_items[..] { + [] => { + // Check whether the path actually exists. + let s = async_stream::stream! { + let prefix_exists = match prefix.as_str() { + "" | "/" => true, + _ => { + // To check whether the prefix actually exists, check whether it exists as a result one directory above. + // (Azure does not return marker files for empty directories.) + let upper_dir = prefix // "/upper/blah/" + .trim_end_matches(&delimiter) // "/upper/blah" + .trim_end_matches(|c: char| c.to_string() != delimiter); // "/upper/" + + let upper_results = container_client + .list_blobs() + .delimiter(delimiter.to_string()) + .prefix(upper_dir.to_string()) + .into_stream() + .try_collect::>() + .await + .with_context(|_| RequestFailedForPathSnafu { + path: format!("{}://{}{}", protocol, container_name, upper_dir), + })? + .iter() + .flat_map(|resp| &resp.blobs.items) + .map(|blob_item| { + self._blob_item_to_file_metadata( + &protocol, + &container_name, + blob_item, + ) + .filepath + }) + .collect::>(); + + upper_results.contains(&full_path_with_trailing_delimiter) + } + }; + // If the prefix does not exist, the stream needs to yield a single NotFound error. + // Otherwise, it is a truly empty directory and we return an empty stream. + if !prefix_exists { + yield Err(Error::NotFound { path: full_path }.into()) + } + }; + s.boxed() + } + [Ok(item)] => { + // If we get a single result, we need to check whether we asked for a directory but got a file, + // in which case the requested directory does not actually exist. + + if full_path.len() > item.filepath.len() { + let error = Err(Error::NotFound { path: full_path }.into()); + futures::stream::iter(vec![error]).boxed() + } else { + futures::stream::iter(maybe_first_two_items) + .chain(unchecked_results) + .boxed() + } + } + _ => futures::stream::iter(maybe_first_two_items) + .chain(unchecked_results) + .boxed(), + } } async fn _list_directory( @@ -275,9 +452,9 @@ impl AzureBlobSource { } } - fn _container_to_file_metadata(&self, protocol: &str, container: &Container) -> FileMetadata { + fn _container_to_file_metadata(&self, protocol: String, container: &Container) -> FileMetadata { // NB: Cannot pass through to Azure client's .url() methods here - // because they return URIs of the form https://.../container/path. + // because they return URIs of a very different format (https://.../container/path). FileMetadata { filepath: format!("{protocol}://{}", &container.name), size: None, @@ -362,36 +539,25 @@ impl ObjectSource for AzureBlobSource { Ok(metadata.blob.properties.content_length as usize) } - // path can be root (buckets) or path prefix within a bucket. - async fn ls( + async fn iter_dir( &self, - path: &str, + uri: &str, delimiter: Option<&str>, - continuation_token: Option<&str>, - ) -> super::Result { - let parsed = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?; + _limit: Option, + ) -> super::Result>> { + let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?; let delimiter = delimiter.unwrap_or("/"); - // It looks like the azure rust library API - // does not currently allow using the continuation token: - // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/container/operations/list_blobs/struct.ListBlobsBuilder.html - // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/service/operations/struct.ListContainersBuilder.html - // https://docs.rs/azure_core/0.15.0/azure_core/struct.Pageable.html - assert!( - continuation_token.is_none(), - "unexpected Azure continuation_token {:?} received", - continuation_token - ); - - // "Container" is Azure's name for Bucket. let container = { - // fsspec supports two URI formats are supported; for compatibility, we will support both as well. + // "Container" is Azure's name for Bucket. + // + // fsspec supports two URI formats; for compatibility, we will support both as well. // PROTOCOL://container/path-part/file // PROTOCOL://container@account.dfs.core.windows.net/path-part/file // See https://github.com/fsspec/adlfs/ for more details - let username = parsed.username(); + let username = uri.username(); match username { - "" => parsed.host_str(), + "" => uri.host_str(), _ => Some(username), } }; @@ -400,17 +566,90 @@ impl ObjectSource for AzureBlobSource { // NB: It's unclear if there is a semantic difference between the protocols // or if there is a standard for the behaviour either; // here, we will treat them both the same, but persist whichever protocol string was used. - let protocol = parsed.scheme(); + let protocol = uri.scheme(); match container { // List containers. - None => self._list_containers(protocol).await, + None => Ok(self.list_containers_stream(protocol.to_string()).await), // List a path within a container. Some(container_name) => { - let prefix = parsed.path(); - self._list_directory(protocol, container_name, prefix, delimiter) - .await + let prefix = uri.path(); + Ok(self + .list_directory_stream(protocol, container_name, prefix, delimiter) + .await) } } + + // let uri = uri.to_string(); + // let delimiter = delimiter.map(String::from); + // let s = stream! { + // let lsr = self.ls(&uri, delimiter.as_deref(), None).await?; + // let mut continuation_token = lsr.continuation_token.clone(); + // for file in lsr.files { + // yield Ok(file); + // } + + // while continuation_token.is_some() { + // let lsr = self.ls(&uri, delimiter.as_deref(), continuation_token.as_deref()).await?; + // continuation_token = lsr.continuation_token.clone(); + // for file in lsr.files { + // yield Ok(file); + // } + // } + // }; + // Ok(s.boxed()) + } + + // path can be root (buckets) or path prefix within a bucket. + async fn ls( + &self, + path: &str, + delimiter: Option<&str>, + continuation_token: Option<&str>, + ) -> super::Result { + todo!() + // let parsed = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?; + // let delimiter = delimiter.unwrap_or("/"); + + // // It looks like the azure rust library API + // // does not currently allow using the continuation token: + // // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/container/operations/list_blobs/struct.ListBlobsBuilder.html + // // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/service/operations/struct.ListContainersBuilder.html + // // https://docs.rs/azure_core/0.15.0/azure_core/struct.Pageable.html + // assert!( + // continuation_token.is_none(), + // "Azure continuation_token {:?} received, which we cannot use", + // continuation_token + // ); + + // // "Container" is Azure's name for Bucket. + // let container = { + // // fsspec supports two URI formats are supported; for compatibility, we will support both as well. + // // PROTOCOL://container/path-part/file + // // PROTOCOL://container@account.dfs.core.windows.net/path-part/file + // // See https://github.com/fsspec/adlfs/ for more details + // let username = parsed.username(); + // match username { + // "" => parsed.host_str(), + // _ => Some(username), + // } + // }; + + // // fsspec supports multiple URI protocol strings for Azure: az:// and abfs://. + // // NB: It's unclear if there is a semantic difference between the protocols + // // or if there is a standard for the behaviour either; + // // here, we will treat them both the same, but persist whichever protocol string was used. + // let protocol = parsed.scheme(); + + // match container { + // // List containers. + // None => self._list_containers(protocol).await, + // // List a path within a container. + // Some(container_name) => { + // let prefix = parsed.path(); + // self._list_directory(protocol, container_name, prefix, delimiter) + // .await + // } + // } } } From 5246822e44e1fe133ff5564cde114215629eaf90 Mon Sep 17 00:00:00 2001 From: Xiayue Charles Lin Date: Wed, 27 Sep 2023 12:02:23 -0700 Subject: [PATCH 07/12] finally --- src/daft-io/src/azure_blob.rs | 298 +++++++++------------------------- 1 file changed, 80 insertions(+), 218 deletions(-) diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index a9307349f5..bfdb1a6322 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -4,7 +4,7 @@ use azure_storage_blobs::{ container::{operations::BlobItem, Container}, prelude::*, }; -use futures::{pin_mut, stream::BoxStream, StreamExt, TryStreamExt}; +use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use snafu::{IntoError, ResultExt, Snafu}; use std::{collections::HashSet, num::ParseIntError, ops::Range, string::FromUtf8Error, sync::Arc}; @@ -173,59 +173,6 @@ impl AzureBlobSource { .boxed() } - async fn _list_directory_delimiter_stream( - &self, - container_client: &ContainerClient, - protocol: &str, - container_name: &str, - prefix: &str, - delimiter: &str, - ) -> BoxStream> { - // Calls Azure list_blobs with the prefix - // and returns the result flattened and standardized into FileMetadata. - - // Clone and own some references that we need for the lifetime of the stream. - let protocol = protocol.to_string(); - let container_name = container_name.to_string(); - let prefix = prefix.to_string(); - - // Paginated response stream from Azure API. - let responses_stream = container_client - .list_blobs() - .delimiter(delimiter.to_string()) - .prefix(prefix.clone()) - .into_stream(); - - // Map each page of results to a page of standardized FileMetadata. - responses_stream - .flat_map(move |response| match response { - Ok(response) => { - let paths_data = response - .blobs - .items - .iter() - .map(|blob_item| { - Ok(self._blob_item_to_file_metadata( - &protocol, - &container_name, - blob_item, - )) - }) - .collect::>(); - futures::stream::iter(paths_data) - } - Err(error) => { - let error = Err(Error::RequestFailedForPath { - path: format!("{}://{}{}", &protocol, &container_name, &prefix), - source: error, - } - .into()); - futures::stream::iter(vec![error]) - } - }) - .boxed() - } - async fn list_directory_stream( &self, protocol: &str, @@ -272,13 +219,24 @@ impl AzureBlobSource { // To check for and deal with these cases, // manually process the first two items of the stream. - let maybe_first_two_items = vec![ - unchecked_results.next().await, - unchecked_results.next().await, - ] - .into_iter() - .flatten() - .collect::>(); + let mut maybe_first_two_items = vec![]; + let mut stream_exhausted = false; + for _ in 0..2 { + let item = unchecked_results.next().await; + if let Some(item) = item { + maybe_first_two_items.push(item); + } else { + stream_exhausted = true; + break; + } + } + + // Make sure the stream is pollable even if empty, since we will chain it later. + let unchecked_results = if !stream_exhausted { + unchecked_results + } else { + futures::stream::iter(vec![]).boxed() + }; match &maybe_first_two_items[..] { [] => { @@ -345,111 +303,57 @@ impl AzureBlobSource { } } - async fn _list_directory( + async fn _list_directory_delimiter_stream( &self, + container_client: &ContainerClient, protocol: &str, container_name: &str, prefix: &str, delimiter: &str, - ) -> super::Result { - let container_client = self.blob_client.container_client(container_name); - - // Blob stores expose listing by prefix and delimiter, - // but this is not the exact same as a unix-like LS behaviour - // (e.g. /somef is a prefix of /somefile, but you cannot ls /somef) - // To use prefix listing as LS, we need to ensure the path given is exactly a directory or a file, not a prefix. + ) -> BoxStream> { + // Calls Azure list_blobs with the prefix + // and returns the result flattened and standardized into FileMetadata. - // It turns out Azure list_blobs("path/") will match both a file at "path" and a folder at "path/", which is exactly what we need. - let prefix_with_delimiter = format!("{}{delimiter}", prefix.trim_end_matches(delimiter)); - let full_path = format!("{}://{}{}", protocol, container_name, prefix); - let full_path_with_trailing_delimiter = format!( - "{}://{}{}", - protocol, container_name, &prefix_with_delimiter - ); + // Clone and own some references that we need for the lifetime of the stream. + let protocol = protocol.to_string(); + let container_name = container_name.to_string(); + let prefix = prefix.to_string(); - let results = container_client + // Paginated response stream from Azure API. + let responses_stream = container_client .list_blobs() .delimiter(delimiter.to_string()) - .prefix(prefix_with_delimiter.clone()) - .into_stream() - .try_collect::>() - .await - .with_context(|_| RequestFailedForPathSnafu { - path: &full_path_with_trailing_delimiter, - })? - .iter() - .flat_map(|resp| &resp.blobs.items) - .map(|blob_item| self._blob_item_to_file_metadata(protocol, container_name, blob_item)) - .collect::>(); - - match &results[..] { - [] => { - // If an empty list is returned, we need to check whether the prefix actually exists and has nothing after it - // or if it is a nonexistent prefix. - // (Azure does not return marker files for empty directories.) - - let prefix_exists = match prefix { - "" | "/" => true, - _ => { - // To check whether the prefix actually exists, check whether it exists as a result one directory above. - let upper_dir = prefix // "/upper/blah/" - .trim_end_matches(delimiter) // "/upper/blah" - .trim_end_matches(|c: char| c.to_string() != delimiter); // "/upper/" - - let upper_results = container_client - .list_blobs() - .delimiter(delimiter.to_string()) - .prefix(upper_dir.to_string()) - .into_stream() - .try_collect::>() - .await - .with_context(|_| RequestFailedForPathSnafu { - path: format!("{}://{}{}", protocol, container_name, upper_dir), - })? - .iter() - .flat_map(|resp| &resp.blobs.items) - .map(|blob_item| { - self._blob_item_to_file_metadata( - protocol, - container_name, - blob_item, - ) - .filepath - }) - .collect::>(); - upper_results.contains(&full_path_with_trailing_delimiter) - } - }; + .prefix(prefix.clone()) + .into_stream(); - if prefix_exists { - Ok(LSResult { - files: vec![], - continuation_token: None, - }) - } else { - Err(Error::NotFound { path: full_path }.into()) + // Map each page of results to a page of standardized FileMetadata. + responses_stream + .flat_map(move |response| match response { + Ok(response) => { + let paths_data = response + .blobs + .items + .iter() + .map(|blob_item| { + Ok(self._blob_item_to_file_metadata( + &protocol, + &container_name, + blob_item, + )) + }) + .collect::>(); + futures::stream::iter(paths_data) } - } - [result] => { - // Azure prefixing does not differentiate between directories and files even if the trailing slash is provided. - // This returns incorrect results when we asked for a directory and got a file. - // (The other way around is okay - we can ask for a path without a trailing slash and get a directory with a trailing slash.) - // If we get a single result, we need to check whether we asked for a directory but got a file, in which case the requested directory does not exist. - - if full_path.len() > result.filepath.len() { - Err(Error::NotFound { path: full_path }.into()) - } else { - Ok(LSResult { - files: results, - continuation_token: None, - }) + Err(error) => { + let error = Err(Error::RequestFailedForPath { + path: format!("{}://{}{}", &protocol, &container_name, &prefix), + source: error, + } + .into()); + futures::stream::iter(vec![error]) } - } - _ => Ok(LSResult { - files: results, - continuation_token: None, - }), - } + }) + .boxed() } fn _container_to_file_metadata(&self, protocol: String, container: &Container) -> FileMetadata { @@ -548,6 +452,7 @@ impl ObjectSource for AzureBlobSource { let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?; let delimiter = delimiter.unwrap_or("/"); + // path can be root (buckets) or path prefix within a bucket. let container = { // "Container" is Azure's name for Bucket. // @@ -579,77 +484,34 @@ impl ObjectSource for AzureBlobSource { .await) } } - - // let uri = uri.to_string(); - // let delimiter = delimiter.map(String::from); - // let s = stream! { - // let lsr = self.ls(&uri, delimiter.as_deref(), None).await?; - // let mut continuation_token = lsr.continuation_token.clone(); - // for file in lsr.files { - // yield Ok(file); - // } - - // while continuation_token.is_some() { - // let lsr = self.ls(&uri, delimiter.as_deref(), continuation_token.as_deref()).await?; - // continuation_token = lsr.continuation_token.clone(); - // for file in lsr.files { - // yield Ok(file); - // } - // } - // }; - // Ok(s.boxed()) } - // path can be root (buckets) or path prefix within a bucket. async fn ls( &self, path: &str, delimiter: Option<&str>, continuation_token: Option<&str>, ) -> super::Result { - todo!() - // let parsed = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?; - // let delimiter = delimiter.unwrap_or("/"); - - // // It looks like the azure rust library API - // // does not currently allow using the continuation token: - // // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/container/operations/list_blobs/struct.ListBlobsBuilder.html - // // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/service/operations/struct.ListContainersBuilder.html - // // https://docs.rs/azure_core/0.15.0/azure_core/struct.Pageable.html - // assert!( - // continuation_token.is_none(), - // "Azure continuation_token {:?} received, which we cannot use", - // continuation_token - // ); - - // // "Container" is Azure's name for Bucket. - // let container = { - // // fsspec supports two URI formats are supported; for compatibility, we will support both as well. - // // PROTOCOL://container/path-part/file - // // PROTOCOL://container@account.dfs.core.windows.net/path-part/file - // // See https://github.com/fsspec/adlfs/ for more details - // let username = parsed.username(); - // match username { - // "" => parsed.host_str(), - // _ => Some(username), - // } - // }; - - // // fsspec supports multiple URI protocol strings for Azure: az:// and abfs://. - // // NB: It's unclear if there is a semantic difference between the protocols - // // or if there is a standard for the behaviour either; - // // here, we will treat them both the same, but persist whichever protocol string was used. - // let protocol = parsed.scheme(); - - // match container { - // // List containers. - // None => self._list_containers(protocol).await, - // // List a path within a container. - // Some(container_name) => { - // let prefix = parsed.path(); - // self._list_directory(protocol, container_name, prefix, delimiter) - // .await - // } - // } + // It looks like the azure rust library API + // does not currently allow using the continuation token: + // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/container/operations/list_blobs/struct.ListBlobsBuilder.html + // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/service/operations/struct.ListContainersBuilder.html + // https://docs.rs/azure_core/0.15.0/azure_core/struct.Pageable.html + assert!( + continuation_token.is_none(), + "Azure continuation_token {:?} received, which we cannot use", + continuation_token + ); + + let files = self + .iter_dir(path, delimiter, None) + .await? + .try_collect::>() + .await?; + + Ok(LSResult { + files, + continuation_token: None, + }) } } From c4a03134753d3b048cb64c077b5513e82e9d255e Mon Sep 17 00:00:00 2001 From: Xiayue Charles Lin Date: Wed, 27 Sep 2023 12:03:40 -0700 Subject: [PATCH 08/12] comment --- src/daft-io/src/azure_blob.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index bfdb1a6322..f3e61c4bef 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -231,7 +231,8 @@ impl AzureBlobSource { } } - // Make sure the stream is pollable even if empty, since we will chain it later. + // Make sure the stream is pollable even if empty, + // since we will chain it later with the two items we already popped. let unchecked_results = if !stream_exhausted { unchecked_results } else { From a497280c3d9d269d741dfaf30711f9c0ec72297b Mon Sep 17 00:00:00 2001 From: Xiayue Charles Lin Date: Wed, 27 Sep 2023 12:12:50 -0700 Subject: [PATCH 09/12] Add trailing slash for container --- src/daft-io/src/azure_blob.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index f3e61c4bef..1274ea49b4 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -361,7 +361,7 @@ impl AzureBlobSource { // NB: Cannot pass through to Azure client's .url() methods here // because they return URIs of a very different format (https://.../container/path). FileMetadata { - filepath: format!("{protocol}://{}", &container.name), + filepath: format!("{protocol}://{}/", &container.name), size: None, filetype: FileType::Directory, } From 79523c9eee8658960b9a40d9f9d947b8ced71b0d Mon Sep 17 00:00:00 2001 From: Xiayue Charles Lin Date: Thu, 28 Sep 2023 11:19:02 -0700 Subject: [PATCH 10/12] Refactor to redundant collect --- src/daft-io/src/azure_blob.rs | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index 1274ea49b4..74c16a053c 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -142,8 +142,10 @@ impl AzureBlobSource { async fn list_containers_stream( &self, - protocol: String, + protocol: &str, ) -> BoxStream> { + let protocol = protocol.to_string(); + // Paginated stream of results from Azure API call. let responses_stream = self .blob_client @@ -154,20 +156,17 @@ impl AzureBlobSource { // Flatmap each page of results to a single stream of our standardized FileMetadata. responses_stream - .flat_map(move |response| match response { + .map(move |response| (response, protocol.clone())) + .flat_map(move |(response, protocol)| match response { Ok(response) => { - let containers = response - .containers - .iter() - .map(|container| { - Ok(self._container_to_file_metadata(protocol.clone(), container)) - }) - .collect::>(); - futures::stream::iter(containers) + let containers = response.containers.into_iter().map(move |container| { + Ok(self._container_to_file_metadata(protocol.as_str(), &container)) + }); + futures::stream::iter(containers).boxed() } Err(error) => { let error = Err(Error::AzureGenericError { source: error }.into()); - futures::stream::iter(vec![error]) + futures::stream::iter(vec![error]).boxed() } }) .boxed() @@ -357,7 +356,7 @@ impl AzureBlobSource { .boxed() } - fn _container_to_file_metadata(&self, protocol: String, container: &Container) -> FileMetadata { + fn _container_to_file_metadata(&self, protocol: &str, container: &Container) -> FileMetadata { // NB: Cannot pass through to Azure client's .url() methods here // because they return URIs of a very different format (https://.../container/path). FileMetadata { @@ -476,7 +475,7 @@ impl ObjectSource for AzureBlobSource { match container { // List containers. - None => Ok(self.list_containers_stream(protocol.to_string()).await), + None => Ok(self.list_containers_stream(protocol).await), // List a path within a container. Some(container_name) => { let prefix = uri.path(); From 3e0743367b70087e1eba588eae5076decd066deb Mon Sep 17 00:00:00 2001 From: Xiayue Charles Lin Date: Thu, 28 Sep 2023 15:36:54 -0700 Subject: [PATCH 11/12] everything is a stream --- src/daft-io/src/azure_blob.rs | 92 +++++++++++++++-------------------- 1 file changed, 39 insertions(+), 53 deletions(-) diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index 74c16a053c..7661a87138 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -6,7 +6,7 @@ use azure_storage_blobs::{ }; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use snafu::{IntoError, ResultExt, Snafu}; -use std::{collections::HashSet, num::ParseIntError, ops::Range, string::FromUtf8Error, sync::Arc}; +use std::{ops::Range, sync::Arc}; use crate::{ object_io::{FileMetadata, FileType, LSResult, ObjectSource}, @@ -16,21 +16,14 @@ use common_io_config::AzureConfig; #[derive(Debug, Snafu)] enum Error { - // Input parsing errors. + // Input errors. #[snafu(display("Unable to parse URL: \"{}\"", path))] InvalidUrl { path: String, source: url::ParseError, }, - #[snafu(display( - "Unable to parse data as Utf8 while reading header for file: {path}. {source}" - ))] - UnableToParseUtf8 { path: String, source: FromUtf8Error }, - - #[snafu(display( - "Unable to parse data as Integer while reading header for file: {path}. {source}" - ))] - UnableToParseInteger { path: String, source: ParseIntError }, + #[snafu(display("Continuation tokens are not supported: \"{}\"", token))] + ContinuationToken { token: String }, // Generic client errors. #[snafu(display("Azure Storage Account not set and is required.\n Set either `AzureConfig.storage_account` or the `AZURE_STORAGE_ACCOUNT` environment variable."))] @@ -38,7 +31,7 @@ enum Error { #[snafu(display("Unable to create Azure Client {}", source))] UnableToCreateClient { source: azure_storage::Error }, #[snafu(display("Azure client generic error: {}", source))] - AzureGenericError { source: azure_storage::Error }, + AzureGeneric { source: azure_storage::Error }, // Parameterized client errors. #[snafu(display("Unable to connect to {}: {}", path, source))] @@ -69,7 +62,7 @@ enum Error { NotFound { path: String }, #[snafu(display("Unable to access container: {}: {}", path, source))] - ContainerAccessError { + ContainerAccess { path: String, source: azure_storage::Error, }, @@ -165,7 +158,7 @@ impl AzureBlobSource { futures::stream::iter(containers).boxed() } Err(error) => { - let error = Err(Error::AzureGenericError { source: error }.into()); + let error = Err(Error::AzureGeneric { source: error }.into()); futures::stream::iter(vec![error]).boxed() } }) @@ -251,29 +244,24 @@ impl AzureBlobSource { .trim_end_matches(&delimiter) // "/upper/blah" .trim_end_matches(|c: char| c.to_string() != delimiter); // "/upper/" - let upper_results = container_client - .list_blobs() - .delimiter(delimiter.to_string()) - .prefix(upper_dir.to_string()) - .into_stream() - .try_collect::>() - .await - .with_context(|_| RequestFailedForPathSnafu { - path: format!("{}://{}{}", protocol, container_name, upper_dir), - })? - .iter() - .flat_map(|resp| &resp.blobs.items) - .map(|blob_item| { - self._blob_item_to_file_metadata( - &protocol, - &container_name, - blob_item, - ) - .filepath - }) - .collect::>(); - - upper_results.contains(&full_path_with_trailing_delimiter) + let upper_results_stream = self._list_directory_delimiter_stream( + &container_client, + &protocol, + &container_name, + upper_dir, + &delimiter, + ).await; + + // At this point, we have a stream of Result. + // We would like to stop as soon as there is a file match, + // or if there is an error. + upper_results_stream + .map_ok(|file_info| (file_info.filepath == full_path_with_trailing_delimiter)) + .try_skip_while(|is_match| futures::future::ready(Ok(!is_match))) + .boxed() + .try_next() + .await? + .is_some() } }; // If the prefix does not exist, the stream needs to yield a single NotFound error. @@ -328,21 +316,18 @@ impl AzureBlobSource { // Map each page of results to a page of standardized FileMetadata. responses_stream - .flat_map(move |response| match response { + .map(move |response| (response, protocol.clone(), container_name.clone())) + .flat_map(move |(response, protocol, container_name)| match response { Ok(response) => { - let paths_data = response - .blobs - .items - .iter() - .map(|blob_item| { + let paths_data = + response.blobs.items.into_iter().map(move |blob_item| { Ok(self._blob_item_to_file_metadata( &protocol, &container_name, - blob_item, + &blob_item, )) - }) - .collect::>(); - futures::stream::iter(paths_data) + }); + futures::stream::iter(paths_data).boxed() } Err(error) => { let error = Err(Error::RequestFailedForPath { @@ -350,7 +335,7 @@ impl AzureBlobSource { source: error, } .into()); - futures::stream::iter(vec![error]) + futures::stream::iter(vec![error]).boxed() } }) .boxed() @@ -497,11 +482,12 @@ impl ObjectSource for AzureBlobSource { // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/container/operations/list_blobs/struct.ListBlobsBuilder.html // https://docs.rs/azure_storage_blobs/0.15.0/azure_storage_blobs/service/operations/struct.ListContainersBuilder.html // https://docs.rs/azure_core/0.15.0/azure_core/struct.Pageable.html - assert!( - continuation_token.is_none(), - "Azure continuation_token {:?} received, which we cannot use", - continuation_token - ); + match continuation_token { + None => Ok(()), + Some(token) => Err(Error::ContinuationToken { + token: token.to_string(), + }), + }?; let files = self .iter_dir(path, delimiter, None) From 2e91525d3cb920ba985f29ddcb99d44892175e68 Mon Sep 17 00:00:00 2001 From: Xiayue Charles Lin Date: Fri, 29 Sep 2023 11:47:35 -0700 Subject: [PATCH 12/12] Remove unnecessary box --- src/daft-io/src/azure_blob.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index 7661a87138..bf868dffae 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -258,7 +258,6 @@ impl AzureBlobSource { upper_results_stream .map_ok(|file_info| (file_info.filepath == full_path_with_trailing_delimiter)) .try_skip_while(|is_match| futures::future::ready(Ok(!is_match))) - .boxed() .try_next() .await? .is_some()