From 7f3a9eb4c88a8ff4eec9ac7e28c1842b29437a36 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 29 Oct 2024 15:06:39 -0700 Subject: [PATCH] Create copies rather than slices, remove unused make_output_message method --- .../morpheus/_lib/src/stages/deserialize.cpp | 26 +++++-------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/python/morpheus/morpheus/_lib/src/stages/deserialize.cpp b/python/morpheus/morpheus/_lib/src/stages/deserialize.cpp index a4d7462830..22f1cb7b30 100644 --- a/python/morpheus/morpheus/_lib/src/stages/deserialize.cpp +++ b/python/morpheus/morpheus/_lib/src/stages/deserialize.cpp @@ -18,7 +18,9 @@ #include "morpheus/stages/deserialize.hpp" #include "morpheus/messages/control.hpp" // for ControlMessage +#include "morpheus/messages/meta.hpp" // for MessageMeta, SlicedMessageMeta #include "morpheus/types.hpp" // for TensorIndex +#include "morpheus/utilities/cudf_util.hpp" // for CudfHelper #include "morpheus/utilities/json_types.hpp" // for PythonByteContainer #include "morpheus/utilities/python_util.hpp" // for show_warning_message #include "morpheus/utilities/string_util.hpp" // for MORPHEUS_CONCAT_STR @@ -36,23 +38,6 @@ namespace morpheus { -void make_output_message(std::shared_ptr& incoming_message, - TensorIndex start, - TensorIndex stop, - control_message_task_t* task, - std::shared_ptr& windowed_message) -{ - auto sliced_meta = std::make_shared(incoming_message, start, stop); - auto message = std::make_shared(); - message->payload(sliced_meta); - if (task) - { - message->add_task(task->first, task->second); - } - - windowed_message.swap(message); -} - DeserializeStage::subscribe_fn_t DeserializeStage::build_operator() { return [this](rxcpp::observable input, rxcpp::subscriber output) { @@ -89,9 +74,12 @@ DeserializeStage::subscribe_fn_t DeserializeStage::build_operator() { std::shared_ptr windowed_message = std::make_shared(); - auto sliced_meta = std::make_shared( + auto sliced_meta = SlicedMessageMeta( incoming_message, i, std::min(i + this->m_batch_size, incoming_message->count())); - windowed_message->payload(sliced_meta); + auto sliced_info = sliced_meta.get_info(); + + auto new_meta = MessageMeta::create_from_python(CudfHelper::table_from_table_info(sliced_info)); + windowed_message->payload(new_meta); auto task = m_task.get(); if (task)