Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added support for PyTorch Lightning in the DDP backend. #162

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

gopaljigaur
Copy link
Collaborator

This pull request includes several changes to improve the handling of distributed data parallel (DDP) setups and trial evaluation in the neps runtime. The changes focus on adding support for evaluating trials in a DDP context and ensuring proper state management.

DDP and Trial Evaluation Enhancements:

  • Added a new function _is_ddp_and_not_rank_zero to check if the current process is part of a DDP setup and is not the rank zero process. (neps/runtime.py, neps/runtime.pyR49-R66)
  • Introduced the _launch_ddp_runtime function to handle the evaluation of trials in a DDP setup. This function ensures that only the rank zero process launches a new worker. (neps/runtime.py, neps/runtime.pyR512-R531)
  • Modified the _launch_runtime function to use _launch_ddp_runtime when in a DDP setup and not rank zero. This prevents non-rank-zero processes from launching new workers. (neps/runtime.py, neps/runtime.pyR550-R556)

State Management Improvements:

  • Added the evaluating method to the FileBasedTrialStore class to retrieve all evaluating trials. (neps/state/filebased.py, neps/state/filebased.pyR212-R220)
  • Added the get_current_evaluating_trial method to the NepsState class to get the current trial being evaluated. (neps/state/neps_state.py, neps/state/neps_state.pyR217-R222)
  • Defined the evaluating method in the TrialStore protocol to standardize the retrieval of evaluating trials across different implementations. (neps/state/protocols.py, neps/state/protocols.pyR141-R144)

These changes collectively enhance the neps runtime's ability to manage and evaluate trials efficiently, especially in distributed computing environments.

@gopaljigaur gopaljigaur requested review from eddiebergman and DaStoll and removed request for eddiebergman December 8, 2024 17:44
@gopaljigaur gopaljigaur changed the title Added support for PyTorch Lightning in the DDP backend. feat: Added support for PyTorch Lightning in the DDP backend. Dec 12, 2024
@gopaljigaur
Copy link
Collaborator Author

@eddiebergman @DaStoll

Copy link
Contributor

@eddiebergman eddiebergman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems legit, just a question from my own lack of knowing about DDP.

How is the user expected to actually launch a DDP run? Seems everything here is set up with the expectation that they lauch NePS several times, where the rank0 run will pick up the trials and the rest...?

Given this, can you actually run two DDP runs in tandem?

@gopaljigaur
Copy link
Collaborator Author

@eddiebergman In the current approach, neps.run API is called multiple times by pytorch-lightning itself. The way DDP in lightning works is that it launches the main function of the script that started the pipeline. Since it's usually neps.run in the main block of the script, it gets launched multiple times by pytorch-lightning.

Without any modifications, for each NePS launch, a new worker would be created and registered for the same pipeline, which is undesirable. Instead, we try to register a worker only for the NePS launch that is on rank0 and for the higher rank launches of neps.run, instead of letting the pipeline go towards creating a new worker, we simply poll for the current evaluating config. And, when we find the current evaluating config, we run the pipeline function with that config. This now resembles the default DDP launching method.

However, currently I believe this method is not compatible with multiple DDP runs in tandem. In that case:

  1. User Launch 1 of neps: launches the following:
  • pytorch launch: rank0 (registered worker)
  • pytorch launch: rank1 (no worker)
  • pytorch launch: rank2 (no worker)
  • so on...

Here, rank1 and rank2 are launched by pytorch, not by user. These rank1 and rank2 launches should poll for the current evaluating config from rank0 launch which is registered as a worker.

  1. User Launch 2 of neps: launches the following:
  • pytorch launch: rank0 (registered worker)
  • pytorch launch: rank1 (no worker)
  • pytorch launch: rank2 (no worker)
  • so on...

Now in this case, if we poll the current evaluating config from rank1 and rank2 launches, we might get the config being evaluated in the User Launch 1 (i.e. the config sampled by other rank0 launch). Hence, we need to implement a way for sub-processes of this process to only obtain the config sampled by this rank0 worker and not from other rank0 workers.

@eddiebergman
Copy link
Contributor

eddiebergman commented Dec 16, 2024

There are some filesystem/worker changes come about from #161 but in the meantime, I think the most manageable approach would be that if neps.run() needs to be called from the main-script, then perhaps it would be better to add some extra metadata to a given trial to indicate that it needs rank1+ workers on it, i.e. tag the trial as this is the one that the spawned workers should pick up.

I'm not sure how the DDP sub-process spawning is handled by lightning but if it's possible to pass ENV vars to the spawned workers, then this could be the easiest way to make this work. For example, all workers spawned by rank0 through lightning have something like NEPS_DDP_TRIAL=config_4 set, to indicate that the rank0 worker has sampled and evaluating config_4 with the new processes checking

if (config_id := os.getenv("NEPS_DDP_PROCESS_TRIAL")) is not None:
    ... # do what you are doing for ddp workers
else:
    ... # Regular flow

Do you think this could work? It's definitely possible with the default ProcessPoolExecutor and ProcessPool utilities of Python but I know torch re-invented the wheel there in-terms of their own multi-processing

@gopaljigaur
Copy link
Collaborator Author

@eddiebergman, the approach you mention may work but, do you believe it would be possible for DDP spawned workers to pick up the config from their parent worker only, and not from the other parent worker of another DDP run?

I have made a new commit (3af8969) that now allows the non-rank0 processes to pick up the config only from the same parent worker at all times. However, this approach might fail if one of the spawned workers picks up the config from a wrong parent in the beginning. That seems to be the only failure case of the approach I implemented. Please let me know any suggestions.

@eddiebergman
Copy link
Contributor

eddiebergman commented Dec 18, 2024

Assuming you can pass env vars, I don't see why it would work and doesn't risk any might pick up the wrong config, i.e. "hey, I am rank0 process and I have sampled config_14, and you are my spawned DDP worker with your env var NEPS_DDP_TRIAL_ID=config_14, go load in config_14 and do what needs to be done."

Seems much easier than having to read the entire state, figure out which is evaluating, make sure to hopefully pick up the correct one. Might be missing something here though but this eliminates all guesswork?

@gopaljigaur
Copy link
Collaborator Author

@eddiebergman, I tried to implement the approach as you described but the issue is with the way DDP process groups get initialized with pytorch-lightning. The DDP process group is created only after NePS launches the pipeline. But the config sampling is done before NePS actually launches the pipeline. So, we cannot set any process group environment variables until the pipeline is launched and after the pipeline has been launched, it is already too late to set environment variables.

@eddiebergman
Copy link
Contributor

eddiebergman commented Dec 22, 2024

I get your point, makes sense. I would need to see how the ddp workers get spawned. But if the spawned workers share the exact same ENV, then we can do it directly set os.environ after sampling, before run_pipeline() is called.

If they don't share the same ENV, then it's still possible as well, there is some information sent from the main to the spawned. Just need to go code digging to find where that is and find the best hack to do it.


But at any rate, we need to transfer information for the main worker to the spawned workers, that is unique to those spawned workers i.e. imagine workers main1 and main2 spawn 8 sub workers each.

They sub workers need some information to know which is their parent to do the right thing. Guessing which config to pick up is just going to lead to more issues later on, and even worse, it might be a silent issue that we would never know about it as there's no explicit error that would be raised

For example, in the case above, we could have 14/16 sub workers picked up config1 from main1 and only 2/16 subworkers pick up config2 from main2. You might start concluding somehow the hyperparameters are effecting runtime, where really it's just a silent bug on our part.

@gopaljigaur
Copy link
Collaborator Author

@eddiebergman, I have combined both your approach and mine to make the config sharing among workers free of any risks. It's easy to share the Environment variable in the beginning (before process group initialization) because the children processes inherit the parent environment. However, for the subsequently sampled Trials, we have to manually change the environment variables in the children's environments with the id of the newly sampled Trial, which requires Tensor broadcasting or something similar, making the process even complex.

Here's how the current approach works:

  1. First the neps.run is called and the parent process sets environment variable containing sampled config id.
  2. The child workers pick up the config id from the inherited environment variable and store the evaluating (parent) worker id in a variable.
  3. For the subsequent sampled Trials, the child workers get the currently evaluating Trials and simply take the Trial from their own parent worker id.

I have tested this approach for multiple DDP runs and it seems to work each time. Please let me know what you think. If this looks fine, I can make another commit with a NePS example for pytorch-lightning with DDP.

@gopaljigaur gopaljigaur marked this pull request as ready for review January 3, 2025 08:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

2 participants