diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index d2a92fea311e..946c7600768e 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1248,6 +1248,7 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-physical-plan", + "futures", "parking_lot", ] diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs index ceb72dbc546b..81c92e42a0ed 100644 --- a/datafusion-cli/src/catalog.rs +++ b/datafusion-cli/src/catalog.rs @@ -31,6 +31,7 @@ use datafusion::execution::session_state::SessionStateBuilder; use async_trait::async_trait; use dirs::home_dir; +use futures::stream::BoxStream; use parking_lot::RwLock; /// Wraps another catalog, automatically register require object stores for the file locations @@ -49,28 +50,29 @@ impl DynamicObjectStoreCatalog { } } +#[async_trait] impl CatalogProviderList for DynamicObjectStoreCatalog { fn as_any(&self) -> &dyn Any { self } - fn register_catalog( + async fn register_catalog( &self, name: String, catalog: Arc, - ) -> Option> { - self.inner.register_catalog(name, catalog) + ) -> Result>> { + self.inner.register_catalog(name, catalog).await } - fn catalog_names(&self) -> Vec { - self.inner.catalog_names() + async fn catalog_names(&self) -> BoxStream<'static, Result> { + self.inner.catalog_names().await } - fn catalog(&self, name: &str) -> Option> { + async fn catalog(&self, name: &str) -> Result>> { let state = self.state.clone(); - self.inner.catalog(name).map(|catalog| { + Ok(self.inner.catalog(name).await?.map(|catalog| { Arc::new(DynamicObjectStoreCatalogProvider::new(catalog, state)) as _ - }) + })) } } @@ -90,28 +92,29 @@ impl DynamicObjectStoreCatalogProvider { } } +#[async_trait] impl CatalogProvider for DynamicObjectStoreCatalogProvider { fn as_any(&self) -> &dyn Any { self } - fn schema_names(&self) -> Vec { - self.inner.schema_names() + async fn schema_names(&self) -> BoxStream<'static, Result> { + self.inner.schema_names().await } - fn schema(&self, name: &str) -> Option> { + async fn schema(&self, name: &str) -> Result>> { let state = self.state.clone(); - self.inner.schema(name).map(|schema| { + Ok(self.inner.schema(name).await?.map(|schema| { Arc::new(DynamicObjectStoreSchemaProvider::new(schema, state)) as _ - }) + })) } - fn register_schema( + async fn register_schema( &self, name: &str, schema: Arc, ) -> Result>> { - self.inner.register_schema(name, schema) + self.inner.register_schema(name, schema).await } } @@ -138,16 +141,16 @@ impl SchemaProvider for DynamicObjectStoreSchemaProvider { self } - fn table_names(&self) -> Vec { - self.inner.table_names() + async fn table_names(&self) -> BoxStream<'static, Result> { + self.inner.table_names().await } - fn register_table( + async fn register_table( &self, name: String, table: Arc, ) -> Result>> { - self.inner.register_table(name, table) + self.inner.register_table(name, table).await } async fn table(&self, name: &str) -> Result>> { @@ -166,7 +169,7 @@ impl SchemaProvider for DynamicObjectStoreSchemaProvider { .ok_or_else(|| plan_datafusion_err!("locking error"))? .read() .clone(); - let mut builder = SessionStateBuilder::from(state.clone()); + let mut builder = SessionStateBuilder::new_from_existing(state.clone()).await; let optimized_name = substitute_tilde(name.to_owned()); let table_url = ListingTableUrl::parse(optimized_name.as_str())?; let scheme = table_url.scheme(); @@ -194,7 +197,7 @@ impl SchemaProvider for DynamicObjectStoreSchemaProvider { } _ => {} }; - state = builder.build(); + state = builder.build().await; let store = get_object_store( &state, table_url.scheme(), @@ -208,12 +211,15 @@ impl SchemaProvider for DynamicObjectStoreSchemaProvider { self.inner.table(name).await } - fn deregister_table(&self, name: &str) -> Result>> { - self.inner.deregister_table(name) + async fn deregister_table( + &self, + name: &str, + ) -> Result>> { + self.inner.deregister_table(name).await } - fn table_exist(&self, name: &str) -> bool { - self.inner.table_exist(name) + async fn table_exist(&self, name: &str) -> bool { + self.inner.table_exist(name).await } } @@ -234,8 +240,9 @@ mod tests { use datafusion::catalog::SchemaProvider; use datafusion::prelude::SessionContext; + use futures::TryStreamExt; - fn setup_context() -> (SessionContext, Arc) { + async fn setup_context() -> (SessionContext, Arc) { let ctx = SessionContext::new(); ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new( ctx.state().catalog_list().clone(), @@ -247,10 +254,30 @@ mod tests { ctx.state_weak_ref(), ) as &dyn CatalogProviderList; let catalog = provider - .catalog(provider.catalog_names().first().unwrap()) + .catalog( + &provider + .catalog_names() + .await + .try_next() + .await + .unwrap() + .unwrap(), + ) + .await + .unwrap() .unwrap(); let schema = catalog - .schema(catalog.schema_names().first().unwrap()) + .schema( + &catalog + .schema_names() + .await + .try_next() + .await + .unwrap() + .unwrap(), + ) + .await + .unwrap() .unwrap(); (ctx, schema) } @@ -262,7 +289,7 @@ mod tests { let domain = "example.com"; let location = format!("http://{domain}/file.parquet"); - let (ctx, schema) = setup_context(); + let (ctx, schema) = setup_context().await; // That's a non registered table so expecting None here let table = schema.table(&location).await?; @@ -287,7 +314,7 @@ mod tests { let bucket = "examples3bucket"; let location = format!("s3://{bucket}/file.parquet"); - let (ctx, schema) = setup_context(); + let (ctx, schema) = setup_context().await; let table = schema.table(&location).await?; assert!(table.is_none()); @@ -309,7 +336,7 @@ mod tests { let bucket = "examplegsbucket"; let location = format!("gs://{bucket}/file.parquet"); - let (ctx, schema) = setup_context(); + let (ctx, schema) = setup_context().await; let table = schema.table(&location).await?; assert!(table.is_none()); @@ -329,7 +356,7 @@ mod tests { #[tokio::test] async fn query_invalid_location_test() { let location = "ts://file.parquet"; - let (_ctx, schema) = setup_context(); + let (_ctx, schema) = setup_context().await; assert!(schema.table(location).await.is_err()); } diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 4c6c352ff339..6c01b1f909ad 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -176,7 +176,8 @@ async fn main_inner() -> Result<()> { // enable dynamic file query let ctx = SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env)) - .enable_url_table(); + .enable_url_table() + .await; ctx.refresh_catalogs().await?; // install dynamic catalog provider that can register required object stores ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new( diff --git a/datafusion-examples/examples/catalog.rs b/datafusion-examples/examples/catalog.rs index cd201c7d60f0..f584a55d01e8 100644 --- a/datafusion-examples/examples/catalog.rs +++ b/datafusion-examples/examples/catalog.rs @@ -29,6 +29,7 @@ use datafusion::{ execution::context::SessionState, prelude::SessionContext, }; +use futures::{stream::BoxStream, StreamExt}; use std::sync::RwLock; use std::{any::Any, collections::HashMap, path::Path, sync::Arc}; use std::{fs::File, io::Write}; @@ -82,7 +83,9 @@ async fn main() -> Result<()> { .await?; // register our catalog in the context - ctx.register_catalog("dircat", Arc::new(catalog)).await; + ctx.register_catalog("dircat", Arc::new(catalog)) + .await + .unwrap(); { // catalog was passed down into our custom catalog list since we override the ctx's default let catalogs = cataloglist.catalogs.read().unwrap(); @@ -188,9 +191,9 @@ impl SchemaProvider for DirSchema { self } - async fn table_names(&self) -> Vec { + async fn table_names(&self) -> BoxStream<'static, Result> { let tables = self.tables.read().unwrap(); - tables.keys().cloned().collect::>() + futures::stream::iter(tables.keys().cloned().map(Ok).collect::>()).boxed() } async fn table(&self, name: &str) -> Result>> { @@ -253,19 +256,19 @@ impl CatalogProvider for DirCatalog { Ok(Some(schema)) } - async fn schema_names(&self) -> Vec { + async fn schema_names(&self) -> BoxStream<'static, Result> { let schemas = self.schemas.read().unwrap(); - schemas.keys().cloned().collect() + futures::stream::iter(schemas.keys().cloned().map(Ok).collect::>()).boxed() } - async fn schema(&self, name: &str) -> Option> { + async fn schema(&self, name: &str) -> Result>> { let schemas = self.schemas.read().unwrap(); let maybe_schema = schemas.get(name); if let Some(schema) = maybe_schema { let schema = schema.clone() as Arc; - Some(schema) + Ok(Some(schema)) } else { - None + Ok(None) } } } @@ -291,22 +294,22 @@ impl CatalogProviderList for CustomCatalogProviderList { &self, name: String, catalog: Arc, - ) -> Option> { + ) -> Result>> { let mut cats = self.catalogs.write().unwrap(); cats.insert(name, catalog.clone()); - Some(catalog) + Ok(Some(catalog)) } /// Retrieves the list of available catalog names - async fn catalog_names(&self) -> Vec { + async fn catalog_names(&self) -> BoxStream<'static, Result> { let cats = self.catalogs.read().unwrap(); - cats.keys().cloned().collect() + futures::stream::iter(cats.keys().cloned().map(Ok).collect::>()).boxed() } /// Retrieves a specific catalog by name, provided it exists. - async fn catalog(&self, name: &str) -> Option> { + async fn catalog(&self, name: &str) -> Result>> { let cats = self.catalogs.read().unwrap(); - cats.get(name).cloned() + Ok(cats.get(name).cloned()) } } diff --git a/datafusion-examples/examples/flight/flight_sql_server.rs b/datafusion-examples/examples/flight/flight_sql_server.rs index 49782a77e916..42b198eb9f83 100644 --- a/datafusion-examples/examples/flight/flight_sql_server.rs +++ b/datafusion-examples/examples/flight/flight_sql_server.rs @@ -170,11 +170,15 @@ impl FlightSqlServiceImpl { let mut schemas = vec![]; let mut names = vec![]; let mut types = vec![]; - for catalog in ctx.catalog_names().await { - let catalog_provider = ctx.catalog(&catalog).await.unwrap(); - for schema in catalog_provider.schema_names().await { - let schema_provider = catalog_provider.schema(&schema).await.unwrap(); - for table in schema_provider.table_names().await { + let mut catalog_names = ctx.catalog_names().await; + while let Some(catalog) = catalog_names.try_next().await.unwrap() { + let catalog_provider = ctx.catalog(&catalog).await.unwrap().unwrap(); + let mut schema_names = catalog_provider.schema_names().await; + while let Some(schema) = schema_names.try_next().await.unwrap() { + let schema_provider = + catalog_provider.schema(&schema).await.unwrap().unwrap(); + let mut table_names = schema_provider.table_names().await; + while let Some(table) = table_names.try_next().await.unwrap() { let table_provider = schema_provider.table(&table).await.unwrap().unwrap(); catalogs.push(catalog.clone()); diff --git a/datafusion/catalog/Cargo.toml b/datafusion/catalog/Cargo.toml index f9801352087d..41e3cedbd9cf 100644 --- a/datafusion/catalog/Cargo.toml +++ b/datafusion/catalog/Cargo.toml @@ -34,6 +34,7 @@ datafusion-common = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-physical-plan = { workspace = true } +futures = { workspace = true } parking_lot = { workspace = true } [lints] diff --git a/datafusion/catalog/src/catalog.rs b/datafusion/catalog/src/catalog.rs index 7b5a4ee68279..9da3f0c091a1 100644 --- a/datafusion/catalog/src/catalog.rs +++ b/datafusion/catalog/src/catalog.rs @@ -23,6 +23,7 @@ pub use crate::schema::SchemaProvider; use async_trait::async_trait; use datafusion_common::not_impl_err; use datafusion_common::Result; +use futures::stream::BoxStream; /// Represents a catalog, comprising a number of named schemas. /// @@ -110,10 +111,10 @@ pub trait CatalogProvider: Debug + Sync + Send { fn as_any(&self) -> &dyn Any; /// Retrieves the list of available schema names in this catalog. - async fn schema_names(&self) -> Vec; + async fn schema_names(&self) -> BoxStream<'static, Result>; /// Retrieves a specific schema from the catalog by name, provided it exists. - async fn schema(&self, name: &str) -> Option>; + async fn schema(&self, name: &str) -> Result>>; /// Adds a new schema to this catalog. /// @@ -167,11 +168,11 @@ pub trait CatalogProviderList: Debug + Sync + Send { &self, name: String, catalog: Arc, - ) -> Option>; + ) -> Result>>; /// Retrieves the list of available catalog names - async fn catalog_names(&self) -> Vec; + async fn catalog_names(&self) -> BoxStream<'static, Result>; /// Retrieves a specific catalog by name, provided it exists. - async fn catalog(&self, name: &str) -> Option>; + async fn catalog(&self, name: &str) -> Result>>; } diff --git a/datafusion/catalog/src/dynamic_file/catalog.rs b/datafusion/catalog/src/dynamic_file/catalog.rs index 1801111934a0..eba45d1c6bdb 100644 --- a/datafusion/catalog/src/dynamic_file/catalog.rs +++ b/datafusion/catalog/src/dynamic_file/catalog.rs @@ -19,6 +19,8 @@ use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider}; use async_trait::async_trait; +use datafusion_common::Result; +use futures::stream::BoxStream; use std::any::Any; use std::fmt::Debug; use std::sync::Arc; @@ -51,21 +53,21 @@ impl CatalogProviderList for DynamicFileCatalog { &self, name: String, catalog: Arc, - ) -> Option> { - self.inner.register_catalog(name, catalog).await + ) -> Result>> { + Ok(self.inner.register_catalog(name, catalog).await?) } - async fn catalog_names(&self) -> Vec { + async fn catalog_names(&self) -> BoxStream<'static, Result> { self.inner.catalog_names().await } - async fn catalog(&self, name: &str) -> Option> { - self.inner.catalog(name).await.map(|catalog| { + async fn catalog(&self, name: &str) -> Result>> { + Ok(self.inner.catalog(name).await?.map(|catalog| { Arc::new(DynamicFileCatalogProvider::new( catalog, Arc::clone(&self.factory), )) as _ - }) + })) } } @@ -93,25 +95,25 @@ impl CatalogProvider for DynamicFileCatalogProvider { self } - async fn schema_names(&self) -> Vec { + async fn schema_names(&self) -> BoxStream<'static, Result> { self.inner.schema_names().await } - async fn schema(&self, name: &str) -> Option> { - self.inner.schema(name).await.map(|schema| { + async fn schema(&self, name: &str) -> Result>> { + Ok(self.inner.schema(name).await?.map(|schema| { Arc::new(DynamicFileSchemaProvider::new( schema, Arc::clone(&self.factory), )) as _ - }) + })) } async fn register_schema( &self, name: &str, schema: Arc, - ) -> datafusion_common::Result>> { - self.inner.register_schema(name, schema).await + ) -> Result>> { + Ok(self.inner.register_schema(name, schema).await?) } } @@ -143,14 +145,11 @@ impl SchemaProvider for DynamicFileSchemaProvider { self } - async fn table_names(&self) -> Vec { + async fn table_names(&self) -> BoxStream<'static, Result> { self.inner.table_names().await } - async fn table( - &self, - name: &str, - ) -> datafusion_common::Result>> { + async fn table(&self, name: &str) -> Result>> { if let Some(table) = self.inner.table(name).await? { return Ok(Some(table)); }; @@ -162,14 +161,14 @@ impl SchemaProvider for DynamicFileSchemaProvider { &self, name: String, table: Arc, - ) -> datafusion_common::Result>> { + ) -> Result>> { self.inner.register_table(name, table).await } async fn deregister_table( &self, name: &str, - ) -> datafusion_common::Result>> { + ) -> Result>> { self.inner.deregister_table(name).await } @@ -182,8 +181,5 @@ impl SchemaProvider for DynamicFileSchemaProvider { #[async_trait] pub trait UrlTableFactory: Debug + Sync + Send { /// create a new table provider from the provided url - async fn try_new( - &self, - url: &str, - ) -> datafusion_common::Result>>; + async fn try_new(&self, url: &str) -> Result>>; } diff --git a/datafusion/catalog/src/schema.rs b/datafusion/catalog/src/schema.rs index 61d2a8a54335..8db7dd3207c9 100644 --- a/datafusion/catalog/src/schema.rs +++ b/datafusion/catalog/src/schema.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use datafusion_common::{exec_err, DataFusionError}; +use futures::stream::BoxStream; use std::any::Any; use std::fmt::Debug; use std::sync::Arc; @@ -45,7 +46,7 @@ pub trait SchemaProvider: Debug + Sync + Send { fn as_any(&self) -> &dyn Any; /// Retrieves the list of available table names in this schema. - async fn table_names(&self) -> Vec; + async fn table_names(&self) -> BoxStream<'static, Result>; /// Retrieves a specific table from the schema by name, if it exists, /// otherwise returns `None`. diff --git a/datafusion/core/src/catalog_common/information_schema.rs b/datafusion/core/src/catalog_common/information_schema.rs index cc188dac84df..0d83f8b3f587 100644 --- a/datafusion/core/src/catalog_common/information_schema.rs +++ b/datafusion/core/src/catalog_common/information_schema.rs @@ -39,6 +39,8 @@ use async_trait::async_trait; use datafusion_common::error::Result; use datafusion_common::DataFusionError; use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF}; +use futures::stream::BoxStream; +use futures::{StreamExt, TryStreamExt}; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::{any::Any, sync::Arc}; @@ -96,14 +98,17 @@ impl InformationSchemaConfig { ) -> Result<(), DataFusionError> { // create a mem table with the names of tables - for catalog_name in self.catalog_list.catalog_names().await { - let catalog = self.catalog_list.catalog(&catalog_name).await.unwrap(); + let mut catalog_names = self.catalog_list.catalog_names().await; + while let Some(catalog_name) = catalog_names.try_next().await? { + let catalog = self.catalog_list.catalog(&catalog_name).await?.unwrap(); - for schema_name in catalog.schema_names().await { + let mut schema_names = catalog.schema_names().await; + while let Some(schema_name) = schema_names.try_next().await? { if schema_name != INFORMATION_SCHEMA { // schema name may not exist in the catalog, so we need to check - if let Some(schema) = catalog.schema(&schema_name).await { - for table_name in schema.table_names().await { + if let Some(schema) = catalog.schema(&schema_name).await? { + let mut table_names = schema.table_names().await; + while let Some(table_name) = table_names.try_next().await? { if let Some(table) = schema.table(&table_name).await? { builder.add_table( &catalog_name, @@ -131,33 +136,42 @@ impl InformationSchemaConfig { Ok(()) } - async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) { - for catalog_name in self.catalog_list.catalog_names().await { - let catalog = self.catalog_list.catalog(&catalog_name).await.unwrap(); + async fn make_schemata( + &self, + builder: &mut InformationSchemataBuilder, + ) -> Result<()> { + let mut catalog_names = self.catalog_list.catalog_names().await; + while let Some(catalog_name) = catalog_names.try_next().await? { + let catalog = self.catalog_list.catalog(&catalog_name).await?.unwrap(); - for schema_name in catalog.schema_names().await { + let mut schema_names = catalog.schema_names().await; + while let Some(schema_name) = schema_names.try_next().await? { if schema_name != INFORMATION_SCHEMA { - if let Some(schema) = catalog.schema(&schema_name).await { + if let Some(schema) = catalog.schema(&schema_name).await? { let schema_owner = schema.owner_name().await; builder.add_schemata(&catalog_name, &schema_name, schema_owner); } } } } + Ok(()) } async fn make_views( &self, builder: &mut InformationSchemaViewBuilder, ) -> Result<(), DataFusionError> { - for catalog_name in self.catalog_list.catalog_names().await { - let catalog = self.catalog_list.catalog(&catalog_name).await.unwrap(); + let mut catalog_names = self.catalog_list.catalog_names().await; + while let Some(catalog_name) = catalog_names.try_next().await? { + let catalog = self.catalog_list.catalog(&catalog_name).await?.unwrap(); - for schema_name in catalog.schema_names().await { + let mut schema_names = catalog.schema_names().await; + while let Some(schema_name) = schema_names.try_next().await? { if schema_name != INFORMATION_SCHEMA { // schema name may not exist in the catalog, so we need to check - if let Some(schema) = catalog.schema(&schema_name).await { - for table_name in schema.table_names().await { + if let Some(schema) = catalog.schema(&schema_name).await? { + let mut table_names = schema.table_names().await; + while let Some(table_name) = table_names.try_next().await? { if let Some(table) = schema.table(&table_name).await? { builder.add_view( &catalog_name, @@ -180,14 +194,17 @@ impl InformationSchemaConfig { &self, builder: &mut InformationSchemaColumnsBuilder, ) -> Result<(), DataFusionError> { - for catalog_name in self.catalog_list.catalog_names().await { - let catalog = self.catalog_list.catalog(&catalog_name).await.unwrap(); + let mut catalog_names = self.catalog_list.catalog_names().await; + while let Some(catalog_name) = catalog_names.try_next().await? { + let catalog = self.catalog_list.catalog(&catalog_name).await?.unwrap(); - for schema_name in catalog.schema_names().await { + let mut schema_names = catalog.schema_names().await; + while let Some(schema_name) = schema_names.try_next().await? { if schema_name != INFORMATION_SCHEMA { // schema name may not exist in the catalog, so we need to check - if let Some(schema) = catalog.schema(&schema_name).await { - for table_name in schema.table_names().await { + if let Some(schema) = catalog.schema(&schema_name).await? { + let mut table_names = schema.table_names().await; + while let Some(table_name) = table_names.try_next().await? { if let Some(table) = schema.table(&table_name).await? { for (field_position, field) in table.schema().fields().iter().enumerate() @@ -469,11 +486,9 @@ impl SchemaProvider for InformationSchemaProvider { self } - async fn table_names(&self) -> Vec { - INFORMATION_SCHEMA_TABLES - .iter() - .map(|t| t.to_string()) - .collect() + async fn table_names(&self) -> BoxStream<'static, Result> { + let table_names = INFORMATION_SCHEMA_TABLES.iter().map(|s| Ok(s.to_string())); + futures::stream::iter(table_names).boxed() } async fn table( @@ -994,7 +1009,7 @@ impl PartitionStream for InformationSchemata { Arc::clone(&self.schema), // TODO: Stream this futures::stream::once(async move { - config.make_schemata(&mut builder).await; + config.make_schemata(&mut builder).await?; Ok(builder.finish()) }), )) diff --git a/datafusion/core/src/catalog_common/listing_schema.rs b/datafusion/core/src/catalog_common/listing_schema.rs index 41465e36cb31..8be9177df736 100644 --- a/datafusion/core/src/catalog_common/listing_schema.rs +++ b/datafusion/core/src/catalog_common/listing_schema.rs @@ -26,12 +26,13 @@ use crate::catalog::{SchemaProvider, TableProvider, TableProviderFactory}; use crate::execution::context::SessionState; use datafusion_common::{ - Constraints, DFSchema, DataFusionError, HashMap, TableReference, + Constraints, DFSchema, DataFusionError, HashMap, Result, TableReference, }; use datafusion_expr::CreateExternalTable; use async_trait::async_trait; -use futures::TryStreamExt; +use futures::stream::BoxStream; +use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use object_store::ObjectStore; @@ -88,7 +89,7 @@ impl ListingSchemaProvider { } /// Reload table information from ObjectStore - pub async fn refresh(&self, state: &SessionState) -> datafusion_common::Result<()> { + pub async fn refresh(&self, state: &SessionState) -> Result<()> { let entries: Vec<_> = self.store.list(Some(&self.path)).try_collect().await?; let base = Path::new(self.path.as_ref()); let mut tables = HashSet::new(); @@ -163,13 +164,10 @@ impl SchemaProvider for ListingSchemaProvider { self } - async fn table_names(&self) -> Vec { - self.tables - .lock() - .expect("Can't lock tables") - .keys() - .map(|it| it.to_string()) - .collect() + async fn table_names(&self) -> BoxStream<'static, Result> { + let tables = self.tables.lock().expect("Can't lock tables"); + let tables = tables.keys().map(|k| Ok(k.clone())).collect::>(); + futures::stream::iter(tables).boxed() } async fn table( @@ -188,7 +186,7 @@ impl SchemaProvider for ListingSchemaProvider { &self, name: String, table: Arc, - ) -> datafusion_common::Result>> { + ) -> Result>> { self.tables .lock() .expect("Can't lock tables") @@ -199,7 +197,7 @@ impl SchemaProvider for ListingSchemaProvider { async fn deregister_table( &self, name: &str, - ) -> datafusion_common::Result>> { + ) -> Result>> { Ok(self.tables.lock().expect("Can't lock tables").remove(name)) } diff --git a/datafusion/core/src/catalog_common/memory.rs b/datafusion/core/src/catalog_common/memory.rs index 82ae5f88e423..c4512d364e3b 100644 --- a/datafusion/core/src/catalog_common/memory.rs +++ b/datafusion/core/src/catalog_common/memory.rs @@ -23,7 +23,9 @@ use crate::catalog::{ }; use async_trait::async_trait; use dashmap::DashMap; -use datafusion_common::{exec_err, DataFusionError}; +use datafusion_common::{exec_err, DataFusionError, Result}; +use futures::stream::BoxStream; +use futures::{StreamExt, TryStreamExt}; use std::any::Any; use std::sync::Arc; @@ -59,16 +61,21 @@ impl CatalogProviderList for MemoryCatalogProviderList { &self, name: String, catalog: Arc, - ) -> Option> { - self.catalogs.insert(name, catalog) + ) -> Result>> { + Ok(self.catalogs.insert(name, catalog)) } - async fn catalog_names(&self) -> Vec { - self.catalogs.iter().map(|c| c.key().clone()).collect() + async fn catalog_names(&self) -> BoxStream<'static, Result> { + let catalog_names = self + .catalogs + .iter() + .map(|keyval| Ok(keyval.key().clone())) + .collect::>(); + futures::stream::iter(catalog_names).boxed() } - async fn catalog(&self, name: &str) -> Option> { - self.catalogs.get(name).map(|c| Arc::clone(c.value())) + async fn catalog(&self, name: &str) -> Result>> { + Ok(self.catalogs.get(name).map(|c| Arc::clone(c.value()))) } } @@ -99,19 +106,24 @@ impl CatalogProvider for MemoryCatalogProvider { self } - async fn schema_names(&self) -> Vec { - self.schemas.iter().map(|s| s.key().clone()).collect() + async fn schema_names(&self) -> BoxStream<'static, Result> { + let schema_names = self + .schemas + .iter() + .map(|keyval| Ok(keyval.key().clone())) + .collect::>(); + futures::stream::iter(schema_names).boxed() } - async fn schema(&self, name: &str) -> Option> { - self.schemas.get(name).map(|s| Arc::clone(s.value())) + async fn schema(&self, name: &str) -> Result>> { + Ok(self.schemas.get(name).map(|s| Arc::clone(s.value()))) } async fn register_schema( &self, name: &str, schema: Arc, - ) -> datafusion_common::Result>> { + ) -> Result>> { Ok(self.schemas.insert(name.into(), schema)) } @@ -119,9 +131,9 @@ impl CatalogProvider for MemoryCatalogProvider { &self, name: &str, cascade: bool, - ) -> datafusion_common::Result>> { - if let Some(schema) = self.schema(name).await { - let table_names = schema.table_names().await; + ) -> Result>> { + if let Some(schema) = self.schema(name).await? { + let table_names = schema.table_names().await.try_collect::>().await?; match (table_names.is_empty(), cascade) { (true, _) | (false, true) => { let (_, removed) = self.schemas.remove(name).unwrap(); @@ -166,17 +178,19 @@ impl SchemaProvider for MemorySchemaProvider { self } - async fn table_names(&self) -> Vec { - self.tables + async fn table_names(&self) -> BoxStream<'static, Result> { + let table_names = self + .tables .iter() - .map(|table| table.key().clone()) - .collect() + .map(|keyval| Ok(keyval.key().clone())) + .collect::>(); + futures::stream::iter(table_names).boxed() } async fn table( &self, name: &str, - ) -> datafusion_common::Result>, DataFusionError> { + ) -> Result>, DataFusionError> { Ok(self.tables.get(name).map(|table| Arc::clone(table.value()))) } @@ -184,7 +198,7 @@ impl SchemaProvider for MemorySchemaProvider { &self, name: String, table: Arc, - ) -> datafusion_common::Result>> { + ) -> Result>> { if self.table_exist(name.as_str()).await { return exec_err!("The table {name} already exists"); } @@ -194,7 +208,7 @@ impl SchemaProvider for MemorySchemaProvider { async fn deregister_table( &self, name: &str, - ) -> datafusion_common::Result>> { + ) -> Result>> { Ok(self.tables.remove(name).map(|(_, table)| table)) } @@ -261,11 +275,14 @@ mod test { self } - async fn schema_names(&self) -> Vec { + async fn schema_names(&self) -> BoxStream<'static, Result> { unimplemented!() } - async fn schema(&self, _name: &str) -> Option> { + async fn schema( + &self, + _name: &str, + ) -> Result>> { unimplemented!() } } @@ -340,7 +357,9 @@ mod test { .register_schema("active", Arc::new(schema)) .await .unwrap(); - ctx.register_catalog("cat", Arc::new(catalog)).await; + ctx.register_catalog("cat", Arc::new(catalog)) + .await + .unwrap(); let df = ctx .sql("SELECT id, bool_col FROM cat.active.alltypes_plain") diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 54c3e850770b..81cb8feed843 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -66,7 +66,8 @@ use datafusion_expr::{ planner::ExprPlanner, Expr, UserDefinedLogicalNode, WindowUDF, }; -use futures::FutureExt; +use futures::stream::BoxStream; +use futures::{FutureExt, TryStreamExt}; // backwards compatibility pub use crate::execution::session_state::SessionState; @@ -292,13 +293,14 @@ impl SessionContext { /// Finds any [`ListingSchemaProvider`]s and instructs them to reload tables from "disk" pub async fn refresh_catalogs(&self) -> Result<()> { - let cat_names = self.catalog_names().await.clone(); - for cat_name in cat_names.iter() { - let cat = self.catalog(cat_name.as_str()).await.ok_or_else(|| { + let mut cat_names = self.catalog_names().await; + while let Some(cat_name) = cat_names.try_next().await? { + let cat = self.catalog(cat_name.as_str()).await?.ok_or_else(|| { DataFusionError::Internal("Catalog not found!".to_string()) })?; - for schema_name in cat.schema_names().await { - let schema = cat.schema(schema_name.as_str()).await.ok_or_else(|| { + let mut schema_names = cat.schema_names().await; + while let Some(schema_name) = schema_names.try_next().await? { + let schema = cat.schema(&schema_name).await?.ok_or_else(|| { DataFusionError::Internal("Schema not found!".to_string()) })?; let lister = schema.as_any().downcast_ref::(); @@ -919,7 +921,7 @@ impl SessionContext { (name, catalog_list) }; let catalog_fut = catalog_list.catalog(&name); - let catalog = catalog_fut.await.ok_or_else(|| { + let catalog = catalog_fut.await?.ok_or_else(|| { DataFusionError::Execution(format!( "Missing default catalog '{name}'" )) @@ -928,14 +930,14 @@ impl SessionContext { } 2 => { let name = &tokens[0]; - let catalog = self.catalog(name).await.ok_or_else(|| { + let catalog = self.catalog(name).await?.ok_or_else(|| { DataFusionError::Execution(format!("Missing catalog '{name}'")) })?; (catalog, tokens[1]) } _ => return exec_err!("Unable to parse catalog from {schema_name}"), }; - let schema = catalog.schema(schema_name).await; + let schema = catalog.schema(schema_name).await?; match (if_not_exists, schema) { (true, Some(_)) => self.return_empty_dataframe(), @@ -954,7 +956,7 @@ impl SessionContext { if_not_exists, .. } = cmd; - let catalog = self.catalog(catalog_name.as_str()).await; + let catalog = self.catalog(catalog_name.as_str()).await?; match (if_not_exists, catalog) { (true, Some(_)) => self.return_empty_dataframe(), @@ -963,7 +965,7 @@ impl SessionContext { let catalog_list = self.state.write().catalog_list().clone(); catalog_list .register_catalog(catalog_name, new_catalog) - .await; + .await?; self.return_empty_dataframe() } (false, Some(_)) => exec_err!("Catalog '{catalog_name}' already exists"), @@ -1017,7 +1019,7 @@ impl SessionContext { let catalog_list = state.catalog_list().clone(); (catalog_name, catalog_list) }; - if let Some(catalog) = catalog_list.catalog(&catalog_name).await { + if let Some(catalog) = catalog_list.catalog(&catalog_name).await? { catalog } else if allow_missing { return self.return_empty_dataframe(); @@ -1085,8 +1087,8 @@ impl SessionContext { let catalog_list = state.catalog_list().clone(); (resolved, catalog_list) }; - if let Some(catalog) = catalog_list.catalog(&resolved.catalog).await { - catalog.schema(&resolved.schema).await + if let Some(catalog) = catalog_list.catalog(&resolved.catalog).await? { + catalog.schema(&resolved.schema).await? } else { None } @@ -1440,22 +1442,22 @@ impl SessionContext { &self, name: impl Into, catalog: Arc, - ) -> Option> { + ) -> Result>> { let name = name.into(); let catalog_list = self.state.read().catalog_list().clone(); catalog_list.register_catalog(name, catalog).await } /// Retrieves the list of available catalog names. - pub async fn catalog_names(&self) -> Vec { + pub async fn catalog_names(&self) -> BoxStream<'static, Result> { let catalog_list = self.state.read().catalog_list().clone(); catalog_list.catalog_names().await } /// Retrieves a [`CatalogProvider`] instance by name - pub async fn catalog(&self, name: &str) -> Option> { + pub async fn catalog(&self, name: &str) -> Result>> { let catalog_list = self.state.read().catalog_list().clone(); - catalog_list.catalog(name).await + Ok(catalog_list.catalog(name).await?) } /// Registers a [`TableProvider`] as a table that can be @@ -2097,7 +2099,9 @@ mod tests { .register_schema("my_schema", Arc::new(schema)) .await .unwrap(); - ctx.register_catalog("my_catalog", Arc::new(catalog)).await; + ctx.register_catalog("my_catalog", Arc::new(catalog)) + .await + .unwrap(); for table_ref in &["my_catalog.my_schema.test", "my_schema.test", "test"] { let result = plan_and_collect( @@ -2130,7 +2134,9 @@ mod tests { catalog_a .register_schema("schema_a", Arc::new(schema_a)) .await?; - ctx.register_catalog("catalog_a", Arc::new(catalog_a)).await; + ctx.register_catalog("catalog_a", Arc::new(catalog_a)) + .await + .unwrap(); let catalog_b = MemoryCatalogProvider::new(); let schema_b = MemorySchemaProvider::new(); @@ -2140,7 +2146,8 @@ mod tests { catalog_b .register_schema("schema_b", Arc::new(schema_b)) .await?; - ctx.register_catalog("catalog_b", Arc::new(catalog_b)).await; + ctx.register_catalog("catalog_b", Arc::new(catalog_b)) + .await?; let result = plan_and_collect( &ctx, @@ -2178,7 +2185,7 @@ mod tests { // register a single catalog let catalog = Arc::new(MemoryCatalogProvider::new()); let catalog_weak = Arc::downgrade(&catalog); - ctx.register_catalog("my_catalog", catalog).await; + ctx.register_catalog("my_catalog", catalog).await.unwrap(); let catalog_list_weak = { let state = ctx.state.read(); diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 7fdd5d8b5c5b..fe6fd43d7691 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -323,7 +323,7 @@ impl SessionState { async move { catalog_list .catalog(&resolved_ref.catalog) - .await + .await? .ok_or_else(|| { plan_datafusion_err!( "failed to resolve catalog: {}", @@ -331,7 +331,7 @@ impl SessionState { ) })? .schema(&resolved_ref.schema) - .await + .await? .ok_or_else(|| { plan_datafusion_err!( "failed to resolve schema: {}", @@ -1042,6 +1042,7 @@ impl SessionStateBuilder { .catalog_list() .catalog(&existing.config.options().catalog.default_catalog) .await + .unwrap() .is_some(); // The new `with_create_default_catalog_and_schema` should be false if the default catalog exists let create_default_catalog_and_schema = existing @@ -1452,7 +1453,8 @@ impl SessionStateBuilder { state.config.options().catalog.default_catalog.clone(), Arc::new(default_catalog), ) - .await; + .await + .unwrap(); } if let Some(analyzer_rules) = analyzer_rules { @@ -2062,9 +2064,11 @@ mod tests { .catalog(default_catalog.as_str()) .await .unwrap() + .unwrap() .schema(default_schema.as_str()) .await .unwrap() + .unwrap() .table_exist("employee") .await; assert!(is_exist); @@ -2078,9 +2082,11 @@ mod tests { .catalog(default_catalog.as_str()) .await .unwrap() + .unwrap() .schema(default_schema.as_str()) .await .unwrap() + .unwrap() .table_exist("employee") .await ); @@ -2096,6 +2102,7 @@ mod tests { .catalog_list() .catalog(&default_catalog) .await + .unwrap() .is_none()); let new_state = SessionStateBuilder::new_from_existing(without_default_state) .await @@ -2105,6 +2112,7 @@ mod tests { .catalog_list() .catalog(&default_catalog) .await + .unwrap() .is_none()); Ok(()) } diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs index 594f2319f02f..6bd4282196b5 100644 --- a/datafusion/core/tests/sql/create_drop.rs +++ b/datafusion/core/tests/sql/create_drop.rs @@ -34,8 +34,8 @@ async fn create_custom_table() -> Result<()> { let sql = "CREATE EXTERNAL TABLE dt STORED AS DELTATABLE LOCATION 's3://bucket/schema/table';"; ctx.sql(sql).await.unwrap(); - let cat = ctx.catalog("datafusion").await.unwrap(); - let schema = cat.schema("public").await.unwrap(); + let cat = ctx.catalog("datafusion").await.unwrap().unwrap(); + let schema = cat.schema("public").await.unwrap().unwrap(); let exists = schema.table_exist("dt").await; assert!(exists, "Table should have been created!"); @@ -56,8 +56,8 @@ async fn create_external_table_with_ddl() -> Result<()> { let sql = "CREATE EXTERNAL TABLE dt (a_id integer, a_str string, a_bool boolean) STORED AS MOCKTABLE LOCATION 'mockprotocol://path/to/table';"; ctx.sql(sql).await.unwrap(); - let cat = ctx.catalog("datafusion").await.unwrap(); - let schema = cat.schema("public").await.unwrap(); + let cat = ctx.catalog("datafusion").await.unwrap().unwrap(); + let schema = cat.schema("public").await.unwrap().unwrap(); let exists = schema.table_exist("dt").await; assert!(exists, "Table should have been created!"); diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 1c72f88f0a20..3f133bb67172 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -253,7 +253,9 @@ pub async fn register_table_with_many_types(ctx: &SessionContext) { .register_schema("my_schema", Arc::new(schema)) .await .unwrap(); - ctx.register_catalog("my_catalog", Arc::new(catalog)).await; + ctx.register_catalog("my_catalog", Arc::new(catalog)) + .await + .unwrap(); ctx.register_table( "my_catalog.my_schema.table_with_many_types", diff --git a/datafusion/substrait/src/lib.rs b/datafusion/substrait/src/lib.rs index 1389cac75b99..60ec84c31972 100644 --- a/datafusion/substrait/src/lib.rs +++ b/datafusion/substrait/src/lib.rs @@ -59,7 +59,7 @@ //! // Create a plan that scans table 't' //! let ctx = SessionContext::new(); //! let batch = RecordBatch::try_from_iter(vec![("x", Arc::new(Int32Array::from(vec![42])) as _)])?; -//! ctx.register_batch("t", batch)?; +//! ctx.register_batch("t", batch).await?; //! let df = ctx.sql("SELECT x from t").await?; //! let plan = df.into_optimized_plan()?; //! diff --git a/docs/source/library-user-guide/using-the-dataframe-api.md b/docs/source/library-user-guide/using-the-dataframe-api.md index 7f3e28c255c6..82bc458264ee 100644 --- a/docs/source/library-user-guide/using-the-dataframe-api.md +++ b/docs/source/library-user-guide/using-the-dataframe-api.md @@ -88,7 +88,7 @@ async fn main() -> Result<()> { ("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef), ("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))), ])?; - ctx.register_batch("users", data)?; + ctx.register_batch("users", data).await?; // Create a DataFrame using SQL let dataframe = ctx.sql("SELECT * FROM users;") .await?