Skip to content

Commit

Permalink
fix sammy requests
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgazelka committed Nov 19, 2024
1 parent cc9752c commit c8688f1
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 62 deletions.
87 changes: 26 additions & 61 deletions src/daft-connect/src/op/execute/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,70 +29,35 @@ impl Session {
let finished = context.finished();

let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<eyre::Result<ExecutePlanResponse>>();

std::thread::spawn(move || {
let plan = match translation::to_logical_plan(command) {
Ok(plan) => plan,
Err(e) => {
tx.send(Err(eyre::eyre!(e))).unwrap();
return;
}
};

let logical_plan = plan.build();
let physical_plan = match daft_local_plan::translate(&logical_plan) {
Ok(plan) => plan,
Err(e) => {
tx.send(Err(eyre::eyre!(e))).unwrap();
return;
}
};

let cfg = DaftExecutionConfig::default();
let result = match daft_local_execution::run_local(
&physical_plan,
HashMap::new(),
cfg.into(),
None,
CancellationToken::new(), // todo: maybe implement cancelling
) {
Ok(result) => result,
Err(e) => {
tx.send(Err(eyre::eyre!(e))).unwrap();
return;
}
};

for result in result {
let result = match result {
Ok(result) => result,
Err(e) => {
tx.send(Err(eyre::eyre!(e))).unwrap();
return;
let result = (|| -> eyre::Result<()> {
let plan = translation::to_logical_plan(command)?;
let logical_plan = plan.build();
let physical_plan = daft_local_plan::translate(&logical_plan)?;

let cfg = DaftExecutionConfig::default();
let results = daft_local_execution::run_local(
&physical_plan,
HashMap::new(),
cfg.into(),
None,
CancellationToken::new(), // todo: maybe implement cancelling
)?;

for result in results {
let result = result?;
let tables = result.get_tables()?;

for table in tables.as_slice() {
let response = context.gen_response(table)?;
tx.send(Ok(response)).unwrap();
}
};

let tables = match result.get_tables() {
Ok(tables) => tables,
Err(e) => {
tx.send(Err(eyre::eyre!(e))).unwrap();
return;
}
};

for table in tables.as_slice() {
let response = context.gen_response(table);

let response = match response {
Ok(response) => response,
Err(e) => {
tx.send(Err(eyre::eyre!(e))).unwrap();
return;
}
};

tx.send(Ok(response)).unwrap();
}
Ok(())
})();

if let Err(e) = result {
tx.send(Err(e)).unwrap();
}
});

Expand Down
1 change: 0 additions & 1 deletion tests/connect/test_range_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def spark_session():
# Cleanup
server.shutdown()
session.stop()
# time.sleep(2) # Allow time for session cleanup


def test_range_operation(spark_session):
Expand Down

0 comments on commit c8688f1

Please sign in to comment.