From 814f8d3c9f89e381d1b3e744a83f727b589467a8 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Thu, 15 Feb 2024 17:54:29 -0800 Subject: [PATCH 01/10] perform head operation instead of list when given a file --- src/daft-io/src/object_store_glob.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/daft-io/src/object_store_glob.rs b/src/daft-io/src/object_store_glob.rs index ba61ca3283..5764746fec 100644 --- a/src/daft-io/src/object_store_glob.rs +++ b/src/daft-io/src/object_store_glob.rs @@ -346,7 +346,18 @@ pub(crate) async fn glob( if !full_fragment.has_special_character() { let mut remaining_results = limit; let glob = full_fragment.escaped_str().to_string(); + return Ok(stream! { + if !glob.ends_with(GLOB_DELIMITER) { + // If doesn't have a glob character and doesn't end with a delimiter, assume its a file first. + let maybe_size = source.get_size(&glob, io_stats.clone()).await; + match maybe_size { + Ok(size_bytes) => yield Ok(FileMetadata{filepath: glob.clone(), size: Some(size_bytes as u64), filetype: FileType::File }), + Err(crate::Error::NotFound {..}) => {}, + Err(err) => yield Err(err), + } + } + let mut results = source.iter_dir(glob.as_str(), true, page_size, io_stats).await?; while let Some(result) = results.next().await && remaining_results.map(|rr| rr > 0).unwrap_or(true) { match result { From 6f79229a6aac616d3c26d939cef296300a15344a Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Thu, 15 Feb 2024 18:22:04 -0800 Subject: [PATCH 02/10] perform handling for local case --- src/daft-io/src/lib.rs | 2 +- src/daft-io/src/local.rs | 33 +++++++++++++++++++++++++++++---- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/src/daft-io/src/lib.rs b/src/daft-io/src/lib.rs index 70e3f7b1fd..42bf0c46a2 100644 --- a/src/daft-io/src/lib.rs +++ b/src/daft-io/src/lib.rs @@ -1,6 +1,6 @@ #![feature(async_closure)] #![feature(let_chains)] - +#![feature(io_error_more)] mod azure_blob; mod google_cloud; mod http; diff --git a/src/daft-io/src/local.rs b/src/daft-io/src/local.rs index 1f7afdc26d..1487c4b71c 100644 --- a/src/daft-io/src/local.rs +++ b/src/daft-io/src/local.rs @@ -65,15 +65,16 @@ enum Error { #[snafu(display("Unable to convert URL \"{}\" to local file path", path))] InvalidFilePath { path: String }, + + #[snafu(display("Attempted to Read Directory as File {}", path))] + IsADirectory { path: String }, } impl From for super::Error { fn from(error: Error) -> Self { use Error::*; match error { - UnableToOpenFile { path, source } - | UnableToFetchFileMetadata { path, source } - | UnableToFetchDirectoryEntries { path, source } => { + UnableToOpenFile { path, source } | UnableToFetchDirectoryEntries { path, source } => { use std::io::ErrorKind::*; match source.kind() { NotFound => super::Error::NotFound { @@ -86,6 +87,20 @@ impl From for super::Error { }, } } + UnableToFetchFileMetadata { path, source } => { + use std::io::ErrorKind::*; + println!("{source:?}"); + match source.kind() { + NotFound | IsADirectory => super::Error::NotFound { + path, + source: source.into(), + }, + _ => super::Error::UnableToOpenFile { + path, + source: source.into(), + }, + } + } UnableToReadBytes { path, source } => super::Error::UnableToReadBytes { path, source }, InvalidUrl { url, source } => super::Error::InvalidUrl { path: url.to_string_lossy().into_owned(), @@ -139,7 +154,17 @@ impl ObjectSource for LocalSource { .context(UnableToFetchFileMetadataSnafu { path: uri.to_string(), })?; - Ok(meta.len() as usize) + + if meta.is_dir() { + Err(super::Error::NotFound { + path: uri.to_owned(), + source: Box::new(Error::IsADirectory { + path: uri.to_owned(), + }), + }) + } else { + Ok(meta.len() as usize) + } } async fn glob( From e505e47db998cc7fdae61e2d79889132f51100a0 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Thu, 15 Feb 2024 18:40:57 -0800 Subject: [PATCH 03/10] terminate if head is successful --- src/daft-io/src/local.rs | 1 - src/daft-io/src/object_store_glob.rs | 27 +++++++++++++++------------ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/daft-io/src/local.rs b/src/daft-io/src/local.rs index 1487c4b71c..3e83f6ab40 100644 --- a/src/daft-io/src/local.rs +++ b/src/daft-io/src/local.rs @@ -89,7 +89,6 @@ impl From for super::Error { } UnableToFetchFileMetadata { path, source } => { use std::io::ErrorKind::*; - println!("{source:?}"); match source.kind() { NotFound | IsADirectory => super::Error::NotFound { path, diff --git a/src/daft-io/src/object_store_glob.rs b/src/daft-io/src/object_store_glob.rs index 5764746fec..220dd5e992 100644 --- a/src/daft-io/src/object_store_glob.rs +++ b/src/daft-io/src/object_store_glob.rs @@ -348,26 +348,29 @@ pub(crate) async fn glob( let glob = full_fragment.escaped_str().to_string(); return Ok(stream! { + let mut attempt_as_dir = true; if !glob.ends_with(GLOB_DELIMITER) { + attempt_as_dir = false; // If doesn't have a glob character and doesn't end with a delimiter, assume its a file first. let maybe_size = source.get_size(&glob, io_stats.clone()).await; match maybe_size { Ok(size_bytes) => yield Ok(FileMetadata{filepath: glob.clone(), size: Some(size_bytes as u64), filetype: FileType::File }), - Err(crate::Error::NotFound {..}) => {}, + Err(crate::Error::NotFound {..}) => {attempt_as_dir = true;}, Err(err) => yield Err(err), } } - - let mut results = source.iter_dir(glob.as_str(), true, page_size, io_stats).await?; - while let Some(result) = results.next().await && remaining_results.map(|rr| rr > 0).unwrap_or(true) { - match result { - Ok(fm) => { - if _should_return(&fm) { - remaining_results = remaining_results.map(|rr| rr - 1); - yield Ok(fm) - } - }, - Err(e) => yield Err(e), + if attempt_as_dir { + let mut results = source.iter_dir(glob.as_str(), true, page_size, io_stats).await?; + while let Some(result) = results.next().await && remaining_results.map(|rr| rr > 0).unwrap_or(true) { + match result { + Ok(fm) => { + if _should_return(&fm) { + remaining_results = remaining_results.map(|rr| rr - 1); + yield Ok(fm) + } + }, + Err(e) => yield Err(e), + } } } } From 209047e086d021c992c61912108d9dfd6543bf32 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Sun, 18 Feb 2024 15:10:37 -0800 Subject: [PATCH 04/10] Pass through on NotAFile --- src/daft-io/src/azure_blob.rs | 11 +++++++++++ src/daft-io/src/google_cloud.rs | 11 ++++++++++- src/daft-io/src/local.rs | 5 +---- src/daft-io/src/object_store_glob.rs | 2 +- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index 24f1547c57..fe9726e1a5 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -71,6 +71,8 @@ enum Error { path: String, source: azure_storage::Error, }, + #[snafu(display("Not a File: \"{}\"", path))] + NotAFile { path: String }, } impl From for super::Error { @@ -98,6 +100,7 @@ impl From for super::Error { path: path.into(), source: error.into(), }, + NotAFile { path } => super::Error::NotAFile { path }, _ => super::Error::Generic { store: super::SourceType::AzureBlob, source: error.into(), @@ -414,6 +417,10 @@ impl ObjectSource for AzureBlobSource { }?; let key = parsed.path(); + if key.is_empty() { + return Err(Error::NotAFile { path: uri.into() }.into()); + } + let container_client = self.blob_client.container_client(container); let blob_client = container_client.blob_client(key); let request_builder = blob_client.get(); @@ -455,6 +462,10 @@ impl ObjectSource for AzureBlobSource { }?; let key = parsed.path(); + if key.is_empty() { + return Err(Error::NotAFile { path: uri.into() }.into()); + } + let container_client = self.blob_client.container_client(container); let blob_client = container_client.blob_client(key); let metadata = blob_client diff --git a/src/daft-io/src/google_cloud.rs b/src/daft-io/src/google_cloud.rs index 34cdde825f..f2ad4e67e4 100644 --- a/src/daft-io/src/google_cloud.rs +++ b/src/daft-io/src/google_cloud.rs @@ -48,7 +48,8 @@ enum Error { UnableToLoadCredentials { source: google_cloud_storage::client::google_cloud_auth::error::Error, }, - + #[snafu(display("Not a File: \"{}\"", path))] + NotAFile { path: String }, #[snafu(display("Not a File: \"{}\"", path))] NotFound { path: String }, } @@ -104,6 +105,7 @@ impl From for super::Error { store: super::SourceType::GCS, source: source.into(), }, + NotAFile { path } => super::Error::NotAFile { path }, } } } @@ -132,6 +134,10 @@ impl GCSClientWrapper { ) -> super::Result { let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?; let (bucket, key) = parse_uri(&uri)?; + if key.is_empty() { + return Err(Error::NotAFile { path: uri.into() }.into()); + } + let client = &self.0; let req = GetObjectRequest { bucket: bucket.into(), @@ -174,6 +180,9 @@ impl GCSClientWrapper { async fn get_size(&self, uri: &str, io_stats: Option) -> super::Result { let uri = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?; let (bucket, key) = parse_uri(&uri)?; + if key.is_empty() { + return Err(Error::NotAFile { path: uri.into() }.into()); + } let client = &self.0; let req = GetObjectRequest { bucket: bucket.into(), diff --git a/src/daft-io/src/local.rs b/src/daft-io/src/local.rs index 3e83f6ab40..5538b1cab8 100644 --- a/src/daft-io/src/local.rs +++ b/src/daft-io/src/local.rs @@ -155,11 +155,8 @@ impl ObjectSource for LocalSource { })?; if meta.is_dir() { - Err(super::Error::NotFound { + Err(super::Error::NotAFile { path: uri.to_owned(), - source: Box::new(Error::IsADirectory { - path: uri.to_owned(), - }), }) } else { Ok(meta.len() as usize) diff --git a/src/daft-io/src/object_store_glob.rs b/src/daft-io/src/object_store_glob.rs index 220dd5e992..665f5129bb 100644 --- a/src/daft-io/src/object_store_glob.rs +++ b/src/daft-io/src/object_store_glob.rs @@ -355,7 +355,7 @@ pub(crate) async fn glob( let maybe_size = source.get_size(&glob, io_stats.clone()).await; match maybe_size { Ok(size_bytes) => yield Ok(FileMetadata{filepath: glob.clone(), size: Some(size_bytes as u64), filetype: FileType::File }), - Err(crate::Error::NotFound {..}) => {attempt_as_dir = true;}, + Err(crate::Error::NotAFile {..} | crate::Error::NotFound { .. }) => {attempt_as_dir = true;}, Err(err) => yield Err(err), } } From 39d6ace59eae84282e90dfd2f89a4c31cba0a3c6 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Sun, 18 Feb 2024 22:31:44 -0800 Subject: [PATCH 05/10] par glob --- src/daft-scan/src/glob.rs | 74 +++++++++++++++++++++++++++++---------- src/daft-scan/src/lib.rs | 1 + 2 files changed, 56 insertions(+), 19 deletions(-) diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 4600310cab..d4ef9312ff 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -1,11 +1,11 @@ -use std::sync::Arc; +use std::{sync::Arc, vec}; use common_error::{DaftError, DaftResult}; use daft_core::schema::SchemaRef; use daft_csv::CsvParseOptions; use daft_io::{parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef}; use daft_parquet::read::ParquetSchemaInferenceOptions; -use futures::{stream::BoxStream, StreamExt}; +use futures::{stream::BoxStream, SinkExt, StreamExt, TryStreamExt}; use snafu::Snafu; use crate::{ @@ -81,6 +81,57 @@ fn run_glob( Ok(Box::new(iterator)) } + +fn run_glob_parallel( + glob_paths: Vec, + io_client: Arc, + runtime: Arc, + io_stats: Option, +// ) -> impl Iterator> { + +) -> DaftResult { + + let num_parallel_tasks = 64; + + let owned_runtime = runtime.clone(); + let boxstream = futures::stream::iter(glob_paths.into_iter().map(move |path| { + let (_, parsed_glob_path) = parse_url(&path).unwrap(); + let glob_input = parsed_glob_path.as_ref().to_string(); + let io_client = io_client.clone(); + let io_stats = io_stats.clone(); + + runtime.spawn(async move { + let result = io_client + .glob(glob_input, None, None, None, io_stats).await.unwrap(); + futures::stream::iter(result.collect::>().await) + }) + })) + .buffered(num_parallel_tasks).try_flatten(); + + // owned_runtime.block_on(async move { + // let vecs = boxstream.try_collect::>().await?; + + // Ok(vecs.with_flat_map(|v| )) + // }) + + // let boxstream = if let Some(limit) = limit { + // boxstream.take(limit).boxed() + // } else { + // boxstream.boxed() + // }; + let boxstream = Box::pin(boxstream); + + // Construct a static-lifetime BoxStreamIterator + let iterator = BoxStreamIterator { + boxstream, + runtime_handle: owned_runtime.handle().clone(), + }; + let iterator = iterator.map(|fm| Ok(fm?)); + Ok(Box::new(iterator)) +} + + + impl GlobScanOperator { pub fn try_new( glob_paths: &[&str], @@ -220,23 +271,8 @@ impl ScanOperator for GlobScanOperator { self.glob_paths )); - // Run [`run_glob`] on each path and mux them into the same iterator - let files = self - .glob_paths - .clone() - .into_iter() - .flat_map(move |glob_path| { - match run_glob( - glob_path.as_str(), - None, - io_client.clone(), - io_runtime.clone(), - Some(io_stats.clone()), - ) { - Ok(paths) => paths, - Err(err) => Box::new(vec![Err(err)].into_iter()), - } - }); + + let files = run_glob_parallel(self.glob_paths.clone(), io_client.clone(), io_runtime.clone(), Some(io_stats.clone()))?; let file_format_config = self.file_format_config.clone(); let schema = self.schema.clone(); diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index fab6b271f7..afbbc11de4 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -1,5 +1,6 @@ #![feature(if_let_guard)] #![feature(let_chains)] +#![feature(async_closure)] use std::{ fmt::{Debug, Display}, hash::{Hash, Hasher}, From 89ac4a6b52df1897bc1e45a72bb732833fcabce9 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Sun, 18 Feb 2024 22:54:29 -0800 Subject: [PATCH 06/10] try peek --- src/daft-scan/src/glob.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index d4ef9312ff..902fd8ce37 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -102,8 +102,12 @@ fn run_glob_parallel( runtime.spawn(async move { let result = io_client - .glob(glob_input, None, None, None, io_stats).await.unwrap(); - futures::stream::iter(result.collect::>().await) + .glob(glob_input, None, None, None, io_stats).await.unwrap().peekable(); + let mut result = Box::pin(result); + + let _val = result.as_mut().peek().await; + result + // futures::stream::iter(result.collect::>().await) }) })) .buffered(num_parallel_tasks).try_flatten(); From bc10bf594a485acddc862757d77d217a927328b3 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Sun, 18 Feb 2024 23:18:57 -0800 Subject: [PATCH 07/10] clean up --- src/daft-scan/src/glob.rs | 31 +++++++++---------------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 902fd8ce37..0fa68bca8b 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -5,8 +5,8 @@ use daft_core::schema::SchemaRef; use daft_csv::CsvParseOptions; use daft_io::{parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef}; use daft_parquet::read::ParquetSchemaInferenceOptions; -use futures::{stream::BoxStream, SinkExt, StreamExt, TryStreamExt}; -use snafu::Snafu; +use futures::{stream::BoxStream, SinkExt, StreamExt, TryFutureExt, TryStreamExt}; +use snafu::{ResultExt, Snafu}; use crate::{ file_format::{CsvSourceConfig, FileFormatConfig, ParquetSourceConfig}, @@ -101,29 +101,16 @@ fn run_glob_parallel( let io_stats = io_stats.clone(); runtime.spawn(async move { - let result = io_client - .glob(glob_input, None, None, None, io_stats).await.unwrap().peekable(); - let mut result = Box::pin(result); + let stream = io_client + .glob(glob_input, None, None, None, io_stats).await?; + let results = stream.collect::>().await; + Result::<_, daft_io::Error>::Ok(futures::stream::iter(results)) - let _val = result.as_mut().peek().await; - result - // futures::stream::iter(result.collect::>().await) }) })) - .buffered(num_parallel_tasks).try_flatten(); - - // owned_runtime.block_on(async move { - // let vecs = boxstream.try_collect::>().await?; - - // Ok(vecs.with_flat_map(|v| )) - // }) - - // let boxstream = if let Some(limit) = limit { - // boxstream.take(limit).boxed() - // } else { - // boxstream.boxed() - // }; - let boxstream = Box::pin(boxstream); + .buffered(num_parallel_tasks).map(|v| { + v.map_err(|e| daft_io::Error::JoinError { source: e })? + }).try_flatten().boxed(); // Construct a static-lifetime BoxStreamIterator let iterator = BoxStreamIterator { From 2fadb9a3dd3aca82f11959651966b4ec179da347 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Sun, 18 Feb 2024 23:28:36 -0800 Subject: [PATCH 08/10] clean up --- src/daft-scan/src/glob.rs | 52 +++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 0fa68bca8b..54551eb2e2 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -5,8 +5,8 @@ use daft_core::schema::SchemaRef; use daft_csv::CsvParseOptions; use daft_io::{parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef}; use daft_parquet::read::ParquetSchemaInferenceOptions; -use futures::{stream::BoxStream, SinkExt, StreamExt, TryFutureExt, TryStreamExt}; -use snafu::{ResultExt, Snafu}; +use futures::{stream::BoxStream, StreamExt, TryStreamExt}; +use snafu::Snafu; use crate::{ file_format::{CsvSourceConfig, FileFormatConfig, ParquetSourceConfig}, @@ -81,36 +81,34 @@ fn run_glob( Ok(Box::new(iterator)) } - fn run_glob_parallel( glob_paths: Vec, io_client: Arc, runtime: Arc, io_stats: Option, -// ) -> impl Iterator> { - + // ) -> impl Iterator> { ) -> DaftResult { - let num_parallel_tasks = 64; let owned_runtime = runtime.clone(); let boxstream = futures::stream::iter(glob_paths.into_iter().map(move |path| { - let (_, parsed_glob_path) = parse_url(&path).unwrap(); - let glob_input = parsed_glob_path.as_ref().to_string(); - let io_client = io_client.clone(); - let io_stats = io_stats.clone(); - - runtime.spawn(async move { - let stream = io_client - .glob(glob_input, None, None, None, io_stats).await?; - let results = stream.collect::>().await; - Result::<_, daft_io::Error>::Ok(futures::stream::iter(results)) - - }) - })) - .buffered(num_parallel_tasks).map(|v| { - v.map_err(|e| daft_io::Error::JoinError { source: e })? - }).try_flatten().boxed(); + let (_, parsed_glob_path) = parse_url(&path).unwrap(); + let glob_input = parsed_glob_path.as_ref().to_string(); + let io_client = io_client.clone(); + let io_stats = io_stats.clone(); + + runtime.spawn(async move { + let stream = io_client + .glob(glob_input, None, None, None, io_stats) + .await?; + let results = stream.collect::>().await; + Result::<_, daft_io::Error>::Ok(futures::stream::iter(results)) + }) + })) + .buffered(num_parallel_tasks) + .map(|v| v.map_err(|e| daft_io::Error::JoinError { source: e })?) + .try_flatten() + .boxed(); // Construct a static-lifetime BoxStreamIterator let iterator = BoxStreamIterator { @@ -121,8 +119,6 @@ fn run_glob_parallel( Ok(Box::new(iterator)) } - - impl GlobScanOperator { pub fn try_new( glob_paths: &[&str], @@ -262,8 +258,12 @@ impl ScanOperator for GlobScanOperator { self.glob_paths )); - - let files = run_glob_parallel(self.glob_paths.clone(), io_client.clone(), io_runtime.clone(), Some(io_stats.clone()))?; + let files = run_glob_parallel( + self.glob_paths.clone(), + io_client.clone(), + io_runtime.clone(), + Some(io_stats.clone()), + )?; let file_format_config = self.file_format_config.clone(); let schema = self.schema.clone(); From 552a97302b2159d0164ae4c3ac041598dd7a7ef5 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Mon, 19 Feb 2024 01:41:34 -0800 Subject: [PATCH 09/10] clean up code --- src/daft-scan/src/glob.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 54551eb2e2..2ce839eb33 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -86,8 +86,7 @@ fn run_glob_parallel( io_client: Arc, runtime: Arc, io_stats: Option, - // ) -> impl Iterator> { -) -> DaftResult { +) -> DaftResult>> { let num_parallel_tasks = 64; let owned_runtime = runtime.clone(); @@ -108,6 +107,7 @@ fn run_glob_parallel( .buffered(num_parallel_tasks) .map(|v| v.map_err(|e| daft_io::Error::JoinError { source: e })?) .try_flatten() + .map(|v| Ok(v?)) .boxed(); // Construct a static-lifetime BoxStreamIterator @@ -115,8 +115,7 @@ fn run_glob_parallel( boxstream, runtime_handle: owned_runtime.handle().clone(), }; - let iterator = iterator.map(|fm| Ok(fm?)); - Ok(Box::new(iterator)) + Ok(iterator) } impl GlobScanOperator { From 73bc43d92d314c00e9ddc75017edb0563fd5fd75 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Mon, 19 Feb 2024 18:38:08 -0800 Subject: [PATCH 10/10] drop async closure feature --- src/daft-scan/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index afbbc11de4..fab6b271f7 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -1,6 +1,5 @@ #![feature(if_let_guard)] #![feature(let_chains)] -#![feature(async_closure)] use std::{ fmt::{Debug, Display}, hash::{Hash, Hasher},