From da3814ace379ddf430ca6f46b3e35c44dbd452ca Mon Sep 17 00:00:00 2001 From: Michael Demoret <42954918+mdemoret-nv@users.noreply.github.com> Date: Thu, 30 Nov 2023 11:12:31 -0700 Subject: [PATCH] Fix the SID Viz workflow shutdown process with the new pipeline shutdown process (#1392) Due to the changes in #1233, the pipeline shutdown process has been slightly altered. This updates the viz demo to shutdown correctly without error. Also added support for exiting on Ctrl+C Fixes #1391 Authors: - Michael Demoret (https://github.com/mdemoret-nv) Approvers: - Eli Fajardo (https://github.com/efajardo-nv) - David Gardner (https://github.com/dagardner-nv) URL: https://github.com/nv-morpheus/Morpheus/pull/1392 --- morpheus/pipeline/pipeline.py | 83 +++++++++++-------- .../postprocess/generate_viz_frames_stage.py | 23 ++++- 2 files changed, 68 insertions(+), 38 deletions(-) diff --git a/morpheus/pipeline/pipeline.py b/morpheus/pipeline/pipeline.py index f3365b5ba4..568c848df4 100644 --- a/morpheus/pipeline/pipeline.py +++ b/morpheus/pipeline/pipeline.py @@ -79,6 +79,8 @@ def __init__(self, config: Config): self._mrc_executor: mrc.Executor = None + self._loop: asyncio.AbstractEventLoop = None + @property def is_built(self) -> bool: return self._is_built @@ -317,7 +319,8 @@ def inner_build(builder: mrc.Builder, segment_id: str): for port in typing.cast(StageBase, stage).input_ports: port.link_node(builder=builder) - asyncio.run(self._async_start(segment_graph.nodes())) + # Call the start method for the stages in this segment. Must run on the loop and wait for the result + asyncio.run_coroutine_threadsafe(self._async_start(segment_graph.nodes()), self._loop).result() logger.info("====Building Segment Complete!====") @@ -339,9 +342,47 @@ def inner_build(builder: mrc.Builder, segment_id: str): logger.info("====Registering Pipeline Complete!====") - def _start(self): + async def _start(self): assert self._is_built, "Pipeline must be built before starting" + # Only execute this once + if (self._is_started): + return + + # Stop from running this twice + self._is_started = True + + # Save off the current loop so we can use it in async_start + self._loop = asyncio.get_running_loop() + + # Setup error handling and cancellation of the pipeline + def error_handler(_, context: dict): + + msg = f"Unhandled exception in async loop! Exception: \n{context['message']}" + exception = context.get("exception", Exception()) + + logger.critical(msg, exc_info=exception) + + self._loop.set_exception_handler(error_handler) + + exit_count = 0 + + # Handles Ctrl+C for graceful shutdown + def term_signal(): + + nonlocal exit_count + exit_count = exit_count + 1 + + if (exit_count == 1): + tqdm.write("Stopping pipeline. Please wait... Press Ctrl+C again to kill.") + self.stop() + else: + tqdm.write("Killing") + sys.exit(1) + + for sig in [signal.SIGINT, signal.SIGTERM]: + self._loop.add_signal_handler(sig, term_signal) + logger.info("====Starting Pipeline====") self._mrc_executor.start() @@ -372,7 +413,10 @@ async def join(self): if self._mrc_executor is None: raise RuntimeError("Pipeline failed pre-flight checks.") - await self._mrc_executor.join_async() + # Make a local reference so the object doesnt go out of scope from a call to stop() + executor = self._mrc_executor + + await executor.join_async() except Exception: logger.exception("Exception occurred in pipeline. Rethrowing") raise @@ -408,7 +452,7 @@ async def _build_and_start(self): logger.exception("Error occurred during Pipeline.build(). Exiting.", exc_info=True) return - self._start() + await self._start() async def _async_start(self, stages: networkx.classes.reportviews.NodeView): # This method is called once for each segment in the pipeline executed on this host @@ -416,8 +460,6 @@ async def _async_start(self, stages: networkx.classes.reportviews.NodeView): if (isinstance(stage, Stage)): await stage.start_async() - self._is_started = True - def visualize(self, filename: str = None, **graph_kwargs): """ Output a pipeline diagram to `filename`. The file format of the diagrame is inferred by the extension of @@ -578,35 +620,6 @@ async def run_async(self): """ This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete. """ - loop = asyncio.get_running_loop() - - def error_handler(_, context: dict): - - msg = f"Unhandled exception in async loop! Exception: \n{context['message']}" - exception = context.get("exception", Exception()) - - logger.critical(msg, exc_info=exception) - - loop.set_exception_handler(error_handler) - - exit_count = 0 - - # Handles Ctrl+C for graceful shutdown - def term_signal(): - - nonlocal exit_count - exit_count = exit_count + 1 - - if (exit_count == 1): - tqdm.write("Stopping pipeline. Please wait... Press Ctrl+C again to kill.") - self.stop() - else: - tqdm.write("Killing") - sys.exit(1) - - for sig in [signal.SIGINT, signal.SIGTERM]: - loop.add_signal_handler(sig, term_signal) - try: await self._build_and_start() diff --git a/morpheus/stages/postprocess/generate_viz_frames_stage.py b/morpheus/stages/postprocess/generate_viz_frames_stage.py index 9d281e788f..77f40e061d 100644 --- a/morpheus/stages/postprocess/generate_viz_frames_stage.py +++ b/morpheus/stages/postprocess/generate_viz_frames_stage.py @@ -235,15 +235,28 @@ async def run_server(): return await super().start_async() + def stop(self): + """ + Stages can implement this to perform cleanup steps when pipeline is stopped. + """ + + if (self._loop is not None): + asyncio.run_coroutine_threadsafe(self._stop_server(), loop=self._loop) + pass + async def _stop_server(self): + # Only run this once + if (self._buffer_queue.is_closed()): + return + logger.info("Shutting down queue") await self._buffer_queue.close() self._server_close_event.set() - # Wait for it to + # Wait for it to fully shut down await self._server_task def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: @@ -274,8 +287,12 @@ def write_batch(x: MultiResponseMessage): out_buf = sink.getvalue() - # Enqueue the buffer and block until that completes - asyncio.run_coroutine_threadsafe(self._buffer_queue.put(out_buf), loop=self._loop).result() + try: + # Enqueue the buffer and block until that completes + asyncio.run_coroutine_threadsafe(self._buffer_queue.put(out_buf), loop=self._loop).result() + except Closed: + # Ignore closed errors. Likely the pipeline is shutting down + pass input_obs.pipe(ops.map(write_batch)).subscribe(output_obs)