Skip to content

Commit

Permalink
Initial commit for monitor stage
Browse files Browse the repository at this point in the history
  • Loading branch information
yczhang-nv committed Sep 24, 2024
1 parent 201aa0b commit a43c063
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 1 deletion.
2 changes: 2 additions & 0 deletions python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ add_library(morpheus
src/stages/http_server_source_stage.cpp
src/stages/inference_client_stage.cpp
src/stages/kafka_source.cpp
src/stages/monitor.cpp
src/stages/preprocess_fil.cpp
src/stages/preprocess_nlp.cpp
src/stages/serialize.cpp
Expand All @@ -79,6 +80,7 @@ add_library(${PROJECT_NAME}::morpheus ALIAS morpheus)

target_link_libraries(morpheus
PRIVATE
indicators::indicators
matx::matx
$<$<CONFIG:Debug>:ZLIB::ZLIB>
PUBLIC
Expand Down
61 changes: 61 additions & 0 deletions python/morpheus/morpheus/_lib/include/morpheus/stages/monitor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#pragma once

#include "morpheus/export.h" // for MORPHEUS_EXPORT

#include <indicators/progress_bar.hpp>
#include <mrc/segment/builder.hpp> // for Builder
#include <pymrc/node.hpp> // for PythonNode
#include <rxcpp/rx.hpp> // for trace_activity, decay_t, from

#include <string>

namespace morpheus {
/****** Component public implementations *******************/
/****** FilterDetectionStage********************************/

/**
* @addtogroup stages
* @{
* @file
*/

/**
* @brief
*/
template <typename InputT, typename OutputT>
class MORPHEUS_EXPORT MonitorStage : public mrc::pymrc::PythonNode<std::shared_ptr<InputT>, std::shared_ptr<OutputT>>
{
public:
using base_t = mrc::pymrc::PythonNode<std::shared_ptr<InputT>, std::shared_ptr<OutputT>>;
using typename base_t::sink_type_t;
using typename base_t::source_type_t;
using typename base_t::subscribe_fn_t;

/**
* @brief
*/
MonitorStage(const std::string& description, float smoothing, const std::string& unit, bool delayed_start);

private:
subscribe_fn_t build_operator();
const std::string& m_description;
float m_smoothing;
const std::string& m_unit;
bool m_delayed_start;
long long m_count;

indicators::ProgressBar m_progress_bar;
};

/****** MonitorStageInterfaceProxy******************/
/**
* @brief Interface proxy, used to insulate python bindings.
*/
struct MORPHEUS_EXPORT MonitorStageInterfaceProxy
{
template <typename InputT, typename OutputT>
static std::shared_ptr<mrc::segment::Object<MonitorStage<InputT, OutputT>>> init(mrc::segment::Builder& builder);
};

/** @} */ // end of group
} // namespace morpheus
59 changes: 59 additions & 0 deletions python/morpheus/morpheus/_lib/src/stages/monitor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#include "morpheus/stages/monitor.hpp"
#include "indicators/setting.hpp"

namespace morpheus {

// Component public implementations
// ****************** MonitorStage ************************ //
template <typename InputT, typename OutputT>
MonitorStage<InputT, OutputT>::MonitorStage(const std::string& description,
float smoothing,
const std::string& unit,
bool delayed_start) :
base_t(base_t::op_factory_from_sub_fn(build_operator())),
m_description(description),
m_smoothing(smoothing),
m_unit(unit),
m_delayed_start(delayed_start),
m_count(0)
{
m_progress_bar.set_option(indicators::option::BarWidth{50});
m_progress_bar.set_option(indicators::option::Start{"["});
m_progress_bar.set_option(indicators::option::Fill(""));
m_progress_bar.set_option(indicators::option::Lead(">"));
m_progress_bar.set_option(indicators::option::Remainder(" "));
m_progress_bar.set_option(indicators::option::End("]"));
m_progress_bar.set_option(indicators::option::PostfixText{m_description});
m_progress_bar.set_option(indicators::option::ForegroundColor{indicators::Color::yellow});
m_progress_bar.set_option(indicators::option::ShowElapsedTime{true});
// m_progress_bar.set_option(indicators::option::ShowRemainingTime{true});
}

template <typename InputT, typename OutputT>
MonitorStage<InputT, OutputT>::subscribe_fn_t MonitorStage<InputT, OutputT>::build_operator()
{
return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) {
return input.subscribe(rxcpp::make_observer<sink_type_t>(
[this, &output](sink_type_t msg) {
m_count++;
m_progress_bar.set_progress(m_count);
},
[&](std::exception_ptr error_ptr) {
output.on_error(error_ptr);
},
[&]() {
output.on_completed();
}));
};
}

// ************ MonitorStageInterfaceProxy ************* //
template <typename InputT, typename OutputT>
std::shared_ptr<mrc::segment::Object<MonitorStage<InputT, OutputT>>> MonitorStageInterfaceProxy::init(
mrc::segment::Builder& builder)
{
auto stage = builder.construct_object<MonitorStage<InputT, OutputT>>();

return stage;
}
} // namespace morpheus

0 comments on commit a43c063

Please sign in to comment.