Skip to content

Commit

Permalink
Fix datafusion-cli. Change async Vec<String> to BoxStream<Result<Stri…
Browse files Browse the repository at this point in the history
…ng>>. Add Result to a few APIs
  • Loading branch information
westonpace committed Nov 27, 2024
1 parent 5332f0d commit 840e4ed
Show file tree
Hide file tree
Showing 18 changed files with 260 additions and 176 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

91 changes: 59 additions & 32 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion::execution::session_state::SessionStateBuilder;

use async_trait::async_trait;
use dirs::home_dir;
use futures::stream::BoxStream;
use parking_lot::RwLock;

/// Wraps another catalog, automatically register require object stores for the file locations
Expand All @@ -49,28 +50,29 @@ impl DynamicObjectStoreCatalog {
}
}

#[async_trait]
impl CatalogProviderList for DynamicObjectStoreCatalog {
fn as_any(&self) -> &dyn Any {
self
}

fn register_catalog(
async fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
self.inner.register_catalog(name, catalog)
) -> Result<Option<Arc<dyn CatalogProvider>>> {
self.inner.register_catalog(name, catalog).await
}

fn catalog_names(&self) -> Vec<String> {
self.inner.catalog_names()
async fn catalog_names(&self) -> BoxStream<'static, Result<String>> {
self.inner.catalog_names().await
}

fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
async fn catalog(&self, name: &str) -> Result<Option<Arc<dyn CatalogProvider>>> {
let state = self.state.clone();
self.inner.catalog(name).map(|catalog| {
Ok(self.inner.catalog(name).await?.map(|catalog| {
Arc::new(DynamicObjectStoreCatalogProvider::new(catalog, state)) as _
})
}))
}
}

Expand All @@ -90,28 +92,29 @@ impl DynamicObjectStoreCatalogProvider {
}
}

#[async_trait]
impl CatalogProvider for DynamicObjectStoreCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
self.inner.schema_names()
async fn schema_names(&self) -> BoxStream<'static, Result<String>> {
self.inner.schema_names().await
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
async fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
let state = self.state.clone();
self.inner.schema(name).map(|schema| {
Ok(self.inner.schema(name).await?.map(|schema| {
Arc::new(DynamicObjectStoreSchemaProvider::new(schema, state)) as _
})
}))
}

fn register_schema(
async fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
self.inner.register_schema(name, schema)
self.inner.register_schema(name, schema).await
}
}

Expand All @@ -138,16 +141,16 @@ impl SchemaProvider for DynamicObjectStoreSchemaProvider {
self
}

fn table_names(&self) -> Vec<String> {
self.inner.table_names()
async fn table_names(&self) -> BoxStream<'static, Result<String>> {
self.inner.table_names().await
}

fn register_table(
async fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
self.inner.register_table(name, table)
self.inner.register_table(name, table).await
}

async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
Expand All @@ -166,7 +169,7 @@ impl SchemaProvider for DynamicObjectStoreSchemaProvider {
.ok_or_else(|| plan_datafusion_err!("locking error"))?
.read()
.clone();
let mut builder = SessionStateBuilder::from(state.clone());
let mut builder = SessionStateBuilder::new_from_existing(state.clone()).await;
let optimized_name = substitute_tilde(name.to_owned());
let table_url = ListingTableUrl::parse(optimized_name.as_str())?;
let scheme = table_url.scheme();
Expand Down Expand Up @@ -194,7 +197,7 @@ impl SchemaProvider for DynamicObjectStoreSchemaProvider {
}
_ => {}
};
state = builder.build();
state = builder.build().await;
let store = get_object_store(
&state,
table_url.scheme(),
Expand All @@ -208,12 +211,15 @@ impl SchemaProvider for DynamicObjectStoreSchemaProvider {
self.inner.table(name).await
}

fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
self.inner.deregister_table(name)
async fn deregister_table(
&self,
name: &str,
) -> Result<Option<Arc<dyn TableProvider>>> {
self.inner.deregister_table(name).await
}

fn table_exist(&self, name: &str) -> bool {
self.inner.table_exist(name)
async fn table_exist(&self, name: &str) -> bool {
self.inner.table_exist(name).await
}
}

Expand All @@ -234,8 +240,9 @@ mod tests {

use datafusion::catalog::SchemaProvider;
use datafusion::prelude::SessionContext;
use futures::TryStreamExt;

fn setup_context() -> (SessionContext, Arc<dyn SchemaProvider>) {
async fn setup_context() -> (SessionContext, Arc<dyn SchemaProvider>) {
let ctx = SessionContext::new();
ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new(
ctx.state().catalog_list().clone(),
Expand All @@ -247,10 +254,30 @@ mod tests {
ctx.state_weak_ref(),
) as &dyn CatalogProviderList;
let catalog = provider
.catalog(provider.catalog_names().first().unwrap())
.catalog(
&provider
.catalog_names()
.await
.try_next()
.await
.unwrap()
.unwrap(),
)
.await
.unwrap()
.unwrap();
let schema = catalog
.schema(catalog.schema_names().first().unwrap())
.schema(
&catalog
.schema_names()
.await
.try_next()
.await
.unwrap()
.unwrap(),
)
.await
.unwrap()
.unwrap();
(ctx, schema)
}
Expand All @@ -262,7 +289,7 @@ mod tests {
let domain = "example.com";
let location = format!("http://{domain}/file.parquet");

let (ctx, schema) = setup_context();
let (ctx, schema) = setup_context().await;

// That's a non registered table so expecting None here
let table = schema.table(&location).await?;
Expand All @@ -287,7 +314,7 @@ mod tests {
let bucket = "examples3bucket";
let location = format!("s3://{bucket}/file.parquet");

let (ctx, schema) = setup_context();
let (ctx, schema) = setup_context().await;

let table = schema.table(&location).await?;
assert!(table.is_none());
Expand All @@ -309,7 +336,7 @@ mod tests {
let bucket = "examplegsbucket";
let location = format!("gs://{bucket}/file.parquet");

let (ctx, schema) = setup_context();
let (ctx, schema) = setup_context().await;

let table = schema.table(&location).await?;
assert!(table.is_none());
Expand All @@ -329,7 +356,7 @@ mod tests {
#[tokio::test]
async fn query_invalid_location_test() {
let location = "ts://file.parquet";
let (_ctx, schema) = setup_context();
let (_ctx, schema) = setup_context().await;

assert!(schema.table(location).await.is_err());
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ async fn main_inner() -> Result<()> {
// enable dynamic file query
let ctx =
SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env))
.enable_url_table();
.enable_url_table()
.await;
ctx.refresh_catalogs().await?;
// install dynamic catalog provider that can register required object stores
ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new(
Expand Down
31 changes: 17 additions & 14 deletions datafusion-examples/examples/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::{
execution::context::SessionState,
prelude::SessionContext,
};
use futures::{stream::BoxStream, StreamExt};
use std::sync::RwLock;
use std::{any::Any, collections::HashMap, path::Path, sync::Arc};
use std::{fs::File, io::Write};
Expand Down Expand Up @@ -82,7 +83,9 @@ async fn main() -> Result<()> {
.await?;

// register our catalog in the context
ctx.register_catalog("dircat", Arc::new(catalog)).await;
ctx.register_catalog("dircat", Arc::new(catalog))
.await
.unwrap();
{
// catalog was passed down into our custom catalog list since we override the ctx's default
let catalogs = cataloglist.catalogs.read().unwrap();
Expand Down Expand Up @@ -188,9 +191,9 @@ impl SchemaProvider for DirSchema {
self
}

async fn table_names(&self) -> Vec<String> {
async fn table_names(&self) -> BoxStream<'static, Result<String>> {
let tables = self.tables.read().unwrap();
tables.keys().cloned().collect::<Vec<_>>()
futures::stream::iter(tables.keys().cloned().map(Ok).collect::<Vec<_>>()).boxed()
}

async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
Expand Down Expand Up @@ -253,19 +256,19 @@ impl CatalogProvider for DirCatalog {
Ok(Some(schema))
}

async fn schema_names(&self) -> Vec<String> {
async fn schema_names(&self) -> BoxStream<'static, Result<String>> {
let schemas = self.schemas.read().unwrap();
schemas.keys().cloned().collect()
futures::stream::iter(schemas.keys().cloned().map(Ok).collect::<Vec<_>>()).boxed()
}

async fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
async fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>> {
let schemas = self.schemas.read().unwrap();
let maybe_schema = schemas.get(name);
if let Some(schema) = maybe_schema {
let schema = schema.clone() as Arc<dyn SchemaProvider>;
Some(schema)
Ok(Some(schema))
} else {
None
Ok(None)
}
}
}
Expand All @@ -291,22 +294,22 @@ impl CatalogProviderList for CustomCatalogProviderList {
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
) -> Result<Option<Arc<dyn CatalogProvider>>> {
let mut cats = self.catalogs.write().unwrap();
cats.insert(name, catalog.clone());
Some(catalog)
Ok(Some(catalog))
}

/// Retrieves the list of available catalog names
async fn catalog_names(&self) -> Vec<String> {
async fn catalog_names(&self) -> BoxStream<'static, Result<String>> {
let cats = self.catalogs.read().unwrap();
cats.keys().cloned().collect()
futures::stream::iter(cats.keys().cloned().map(Ok).collect::<Vec<_>>()).boxed()
}

/// Retrieves a specific catalog by name, provided it exists.
async fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
async fn catalog(&self, name: &str) -> Result<Option<Arc<dyn CatalogProvider>>> {
let cats = self.catalogs.read().unwrap();
cats.get(name).cloned()
Ok(cats.get(name).cloned())
}
}

Expand Down
14 changes: 9 additions & 5 deletions datafusion-examples/examples/flight/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,15 @@ impl FlightSqlServiceImpl {
let mut schemas = vec![];
let mut names = vec![];
let mut types = vec![];
for catalog in ctx.catalog_names().await {
let catalog_provider = ctx.catalog(&catalog).await.unwrap();
for schema in catalog_provider.schema_names().await {
let schema_provider = catalog_provider.schema(&schema).await.unwrap();
for table in schema_provider.table_names().await {
let mut catalog_names = ctx.catalog_names().await;
while let Some(catalog) = catalog_names.try_next().await.unwrap() {
let catalog_provider = ctx.catalog(&catalog).await.unwrap().unwrap();
let mut schema_names = catalog_provider.schema_names().await;
while let Some(schema) = schema_names.try_next().await.unwrap() {
let schema_provider =
catalog_provider.schema(&schema).await.unwrap().unwrap();
let mut table_names = schema_provider.table_names().await;
while let Some(table) = table_names.try_next().await.unwrap() {
let table_provider =
schema_provider.table(&table).await.unwrap().unwrap();
catalogs.push(catalog.clone());
Expand Down
1 change: 1 addition & 0 deletions datafusion/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
futures = { workspace = true }
parking_lot = { workspace = true }

[lints]
Expand Down
11 changes: 6 additions & 5 deletions datafusion/catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub use crate::schema::SchemaProvider;
use async_trait::async_trait;
use datafusion_common::not_impl_err;
use datafusion_common::Result;
use futures::stream::BoxStream;

/// Represents a catalog, comprising a number of named schemas.
///
Expand Down Expand Up @@ -110,10 +111,10 @@ pub trait CatalogProvider: Debug + Sync + Send {
fn as_any(&self) -> &dyn Any;

/// Retrieves the list of available schema names in this catalog.
async fn schema_names(&self) -> Vec<String>;
async fn schema_names(&self) -> BoxStream<'static, Result<String>>;

/// Retrieves a specific schema from the catalog by name, provided it exists.
async fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
async fn schema(&self, name: &str) -> Result<Option<Arc<dyn SchemaProvider>>>;

/// Adds a new schema to this catalog.
///
Expand Down Expand Up @@ -167,11 +168,11 @@ pub trait CatalogProviderList: Debug + Sync + Send {
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>>;
) -> Result<Option<Arc<dyn CatalogProvider>>>;

/// Retrieves the list of available catalog names
async fn catalog_names(&self) -> Vec<String>;
async fn catalog_names(&self) -> BoxStream<'static, Result<String>>;

/// Retrieves a specific catalog by name, provided it exists.
async fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>>;
async fn catalog(&self, name: &str) -> Result<Option<Arc<dyn CatalogProvider>>>;
}
Loading

0 comments on commit 840e4ed

Please sign in to comment.