From ca8a73feb25f64546b92f18154d90129d84bc1c6 Mon Sep 17 00:00:00 2001 From: David Gardner <96306125+dagardner-nv@users.noreply.github.com> Date: Thu, 29 Aug 2024 12:42:25 -0700 Subject: [PATCH] Stop a python source once the subscriber is no longer subscribed (#493) * When a Python generator source yields a value, and the subscriber is no longer subscribed, stop the source. * Fix out of date docstring comment. This is a partial fix for nv-morpheus/Morpheus#1838 Authors: - David Gardner (https://github.com/dagardner-nv) Approvers: - Anuradha Karuppiah (https://github.com/AnuradhaKaruppiah) URL: https://github.com/nv-morpheus/MRC/pull/493 --- cpp/mrc/src/internal/service.hpp | 4 ++-- python/mrc/_pymrc/src/segment.cpp | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cpp/mrc/src/internal/service.hpp b/cpp/mrc/src/internal/service.hpp index d24e059c5..47d5b7fab 100644 --- a/cpp/mrc/src/internal/service.hpp +++ b/cpp/mrc/src/internal/service.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -36,7 +36,7 @@ enum class ServiceState }; /** - * @brief Converts a `AsyncServiceState` enum to a string + * @brief Converts a `ServiceState` enum to a string * * @param f * @return std::string diff --git a/python/mrc/_pymrc/src/segment.cpp b/python/mrc/_pymrc/src/segment.cpp index f5b931cf0..8a6f6fb33 100644 --- a/python/mrc/_pymrc/src/segment.cpp +++ b/python/mrc/_pymrc/src/segment.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -233,6 +233,11 @@ std::shared_ptr build_source(mrc::segment::IBuil { subscriber.on_next(std::move(next_val)); } + else + { + DVLOG(10) << ctx.info() << " Source unsubscribed. Stopping"; + break; + } } } catch (const std::exception& e)