Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ControlMessage for Preprocess and PostProcess stages #1623

Merged
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ba0d717
partially supporting CM for PreprocessAEStage
yczhang-nv Apr 10, 2024
01fb2ab
partially supporting CM for GenerateVizFramesStage
yczhang-nv Apr 11, 2024
3debd23
partially supporting CM for GenerateVizFramesStage
yczhang-nv Apr 11, 2024
bb321e8
rollback
yczhang-nv Apr 11, 2024
8ebac82
add stubs and TODOs to filter_detections_stage
yczhang-nv Apr 11, 2024
6170ea3
rename and add partial implementation of filter_detections_stage
yczhang-nv Apr 11, 2024
edf7918
Support CM for ml_flow_drift_stage
yczhang-nv Apr 11, 2024
6394a5d
Partially support (still TODOs) CM for timeseries_stage
yczhang-nv Apr 11, 2024
ebdca56
Support CM for validation_stage
yczhang-nv Apr 11, 2024
c7994ed
Merge remote-tracking branch 'upstream/branch-24.06' into pre-post-pr…
yczhang-nv Apr 12, 2024
ecf76ba
Merge remote-tracking branch 'upstream/branch-24.06' into pre-post-pr…
yczhang-nv May 10, 2024
b6cf005
rollback conflict
yczhang-nv May 10, 2024
08bed32
finish some todos and compiled
yczhang-nv May 10, 2024
6d85be1
finish filter detections ut
yczhang-nv May 14, 2024
b168a3d
ut for ml_flow_drift_stage
yczhang-nv May 14, 2024
5320330
partially unit test for test_timeseries_stage
yczhang-nv May 14, 2024
6847bf4
add ut for generate_viz_frames_stage
yczhang-nv May 14, 2024
6e563b2
add test to validation stage
yczhang-nv May 15, 2024
040657f
add ut for preprocess_ae_stage
yczhang-nv May 15, 2024
d667017
fix ci
yczhang-nv May 15, 2024
93ae47f
on_data
yczhang-nv May 15, 2024
87ea514
bug fixed
yczhang-nv May 15, 2024
994dfeb
fix CI
yczhang-nv May 15, 2024
1313ff6
Merge branch 'branch-24.06' into pre-post-process-control-message
yczhang-nv May 15, 2024
a1590eb
remove prints
yczhang-nv May 15, 2024
a8af1df
fix filter_detections_stage bug
yczhang-nv May 16, 2024
ea73520
fix timeseries_stage test
yczhang-nv May 16, 2024
e3da6fa
fix template
yczhang-nv May 16, 2024
53c625d
fix ci
yczhang-nv May 16, 2024
6f5f173
Merge branch 'branch-24.06' into pre-post-process-control-message
yczhang-nv May 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion morpheus/_lib/cmake/libmorpheus.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ add_library(morpheus
src/stages/add_scores.cpp
src/stages/deserialize.cpp
src/stages/file_source.cpp
src/stages/filter_detection.cpp
src/stages/filter_detections.cpp
src/stages/http_server_source_stage.cpp
src/stages/inference_client_stage.cpp
src/stages/kafka_source.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@

#pragma once

#include "morpheus/export.h"
#include "morpheus/messages/multi.hpp"
#include "morpheus/objects/dev_mem_info.hpp" // for DevMemInfo
#include "morpheus/objects/filter_source.hpp"
#include "morpheus/export.h" // for MORPHEUS_EXPORT
#include "morpheus/messages/control.hpp" // for ControlMessage
#include "morpheus/messages/multi.hpp" // for MultiMessage
#include "morpheus/objects/dev_mem_info.hpp" // for DevMemInfo
#include "morpheus/objects/filter_source.hpp" // for FilterSource

#include <boost/fiber/context.hpp>
#include <mrc/segment/builder.hpp>
#include <mrc/segment/object.hpp>
#include <pymrc/node.hpp>
#include <rxcpp/rx.hpp>
#include <cuda_runtime.h> // for cudaMemcpy
#include <mrc/segment/builder.hpp> // for Builder
#include <mrc/segment/object.hpp> // for Object
#include <pymrc/node.hpp> // for PythonNode
#include <rxcpp/rx.hpp> // for observable_member, trace_activity, map, decay_t, from

#include <cstddef> // for size_t
#include <map>
#include <memory>
#include <string>
#include <thread>
#include <map> // for map
#include <memory> // for allocator, shared_ptr
#include <string> // for string

namespace morpheus {
/****** Component public implementations *******************/
Expand Down Expand Up @@ -68,11 +68,12 @@ namespace morpheus {
* Depending on the downstream stages, this can cause performance issues, especially if those stages need to acquire
* the Python GIL.
*/
template <typename InputT, typename OutputT>
yczhang-nv marked this conversation as resolved.
Show resolved Hide resolved
class MORPHEUS_EXPORT FilterDetectionsStage
: public mrc::pymrc::PythonNode<std::shared_ptr<MultiMessage>, std::shared_ptr<MultiMessage>>
: public mrc::pymrc::PythonNode<std::shared_ptr<InputT>, std::shared_ptr<OutputT>>
{
public:
using base_t = mrc::pymrc::PythonNode<std::shared_ptr<MultiMessage>, std::shared_ptr<MultiMessage>>;
using base_t = mrc::pymrc::PythonNode<std::shared_ptr<InputT>, std::shared_ptr<OutputT>>;
using typename base_t::sink_type_t;
using typename base_t::source_type_t;
using typename base_t::subscribe_fn_t;
Expand All @@ -88,10 +89,15 @@ class MORPHEUS_EXPORT FilterDetectionsStage
*/
FilterDetectionsStage(float threshold, bool copy, FilterSource filter_source, std::string field_name = "probs");

// /**
// * Called every time a message is passed to this stage
// */
// source_type_t on_data(sink_type_t x);

yczhang-nv marked this conversation as resolved.
Show resolved Hide resolved
private:
subscribe_fn_t build_operator();
DevMemInfo get_tensor_filter_source(const std::shared_ptr<morpheus::MultiMessage>& x);
DevMemInfo get_column_filter_source(const std::shared_ptr<morpheus::MultiMessage>& x);
DevMemInfo get_tensor_filter_source(const sink_type_t& x);
DevMemInfo get_column_filter_source(const sink_type_t& x);

float m_threshold;
bool m_copy;
Expand All @@ -101,14 +107,20 @@ class MORPHEUS_EXPORT FilterDetectionsStage
std::map<std::size_t, std::string> m_idx2label;
};

using FilterDetectionsStageMM = // NOLINT(readability-identifier-naming)
FilterDetectionsStage<MultiMessage, MultiMessage>;
using FilterDetectionsStageCM = // NOLINT(readability-identifier-naming)
FilterDetectionsStage<ControlMessage, ControlMessage>;

/****** FilterDetectionStageInterfaceProxy******************/
/**
* @brief Interface proxy, used to insulate python bindings.
*/
struct MORPHEUS_EXPORT FilterDetectionStageInterfaceProxy
{
/**
* @brief Create and initialize a FilterDetectionStage, and return the result
* @brief Create and initialize a FilterDetectionStage that receives MultiMessage and emits MultiMessage, and return
* the result
*
* @param builder : Pipeline context object reference
* @param name : Name of a stage reference
Expand All @@ -117,14 +129,34 @@ struct MORPHEUS_EXPORT FilterDetectionStageInterfaceProxy
* @param filter_source : Indicate if the values used for filtering exist in either an output tensor
* (`FilterSource::TENSOR`) or a column in a Dataframe (`FilterSource::DATAFRAME`).
* @param field_name : Name of the tensor or Dataframe column to filter on default="probs"
* @return std::shared_ptr<mrc::segment::Object<FilterDetectionsStage>>
* @return std::shared_ptr<mrc::segment::Object<FilterDetectionsStage<MultiMessage, MultiMessage>>>
*/
static std::shared_ptr<mrc::segment::Object<FilterDetectionsStage>> init(mrc::segment::Builder& builder,
const std::string& name,
float threshold,
bool copy,
FilterSource filter_source,
std::string field_name);
static std::shared_ptr<mrc::segment::Object<FilterDetectionsStageMM>> init_mm(mrc::segment::Builder& builder,
const std::string& name,
float threshold,
bool copy,
FilterSource filter_source,
std::string field_name);
/**
* @brief Create and initialize a FilterDetectionStage that receives ControlMessage and emits ControlMessage, and
* return the result
*
* @param builder : Pipeline context object reference
* @param name : Name of a stage reference
* @param threshold : Threshold to classify
* @param copy : Whether or not to perform a copy default=true
* @param filter_source : Indicate if the values used for filtering exist in either an output tensor
* (`FilterSource::TENSOR`) or a column in a Dataframe (`FilterSource::DATAFRAME`).
* @param field_name : Name of the tensor or Dataframe column to filter on default="probs"
* @return std::shared_ptr<mrc::segment::Object<FilterDetectionsStage<ControlMessage, ControlMessage>>>
*/
static std::shared_ptr<mrc::segment::Object<FilterDetectionsStageCM>> init_cm(mrc::segment::Builder& builder,
const std::string& name,
float threshold,
bool copy,
FilterSource filter_source,
std::string field_name);
};

/** @} */ // end of group
} // namespace morpheus
217 changes: 0 additions & 217 deletions morpheus/_lib/src/stages/filter_detection.cpp

This file was deleted.

Loading
Loading