Skip to content

Commit

Permalink
Fix the SID Viz workflow shutdown process with the new pipeline shutd…
Browse files Browse the repository at this point in the history
…own process (#1392)

Due to the changes in #1233, the pipeline shutdown process has been slightly altered. This updates the viz demo to shutdown correctly without error. Also added support for exiting on Ctrl+C

Fixes #1391

Authors:
  - Michael Demoret (https://github.com/mdemoret-nv)

Approvers:
  - Eli Fajardo (https://github.com/efajardo-nv)
  - David Gardner (https://github.com/dagardner-nv)

URL: #1392
  • Loading branch information
mdemoret-nv authored Nov 30, 2023
1 parent 3a02383 commit da3814a
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 38 deletions.
83 changes: 48 additions & 35 deletions morpheus/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def __init__(self, config: Config):

self._mrc_executor: mrc.Executor = None

self._loop: asyncio.AbstractEventLoop = None

@property
def is_built(self) -> bool:
return self._is_built
Expand Down Expand Up @@ -317,7 +319,8 @@ def inner_build(builder: mrc.Builder, segment_id: str):
for port in typing.cast(StageBase, stage).input_ports:
port.link_node(builder=builder)

asyncio.run(self._async_start(segment_graph.nodes()))
# Call the start method for the stages in this segment. Must run on the loop and wait for the result
asyncio.run_coroutine_threadsafe(self._async_start(segment_graph.nodes()), self._loop).result()

logger.info("====Building Segment Complete!====")

Expand All @@ -339,9 +342,47 @@ def inner_build(builder: mrc.Builder, segment_id: str):

logger.info("====Registering Pipeline Complete!====")

def _start(self):
async def _start(self):
assert self._is_built, "Pipeline must be built before starting"

# Only execute this once
if (self._is_started):
return

# Stop from running this twice
self._is_started = True

# Save off the current loop so we can use it in async_start
self._loop = asyncio.get_running_loop()

# Setup error handling and cancellation of the pipeline
def error_handler(_, context: dict):

msg = f"Unhandled exception in async loop! Exception: \n{context['message']}"
exception = context.get("exception", Exception())

logger.critical(msg, exc_info=exception)

self._loop.set_exception_handler(error_handler)

exit_count = 0

# Handles Ctrl+C for graceful shutdown
def term_signal():

nonlocal exit_count
exit_count = exit_count + 1

if (exit_count == 1):
tqdm.write("Stopping pipeline. Please wait... Press Ctrl+C again to kill.")
self.stop()
else:
tqdm.write("Killing")
sys.exit(1)

for sig in [signal.SIGINT, signal.SIGTERM]:
self._loop.add_signal_handler(sig, term_signal)

logger.info("====Starting Pipeline====")

self._mrc_executor.start()
Expand Down Expand Up @@ -372,7 +413,10 @@ async def join(self):
if self._mrc_executor is None:
raise RuntimeError("Pipeline failed pre-flight checks.")

await self._mrc_executor.join_async()
# Make a local reference so the object doesnt go out of scope from a call to stop()
executor = self._mrc_executor

await executor.join_async()
except Exception:
logger.exception("Exception occurred in pipeline. Rethrowing")
raise
Expand Down Expand Up @@ -408,16 +452,14 @@ async def _build_and_start(self):
logger.exception("Error occurred during Pipeline.build(). Exiting.", exc_info=True)
return

self._start()
await self._start()

async def _async_start(self, stages: networkx.classes.reportviews.NodeView):
# This method is called once for each segment in the pipeline executed on this host
for stage in stages:
if (isinstance(stage, Stage)):
await stage.start_async()

self._is_started = True

def visualize(self, filename: str = None, **graph_kwargs):
"""
Output a pipeline diagram to `filename`. The file format of the diagrame is inferred by the extension of
Expand Down Expand Up @@ -578,35 +620,6 @@ async def run_async(self):
"""
This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete.
"""
loop = asyncio.get_running_loop()

def error_handler(_, context: dict):

msg = f"Unhandled exception in async loop! Exception: \n{context['message']}"
exception = context.get("exception", Exception())

logger.critical(msg, exc_info=exception)

loop.set_exception_handler(error_handler)

exit_count = 0

# Handles Ctrl+C for graceful shutdown
def term_signal():

nonlocal exit_count
exit_count = exit_count + 1

if (exit_count == 1):
tqdm.write("Stopping pipeline. Please wait... Press Ctrl+C again to kill.")
self.stop()
else:
tqdm.write("Killing")
sys.exit(1)

for sig in [signal.SIGINT, signal.SIGTERM]:
loop.add_signal_handler(sig, term_signal)

try:
await self._build_and_start()

Expand Down
23 changes: 20 additions & 3 deletions morpheus/stages/postprocess/generate_viz_frames_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,28 @@ async def run_server():

return await super().start_async()

def stop(self):
"""
Stages can implement this to perform cleanup steps when pipeline is stopped.
"""

if (self._loop is not None):
asyncio.run_coroutine_threadsafe(self._stop_server(), loop=self._loop)
pass

async def _stop_server(self):

# Only run this once
if (self._buffer_queue.is_closed()):
return

logger.info("Shutting down queue")

await self._buffer_queue.close()

self._server_close_event.set()

# Wait for it to
# Wait for it to fully shut down
await self._server_task

def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
Expand Down Expand Up @@ -274,8 +287,12 @@ def write_batch(x: MultiResponseMessage):

out_buf = sink.getvalue()

# Enqueue the buffer and block until that completes
asyncio.run_coroutine_threadsafe(self._buffer_queue.put(out_buf), loop=self._loop).result()
try:
# Enqueue the buffer and block until that completes
asyncio.run_coroutine_threadsafe(self._buffer_queue.put(out_buf), loop=self._loop).result()
except Closed:
# Ignore closed errors. Likely the pipeline is shutting down
pass

input_obs.pipe(ops.map(write_batch)).subscribe(output_obs)

Expand Down

0 comments on commit da3814a

Please sign in to comment.