Skip to content

Commit

Permalink
Create copies rather than slices, remove unused make_output_message m…
Browse files Browse the repository at this point in the history
…ethod
  • Loading branch information
dagardner-nv committed Oct 29, 2024
1 parent b352f0a commit 7f3a9eb
Showing 1 changed file with 7 additions and 19 deletions.
26 changes: 7 additions & 19 deletions python/morpheus/morpheus/_lib/src/stages/deserialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
#include "morpheus/stages/deserialize.hpp"

#include "morpheus/messages/control.hpp" // for ControlMessage
#include "morpheus/messages/meta.hpp" // for MessageMeta, SlicedMessageMeta
#include "morpheus/types.hpp" // for TensorIndex
#include "morpheus/utilities/cudf_util.hpp" // for CudfHelper
#include "morpheus/utilities/json_types.hpp" // for PythonByteContainer
#include "morpheus/utilities/python_util.hpp" // for show_warning_message
#include "morpheus/utilities/string_util.hpp" // for MORPHEUS_CONCAT_STR
Expand All @@ -36,23 +38,6 @@

namespace morpheus {

void make_output_message(std::shared_ptr<MessageMeta>& incoming_message,
TensorIndex start,
TensorIndex stop,
control_message_task_t* task,
std::shared_ptr<ControlMessage>& windowed_message)
{
auto sliced_meta = std::make_shared<SlicedMessageMeta>(incoming_message, start, stop);
auto message = std::make_shared<ControlMessage>();
message->payload(sliced_meta);
if (task)
{
message->add_task(task->first, task->second);
}

windowed_message.swap(message);
}

DeserializeStage::subscribe_fn_t DeserializeStage::build_operator()
{
return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) {
Expand Down Expand Up @@ -89,9 +74,12 @@ DeserializeStage::subscribe_fn_t DeserializeStage::build_operator()
{
std::shared_ptr<ControlMessage> windowed_message = std::make_shared<ControlMessage>();

auto sliced_meta = std::make_shared<SlicedMessageMeta>(
auto sliced_meta = SlicedMessageMeta(
incoming_message, i, std::min(i + this->m_batch_size, incoming_message->count()));
windowed_message->payload(sliced_meta);
auto sliced_info = sliced_meta.get_info();

auto new_meta = MessageMeta::create_from_python(CudfHelper::table_from_table_info(sliced_info));
windowed_message->payload(new_meta);

auto task = m_task.get();
if (task)
Expand Down

0 comments on commit 7f3a9eb

Please sign in to comment.