Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] Native globbing early stopping #1452

Merged
merged 16 commits into from
Oct 3, 2023
Merged
15 changes: 11 additions & 4 deletions src/daft-io/src/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,16 @@ impl ObjectSource for AzureBlobSource {
async fn iter_dir(
&self,
uri: &str,
delimiter: Option<&str>,
delimiter: &str,
posix: bool,
_page_size: Option<i32>,
_limit: Option<usize>,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?;
let delimiter = delimiter.unwrap_or("/");

if !posix {
todo!("Prefix-listing is not yet implemented for Azure");
}

// path can be root (buckets) or path prefix within a bucket.
let container = {
Expand Down Expand Up @@ -473,8 +478,10 @@ impl ObjectSource for AzureBlobSource {
async fn ls(
&self,
path: &str,
delimiter: Option<&str>,
delimiter: &str,
posix: bool,
continuation_token: Option<&str>,
page_size: Option<i32>,
) -> super::Result<LSResult> {
// It looks like the azure rust library API
// does not currently allow using the continuation token:
Expand All @@ -489,7 +496,7 @@ impl ObjectSource for AzureBlobSource {
}?;

let files = self
.iter_dir(path, delimiter, None)
.iter_dir(path, delimiter, posix, page_size, None)
.await?
.try_collect::<Vec<_>>()
.await?;
Expand Down
10 changes: 8 additions & 2 deletions src/daft-io/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,29 @@ pub(crate) struct GlobState {
pub current_path: String,
pub current_fragment_idx: usize,

// How large of a fanout this level of iteration is currently experiencing
pub current_fanout: usize,

// Whether we have encountered wildcards yet in the process of parsing
pub wildcard_mode: bool,

// Carry along expensive data as Arcs to avoid recomputation
pub glob_fragments: Arc<Vec<GlobFragment>>,
pub full_glob_matcher: Arc<GlobMatcher>,
pub fanout_limit: usize,
pub page_size: Option<i32>,
}

impl GlobState {
pub fn current_glob_fragment(&self) -> &GlobFragment {
&self.glob_fragments[self.current_fragment_idx]
}

pub fn advance(self, path: String, idx: usize) -> Self {
pub fn advance(self, path: String, idx: usize, fanout_factor: usize) -> Self {
GlobState {
current_path: path,
current_fragment_idx: idx,
current_fanout: self.current_fanout * fanout_factor,
..self.clone()
}
}
Expand Down Expand Up @@ -87,7 +93,7 @@ impl GlobFragment {
let mut ptr = 0;
while ptr < data.len() {
let remaining = &data[ptr..];
match remaining.find("\\\\") {
match remaining.find(r"\\") {
Some(backslash_idx) => {
escaped_data.push_str(&remaining[..backslash_idx].replace('\\', ""));
escaped_data.extend(std::iter::once('\\'));
Expand Down
34 changes: 27 additions & 7 deletions src/daft-io/src/google_cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ impl GCSClientWrapper {
key: &str,
delimiter: &str,
continuation_token: Option<&str>,
page_size: Option<i32>,
) -> super::Result<LSResult> {
let req = ListObjectsRequest {
bucket: bucket.to_string(),
Expand All @@ -205,7 +206,7 @@ impl GCSClientWrapper {
start_offset: None,
page_token: continuation_token.map(|s| s.to_string()),
delimiter: Some(delimiter.to_string()), // returns results in "directory mode"
max_results: Some(1000), // Recommended value from API docs
max_results: page_size,
include_trailing_delimiter: Some(false), // This will not populate "directories" in the response's .item[]
projection: None,
versions: None,
Expand Down Expand Up @@ -237,14 +238,19 @@ impl GCSClientWrapper {
async fn ls(
&self,
path: &str,
delimiter: Option<&str>,
delimiter: &str,
posix: bool,
continuation_token: Option<&str>,
page_size: Option<i32>,
) -> super::Result<LSResult> {
let uri = url::Url::parse(path).with_context(|_| InvalidUrlSnafu { path })?;
let (bucket, key) = parse_uri(&uri)?;
let delimiter = delimiter.unwrap_or("/");
match self {
GCSClientWrapper::Native(client) => {
if !posix {
todo!("Prefix-listing is not yet implemented for GCS");
}

// Attempt to forcefully ls the key as a directory (by ensuring a "/" suffix)
let forced_directory_key =
format!("{}{delimiter}", key.trim_end_matches(delimiter));
Expand All @@ -255,14 +261,22 @@ impl GCSClientWrapper {
forced_directory_key.as_str(),
delimiter,
continuation_token,
page_size,
)
.await?;

// If no items were obtained, then this is actually a file and we perform a second ls to obtain just the file's
// details as the one-and-only-one entry
if forced_directory_ls_result.files.is_empty() {
let file_result = self
._ls_impl(client, bucket, key, delimiter, continuation_token)
._ls_impl(
client,
bucket,
key,
delimiter,
continuation_token,
page_size,
)
.await?;

// Not dir and not file, so it is missing
Expand All @@ -279,7 +293,9 @@ impl GCSClientWrapper {
}
}
GCSClientWrapper::S3Compat(client) => {
client.ls(path, Some(delimiter), continuation_token).await
client
.ls(path, delimiter, posix, continuation_token, page_size)
.await
}
}
}
Expand Down Expand Up @@ -340,9 +356,13 @@ impl ObjectSource for GCSSource {
async fn ls(
&self,
path: &str,
delimiter: Option<&str>,
delimiter: &str,
posix: bool,
continuation_token: Option<&str>,
page_size: Option<i32>,
) -> super::Result<LSResult> {
self.client.ls(path, delimiter, continuation_token).await
self.client
.ls(path, delimiter, posix, continuation_token, page_size)
.await
}
}
8 changes: 7 additions & 1 deletion src/daft-io/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,15 @@ impl ObjectSource for HttpSource {
async fn ls(
&self,
path: &str,
_delimiter: Option<&str>,
_delimiter: &str,
posix: bool,
_continuation_token: Option<&str>,
_page_size: Option<i32>,
) -> super::Result<LSResult> {
if !posix {
todo!("Prefix-listing is not implemented for HTTP listing");
}

let request = self.client.get(path);
let response = request
.send()
Expand Down
18 changes: 14 additions & 4 deletions src/daft-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,14 @@ impl ObjectSource for LocalSource {
async fn ls(
&self,
path: &str,
_delimiter: Option<&str>,
delimiter: &str,
posix: bool,
_continuation_token: Option<&str>,
page_size: Option<i32>,
) -> super::Result<LSResult> {
let s = self.iter_dir(path, None, None).await?;
let s = self
.iter_dir(path, delimiter, posix, page_size, None)
.await?;
let files = s.try_collect::<Vec<_>>().await?;
Ok(LSResult {
files,
Expand All @@ -146,9 +150,15 @@ impl ObjectSource for LocalSource {
async fn iter_dir(
&self,
uri: &str,
_delimiter: Option<&str>,
_delimiter: &str,
posix: bool,
_page_size: Option<i32>,
_limit: Option<usize>,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
if !posix {
todo!("Prefix-listing is not implemented for local");
}

const LOCAL_PROTOCOL: &str = "file://";
let Some(uri) = uri.strip_prefix(LOCAL_PROTOCOL) else {
return Err(Error::InvalidFilePath { path: uri.into() }.into());
Expand Down Expand Up @@ -324,7 +334,7 @@ mod tests {
let dir_path = format!("file://{}", dir.path().to_string_lossy());
let client = LocalSource::get_client().await?;

let ls_result = client.ls(dir_path.as_ref(), None, None).await?;
let ls_result = client.ls(dir_path.as_ref(), "/", true, None, None).await?;
let mut files = ls_result.files.clone();
// Ensure stable sort ordering of file paths before comparing with expected payload.
files.sort_by(|a, b| a.filepath.cmp(&b.filepath));
Expand Down
Loading