Skip to content

Commit

Permalink
add monitor controller
Browse files Browse the repository at this point in the history
  • Loading branch information
yczhang-nv committed Sep 24, 2024
1 parent a43c063 commit 710b68a
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 46 deletions.
1 change: 1 addition & 0 deletions python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ add_library(morpheus
src/stages/inference_client_stage.cpp
src/stages/kafka_source.cpp
src/stages/monitor.cpp
src/stages/monitor_controller.cpp
src/stages/preprocess_fil.cpp
src/stages/preprocess_nlp.cpp
src/stages/serialize.cpp
Expand Down
31 changes: 14 additions & 17 deletions python/morpheus/morpheus/_lib/include/morpheus/stages/monitor.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "morpheus/export.h" // for MORPHEUS_EXPORT

#include "morpheus/stages/monitor_controller.hpp" // for MonitorController
#include <indicators/progress_bar.hpp>
#include <mrc/segment/builder.hpp> // for Builder
#include <pymrc/node.hpp> // for PythonNode
Expand All @@ -11,7 +11,7 @@

namespace morpheus {
/****** Component public implementations *******************/
/****** FilterDetectionStage********************************/
/****** MonitorStage********************************/

/**
* @addtogroup stages
Expand All @@ -22,29 +22,26 @@ namespace morpheus {
/**
* @brief
*/
template <typename InputT, typename OutputT>
class MORPHEUS_EXPORT MonitorStage : public mrc::pymrc::PythonNode<std::shared_ptr<InputT>, std::shared_ptr<OutputT>>
template <typename MessageT>
class MORPHEUS_EXPORT MonitorStage : public mrc::pymrc::PythonNode<std::shared_ptr<MessageT>, std::shared_ptr<MessageT>>
{
public:
using base_t = mrc::pymrc::PythonNode<std::shared_ptr<InputT>, std::shared_ptr<OutputT>>;
using base_t = mrc::pymrc::PythonNode<std::shared_ptr<MessageT>, std::shared_ptr<MessageT>>;
using typename base_t::sink_type_t;
using typename base_t::source_type_t;
using typename base_t::subscribe_fn_t;

/**
* @brief
*/
MonitorStage(const std::string& description, float smoothing, const std::string& unit, bool delayed_start);
MonitorStage(const std::string& description,
float smoothing,
const std::string& unit,
bool delayed_start,
std::optional<std::function<int(MessageT)>> determine_count_fn = std::nullopt);

private:
subscribe_fn_t build_operator();
const std::string& m_description;
float m_smoothing;
const std::string& m_unit;
bool m_delayed_start;
long long m_count;

indicators::ProgressBar m_progress_bar;
MonitorController<MessageT> m_monitor_controller;

};

/****** MonitorStageInterfaceProxy******************/
Expand All @@ -53,8 +50,8 @@ class MORPHEUS_EXPORT MonitorStage : public mrc::pymrc::PythonNode<std::shared_p
*/
struct MORPHEUS_EXPORT MonitorStageInterfaceProxy
{
template <typename InputT, typename OutputT>
static std::shared_ptr<mrc::segment::Object<MonitorStage<InputT, OutputT>>> init(mrc::segment::Builder& builder);
template <typename MessageT>
static std::shared_ptr<mrc::segment::Object<MonitorStage<MessageT>>> init(mrc::segment::Builder& builder);
};

/** @} */ // end of group
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once

#include <indicators/dynamic_progress.hpp>
#include <indicators/progress_bar.hpp>
#include <mrc/segment/builder.hpp> // for Builder
#include <pymrc/node.hpp> // for PythonNode
#include <rxcpp/rx.hpp> // for trace_activity, decay_t, from

#include <string>

namespace morpheus {
/****** Component public implementations *******************/
/******************* MonitorController**********************/

/**
* @addtogroup stages
* @{
* @file
*/

/**
* @brief
*/
template <typename MessageT>
class MonitorController
{
public:
MonitorController(const std::string& description,
float smoothing,
const std::string& unit,
bool delayed_start,
std::optional<std::function<int(MessageT)>> determin_count_fn = std::nullopt);

private:
MessageT progress_sink(MessageT msg);
auto auto_count_fn(MessageT msg) -> std::optional<std::function<int(MessageT)>>;
void sink_on_completed();

const std::string& m_description;
float m_smoothing;
const std::string& m_unit;
bool m_delayed_start;
unsigned long m_count;

indicators::ProgressBar m_progress_bar;

std::optional<std::function<int(MessageT)>> m_determine_count_fn;

static indicators::DynamicProgress<indicators::ProgressBar> m_progress_bars;
};

template <typename InputT>
indicators::DynamicProgress<indicators::ProgressBar> MonitorController<InputT>::m_progress_bars;

/** @} */ // end of group
} // namespace morpheus
44 changes: 15 additions & 29 deletions python/morpheus/morpheus/_lib/src/stages/monitor.cpp
Original file line number Diff line number Diff line change
@@ -1,58 +1,44 @@
#include "morpheus/stages/monitor.hpp"
#include "indicators/setting.hpp"

namespace morpheus {

// Component public implementations
// ****************** MonitorStage ************************ //
template <typename InputT, typename OutputT>
MonitorStage<InputT, OutputT>::MonitorStage(const std::string& description,
float smoothing,
const std::string& unit,
bool delayed_start) :
base_t(base_t::op_factory_from_sub_fn(build_operator())),
m_description(description),
m_smoothing(smoothing),
m_unit(unit),
m_delayed_start(delayed_start),
m_count(0)
template <typename MessageT>
MonitorStage<MessageT>::MonitorStage(const std::string& description,
float smoothing,
const std::string& unit,
bool delayed_start,
std::optional<std::function<int(MessageT)>> determine_count_fn) :
base_t(base_t::op_factory_from_sub_fn(build_operator()))
{
m_progress_bar.set_option(indicators::option::BarWidth{50});
m_progress_bar.set_option(indicators::option::Start{"["});
m_progress_bar.set_option(indicators::option::Fill(""));
m_progress_bar.set_option(indicators::option::Lead(">"));
m_progress_bar.set_option(indicators::option::Remainder(" "));
m_progress_bar.set_option(indicators::option::End("]"));
m_progress_bar.set_option(indicators::option::PostfixText{m_description});
m_progress_bar.set_option(indicators::option::ForegroundColor{indicators::Color::yellow});
m_progress_bar.set_option(indicators::option::ShowElapsedTime{true});
// m_progress_bar.set_option(indicators::option::ShowRemainingTime{true});
m_monitor_controller = MonitorController<MessageT>(description, smoothing, unit, delayed_start, determine_count_fn);
}

template <typename InputT, typename OutputT>
MonitorStage<InputT, OutputT>::subscribe_fn_t MonitorStage<InputT, OutputT>::build_operator()
template <typename MessageT>
MonitorStage<MessageT>::subscribe_fn_t MonitorStage<MessageT>::build_operator()
{
return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) {
return input.subscribe(rxcpp::make_observer<sink_type_t>(
[this, &output](sink_type_t msg) {
m_count++;
m_progress_bar.set_progress(m_count);
m_monitor_controller.progress_sink(msg);
},
[&](std::exception_ptr error_ptr) {
output.on_error(error_ptr);
},
[&]() {
m_monitor_controller.sink_on_completed();
output.on_completed();
}));
};
}

// ************ MonitorStageInterfaceProxy ************* //
template <typename InputT, typename OutputT>
std::shared_ptr<mrc::segment::Object<MonitorStage<InputT, OutputT>>> MonitorStageInterfaceProxy::init(
template <typename MessageT>
std::shared_ptr<mrc::segment::Object<MonitorStage<MessageT>>> MonitorStageInterfaceProxy::init(
mrc::segment::Builder& builder)
{
auto stage = builder.construct_object<MonitorStage<InputT, OutputT>>();
auto stage = builder.construct_object<MonitorStage<MessageT>>();

return stage;
}
Expand Down
115 changes: 115 additions & 0 deletions python/morpheus/morpheus/_lib/src/stages/monitor_controller.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#include "morpheus/stages/monitor_controller.hpp"

#include "indicators/setting.hpp"

#include "morpheus/messages/control.hpp"
#include "morpheus/messages/meta.hpp"

#include <cudf/column/column.hpp>
#include <cudf/column/column_factories.hpp>
#include <cudf/table/table.hpp>

#include <optional>
#include <type_traits>
#include <vector>

namespace morpheus {

// Component public implementations
// ****************** MonitorController ************************ //

template <typename MessageT>
MonitorController<MessageT>::MonitorController(const std::string& description,
float smoothing,
const std::string& unit,
bool delayed_start,
std::optional<std::function<int(MessageT)>> determin_count_fn) :
m_description(description),
m_smoothing(smoothing),
m_unit(unit),
m_delayed_start(delayed_start),
m_count(0)
{
m_progress_bar.set_option(indicators::option::BarWidth{50});
m_progress_bar.set_option(indicators::option::Start{"["});
m_progress_bar.set_option(indicators::option::Fill(""));
m_progress_bar.set_option(indicators::option::Lead(">"));
m_progress_bar.set_option(indicators::option::Remainder(" "));
m_progress_bar.set_option(indicators::option::End("]"));
m_progress_bar.set_option(indicators::option::PostfixText{m_description});
m_progress_bar.set_option(indicators::option::ForegroundColor{indicators::Color::yellow});
m_progress_bar.set_option(indicators::option::ShowElapsedTime{true});

MonitorController::m_progress_bars.push_back(m_progress_bar);
}

template <typename MessageT>
MessageT MonitorController<MessageT>::progress_sink(MessageT msg)
{
if (m_determine_count_fn == std::nullopt)
{
m_determine_count_fn = auto_count_fn(msg);
}

m_count += (*m_determine_count_fn)(msg);
m_progress_bar.set_progress(m_count);

return msg;
}

template <typename T>
struct is_vector : std::false_type
{};

template <typename T, typename U>
struct is_vector<std::vector<T, U>> : std::true_type
{};

template <typename MessageT>
auto MonitorController<MessageT>::auto_count_fn(MessageT msg) -> std::optional<std::function<int(MessageT)>>
{
if constexpr (std::is_same_v<MessageT, cudf::table>)
{
return [](MessageT msg) {
return msg.num_rows();
};
}

if constexpr (std::is_same_v<MessageT, MessageMeta>)
{
return [](MessageT msg) {
return msg.count();
};
}

if constexpr (std::is_same_v<MessageT, ControlMessage>)
{
return [](MessageT msg) {
if (!msg.payload())
{
return 0;
}
return msg.payload()->count();
};
}

if constexpr (is_vector<MessageT>::value)
{
return [](MessageT msg) {
return msg.size();
};
}

// Otherwise just count the number of received messages
return [](MessageT msg) {
return 1;
};
}

template <typename MessageT>
void MonitorController<MessageT>::sink_on_completed()
{
m_progress_bar.mark_as_completed();
}

} // namespace morpheus

0 comments on commit 710b68a

Please sign in to comment.