From 710b68a03ce35a937c5c8c82d7e91f70a757a679 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang <134643420+yczhang-nv@users.noreply.github.com> Date: Tue, 24 Sep 2024 16:48:45 -0700 Subject: [PATCH] add monitor controller --- .../morpheus/_lib/cmake/libmorpheus.cmake | 1 + .../_lib/include/morpheus/stages/monitor.hpp | 31 +++-- .../morpheus/stages/monitor_controller.hpp | 56 +++++++++ .../morpheus/_lib/src/stages/monitor.cpp | 44 +++---- .../_lib/src/stages/monitor_controller.cpp | 115 ++++++++++++++++++ 5 files changed, 201 insertions(+), 46 deletions(-) create mode 100644 python/morpheus/morpheus/_lib/include/morpheus/stages/monitor_controller.hpp create mode 100644 python/morpheus/morpheus/_lib/src/stages/monitor_controller.cpp diff --git a/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake b/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake index bcbea43d71..caf92ce65c 100644 --- a/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake +++ b/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake @@ -60,6 +60,7 @@ add_library(morpheus src/stages/inference_client_stage.cpp src/stages/kafka_source.cpp src/stages/monitor.cpp + src/stages/monitor_controller.cpp src/stages/preprocess_fil.cpp src/stages/preprocess_nlp.cpp src/stages/serialize.cpp diff --git a/python/morpheus/morpheus/_lib/include/morpheus/stages/monitor.hpp b/python/morpheus/morpheus/_lib/include/morpheus/stages/monitor.hpp index b20fe43c27..bd1655e2c4 100644 --- a/python/morpheus/morpheus/_lib/include/morpheus/stages/monitor.hpp +++ b/python/morpheus/morpheus/_lib/include/morpheus/stages/monitor.hpp @@ -1,7 +1,7 @@ #pragma once #include "morpheus/export.h" // for MORPHEUS_EXPORT - +#include "morpheus/stages/monitor_controller.hpp" // for MonitorController #include #include // for Builder #include // for PythonNode @@ -11,7 +11,7 @@ namespace morpheus { /****** Component public implementations *******************/ -/****** FilterDetectionStage********************************/ +/****** MonitorStage********************************/ /** * @addtogroup stages @@ -22,29 +22,26 @@ namespace morpheus { /** * @brief */ -template -class MORPHEUS_EXPORT MonitorStage : public mrc::pymrc::PythonNode, std::shared_ptr> +template +class MORPHEUS_EXPORT MonitorStage : public mrc::pymrc::PythonNode, std::shared_ptr> { public: - using base_t = mrc::pymrc::PythonNode, std::shared_ptr>; + using base_t = mrc::pymrc::PythonNode, std::shared_ptr>; using typename base_t::sink_type_t; using typename base_t::source_type_t; using typename base_t::subscribe_fn_t; - /** - * @brief - */ - MonitorStage(const std::string& description, float smoothing, const std::string& unit, bool delayed_start); + MonitorStage(const std::string& description, + float smoothing, + const std::string& unit, + bool delayed_start, + std::optional> determine_count_fn = std::nullopt); private: subscribe_fn_t build_operator(); - const std::string& m_description; - float m_smoothing; - const std::string& m_unit; - bool m_delayed_start; - long long m_count; - indicators::ProgressBar m_progress_bar; + MonitorController m_monitor_controller; + }; /****** MonitorStageInterfaceProxy******************/ @@ -53,8 +50,8 @@ class MORPHEUS_EXPORT MonitorStage : public mrc::pymrc::PythonNode - static std::shared_ptr>> init(mrc::segment::Builder& builder); + template + static std::shared_ptr>> init(mrc::segment::Builder& builder); }; /** @} */ // end of group diff --git a/python/morpheus/morpheus/_lib/include/morpheus/stages/monitor_controller.hpp b/python/morpheus/morpheus/_lib/include/morpheus/stages/monitor_controller.hpp new file mode 100644 index 0000000000..3bbeaca498 --- /dev/null +++ b/python/morpheus/morpheus/_lib/include/morpheus/stages/monitor_controller.hpp @@ -0,0 +1,56 @@ +#pragma once + +#include +#include +#include // for Builder +#include // for PythonNode +#include // for trace_activity, decay_t, from + +#include + +namespace morpheus { +/****** Component public implementations *******************/ +/******************* MonitorController**********************/ + +/** + * @addtogroup stages + * @{ + * @file + */ + +/** + * @brief + */ +template +class MonitorController +{ + public: + MonitorController(const std::string& description, + float smoothing, + const std::string& unit, + bool delayed_start, + std::optional> determin_count_fn = std::nullopt); + + private: + MessageT progress_sink(MessageT msg); + auto auto_count_fn(MessageT msg) -> std::optional>; + void sink_on_completed(); + + const std::string& m_description; + float m_smoothing; + const std::string& m_unit; + bool m_delayed_start; + unsigned long m_count; + + indicators::ProgressBar m_progress_bar; + + std::optional> m_determine_count_fn; + + static indicators::DynamicProgress m_progress_bars; +}; + +template +indicators::DynamicProgress MonitorController::m_progress_bars; + +/** @} */ // end of group +} // namespace morpheus diff --git a/python/morpheus/morpheus/_lib/src/stages/monitor.cpp b/python/morpheus/morpheus/_lib/src/stages/monitor.cpp index 8e9cf5330f..a8fc84a286 100644 --- a/python/morpheus/morpheus/_lib/src/stages/monitor.cpp +++ b/python/morpheus/morpheus/_lib/src/stages/monitor.cpp @@ -1,58 +1,44 @@ #include "morpheus/stages/monitor.hpp" -#include "indicators/setting.hpp" namespace morpheus { // Component public implementations // ****************** MonitorStage ************************ // -template -MonitorStage::MonitorStage(const std::string& description, - float smoothing, - const std::string& unit, - bool delayed_start) : - base_t(base_t::op_factory_from_sub_fn(build_operator())), - m_description(description), - m_smoothing(smoothing), - m_unit(unit), - m_delayed_start(delayed_start), - m_count(0) +template +MonitorStage::MonitorStage(const std::string& description, + float smoothing, + const std::string& unit, + bool delayed_start, + std::optional> determine_count_fn) : + base_t(base_t::op_factory_from_sub_fn(build_operator())) { - m_progress_bar.set_option(indicators::option::BarWidth{50}); - m_progress_bar.set_option(indicators::option::Start{"["}); - m_progress_bar.set_option(indicators::option::Fill("■")); - m_progress_bar.set_option(indicators::option::Lead(">")); - m_progress_bar.set_option(indicators::option::Remainder(" ")); - m_progress_bar.set_option(indicators::option::End("]")); - m_progress_bar.set_option(indicators::option::PostfixText{m_description}); - m_progress_bar.set_option(indicators::option::ForegroundColor{indicators::Color::yellow}); - m_progress_bar.set_option(indicators::option::ShowElapsedTime{true}); - // m_progress_bar.set_option(indicators::option::ShowRemainingTime{true}); + m_monitor_controller = MonitorController(description, smoothing, unit, delayed_start, determine_count_fn); } -template -MonitorStage::subscribe_fn_t MonitorStage::build_operator() +template +MonitorStage::subscribe_fn_t MonitorStage::build_operator() { return [this](rxcpp::observable input, rxcpp::subscriber output) { return input.subscribe(rxcpp::make_observer( [this, &output](sink_type_t msg) { - m_count++; - m_progress_bar.set_progress(m_count); + m_monitor_controller.progress_sink(msg); }, [&](std::exception_ptr error_ptr) { output.on_error(error_ptr); }, [&]() { + m_monitor_controller.sink_on_completed(); output.on_completed(); })); }; } // ************ MonitorStageInterfaceProxy ************* // -template -std::shared_ptr>> MonitorStageInterfaceProxy::init( +template +std::shared_ptr>> MonitorStageInterfaceProxy::init( mrc::segment::Builder& builder) { - auto stage = builder.construct_object>(); + auto stage = builder.construct_object>(); return stage; } diff --git a/python/morpheus/morpheus/_lib/src/stages/monitor_controller.cpp b/python/morpheus/morpheus/_lib/src/stages/monitor_controller.cpp new file mode 100644 index 0000000000..5f8d0a1808 --- /dev/null +++ b/python/morpheus/morpheus/_lib/src/stages/monitor_controller.cpp @@ -0,0 +1,115 @@ +#include "morpheus/stages/monitor_controller.hpp" + +#include "indicators/setting.hpp" + +#include "morpheus/messages/control.hpp" +#include "morpheus/messages/meta.hpp" + +#include +#include +#include + +#include +#include +#include + +namespace morpheus { + +// Component public implementations +// ****************** MonitorController ************************ // + +template +MonitorController::MonitorController(const std::string& description, + float smoothing, + const std::string& unit, + bool delayed_start, + std::optional> determin_count_fn) : + m_description(description), + m_smoothing(smoothing), + m_unit(unit), + m_delayed_start(delayed_start), + m_count(0) +{ + m_progress_bar.set_option(indicators::option::BarWidth{50}); + m_progress_bar.set_option(indicators::option::Start{"["}); + m_progress_bar.set_option(indicators::option::Fill("■")); + m_progress_bar.set_option(indicators::option::Lead(">")); + m_progress_bar.set_option(indicators::option::Remainder(" ")); + m_progress_bar.set_option(indicators::option::End("]")); + m_progress_bar.set_option(indicators::option::PostfixText{m_description}); + m_progress_bar.set_option(indicators::option::ForegroundColor{indicators::Color::yellow}); + m_progress_bar.set_option(indicators::option::ShowElapsedTime{true}); + + MonitorController::m_progress_bars.push_back(m_progress_bar); +} + +template +MessageT MonitorController::progress_sink(MessageT msg) +{ + if (m_determine_count_fn == std::nullopt) + { + m_determine_count_fn = auto_count_fn(msg); + } + + m_count += (*m_determine_count_fn)(msg); + m_progress_bar.set_progress(m_count); + + return msg; +} + +template +struct is_vector : std::false_type +{}; + +template +struct is_vector> : std::true_type +{}; + +template +auto MonitorController::auto_count_fn(MessageT msg) -> std::optional> +{ + if constexpr (std::is_same_v) + { + return [](MessageT msg) { + return msg.num_rows(); + }; + } + + if constexpr (std::is_same_v) + { + return [](MessageT msg) { + return msg.count(); + }; + } + + if constexpr (std::is_same_v) + { + return [](MessageT msg) { + if (!msg.payload()) + { + return 0; + } + return msg.payload()->count(); + }; + } + + if constexpr (is_vector::value) + { + return [](MessageT msg) { + return msg.size(); + }; + } + + // Otherwise just count the number of received messages + return [](MessageT msg) { + return 1; + }; +} + +template +void MonitorController::sink_on_completed() +{ + m_progress_bar.mark_as_completed(); +} + +} // namespace morpheus