From 1e67eda85843721661e280575dbb631176b0ee87 Mon Sep 17 00:00:00 2001 From: James McKinney <26463+jpmckinney@users.noreply.github.com> Date: Wed, 8 May 2024 01:12:42 -0400 Subject: [PATCH] feat: Remove lockfiles when running Exporter and Flattener tasks, allowing tasks to restart, closes #354 --- data_registry/process_manager/task/exporter.py | 10 ++++++++-- data_registry/process_manager/task/flattener.py | 4 ++++ docs/admin/siteadmin.rst | 16 +++++++++++++++- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/data_registry/process_manager/task/exporter.py b/data_registry/process_manager/task/exporter.py index 53ce958..fc81c05 100644 --- a/data_registry/process_manager/task/exporter.py +++ b/data_registry/process_manager/task/exporter.py @@ -5,12 +5,18 @@ class Exporter(TaskManager): final_output = True + def get_export(self): + return Export(self.job.id, basename="full.jsonl.gz") + def run(self): + self.get_export().unlock() + publish({"job_id": self.job.id, "collection_id": self.job.context["process_id_pelican"]}, "exporter_init") def get_status(self): - status = Export(self.job.id, basename="full.jsonl.gz").status - return exporter_status_to_task_status(status) + export = self.get_export() + + return exporter_status_to_task_status(export.status) @skip_if_not_started def wipe(self): diff --git a/data_registry/process_manager/task/flattener.py b/data_registry/process_manager/task/flattener.py index 5ae7f7d..38e9521 100644 --- a/data_registry/process_manager/task/flattener.py +++ b/data_registry/process_manager/task/flattener.py @@ -12,6 +12,10 @@ def get_exports(self): yield Export(self.job.id, basename=f"{path.name[:-9]}.csv.tar.gz") # remove .jsonl.gz def run(self): + for export in self.get_exports(): + if export.running: + export.unlock() + publish({"job_id": self.job.id}, "flattener_init") def get_status(self): diff --git a/docs/admin/siteadmin.rst b/docs/admin/siteadmin.rst index 1da8d1e..a9a885f 100644 --- a/docs/admin/siteadmin.rst +++ b/docs/admin/siteadmin.rst @@ -109,9 +109,23 @@ A job can stall (always "running"). The only option is to `cancel ` and :ref:`Flattener` tasks. Do this only if the ``data_registry_production_exporter_init`` and ``data_registry_production_flattener_init`` queues are empty in the `RabbitMQ management interface `__. + +.. note:: + + The Flattener task publishes one message per file. You might receive a Sentry notification about a failed conversion, while other conversions are still enqueued or in-progress. + + The Exporter task publishes one message per job. This task *can* be restarted while the queue is non-empty – as long as another administrator has not restarted it independently. + +#. `Access the job `__ +#. Set only the *Exporter* and/or *Flattener* task's *Status* to *PLANNED* +#. Click *SAVE* + +Any lockfiles are deleted to allow the task to run. + .. attention:: - To properly implement this feature, see `#354 `__ (for retryable tasks) and `#350 `__ (for non-retryable tasks). + See `#350 `__. Unpublish or freeze a publication ---------------------------------