Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue where Python sources can deadlock when the subscriber is unsubscribed #495

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
98991f9
Stop a python source once the subscriber is no longer subscribed
dagardner-nv Aug 9, 2024
73181d6
Fix xlass name in docstring
dagardner-nv Aug 12, 2024
9f14bc6
Advance the underlying python iterator in a future, allowing both the…
dagardner-nv Aug 15, 2024
1c23d3a
Test to reproduce morpheus issue #1838
dagardner-nv Aug 15, 2024
ac1c28d
If the GLOG_v environment variable is devined set the log level to debug
dagardner-nv Aug 15, 2024
e865253
Fix calling get on the future
dagardner-nv Aug 16, 2024
6e17ade
Fix CR year
dagardner-nv Aug 16, 2024
f712dc6
WIP - not working
dagardner-nv Aug 16, 2024
49ac47a
WIP - not working
dagardner-nv Aug 16, 2024
48c5f6e
Remove unused json usage
dagardner-nv Aug 16, 2024
81ab32e
Remove temporary test
dagardner-nv Aug 16, 2024
dc14b8f
Update PyIteratorIterator to optionally use the subscriber when prese…
dagardner-nv Aug 16, 2024
c2fca4a
Update CR year
dagardner-nv Aug 16, 2024
fbda78c
Remove unintended changes
dagardner-nv Aug 16, 2024
249dec1
Attempt to terminate the source thread on shutdown - does not work - …
dagardner-nv Aug 28, 2024
19abd52
WIP - untested
dagardner-nv Aug 28, 2024
3d1538a
WIP
dagardner-nv Aug 28, 2024
e774f2d
WIP - TODO: includes temp logging
dagardner-nv Aug 29, 2024
cdea4ad
Stop the source generator thread by sending a Python exception to it …
dagardner-nv Aug 29, 2024
c3068e2
Fix CR year
dagardner-nv Aug 29, 2024
53dbae6
Initialize on first call to get_next_value, this avoids stopping the …
dagardner-nv Aug 29, 2024
cafdfb3
Merge branch 'branch-24.10' of github.com:nv-morpheus/MRC into david-…
dagardner-nv Aug 29, 2024
4dbef0d
Always run each instance with it's own unique copy of the iterator wr…
dagardner-nv Aug 29, 2024
14d19c7
Check for exceptions prior to the liveness of the thread
dagardner-nv Aug 29, 2024
8b4c59c
Move the construction of PyIteratorIterator into the try block as the…
dagardner-nv Aug 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/mrc/_pymrc/include/pymrc/segment.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -252,5 +252,7 @@ class BuilderProxy
static void init_module(mrc::segment::IBuilder& self, std::shared_ptr<mrc::modules::SegmentModule> module);
};

void iterator_thread(pybind11::iterator itr, pybind11::object queue, pybind11::object exception_queue);

#pragma GCC visibility pop
} // namespace mrc::pymrc
243 changes: 206 additions & 37 deletions python/mrc/_pymrc/src/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,18 @@
#include <glog/logging.h>
#include <pybind11/cast.h>
#include <pybind11/gil.h>
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <pyerrors.h> // for PyExc_SystemExit
#include <rxcpp/rx.hpp>

#include <chrono>
#include <exception>
#include <fstream>
#include <functional>
#include <iterator>
#include <map>
#include <memory>
#include <stdexcept>
#include <string>
#include <type_traits>
Expand All @@ -52,6 +56,58 @@
// IWYU thinks we need array for py::print
// IWYU pragma: no_include <array>

namespace {
namespace py = pybind11;
using namespace std::chrono_literals;
constexpr std::chrono::seconds ThreadShutdownTimeoutSec = 10s;

void stop_python_thread(py::object& thread)
{
if (thread.attr("is_alive")().cast<bool>())
{
// The source generator thread needs to stop prior to the thread object going out of scope
auto thread_id = thread.attr("ident").cast<unsigned long>();
try
{
DVLOG(10) << "Attempting to kill python thread " << thread.attr("name").cast<std::string>()
<< " id: " << thread_id;

Check warning on line 73 in python/mrc/_pymrc/src/segment.cpp

View check run for this annotation

Codecov / codecov/patch

python/mrc/_pymrc/src/segment.cpp#L72-L73

Added lines #L72 - L73 were not covered by tests

// Using PyExc_SystemExit since Python won't log it as an uncaught exception
auto num_threads_stopped = PyThreadState_SetAsyncExc(thread_id, PyExc_SystemExit);
if (num_threads_stopped != 1)
{
LOG(WARNING) << "Expected return value of 1 from PyThreadState_SetAsyncExc, received: "
<< num_threads_stopped;

Check warning on line 80 in python/mrc/_pymrc/src/segment.cpp

View check run for this annotation

Codecov / codecov/patch

python/mrc/_pymrc/src/segment.cpp#L79-L80

Added lines #L79 - L80 were not covered by tests
}
else
{
// wait until the thread has actually stopped
bool is_alive = thread.attr("is_alive")().cast<bool>();
const auto deadline = std::chrono::system_clock::now() + ThreadShutdownTimeoutSec;
while (is_alive && std::chrono::system_clock::now() < deadline)
{
{
pybind11::gil_scoped_release no_gil;
boost::this_fiber::yield();
}

is_alive = thread.attr("is_alive")().cast<bool>();
}

if (is_alive)
{
LOG(WARNING) << "Unable to stop thread: " << thread_id << " after "
<< ThreadShutdownTimeoutSec.count() << " seconds";

Check warning on line 100 in python/mrc/_pymrc/src/segment.cpp

View check run for this annotation

Codecov / codecov/patch

python/mrc/_pymrc/src/segment.cpp#L99-L100

Added lines #L99 - L100 were not covered by tests
}
}
} catch (const std::exception& e)

Check warning on line 103 in python/mrc/_pymrc/src/segment.cpp

View check run for this annotation

Codecov / codecov/patch

python/mrc/_pymrc/src/segment.cpp#L103

Added line #L103 was not covered by tests
{
LOG(ERROR) << "Encountered error while attempting to stop thread: " << thread_id << ": " << e.what();

Check warning on line 105 in python/mrc/_pymrc/src/segment.cpp

View check run for this annotation

Codecov / codecov/patch

python/mrc/_pymrc/src/segment.cpp#L105

Added line #L105 was not covered by tests
}
}
}
} // namespace

namespace mrc::pymrc {

namespace py = pybind11;
Expand Down Expand Up @@ -152,6 +208,31 @@
PyHolder m_value{};
};

void iterator_thread(py::iterator itr, py::object queue, py::object exception_queue)
{
try
{
py::gil_scoped_acquire gil;
PyIteratorIterator wrapped_iter(std::move(itr));
PyIteratorIterator sentinel;
while (wrapped_iter != sentinel)
{
{
// Copy the current value into the queue.
queue.attr("put")((*wrapped_iter).copy_obj());
}

++wrapped_iter;
}
} catch (py::error_already_set py_except)
{
exception_queue.attr("put")(py::str(py_except.what()));
}
}

class EmptyQueue : public std::exception
{};

class PyIteratorWrapper
{
public:
Expand All @@ -160,7 +241,7 @@
// NOLINTEND(readability-identifier-naming)

// Create from an iterator
PyIteratorWrapper(py::iterator source_iterator) :
PyIteratorWrapper(py::iterator source_iterator, PyObjectSubscriber* subscriber = nullptr) :
m_iter_factory([iterator = PyObjectHolder(std::move(source_iterator))]() mutable {
// Check if the iterator has been started already
if (!iterator)
Expand Down Expand Up @@ -195,58 +276,138 @@
// Create directly
PyIteratorWrapper(std::function<py::iterator()> iter_factory) : m_iter_factory(std::move(iter_factory)) {}

iterator begin()
py::object get_next_value()
{
// Grab the GIL and create the iterator from the factory
using namespace py::literals;
if (!m_initialized)
{
init();
}

AcquireGIL gil;
try
{
auto value = m_queue.attr("get_nowait")();
return value;
} catch (py::error_already_set py_except)
{
if (py_except.matches(m_empty_exception))
{
// Check to see if we got an exception
if (!m_exception_queue.attr("empty")().cast<bool>())
{
auto py_err_str = m_exception_queue.attr("get")("block"_a = true, "timeout"_a = 0.5);
throw std::runtime_error(py_err_str.cast<std::string>());
}

auto iter = m_iter_factory();
if (!m_thread.attr("is_alive")().cast<bool>())
{
throw pybind11::stop_iteration();
}

// if the thread is alive and we don't have an exception, just signal that we don't have any data
throw EmptyQueue();
}

return iterator{std::move(iter)};
throw;

Check warning on line 312 in python/mrc/_pymrc/src/segment.cpp

View check run for this annotation

Codecov / codecov/patch

python/mrc/_pymrc/src/segment.cpp#L312

Added line #L312 was not covered by tests
}
}

iterator end() // NOLINT(readability-convert-member-functions-to-static)
~PyIteratorWrapper()
{
// Do we need the GIL here?
return iterator{};
{
pybind11::gil_scoped_acquire gil;

if (m_initialized)
{
stop_python_thread(m_thread);
}

m_exception_queue = py::object();
m_queue = py::object();
m_empty_exception = py::object();
m_thread = py::object();
}
}

private:
void init()
{
using namespace py::literals;

AcquireGIL gil;
auto iter = m_iter_factory();

// Use Python to create/track the thread to allow the interpreter to shutdown safely
auto functools_mod = py::module_::import("functools");
auto queue_mod = py::module_::import("queue");
auto threading_mod = py::module_::import("threading");

// We want the python bound version of iterator_thread
auto segment_mod = py::module_::import("mrc.core.segment");

m_empty_exception = queue_mod.attr("Empty");
m_queue = queue_mod.attr("Queue")();
m_exception_queue = queue_mod.attr("Queue")();
auto iter_thread_fn = segment_mod.attr("_iterator_thread");
auto bound_iter_thread_fn = functools_mod.attr(
"partial")(iter_thread_fn, "itr"_a = iter, "queue"_a = m_queue, "exception_queue"_a = m_exception_queue);

m_thread = threading_mod.attr(
"Thread")("target"_a = bound_iter_thread_fn, "name"_a = "py_gen_src", "daemon"_a = true);
m_thread.attr("start")();

m_initialized = true;
}

bool m_initialized = false;
py::object m_thread{};
py::object m_exception_queue{}; // replace with exception queue
py::object m_queue{};
py::object m_empty_exception{};
std::function<py::iterator()> m_iter_factory;
};

std::shared_ptr<mrc::segment::ObjectProperties> build_source(mrc::segment::IBuilder& self,
const std::string& name,
PyIteratorWrapper iter_wrapper)
{
auto wrapper = [iter_wrapper = std::move(iter_wrapper)](PyObjectSubscriber& subscriber) mutable {
auto wrapper = [src_iter_wrapper = std::move(iter_wrapper)](PyObjectSubscriber& subscriber) mutable {
auto& ctx = runnable::Context::get_runtime_context();

try
{
DVLOG(10) << ctx.info() << " Starting source";
DVLOG(10) << ctx.info() << " Starting source";

Check warning on line 378 in python/mrc/_pymrc/src/segment.cpp

View check run for this annotation

Codecov / codecov/patch

python/mrc/_pymrc/src/segment.cpp#L378

Added line #L378 was not covered by tests

for (auto next_val : iter_wrapper)
// Taking a copy, if the source has pe_count>1 or engines_per_pe>1 we will need an indepdenent copy of the
// iterator wrapper
PyIteratorWrapper iter_wrapper = src_iter_wrapper;
bool received_stop_iteration = false;
while (!received_stop_iteration && subscriber.is_subscribed())
{
try
{
// Only send if its subscribed. Very important to ensure the object has been moved!
auto next_val = iter_wrapper.get_next_value();
if (subscriber.is_subscribed())
{
subscriber.on_next(std::move(next_val));
}
else

} catch (EmptyQueue)
{
// No value
if (subscriber.is_subscribed())
{
DVLOG(10) << ctx.info() << " Source unsubscribed. Stopping";
break;
boost::this_fiber::yield();
}
} catch (pybind11::stop_iteration)
{
DVLOG(10) << ctx.info() << " Received stop_iteration";
received_stop_iteration = true;
} catch (const std::exception& e)
{
LOG(ERROR) << ctx.info() << " Error occurred in source. Error msg: " << e.what();
subscriber.on_error(std::current_exception());
return;
}

} catch (const std::exception& e)
{
LOG(ERROR) << ctx.info() << "Error occurred in source. Error msg: " << e.what();

// gil.release();
subscriber.on_error(std::current_exception());
return;
}

subscriber.on_completed();
Expand All @@ -266,22 +427,30 @@
// // the GIL now to avoid dropping and reacquiring multiple times
// AcquireGIL gil;

if (!current.is_initialized())
{
// On the first pass, initialize the iterator. This will generate the first value
current = iter_wrapper.begin();
}
else
auto& ctx = runnable::Context::get_runtime_context();

bool received_value = false;
bool received_stop_iteration = false;
while (!received_value && !received_stop_iteration)
{
// Otherwise pre-increment the iterator
++current;
}
try
{
data = iter_wrapper.get_next_value();
received_value = true;

// Copy the current value into data. No need for the GIL here due to PyHolder
data = *current;
} catch (EmptyQueue)
{
// No value
boost::this_fiber::yield();
} catch (pybind11::stop_iteration)
{
DVLOG(10) << ctx.info() << "Received stop_iteration";
received_stop_iteration = true;
}
}

// Return closed if the current iterator is complete
return current == PyIteratorIterator() ? channel::Status::closed : channel::Status::success;
return received_stop_iteration ? channel::Status::closed : channel::Status::success;
};

return self.construct_object<PythonSourceComponent<PyHolder>>(name, std::move(get_next));
Expand Down
5 changes: 1 addition & 4 deletions python/mrc/_pymrc/src/utilities/acquire_gil.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -17,16 +17,13 @@

#include "pymrc/utilities/acquire_gil.hpp"

#include <nlohmann/json_fwd.hpp>
#include <pybind11/gil.h>
#include <pybind11/pybind11.h>

namespace mrc::pymrc {

namespace py = pybind11;

using nlohmann::json;

AcquireGIL::AcquireGIL() : m_gil(std::make_unique<py::gil_scoped_acquire>()) {}

AcquireGIL::~AcquireGIL() = default;
Expand Down
2 changes: 1 addition & 1 deletion python/mrc/_pymrc/tests/test_executor.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
4 changes: 3 additions & 1 deletion python/mrc/core/segment.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -251,6 +251,8 @@ PYBIND11_MODULE(segment, py_mod)

Builder.def("make_node_full", &BuilderProxy::make_node_full, py::return_value_policy::reference_internal);

py_mod.def("_iterator_thread", &iterator_thread, py::arg("itr"), py::arg("queue"), py::arg("exception_queue"));

py_mod.attr("__version__") = MRC_CONCAT_STR(mrc_VERSION_MAJOR << "." << mrc_VERSION_MINOR << "."
<< mrc_VERSION_PATCH);
}
Expand Down
Loading
Loading