Skip to content

Commit

Permalink
pipline channel
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Aug 27, 2024
1 parent 5357ffa commit 8dde192
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 28 deletions.
4 changes: 2 additions & 2 deletions src/daft-local-execution/src/intermediate_ops/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ impl IntermediateOperator for AggregateOperator {
fn execute(
&self,
_idx: usize,
input: PipelineResultType,
input: &PipelineResultType,
_state: Option<&mut Box<dyn IntermediateOperatorState>>,
) -> DaftResult<IntermediateOperatorResult> {
let out = input.data().agg(&self.agg_exprs, &self.group_by)?;
let out = input.as_data().agg(&self.agg_exprs, &self.group_by)?;
Ok(IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(
out,
))))
Expand Down
4 changes: 2 additions & 2 deletions src/daft-local-execution/src/intermediate_ops/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ impl IntermediateOperator for FilterOperator {
fn execute(
&self,
_idx: usize,
input: PipelineResultType,
input: &PipelineResultType,
_state: Option<&mut Box<dyn IntermediateOperatorState>>,
) -> DaftResult<IntermediateOperatorResult> {
let out = input.data().filter(&[self.predicate.clone()])?;
let out = input.as_data().filter(&[self.predicate.clone()])?;
Ok(IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(
out,
))))
Expand Down
12 changes: 6 additions & 6 deletions src/daft-local-execution/src/intermediate_ops/hash_join_probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ enum HashJoinProbeState {
}

impl HashJoinProbeState {
fn set_table(&mut self, table: Arc<ProbeTable>, tables: Arc<Vec<Table>>) {
fn set_table(&mut self, table: &Arc<ProbeTable>, tables: &Arc<Vec<Table>>) {
if let HashJoinProbeState::Building = self {
*self = HashJoinProbeState::ReadyToProbe(table, tables);
*self = HashJoinProbeState::ReadyToProbe(table.clone(), tables.clone());
} else {
panic!("HashJoinProbeState should only be in Building state when setting table")
}
}

fn probe(
&self,
input: Arc<MicroPartition>,
input: &Arc<MicroPartition>,
right_on: &[ExprRef],
pruned_right_side_columns: &[String],
) -> DaftResult<Arc<MicroPartition>> {
Expand Down Expand Up @@ -109,7 +109,7 @@ impl IntermediateOperator for HashJoinProbeOperator {
fn execute(
&self,
idx: usize,
input: PipelineResultType,
input: &PipelineResultType,
state: Option<&mut Box<dyn IntermediateOperatorState>>,
) -> DaftResult<IntermediateOperatorResult> {
println!("HashJoinProbeOperator::execute: idx: {}", idx);
Expand All @@ -120,7 +120,7 @@ impl IntermediateOperator for HashJoinProbeOperator {
.as_any_mut()
.downcast_mut::<HashJoinProbeState>()
.expect("HashJoinProbeOperator state should be HashJoinProbeState");
let (probe_table, tables) = input.probe_table();
let (probe_table, tables) = input.as_probe_table();
state.set_table(probe_table, tables);
Ok(IntermediateOperatorResult::NeedMoreInput(None))
}
Expand All @@ -130,7 +130,7 @@ impl IntermediateOperator for HashJoinProbeOperator {
.as_any_mut()
.downcast_mut::<HashJoinProbeState>()
.expect("HashJoinProbeOperator state should be HashJoinProbeState");
let input = input.data();
let input = input.as_data();
let out = state.probe(input, &self.right_on, &self.pruned_right_side_columns)?;
Ok(IntermediateOperatorResult::NeedMoreInput(Some(out)))
}
Expand Down
26 changes: 16 additions & 10 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub trait IntermediateOperator: Send + Sync {
fn execute(
&self,
idx: usize,
input: PipelineResultType,
input: &PipelineResultType,
state: Option<&mut Box<dyn IntermediateOperatorState>>,
) -> DaftResult<IntermediateOperatorResult>;
fn name(&self) -> &'static str;
Expand Down Expand Up @@ -80,14 +80,20 @@ impl IntermediateNode {
let span = info_span!("IntermediateOp::execute");
let mut state = op.make_state();
while let Some((idx, morsel)) = receiver.recv().await {
let result = rt_context.in_span(&span, || op.execute(idx, morsel, state.as_mut()))?;
match result {
IntermediateOperatorResult::NeedMoreInput(Some(mp)) => {
let _ = sender.send(mp.into()).await;
}
IntermediateOperatorResult::NeedMoreInput(None) => {}
IntermediateOperatorResult::HasMoreOutput(mp) => {
let _ = sender.send(mp.into()).await;
loop {
let result =
rt_context.in_span(&span, || op.execute(idx, &morsel, state.as_mut()))?;
match result {
IntermediateOperatorResult::NeedMoreInput(Some(mp)) => {
let _ = sender.send(mp.into()).await;
break;
}
IntermediateOperatorResult::NeedMoreInput(None) => {
break;
}
IntermediateOperatorResult::HasMoreOutput(mp) => {
let _ = sender.send(mp.into()).await;
}
}
}
}
Expand Down Expand Up @@ -140,7 +146,7 @@ impl IntermediateNode {
let _ = worker_sender.send((idx, morsel.clone())).await;
}
} else {
buffer.push(morsel.data().clone());
buffer.push(morsel.as_data().clone());
if let Some(ready) = buffer.try_clear() {
let _ = send_to_next_worker(idx, ready?.into()).await;
}
Expand Down
4 changes: 2 additions & 2 deletions src/daft-local-execution/src/intermediate_ops/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ impl IntermediateOperator for ProjectOperator {
fn execute(
&self,
_idx: usize,
input: PipelineResultType,
input: &PipelineResultType,
_state: Option<&mut Box<dyn IntermediateOperatorState>>,
) -> DaftResult<IntermediateOperatorResult> {
let out = input.data().eval_expression_list(&self.projection)?;
let out = input.as_data().eval_expression_list(&self.projection)?;
Ok(IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(
out,
))))
Expand Down
4 changes: 2 additions & 2 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ impl From<(Arc<ProbeTable>, Arc<Vec<Table>>)> for PipelineResultType {
}

impl PipelineResultType {
pub fn data(self) -> Arc<MicroPartition> {
pub fn as_data(&self) -> &Arc<MicroPartition> {
match self {
PipelineResultType::Data(data) => data,
_ => panic!("Expected data"),
}
}

pub fn probe_table(self) -> (Arc<ProbeTable>, Arc<Vec<Table>>) {
pub fn as_probe_table(&self) -> (&Arc<ProbeTable>, &Arc<Vec<Table>>) {
match self {
PipelineResultType::ProbeTable(probe_table, tables) => (probe_table, tables),
_ => panic!("Expected probe table"),
Expand Down
2 changes: 1 addition & 1 deletion src/daft-local-execution/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ pub fn run_local(
.await?
.get_receiver();
while let Some(val) = receiver.recv().await {
let _ = tx.send(val.data()).await;
let _ = tx.send(val.as_data().clone()).await;
}

while let Some(result) = runtime_handle.join_next().await {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-local-execution/src/sinks/blocking_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl PipelineNode for BlockingSinkNode {
let mut guard = op.lock().await;
while let Some(val) = child_results_receiver.recv().await {
if let BlockingSinkStatus::Finished =
rt_context.in_span(&span, || guard.sink(&val.data()))?
rt_context.in_span(&span, || guard.sink(val.as_data()))?
{
break;
}
Expand Down
4 changes: 2 additions & 2 deletions src/daft-local-execution/src/sinks/streaming_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ impl PipelineNode for StreamingSinkNode {
let mut sink = op.lock().await;
let mut is_active = true;
while is_active && let Some(val) = child_results_receiver.recv().await {
let val = val.data();
let val = val.as_data();
loop {
let result = runtime_stats.in_span(&span, || sink.execute(0, &val))?;
let result = runtime_stats.in_span(&span, || sink.execute(0, val))?;
match result {
StreamSinkOutput::HasMoreOutput(mp) => {
sender.send(mp.into()).await.unwrap();
Expand Down

0 comments on commit 8dde192

Please sign in to comment.