Skip to content

Commit

Permalink
Remove redundant get_table_provider method
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Nov 28, 2023
1 parent d14e889 commit c87301f
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 41 deletions.
1 change: 0 additions & 1 deletion src/config/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result<SeafowlContext

#[cfg(test)]
mod tests {
use crate::context::SeafowlContext;
use sqlx::sqlite::SqliteJournalMode;

use super::*;
Expand Down
4 changes: 2 additions & 2 deletions src/context/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,11 @@ impl SeafowlContext {
))
};

if self.get_table_provider(old_table_name.to_owned()).await.is_err() {
if self.try_get_delta_table(old_table_name.to_owned()).await.is_err() {
return Err(Error::Plan(
format!("Source table {old_table_name:?} doesn't exist")
))
} else if self.get_table_provider(new_table_name.to_owned()).await.is_ok() {
} else if self.try_get_delta_table(new_table_name.to_owned()).await.is_ok() {
return Err(Error::Plan(
format!("Target table {new_table_name:?} already exists")
))
Expand Down
33 changes: 0 additions & 33 deletions src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::{
};

use base64::{engine::general_purpose::STANDARD, Engine};
use datafusion::datasource::TableProvider;
pub use datafusion::error::{DataFusionError as Error, Result};
use datafusion::{error::DataFusionError, prelude::SessionContext, sql::TableReference};
use datafusion_common::OwnedTableReference;
Expand Down Expand Up @@ -141,38 +140,6 @@ impl SeafowlContext {
Ok(TableReference::from(resolved_reference).to_owned_reference())
}

/// Get a provider for a given table, return Err if it doesn't exist
async fn get_table_provider(
&self,
table_name: impl Into<String>,
) -> Result<Arc<dyn TableProvider>> {
let table_name = table_name.into();
let table_ref = TableReference::from(table_name.as_str());

let resolved_ref = table_ref.resolve(&self.database, DEFAULT_SCHEMA);

self.inner
.catalog(&resolved_ref.catalog)
.ok_or_else(|| {
Error::Plan(format!(
"failed to resolve catalog: {}",
resolved_ref.catalog
))
})?
.schema(&resolved_ref.schema)
.ok_or_else(|| {
Error::Plan(format!("failed to resolve schema: {}", resolved_ref.schema))
})?
.table(&resolved_ref.table)
.await
.ok_or_else(|| {
Error::Plan(format!(
"'{}.{}.{}' not found",
resolved_ref.catalog, resolved_ref.schema, resolved_ref.table
))
})
}

/// Resolve a table reference into a Delta table
pub async fn try_get_delta_table<'a>(
&self,
Expand Down
14 changes: 9 additions & 5 deletions src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,9 +722,11 @@ impl SeafowlContext {
Ok(row_count) => {
info!("Deleted {} old table versions", row_count);
}
Err(error) => return Err(Error::Internal(format!(
Err(error) => {
return Err(Error::Internal(format!(
"Failed to delete old table versions: {error:?}"
))),
)))
}
}
}

Expand Down Expand Up @@ -831,9 +833,11 @@ impl SeafowlContext {
{
Some(_) => {
// Schema exists, check if existing table's schema matches the new one
match self.get_table_provider(&table_name).await {
Ok(table) => {
table_schema = Some(table.schema());
match self.try_get_delta_table(&table_name).await {
Ok(mut table) => {
// Update table state to pick up the most recent schema
table.update().await?;
table_schema = Some(TableProvider::schema(&table));
true
}
Err(_) => false,
Expand Down

0 comments on commit c87301f

Please sign in to comment.