Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve pipeline stop logic to ensure join is called exactly once for all stages #1479

Merged
merged 27 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2da0f39
update pipeline stop logic
efajardo-nv Jan 25, 2024
3d4cdfe
pr feedback updates
efajardo-nv Feb 2, 2024
f3c72b2
Merge branch 'branch-24.03' of https://github.com/nv-morpheus/Morpheu…
efajardo-nv Feb 2, 2024
6c3d59e
add post_start task
efajardo-nv Feb 2, 2024
b454dcc
fix by adding pipeline.join to run_async
efajardo-nv Feb 3, 2024
ba8f21f
update http server unit test
efajardo-nv Feb 5, 2024
1512d0b
Merge branch 'branch-24.03' of https://github.com/nv-morpheus/Morpheu…
efajardo-nv Feb 8, 2024
83dc809
replace asyncio event with future to propagate exceptions
efajardo-nv Feb 8, 2024
a05552d
update monitor stage test
efajardo-nv Feb 8, 2024
16093ac
pipeline state unit tests
efajardo-nv Feb 9, 2024
c402fe3
remove commented lines
efajardo-nv Feb 9, 2024
a6b719f
remove commented lines
efajardo-nv Feb 9, 2024
fee8124
fix copyright
efajardo-nv Feb 9, 2024
a17f8d5
add remaining pipeline state tests
efajardo-nv Feb 9, 2024
4ae1c34
Merge branch 'branch-24.03' of https://github.com/nv-morpheus/Morpheu…
efajardo-nv Feb 9, 2024
e05da81
Merge branch 'branch-24.03' into pipeline-stop-fix
efajardo-nv Feb 12, 2024
fb3727c
add pipeline build tests
efajardo-nv Feb 12, 2024
fd49cd1
Merge branch 'branch-24.03' of https://github.com/nv-morpheus/Morpheu…
efajardo-nv Feb 12, 2024
4b9ffc6
Merge branch 'pipeline-stop-fix' of https://github.com/efajardo-nv/Mo…
efajardo-nv Feb 12, 2024
64bb13f
test pipeline build test names
efajardo-nv Feb 12, 2024
8acc4d0
pipeline.join error handling
efajardo-nv Feb 12, 2024
88ba3bd
pipeline join tests
efajardo-nv Feb 12, 2024
33883b8
Merge branch 'branch-24.03' of https://github.com/nv-morpheus/Morpheu…
efajardo-nv Feb 12, 2024
c50ea80
add stage methods called tests for pipeline joins
efajardo-nv Feb 12, 2024
aad652d
Merge branch 'branch-24.03' of https://github.com/nv-morpheus/Morpheu…
efajardo-nv Feb 13, 2024
41df4c5
remove error for join after join
efajardo-nv Feb 14, 2024
cfb46a7
Merge branch 'branch-24.03' of https://github.com/nv-morpheus/Morpheu…
efajardo-nv Feb 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions morpheus/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ def __init__(self, config: Config):
self._num_threads = config.num_threads

# Complete set of nodes across segments in this pipeline
self._stages: typing.Set[Stage] = set()
self._stages: typing.List[Stage] = []

# Complete set of sources across segments in this pipeline
self._sources: typing.Set[SourceStage] = set()
self._sources: typing.List[SourceStage] = []

# Dictionary containing segment information for this pipeline
self._segments: typing.Dict = defaultdict(lambda: {"nodes": set(), "ingress_ports": [], "egress_ports": []})
Expand All @@ -94,6 +94,7 @@ def __init__(self, config: Config):

self._loop: asyncio.AbstractEventLoop = None

# Future that allows post_start to propagate exceptions back to pipeline
self._post_start_future: asyncio.Future = None
efajardo-nv marked this conversation as resolved.
Show resolved Hide resolved

@property
Expand Down Expand Up @@ -124,10 +125,10 @@ def add_stage(self, stage: StageT, segment_id: str = "main") -> StageT:
# Add to list of stages if it's a stage, not a source
if (isinstance(stage, Stage)):
segment_nodes.add(stage)
self._stages.add(stage)
self._stages.append(stage)
elif (isinstance(stage, SourceStage)):
segment_nodes.add(stage)
self._sources.add(stage)
self._sources.append(stage)
else:
raise NotImplementedError(f"add_stage() failed. Unknown node type: {type(stage)}")

Expand Down Expand Up @@ -425,7 +426,6 @@ async def post_start(executor):
with self._mutex:
self._state = PipelineState.COMPLETED

# asyncio.create_task(post_start(self._mrc_executor))
self._post_start_future = asyncio.create_task(post_start(self._mrc_executor))

def stop(self):
Expand All @@ -448,9 +448,11 @@ def stop(self):

async def join(self):
"""
Suspend execution all currently running stages and the MRC pipeline.
Typically called after `stop`.
Wait until pipeline completes upon which join methods of sources and stages will be called.
"""
assert self._post_start_future is not None, "Pipeline must be started before joining"
assert self._state != PipelineState.COMPLETED, "Cannot join a pipeline that has already completed"
efajardo-nv marked this conversation as resolved.
Show resolved Hide resolved

await self._post_start_future
efajardo-nv marked this conversation as resolved.
Show resolved Hide resolved

def _on_stop(self):
Expand Down
70 changes: 70 additions & 0 deletions tests/pipeline/test_pipeline_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,29 @@ async def test_stop_after_stop(config: Config):
await pipeline.join()


async def test_join_without_start(config: Config):

pipeline = LinearPipeline(config)
assert pipeline.state == PipelineState.INITIALIZED
pipeline.set_source(source_test_stage(config))
with pytest.raises(Exception) as excinfo:
await pipeline.join()
assert "must be started" in str(excinfo.value)


async def test_join_after_join(config: Config):
efajardo-nv marked this conversation as resolved.
Show resolved Hide resolved
pipeline = LinearPipeline(config)
assert pipeline.state == PipelineState.INITIALIZED
pipeline.set_source(source_test_stage(config))
await pipeline.build_and_start()
assert pipeline.state == PipelineState.STARTED
await pipeline.join()
assert pipeline.state == PipelineState.COMPLETED
with pytest.raises(Exception) as excinfo:
await pipeline.join()
assert "has already completed" in str(excinfo.value)


@mock.patch('morpheus.stages.preprocess.deserialize_stage.DeserializeStage.join')
@mock.patch('morpheus.stages.preprocess.deserialize_stage.DeserializeStage.stop')
@mock.patch('morpheus.stages.input.in_memory_source_stage.InMemorySourceStage.join')
Expand Down Expand Up @@ -242,3 +265,50 @@ async def test_stage_methods_called_stop_after_stop(mock_source_stage_stop,
mock_source_stage_join.assert_called_once()
mock_deserialize_stage_stop.assert_called_once()
mock_deserialize_stage_join.assert_called_once()


@mock.patch('morpheus.stages.preprocess.deserialize_stage.DeserializeStage.join')
@mock.patch('morpheus.stages.preprocess.deserialize_stage.DeserializeStage.stop')
@mock.patch('morpheus.stages.input.in_memory_source_stage.InMemorySourceStage.join')
@mock.patch('morpheus.stages.input.in_memory_source_stage.InMemorySourceStage.stop')
async def test_stage_methods_called_join_without_start(mock_source_stage_stop,
mock_source_stage_join,
mock_deserialize_stage_stop,
mock_deserialize_stage_join,
config: Config,
filter_probs_df: DataFrameType):
pipeline = LinearPipeline(config)
pipeline.set_source(InMemorySourceStage(config, [filter_probs_df]))
pipeline.add_stage(DeserializeStage(config))
with pytest.raises(Exception) as excinfo:
await pipeline.join()
assert "must be started" in str(excinfo.value)
mock_source_stage_stop.assert_not_called()
mock_source_stage_join.assert_not_called()
mock_deserialize_stage_stop.assert_not_called()
mock_deserialize_stage_join.assert_not_called()


@mock.patch('morpheus.stages.preprocess.deserialize_stage.DeserializeStage.join')
@mock.patch('morpheus.stages.preprocess.deserialize_stage.DeserializeStage.stop')
@mock.patch('morpheus.stages.input.in_memory_source_stage.InMemorySourceStage.join')
@mock.patch('morpheus.stages.input.in_memory_source_stage.InMemorySourceStage.stop')
async def test_stage_methods_called_join_after_join(mock_source_stage_stop,
efajardo-nv marked this conversation as resolved.
Show resolved Hide resolved
mock_source_stage_join,
mock_deserialize_stage_stop,
mock_deserialize_stage_join,
config: Config,
filter_probs_df: DataFrameType):
pipeline = LinearPipeline(config)
pipeline.set_source(InMemorySourceStage(config, [filter_probs_df]))
pipeline.add_stage(DeserializeStage(config))
await pipeline.build_and_start()
await pipeline.join()
assert pipeline.state == PipelineState.COMPLETED
with pytest.raises(Exception) as excinfo:
await pipeline.join()
assert "has already completed" in str(excinfo.value)
mock_source_stage_stop.assert_not_called()
mock_source_stage_join.assert_called_once()
mock_deserialize_stage_stop.assert_not_called()
mock_deserialize_stage_join.assert_called_once()
2 changes: 1 addition & 1 deletion tests/test_monitor_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async def test_join(mock_morph_tqdm: mock.MagicMock, config: Config):
await stage.join()
mock_morph_tqdm.assert_not_called()

stage.on_start()
await stage.start_async()
await stage.join()
mock_morph_tqdm.close.assert_called_once()

Expand Down
Loading