Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swordfish): Memory manager #3599

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open

feat(swordfish): Memory manager #3599

wants to merge 7 commits into from

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Dec 18, 2024

Simple global memory manager that allows UDFs with memory requests to requests bytes before spawning the task.

@github-actions github-actions bot added the feat label Dec 18, 2024
Copy link

codecov bot commented Dec 18, 2024

Codecov Report

Attention: Patch coverage is 72.80702% with 31 lines in your changes missing coverage. Please review.

Project coverage is 77.84%. Comparing base (063de4d) to head (6c0b158).
Report is 5 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-local-execution/src/resource_manager.rs 58.33% 30 Missing ⚠️
src/common/system-info/src/lib.rs 90.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3599      +/-   ##
==========================================
+ Coverage   77.80%   77.84%   +0.03%     
==========================================
  Files         718      719       +1     
  Lines       88250    88359     +109     
==========================================
+ Hits        68666    68785     +119     
+ Misses      19584    19574      -10     
Files with missing lines Coverage Δ
...ecution/src/intermediate_ops/actor_pool_project.rs 87.59% <100.00%> (+0.60%) ⬆️
.../src/intermediate_ops/anti_semi_hash_join_probe.rs 100.00% <ø> (+1.29%) ⬆️
...local-execution/src/intermediate_ops/cross_join.rs 100.00% <ø> (ø)
...ft-local-execution/src/intermediate_ops/explode.rs 100.00% <ø> (ø)
...aft-local-execution/src/intermediate_ops/filter.rs 100.00% <ø> (ø)
...tion/src/intermediate_ops/inner_hash_join_probe.rs 100.00% <ø> (ø)
...-execution/src/intermediate_ops/intermediate_op.rs 83.54% <100.00%> (+0.87%) ⬆️
...ft-local-execution/src/intermediate_ops/project.rs 89.47% <100.00%> (+1.97%) ⬆️
...aft-local-execution/src/intermediate_ops/sample.rs 100.00% <ø> (ø)
...ft-local-execution/src/intermediate_ops/unpivot.rs 100.00% <ø> (ø)
... and 4 more

... and 11 files with indirect coverage changes

@colin-ho colin-ho marked this pull request as ready for review December 19, 2024 07:34
Copy link

codspeed-hq bot commented Dec 19, 2024

CodSpeed Performance Report

Merging #3599 will degrade performances by 34.14%

Comparing colin/memory-manager (6c0b158) with main (ae74c10)

Summary

❌ 1 regressions
✅ 26 untouched benchmarks

⚠️ Please fix the performance issues or acknowledge them on CodSpeed.

Benchmarks breakdown

Benchmark main colin/memory-manager Change
test_iter_rows_first_row[100 Small Files] 131.6 ms 199.9 ms -34.14%

info: sysinfo::System,
}

impl Default for SystemInfo {
impl Default for SystemInfoInternal {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add this extra nesting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SystemInfo is feature flagged for python, didn't want to propagate that in the local executor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like it uses cfg_attr rather than the feature flag. That means the second part pyclass(module = "daft.daft", frozen) is only set if the feature is enabled.

You should be able to use SystemInfo without the python feature

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[cfg(feature = "python")]
#[pymethods]
impl SystemInfo {
    #[new]
    #[must_use]
    pub fn new() -> Self {
        Self::default()
    }

    #[must_use]
    pub fn cpu_count(&self) -> Option<u64> {
        self.info.physical_core_count().map(|x| x as u64)
    }

    #[must_use]
    pub fn total_memory(&self) -> u64 {
        if let Some(cgroup) = self.info.cgroup_limits() {
            cgroup.total_memory
        } else {
            self.info.total_memory()
        }
    }
}

its the methods that are flagged. I moved the internal logic to non feature flagged methods.

.await??;
let result = {
// MemoryPermit will be automatically dropped at the end of this scope
let _permit = if let Some(mr) = memory_request {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So an issue I see here is that we do the following:

  1. Grab a permit for the resources we're trying to allocate
  2. Schedule the task
  3. Task starts
  4. Task completes
  5. Permit is dropped

But instead I think you want the following

  1. Schedule the Task
  2. Grab a permit for the resources we're trying to allocate
  3. Task starts
  4. Permit is dropped
  5. Task completes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current case we are requesting resources for the right to submit rather than the right to run.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So ideally you actually ask for the resource in the spawn of the tokio task!

use lazy_static::lazy_static;
use tokio::sync::Notify;

lazy_static! {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

favor a OnceLock

None
}

pub fn get_memory_manager() -> Arc<MemoryManager> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would instead return a Arc<MemoryManager>, up to the consume to clone if they need it

MEMORY_MANAGER.clone()
}

pub struct MemoryPermit {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can name this OwnedMemoryPermit

and then in many cases you can also use:

MemoryPermit<'a> {
    bytes: u64,
    manager: &'a MemoryManager,
}

since if you are requesting the memory request in the tokio spawn, you don't need something with a static lifetime.

available_bytes: u64,
}

pub struct MemoryManager {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can limit visibility of much of this to pub(crate)

}

pub async fn request_bytes(self: &Arc<Self>, bytes: u64) -> DaftResult<MemoryPermit> {
if bytes == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we?

}

loop {
let can_allocate = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be cleaner to put this code in

fn try_ request_bytes(self: &Arc<Self>, bytes: u64) -> Option<MemoryPermit> {

}

which attempts to get a permit and will otherwise return None.

THen you can loop that method in here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants