-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CHORE] Swordfish perf + cleanup #3132
Conversation
CodSpeed Performance ReportMerging #3132 will not alter performanceComparing Summary
|
src/daft-local-execution/src/lib.rs
Outdated
pub(crate) type ProbeStateBridgeRef = Arc<ProbeStateBridge>; | ||
pub(crate) struct ProbeStateBridge { | ||
inner: OnceLock<Arc<ProbeState>>, | ||
notify: tokio::sync::Notify, | ||
} | ||
|
||
impl ProbeStateBridge { | ||
fn new() -> Arc<Self> { | ||
Arc::new(Self { | ||
inner: OnceLock::new(), | ||
notify: tokio::sync::Notify::new(), | ||
}) | ||
} | ||
|
||
fn set_probe_state(&self, state: Arc<ProbeState>) { | ||
assert!( | ||
!self.inner.set(state).is_err(), | ||
"ProbeStateBridge should be set only once" | ||
); | ||
self.notify.notify_waiters(); | ||
} | ||
|
||
async fn get_probe_state(&self) -> Arc<ProbeState> { | ||
loop { | ||
if let Some(state) = self.inner.get() { | ||
return state.clone(); | ||
} | ||
self.notify.notified().await; | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@samster25 I was rethinking how we transfer data from hashjoinbuild to hashjoinprobe, and thought that using the same channel for data and probetables was kinda clunky and confusing. So I came up with this mechanism instead, with some inspiration from velox
Essentially, hashjoinbuild and hashjoinprobe will share a copy of the probestatebridge
in order to share the probestate data. This way, PipelineNode doesn't need to be aware of any kind of data except micropartitions.
CC @kevinzwang if you have any thoughts as well, i feel like i'm staring at swordfish code way too much lol, need more opinions
sender: PipelineSender, | ||
receiver: PipelineReceiver, | ||
pub(crate) fn create_channel<T>(buffer_size: usize) -> (Sender<T>, Receiver<T>) { | ||
loole::bounded(buffer_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kevinzwang made a great callout the other day that the round-robin dispatching could be inefficient if ordering is not required. This can be fixed using https://docs.rs/loole/latest/loole/ channels, which are multi-producer multi-consumer. This essentially makes it work-stealing if maintaining order is not required.
daft.context.set_execution_config(enable_native_executor=True, default_morsel_size=1)
@daft.udf(return_dtype=daft.DataType.int64())
def do_work(x):
x = x.to_pylist()
# my laptop has 12 cores, so 12 workers will be spawned to run this udf.
if x[0] % 12 == 0:
print("doing a lot of work on ", x)
time.sleep(1)
else:
print("doing a little work on ", x)
time.sleep(0.1)
return x
daft.from_pydict({"a": [i for i in range(48)]})
.with_column("b", do_work(col("a")))
.agg(col("b").sum()) # agg does not require ordering
.collect()
This script is 4x faster now.
No description provided.