Skip to content

Commit

Permalink
Stop a python source once the subscriber is no longer subscribed (#493)
Browse files Browse the repository at this point in the history
* When a Python generator source yields a value, and the subscriber is no longer subscribed, stop the source.
* Fix out of date docstring comment.

This is a partial fix for nv-morpheus/Morpheus#1838

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Anuradha Karuppiah (https://github.com/AnuradhaKaruppiah)

URL: #493
  • Loading branch information
dagardner-nv authored Aug 29, 2024
1 parent bceb7ef commit ca8a73f
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
4 changes: 2 additions & 2 deletions cpp/mrc/src/internal/service.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -36,7 +36,7 @@ enum class ServiceState
};

/**
* @brief Converts a `AsyncServiceState` enum to a string
* @brief Converts a `ServiceState` enum to a string
*
* @param f
* @return std::string
Expand Down
7 changes: 6 additions & 1 deletion python/mrc/_pymrc/src/segment.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -233,6 +233,11 @@ std::shared_ptr<mrc::segment::ObjectProperties> build_source(mrc::segment::IBuil
{
subscriber.on_next(std::move(next_val));
}
else
{
DVLOG(10) << ctx.info() << " Source unsubscribed. Stopping";
break;
}
}

} catch (const std::exception& e)
Expand Down

0 comments on commit ca8a73f

Please sign in to comment.