Skip to content

Commit

Permalink
Replace unnecessary custom logic with DF provided functions
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Nov 29, 2023
1 parent c87301f commit e40c47f
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 33 deletions.
4 changes: 2 additions & 2 deletions src/context/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,11 @@ impl SeafowlContext {
))
};

if self.try_get_delta_table(old_table_name.to_owned()).await.is_err() {
if self.inner.table_provider(old_table_name.to_owned()).await.is_err() {
return Err(Error::Plan(
format!("Source table {old_table_name:?} doesn't exist")
))
} else if self.try_get_delta_table(new_table_name.to_owned()).await.is_ok() {
} else if self.inner.table_provider(new_table_name.to_owned()).await.is_ok() {
return Err(Error::Plan(
format!("Target table {new_table_name:?} already exists")
))
Expand Down
39 changes: 8 additions & 31 deletions src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::{collect, execute_stream};
use datafusion::{
arrow::{datatypes::SchemaRef, record_batch::RecordBatch},
datasource::file_format::{parquet::ParquetFormat, FileFormat},
error::DataFusionError,
execution::context::TaskContext,
physical_plan::{
coalesce_partitions::CoalescePartitionsExec, EmptyRecordBatchStream,
ExecutionPlan, SendableRecordBatchStream,
},
physical_plan::{ExecutionPlan, SendableRecordBatchStream},
sql::TableReference,
};
use datafusion_expr::logical_plan::{
Expand All @@ -50,7 +48,6 @@ use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake::DeltaTable;
use futures::TryStreamExt;
use log::info;
use std::borrow::Cow;
use std::ops::Deref;
Expand Down Expand Up @@ -777,34 +774,16 @@ impl SeafowlContext {
&self,
physical_plan: Arc<dyn ExecutionPlan>,
) -> Result<Vec<RecordBatch>> {
let stream = self.execute_stream(physical_plan).await?;
stream.err_into().try_collect().await
let task_context = Arc::new(TaskContext::from(self.inner()));
collect(physical_plan, task_context).await
}

pub async fn execute_stream(
&self,
physical_plan: Arc<dyn ExecutionPlan>,
) -> Result<SendableRecordBatchStream> {
match physical_plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(
physical_plan.schema(),
))),
1 => self.execute_stream_partitioned(&physical_plan, 0).await,
_ => {
let plan: Arc<dyn ExecutionPlan> =
Arc::new(CoalescePartitionsExec::new(physical_plan));
self.execute_stream_partitioned(&plan, 0).await
}
}
}

async fn execute_stream_partitioned(
&self,
physical_plan: &Arc<dyn ExecutionPlan>,
partition: usize,
) -> Result<SendableRecordBatchStream> {
let task_context = Arc::new(TaskContext::from(self.inner()));
physical_plan.execute(partition, task_context)
execute_stream(physical_plan, task_context)
}

/// Append data from the provided file, creating a new schema/table if absent
Expand Down Expand Up @@ -833,11 +812,9 @@ impl SeafowlContext {
{
Some(_) => {
// Schema exists, check if existing table's schema matches the new one
match self.try_get_delta_table(&table_name).await {
Ok(mut table) => {
// Update table state to pick up the most recent schema
table.update().await?;
table_schema = Some(TableProvider::schema(&table));
match self.inner.table_provider(&table_name).await {
Ok(table) => {
table_schema = Some(table.schema());
true
}
Err(_) => false,
Expand Down

0 comments on commit e40c47f

Please sign in to comment.