Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the HTTP server shutdown logic to respond correctly to Ctrl+C and stop commands #1517

Open
wants to merge 5 commits into
base: branch-24.06
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions morpheus/_lib/common/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class HttpServer():
def __exit__(self, arg0: object, arg1: object, arg2: object) -> None: ...
def __init__(self, parse_fn: function, bind_address: str = '127.0.0.1', port: int = 8080, endpoint: str = '/message', method: str = 'POST', num_threads: int = 1, max_payload_size: int = 10485760, request_timeout: int = 30) -> None: ...
def is_running(self) -> bool: ...
def run_one(self) -> int: ...
def start(self) -> None: ...
def stop(self) -> None: ...
pass
Expand Down
29 changes: 24 additions & 5 deletions morpheus/_lib/common/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,29 @@ PYBIND11_MODULE(common, _module)
CudfHelper::load();

LoaderRegistry::register_factory_fn(
"file", [](nlohmann::json config) { return std::make_unique<FileDataLoader>(config); }, false);
"file",
[](nlohmann::json config) {
return std::make_unique<FileDataLoader>(config);
},
false);
LoaderRegistry::register_factory_fn(
"grpc", [](nlohmann::json config) { return std::make_unique<GRPCDataLoader>(config); }, false);
"grpc",
[](nlohmann::json config) {
return std::make_unique<GRPCDataLoader>(config);
},
false);
LoaderRegistry::register_factory_fn(
"payload", [](nlohmann::json config) { return std::make_unique<PayloadDataLoader>(config); }, false);
"payload",
[](nlohmann::json config) {
return std::make_unique<PayloadDataLoader>(config);
},
false);
LoaderRegistry::register_factory_fn(
"rest", [](nlohmann::json config) { return std::make_unique<RESTDataLoader>(config); }, false);
"rest",
[](nlohmann::json config) {
return std::make_unique<RESTDataLoader>(config);
},
false);

py::class_<TensorObject>(_module, "Tensor")
.def_property_readonly("__cuda_array_interface__", &TensorObjectInterfaceProxy::cuda_array_interface)
Expand Down Expand Up @@ -106,7 +122,9 @@ PYBIND11_MODULE(common, _module)
.value("CSV", FileTypes::CSV)
.value("PARQUET", FileTypes::PARQUET);

_module.def("typeid_to_numpy_str", [](TypeId tid) { return DType(tid).type_str(); });
_module.def("typeid_to_numpy_str", [](TypeId tid) {
return DType(tid).type_str();
});

_module.def("determine_file_type", &determine_file_type, py::arg("filename"));
_module.def("read_file_to_df", &read_file_to_df, py::arg("filename"), py::arg("file_type") = FileTypes::Auto);
Expand Down Expand Up @@ -135,6 +153,7 @@ PYBIND11_MODULE(common, _module)
.def("start", &HttpServerInterfaceProxy::start)
.def("stop", &HttpServerInterfaceProxy::stop)
.def("is_running", &HttpServerInterfaceProxy::is_running)
.def("run_one", &HttpServerInterfaceProxy::run_one)
.def("__enter__", &HttpServerInterfaceProxy::enter, py::return_value_policy::reference)
.def("__exit__", &HttpServerInterfaceProxy::exit);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,25 @@

#pragma once

#include "morpheus/messages/control.hpp"
#include "morpheus/messages/meta.hpp" // for MessageMeta
#include "morpheus/utilities/http_server.hpp" // for HttpServer

#include <boost/fiber/buffered_channel.hpp> // for buffered_channel
#include <boost/fiber/context.hpp> // for context
#include <boost/fiber/future/future.hpp>
#include <boost/fiber/mutex.hpp>
#include <cudf/io/types.hpp> // for table_with_metadata
#include <mrc/node/rx_sink_base.hpp> // for RxSinkBase
#include <mrc/node/rx_source_base.hpp> // for RxSourceBase
#include <mrc/node/source_properties.hpp> // for channel::Status, SourceProperties<>::source_type_t
#include <mrc/segment/builder.hpp> // for segment::Builder
#include <mrc/segment/object.hpp> // for segment::Object
#include <mrc/types.hpp> // for SegmentAddress
#include <pymrc/node.hpp> // for PythonSource
#include <rxcpp/rx.hpp> // for subscriber
#include <mrc/runnable/context.hpp>
#include <mrc/runnable/runnable.hpp>
#include <mrc/segment/builder.hpp> // for segment::Builder
#include <mrc/segment/object.hpp> // for segment::Object
#include <mrc/types.hpp> // for SegmentAddress
#include <pymrc/node.hpp> // for PythonSource
#include <rxcpp/rx.hpp> // for subscriber

#include <chrono> // for duration
#include <cstddef> // for size_t
Expand All @@ -45,8 +49,6 @@
// IWYU pragma: no_include <thread>

namespace morpheus {
using table_t = std::unique_ptr<cudf::io::table_with_metadata>;
using request_queue_t = boost::fibers::buffered_channel<table_t>;

/****** Component public implementations *******************/
/****** HttpServerSourceStage *************************************/
Expand All @@ -59,14 +61,27 @@ using request_queue_t = boost::fibers::buffered_channel<table_t>;

#pragma GCC visibility push(default)

template <typename T>
class PythonRunnableSource : public mrc::node::ReadableProvider<T>,
public mrc::node::WritableAcceptor<T>,
public mrc::node::SourceChannelOwner<T>,
private mrc::Watchable,
public mrc::runnable::RunnableWithContext<>,
public mrc::pymrc::AutoRegSourceAdapter<T>,
public mrc::pymrc::AutoRegIngressPort<T>
{
public:
};

// TODO(dagardner): optionally add headers to the dataframe

class HttpServerSourceStage : public mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>
class HttpServerSourceStage : public PythonRunnableSource<std::shared_ptr<ControlMessage>>
Comment on lines -64 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we aren't completely on ControlMessage yet, can we make the output message a template choice?

{
public:
using base_t = mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>;
using typename base_t::source_type_t;
using typename base_t::subscriber_fn_t;
using base_t = PythonRunnableSource<std::shared_ptr<ControlMessage>>;
using base_t::source_type_t;
using table_t = std::unique_ptr<cudf::io::table_with_metadata>;
using request_queue_t = boost::fibers::buffered_channel<source_type_t>;

HttpServerSourceStage(std::string bind_address = "127.0.0.1",
unsigned short port = 8080,
Expand All @@ -86,9 +101,14 @@ class HttpServerSourceStage : public mrc::pymrc::PythonSource<std::shared_ptr<Me
void close();

private:
subscriber_fn_t build();
void run(ContextType& context) override;

void on_state_update(const State& new_state) override;

void source_generator(rxcpp::subscriber<source_type_t> subscriber);

boost::fibers::mutex m_mutex;

std::chrono::duration<float, std::milli> m_sleep_time;
std::chrono::duration<long> m_queue_timeout;
std::unique_ptr<HttpServer> m_server;
Expand Down
25 changes: 17 additions & 8 deletions morpheus/_lib/include/morpheus/utilities/http_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

#pragma once

#include <boost/asio/io_context.hpp> // for io_context
#include <boost/asio/ip/tcp.hpp> // for tcp, tcp::acceptor, tcp::endpoint, tcp::socket
#include <boost/beast/core/error.hpp> // for error_code
#include <boost/asio/io_context.hpp> // for io_context
#include <boost/asio/ip/tcp.hpp> // for tcp, tcp::acceptor, tcp::endpoint, tcp::socket
#include <boost/beast/core/error.hpp> // for error_code
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/string_body.hpp>
#include <boost/beast/http/verb.hpp> // for verb
#include <boost/system/error_code.hpp> // for error_code
#include <pybind11/pytypes.h> // for pybind11::function
Expand Down Expand Up @@ -68,7 +70,9 @@ using parse_status_t = std::tuple<unsigned /*http status code*/,
* Refer to https://www.boost.org/doc/libs/1_74_0/libs/system/doc/html/system.html#ref_class_error_code for more
* information regarding `boost::system::error_code`.
*/
using payload_parse_fn_t = std::function<parse_status_t(const std::string& /* post body */)>;
using payload_parse_fn_t =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the docstring to reflect the change here

std::function<parse_status_t(const boost::asio::ip::tcp::endpoint& endpoint,
const boost::beast::http::request<boost::beast::http::string_body> request)>;

constexpr std::size_t DefaultMaxPayloadSize{1024 * 1024 * 10}; // 10MB

Expand Down Expand Up @@ -104,9 +108,13 @@ class HttpServer
void stop();
bool is_running() const;

size_t run_one();

private:
void start_listener(std::binary_semaphore& listener_semaphore, std::binary_semaphore& started_semaphore);

mutable std::mutex m_mutex;

std::string m_bind_address;
unsigned short m_port;
std::string m_endpoint;
Expand All @@ -117,7 +125,7 @@ class HttpServer
std::vector<std::thread> m_listener_threads;
boost::asio::io_context m_io_context;
std::shared_ptr<Listener> m_listener;
std::shared_ptr<payload_parse_fn_t> m_payload_parse_fn;
payload_parse_fn_t m_payload_parse_fn;
std::atomic<bool> m_is_running;
};

Expand All @@ -130,7 +138,7 @@ class Listener : public std::enable_shared_from_this<Listener>
{
public:
Listener(boost::asio::io_context& io_context,
std::shared_ptr<morpheus::payload_parse_fn_t> payload_parse_fn,
morpheus::payload_parse_fn_t payload_parse_fn,
const std::string& bind_address,
unsigned short port,
const std::string& endpoint,
Expand All @@ -140,7 +148,7 @@ class Listener : public std::enable_shared_from_this<Listener>

~Listener() = default;

void run();
void start();
void stop();
bool is_running() const;

Expand All @@ -152,7 +160,7 @@ class Listener : public std::enable_shared_from_this<Listener>
boost::asio::ip::tcp::endpoint m_tcp_endpoint;
std::unique_ptr<boost::asio::ip::tcp::acceptor> m_acceptor;

std::shared_ptr<morpheus::payload_parse_fn_t> m_payload_parse_fn;
morpheus::payload_parse_fn_t m_payload_parse_fn;
const std::string& m_url_endpoint;
boost::beast::http::verb m_method;
std::size_t m_max_payload_size;
Expand All @@ -177,6 +185,7 @@ struct HttpServerInterfaceProxy
static void start(HttpServer& self);
static void stop(HttpServer& self);
static bool is_running(const HttpServer& self);
static size_t run_one(HttpServer& self);

// Context manager methods
static HttpServer& enter(HttpServer& self);
Expand Down
Loading
Loading