Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pytorch DDP - Timestamp must be non-decreasing for series attribute [Neptune fairseq integration] #1327

Closed
Blaizzy opened this issue Apr 17, 2023 · 13 comments
Assignees

Comments

@Blaizzy
Copy link
Contributor

Blaizzy commented Apr 17, 2023

          Hi @Blaizzy I ham trying to use Neptune logger in pure Pytorch through Fairseq i.e trying to integrate Neptune logger for Fairseq package which already supports lots of your comptetitors like, WANDB, AzureML, Aim etc.

I am also facing the same issue. I have run the job only on one GPU rank and still getting the logs as stated above(reproduced below).

2023-04-16 18:04:48 | ERROR | neptune.internal.operation_processors.async_operation_processor | Error occurred during asynchronous operation processing: Timestamp must be non-decreasing for series attribute: monitoring/bb14c23a/stdout. Invalid point: 2023-04-16T12:34:46.659Z
2023-04-16 18:04:48 | ERROR | neptune.internal.operation_processors.async_operation_processor | Error occurred during asynchronous operation processing: Timestamp must be non-decreasing for series attribute: monitoring/bb14c23a/stdout. Invalid point: 2023-04-16T12:34:46.985Z
2023-04-16 18:04:48 | ERROR | neptune.internal.operation_processors.async_operation_processor | Error occurred during asynchronous operation processing: Timestamp must be non-decreasing for series attribute: monitoring/bb14c23a/stdout. Invalid point: 2023-04-16T12:34:46.987Z
2023-04-16 18:04:48 | ERROR | neptune.internal.operation_processors.async_operation_processor | Error occurred during asynchronous operation processing: Timestamp must be non-decreasing for series attribute: monitoring/bb14c23a/stdout. Invalid point: 2023-04-16T12:34:47.370Z
2023-04-16 18:04:48 | ERROR | neptune.internal.operation_processors.async_operation_processor | Error occurred during asynchronous operation processing: Timestamp must be non-decreasing for series attribute: monitoring/bb14c23a/stdout. Invalid point: 2023-04-16T12:34:47.371Z

Where is this exactly an issue ? As in, is there someway I can make changes to prevent this issue from occuring, some kind of sync/wait primitives available in neptune that I am not calling ? Any direction would help me better handle the logger implementation for Fairseq.

Originally posted by @harishankar-gopalan in #733 (comment)

@Blaizzy
Copy link
Contributor Author

Blaizzy commented Apr 17, 2023

@harishankar-gopalan could you share a minimal reproducible example of your code?

@Blaizzy Blaizzy changed the title Timestamp must be non-decreasing for series attribute [Neptune fairseq integration] Pytorch DDP - Timestamp must be non-decreasing for series attribute [Neptune fairseq integration] Apr 17, 2023
@Blaizzy Blaizzy self-assigned this Apr 17, 2023
@harishankar-gopalan
Copy link

Ok so I am integrating a progress_bar implementation for neptune into Fairseq.

All metrics logged from within the application is getting logged fine. The monitoring parameters alone is throwing an error as seen below:

2023-04-17 15:45:26 | ERROR | neptune.internal.operation_processors.async_operation_processor | Error occurred during asynchronous operation processing: Timestamp must be non-decreasing for series attribute: monitoring/4fb0ddcb/memory. Invalid point: 2023-04-17T10:15:15.149Z

I am attaching my implementation of the progress_bar below:

try:
    import neptune
except ImportError:
    neptune = None


class NeptuneProgressBarWrapper(BaseProgressBar):
    cached_run_id = None

    def __init__(self, wrapped_bar, project_name, run_name, run_id, run_tags):
        self.wrapped_bar = wrapped_bar
        if neptune is None:
            logger.warning("neptune logger not found, pip install neptune")
            self.run = None
            return

        if NeptuneProgressBarWrapper.cached_run_id and not run_id:
            run_id = NeptuneProgressBarWrapper.cached_run_id

        if not run_id:
            self.run = neptune.init_run(
                project=project_name,
                name=run_name,
                tags=run_tags.split(",") if run_tags else None,
            )
            NeptuneProgressBarWrapper.cached_run_id = self.run["sys/id"].fetch()
        else:
            self.run = neptune.init_run(
                project=project_name,
                name=run_name,
                with_id=run_id,
            )
            logger.info(f"appending to existing run_id: {run_id}")
        logger.info(
            f"initialized Neptune logger with workspace={self.run._api_object.workspace}, backend_class={type(self.run._backend)}"
        )

    def __iter__(self):
        return iter(self.wrapped_bar)

    def __exit__(self):
        if self.run is not None:
            self.run.stop()

    def log(self, stats, tag=None, step=None):
        self._log_to_neptune(stats, tag, step)
        self.wrapped_bar.log(stats, tag, step)

    def print(self, stats, tag=None, step=None):
        self._log_to_neptune(stats, tag, step)
        self.wrapped_bar.print(stats, tag, step)

    def update_config(self, config):
        if self.run:
            self.run["parameters"] = self._format_stats(config)
        self.wrapped_bar.update_config(config)

    def _format_stat(self, stat):
        if isinstance(stat, tuple):
            stat = list(stat)
        if isinstance(stat, Number):
            stat = round(stat, 5)
        elif isinstance(stat, AverageMeter):
            stat = round(stat.avg, 5)
        elif isinstance(stat, TimeMeter):
            stat = round(stat.avg, 5)
        elif isinstance(stat, StopwatchMeter):
            stat = round(stat.sum, 5)
        elif torch.is_tensor(stat):
            stat = stat.tolist()
        return stat

    def _log_to_neptune(self, stats, tag=None, step=None):
        if self.run is None:
            return

        if step is None:
            step = stats["num_updates"]

        prefix = "" if tag is None else tag + "/"

        for key in stats.keys() - {"num_updates"}:
            name = prefix + key
            self.run[name].append(value=self._format_stat(stats[key]), step=step)

I do have minor changes in Fairseq Configs and in Fairseq train cli to accommodate the required command line params and wiring them up to initialize the Neptune run.

@Blaizzy
Copy link
Contributor Author

Blaizzy commented Apr 17, 2023

Hi @harishankar-gopalan

For tracking DDP jobs using neptune, you can refer to this guide here:
https://docs.neptune.ai/tutorials/running_distributed_training/

The main idea is to create the neptune run and log from rank 0.

Or, if you want to log from all ranks make sure you separate hardware monitoring namespace for each rank.

@Blaizzy
Copy link
Contributor Author

Blaizzy commented Apr 17, 2023

Let me know if this helps you!

@Blaizzy
Copy link
Contributor Author

Blaizzy commented Apr 19, 2023

Hi @harishankar-gopalan

Just checking in to see if you still need help with this :)

@harishankar-gopalan
Copy link

@Blaizzy Apologies for the delay, I havent had a look into the resource you have shared yet. Give me this week to go through. I will update this thread on whether I need further help or not.

@Blaizzy
Copy link
Contributor Author

Blaizzy commented Apr 21, 2023

No problem, looking forward to it.

@harishankar-gopalan
Copy link

Hi @Blaizzy I am already using the recommended method of logging from rank 0 (master) process. Also for multiple epochs I am caching the run id and re-using it so that we can log to the same run id as recommended.
Still I am getting the above mentioned warning. It does not affect the training, but I am not sure what logs am I losing by not addressing the above warning. Any assistance to get to the bottom of it would be really helpful.

I am attaching the relevant code where I am instantiating the Neptune project only for the master process similar to how it is already done for other logging vendors like Wandb, AzureML and the like as can be seen here in the fairseq repository.
neptune

@Blaizzy
Copy link
Contributor Author

Blaizzy commented Apr 24, 2023

Could you share the error you are getting now?

The error you were getting before was due to initializing the run in all processes. Now, in this current case, I believe you might be instantiating the run multiple times in the master process.

This error occurs because all processes want to log into the same field at the same time which causes a race condition.

@Blaizzy
Copy link
Contributor Author

Blaizzy commented Apr 24, 2023

@harishankar-gopalan
Copy link

Will check. As far as I know the run init is called once, and then called with the "with_id" parameters set to a previous run that was init as I want to log to the same run the details of all the epochs. Apart from that the run_init is only called once. I will check once more if there are any other loose ends where the run_init gets called without the with_id parameter set.

@Blaizzy
Copy link
Contributor Author

Blaizzy commented Apr 25, 2023

When you call it again with_id you are reinstating the run. That could be the culprit.

Could you send me a minimal reproducible example? It can be only how you setup Neptune and use Neptune in your code.

@Blaizzy
Copy link
Contributor Author

Blaizzy commented May 9, 2023

Closing this issue as it's stale

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants