Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High memory usage when dispatching many targets #1220

Closed
wlandau opened this issue Jan 30, 2024 · 9 comments
Closed

High memory usage when dispatching many targets #1220

wlandau opened this issue Jan 30, 2024 · 9 comments
Assignees

Comments

@wlandau
Copy link
Member

wlandau commented Jan 30, 2024

I am running a pipeline which dispatched a couple thousand targets with "worker" storage/retrieval, and memory usage for the crew dispatcher is still in the GB range.

As of the last couple versions, targets dispatches all the targets it can in the moment, regardless of the saturation level of the crew controller. I think we might want to go back to withholding tasks from saturated controllers, but in a more efficient way than before.

And maybe there could be an option to compress the data with qs::qserialize() (if the user has qs installed). qserialize() with default settings looks to be slightly more compact than a custom fit-for-purpose serialization method might be:

x <- targets::tar_target(x, 1)
lobstr::obj_size(x)
#> 30.42 kB
lobstr::obj_size(capture.output(print(x)))
#> 2.69 kB
lobstr::obj_size(capture.output(print(x)))
#> 2.02 kB

Of course there is a speed penalty,

microbenchmark(
  print = capture.output(print(x)),
  hash = qs::qserialize(x),
  no_hash = qs::qserialize(x, check_hash = FALSE)
)
#> Unit: microseconds
#>    expr     min       lq     mean   median      uq      max neval cld
#>   print  95.735 102.2540 114.4868 108.6910 117.998  284.950   100  a 
#>    hash 209.551 252.8880 311.1847 276.9345 298.398 3802.750   100   b
#>  nohash 202.458 235.8115 304.7739 262.3385 285.319 4583.185   100   b

but that might be offset if there the data to ship to workers is lighter.

@wlandau wlandau self-assigned this Jan 30, 2024
@wlandau
Copy link
Member Author

wlandau commented Jan 30, 2024

For the first approach, each crew controller could keep a backlog. controller$push_backlog() could append the task name to a special backlog if the controller is saturated. Then controller$pop_backlog() could pop the first n tasks from the backlog, where n is the number of workers minus the number of unresolved tasks.

@shikokuchuo
Copy link
Contributor

Just chanced across this, seems timely! Would like to confirm - this memory usage is at dispatcher? i.e. the messages have been received by this process and are sitting in a buffer somewhere - and building up as they haven't been processed yet.

@shikokuchuo
Copy link
Contributor

I'm asking as if you keep a backlog at the controller, you are still making a copy of the data, just not sending it to dispatcher yet. Dispatcher should just manage memory through R garbage collection (all external pointers have finalizers attached), so usage may seem high but should not cause problems. So perhaps the solution should indeed be targets withholding tasks from crew.

@wlandau
Copy link
Member Author

wlandau commented Jan 30, 2024

#1221 seems to have reduced memory consumption almost by a factor of 10 in my case.

@wlandau
Copy link
Member Author

wlandau commented Jan 30, 2024

Just chanced across this, seems timely! Would like to confirm - this memory usage is at dispatcher? i.e. the messages have been received by this process and are sitting in a buffer somewhere - and building up as they haven't been processed yet.

I remember thinking so yesterday after examining ps::ps(), but now I am not 100% sure.

I'm asking as if you keep a backlog at the controller, you are still making a copy of the data, just not sending it to dispatcher yet.

I thought about doing it that way, but your point is a good one and I realized it early on. Which is why development crew's backlog mechanism is low level and uses task names. Essentially, if you call controller$saturated() and find out all workers are busy, then you can add just the name of the task to a backlog. Then controller$backlog_pop() returns a character vector of task names from the backlog, and the length of the vector is the number of available workers. This allows targets to fiddle around with task names and mirai daemon connections until it knows there is a worker available. Only then is the data for the task created.

@wlandau
Copy link
Member Author

wlandau commented Jan 30, 2024

A couple more thoughts on this:

  1. Early on, we discussed clustermq's notion of "common data", which might help reduce the size of each task's payload. Since then, you implemented mirai::everywhere(), which is appealing for this purpose. But in crew, this might be difficult because (1) workers auto-scale in and out, so everywhere() might need to be called a second time, and (2) the setup task from everywhere() would trigger an auto-scaling step which launches all the workers. Is there another way?
  2. mirai::serialization(list(qs::qserialize, qs::qdeserialize)) looks super slick for the second part of this issue. In fact, I may not even need to do anything fancy in the code base of either crew or targets! Does it apply to all compute profiles?

@wlandau
Copy link
Member Author

wlandau commented Jan 30, 2024

Hmm... mirai::serialization(list(qs::qserialize, qs::qdeserialize)) does not seem to work in targets. This simple pipeline hangs:

# _targets.R file:
library(targets)
mirai::serialization(list(qs::qserialize, qs::qdeserialize))
tar_option_set(controller = crew::crew_controller_local())
list(tar_target(x, 1))

I think the reason is that mirai::serialization() relies on mirai::everywhere() to register the refhooks. everywhere() creates task objects that are not managed by targets or crew, and this confuses those packages because the condition variables still increment when those tasks are completed.

Is there a way to register refhooks without submitting any tasks? If there is, and if I could manually supply the refhooks separately for daemons() and daemon(), then I could interface this through crew.

@shikokuchuo
Copy link
Contributor

I'm asking as if you keep a backlog at the controller, you are still making a copy of the data, just not sending it to dispatcher yet.

I thought about doing it that way, but your point is a good one and I realized it early on. Which is why development crew's backlog mechanism is low level and uses task names. Essentially, if you call controller$saturated() and find out all workers are busy, then you can add just the name of the task to a backlog. Then controller$backlog_pop() returns a character vector of task names from the backlog, and the length of the vector is the number of available workers. This allows targets to fiddle around with task names and mirai daemon connections until it knows there is a worker available. Only then is the data for the task created.

Right, I think something like that would work for targets as it has the DAG and so can ensure the correct data is created just as it's needed. In the more general case there's no guarantee that objects won't be modified by any subsequent evaluation. I'll have to take a look at strategies to reduce memory usage at some point. Not a straightforward one.

  1. Early on, we discussed clustermq's notion of "common data", which might help reduce the size of each task's payload. Since then, you implemented mirai::everywhere(), which is appealing for this purpose. But in crew, this might be difficult because (1) workers auto-scale in and out, so everywhere() might need to be called a second time, and (2) the setup task from everywhere() would trigger an auto-scaling step which launches all the workers. Is there another way?

Works for persistent workers, can't think of a good way for auto-scaling. You'd seemingly need some sort of handshake when new instances connect, which is not ideal.

  1. mirai::serialization(list(qs::qserialize, qs::qdeserialize)) looks super slick for the second part of this issue. In fact, I may not even need to do anything fancy in the code base of either crew or targets! Does it apply to all compute profiles?

May be an idea if I enable it for environments. Custom serialization is limited to handling external pointer objects at the moment to keep the implementation simple.

@wlandau
Copy link
Member Author

wlandau commented Feb 2, 2024

Closing in favor of shikokuchuo/mirai#97.

@wlandau wlandau closed this as completed Feb 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants