Skip to content

Commit

Permalink
simplify a lot of redundant logic
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgazelka committed Nov 18, 2024
1 parent 1632678 commit fd179b8
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 206 deletions.
22 changes: 4 additions & 18 deletions src/daft-connect/src/op/execute.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,17 @@
use std::{
collections::{HashMap, HashSet},
future::ready,
path::PathBuf,
pin::Pin,
};
use std::pin::Pin;

use arrow2::io::ipc::write::StreamWriter;
use common_daft_config::DaftExecutionConfig;
use common_file_formats::FileFormat;
use daft_scan::builder::{parquet_scan, ParquetScanBuilder};
use daft_table::Table;
use eyre::Context;
use futures::{stream, Stream, StreamExt, TryStreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use spark_connect::{
execute_plan_response::{ArrowBatch, ResponseType, ResultComplete},
spark_connect_service_server::SparkConnectService,
write_operation::{SaveMode, SaveType},
ExecutePlanResponse, Relation, WriteOperation,
ExecutePlanResponse,
};
use tonic::Status;
use tracing::{error, warn};
use uuid::Uuid;

use crate::{
invalid_argument_err, not_found_err, translation::relation_to_stream, DaftSparkConnectService,
Session,
};
use crate::{DaftSparkConnectService, Session};

mod root;
mod write;
Expand Down
81 changes: 75 additions & 6 deletions src/daft-connect/src/op/execute/root.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::future::ready;

use common_daft_config::DaftExecutionConfig;
use futures::stream;
use spark_connect::Relation;
use tonic::Status;
use spark_connect::{ExecutePlanResponse, Relation};
use tonic::{codegen::tokio_stream::wrappers::UnboundedReceiverStream, Status};

use crate::{
op::execute::{ExecuteStream, PlanIds},
session::Session,
translation::relation_to_stream,
translation,
};

impl Session {
Expand All @@ -21,13 +22,81 @@ impl Session {
let context = PlanIds {
session: self.client_side_session_id().to_string(),
server_side_session: self.server_side_session_id().to_string(),
operation: operation_id.clone(),
operation: operation_id,
};

let finished = context.finished();

let stream = relation_to_stream(command, context)
.map_err(|e| Status::internal(e.to_string()))?
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<eyre::Result<ExecutePlanResponse>>();

std::thread::spawn(move || {
let plan = match translation::to_logical_plan(command) {
Ok(plan) => plan,
Err(e) => {
tx.send(Err(eyre::eyre!(e))).unwrap();
return;
}
};

let logical_plan = plan.logical_plan.build();
let physical_plan = match daft_local_plan::translate(&logical_plan) {
Ok(plan) => plan,
Err(e) => {
tx.send(Err(eyre::eyre!(e))).unwrap();
return;
}
};

let cfg = DaftExecutionConfig::default();
let result = match daft_local_execution::run_local(
&physical_plan,
plan.partition,
cfg.into(),
None,
) {
Ok(result) => result,
Err(e) => {
tx.send(Err(eyre::eyre!(e))).unwrap();
return;
}
};

for result in result {
let result = match result {
Ok(result) => result,
Err(e) => {
tx.send(Err(eyre::eyre!(e))).unwrap();
return;
}
};

let tables = match result.get_tables() {
Ok(tables) => tables,
Err(e) => {
tx.send(Err(eyre::eyre!(e))).unwrap();
return;
}
};

for table in tables.as_slice() {
let response = context.gen_response(table);

let response = match response {
Ok(response) => response,
Err(e) => {
tx.send(Err(eyre::eyre!(e))).unwrap();
return;
}
};

tx.send(Ok(response)).unwrap();
}
}
});

let stream = UnboundedReceiverStream::new(rx);

let stream = stream
.map_err(|e| Status::internal(e.to_string()))
.chain(stream::once(ready(Ok(finished))));

Expand Down
10 changes: 7 additions & 3 deletions src/daft-connect/src/op/execute/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,13 @@ impl Session {
println!("physical plan: {physical_plan:#?}");

let cfg = DaftExecutionConfig::default();
let results =
daft_local_execution::run_local(&physical_plan, plan_builder.partition, cfg.into(), None)
.unwrap();
let results = daft_local_execution::run_local(
&physical_plan,
plan_builder.partition,
cfg.into(),
None,
)
.unwrap();

// todo: remove
std::thread::scope(|s| {
Expand Down
2 changes: 0 additions & 2 deletions src/daft-connect/src/translation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
mod logical_plan;
mod schema;
mod stream;

pub use logical_plan::to_logical_plan;
pub use schema::relation_to_schema;
pub use stream::relation_to_stream;
130 changes: 0 additions & 130 deletions src/daft-connect/src/translation/stream.rs

This file was deleted.

47 changes: 0 additions & 47 deletions src/daft-connect/src/translation/stream/range.rs

This file was deleted.

1 change: 1 addition & 0 deletions src/daft-logical-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl LogicalPlanBuilder {
size_bytes: usize,
num_rows: usize,
) -> Self {
use crate::InMemoryInfo;
let source_info = SourceInfo::InMemory(InMemoryInfo::new_not_python(
schema.clone(),
partition_key.into(),
Expand Down

0 comments on commit fd179b8

Please sign in to comment.