Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dagardner-nv committed Aug 13, 2024
1 parent e8d97f2 commit 34b625c
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 54 deletions.
36 changes: 36 additions & 0 deletions morpheus/io/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@
# limitations under the License.
"""IO utilities."""

import functools
import logging
import types
import typing

import pandas as pd

if typing.TYPE_CHECKING:
import cudf

from morpheus.config import Config
from morpheus.config import ExecutionMode
from morpheus.utils.type_aliases import DataFrameType
from morpheus.utils.type_aliases import SeriesType

Expand Down Expand Up @@ -133,3 +137,35 @@ def truncate_string_cols_by_bytes(df: DataFrameType,
df[col] = decoded_series

return performed_truncation


def get_df_pkg(config: Config) -> types.ModuleType:
"""
Return the appropriate DataFrame package based on the execution mode.
"""
if config.execution_mode is ExecutionMode.GPU:
import cudf
return cudf
else:
return pd


def get_df_class(config: Config) -> type[DataFrameType]:
"""
Return the appropriate DataFrame class based on the execution mode.
"""
df_pkg = get_df_pkg(config)
return df_pkg.DataFrame


def get_json_reader(config: Config) -> typing.Callable[..., DataFrameType]:
"""
Return the appropriate JSON reader based on the execution mode.
"""
if config.execution_mode is ExecutionMode.GPU:
import cudf
reader = functools.partial(cudf.read_json, engine='cudf')
else:
reader = pd.read_json

return reader
2 changes: 2 additions & 0 deletions morpheus/stages/input/file_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ def __init__(self,
self._repeat_count = repeat

if c.execution_mode is ExecutionMode.GPU:
print("GPU MODE Using cudf")
self._df_type = "cudf"
else:
print("CPU MODE Using pandas")
self._df_type = "pandas"

@property
Expand Down
13 changes: 1 addition & 12 deletions morpheus/stages/input/http_client_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,7 @@ def __init__(self,
self._lines = lines
self._requst_kwargs = request_kwargs

if payload_to_df_fn is None:
if config.execution_mode == ExecutionMode.GPU:
import cudf
self._payload_to_df_fn = cudf.read_json
else:
import pandas
self._payload_to_df_fn = pandas.read_json
else:
self._payload_to_df_fn = payload_to_df_fn
self._payload_to_df_fn = payload_to_df_fn or self._get_default_payload_to_df_fn(config)

@property
def name(self) -> str:
Expand All @@ -163,9 +155,6 @@ def supports_cpp_node(self) -> bool:
"""Indicates whether or not this stage supports a C++ implementation"""
return False

def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(MessageMeta)

def _parse_response(self, response: requests.Response) -> typing.Union[DataFrameType, None]:
"""
Returns a DataFrame parsed from the response payload. If the response payload is empty, then `None` is returned.
Expand Down
33 changes: 14 additions & 19 deletions morpheus/stages/input/http_server_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,25 @@

import mrc

import cudf

from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.config import ExecutionMode
from morpheus.messages import MessageMeta
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stage_schema import StageSchema
from morpheus.stages.input.http_source_stage_base import HttpSourceStageBase
from morpheus.utils.http_utils import HTTPMethod
from morpheus.utils.http_utils import HttpParseResponse
from morpheus.utils.http_utils import MimeTypes
from morpheus.utils.producer_consumer_queue import Closed
from morpheus.utils.type_aliases import DataFrameType

logger = logging.getLogger(__name__)

SUPPORTED_METHODS = (HTTPMethod.POST, HTTPMethod.PUT)
HEALTH_SUPPORTED_METHODS = (HTTPMethod.GET, HTTPMethod.POST)


@register_stage("from-http")
class HttpServerSourceStage(PreallocatorMixin, SingleOutputSource):
@register_stage("from-http", execution_modes=(ExecutionMode.CPU, ExecutionMode.GPU))
class HttpServerSourceStage(HttpSourceStageBase):
"""
Source stage that starts an HTTP server and listens for incoming requests on a specified endpoint.
Expand Down Expand Up @@ -80,7 +78,7 @@ class HttpServerSourceStage(PreallocatorMixin, SingleOutputSource):
Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0`
payload_to_df_fn : callable, default None
A callable that takes the HTTP payload string as the first argument and the `lines` parameter is passed in as
the second argument and returns a cudf.DataFrame. When supplied, the C++ implementation of this stage is
the second argument and returns a DataFrame. When supplied, the C++ implementation of this stage is
disabled, and the Python impl is used.
"""

Expand All @@ -103,7 +101,7 @@ def __init__(self,
request_timeout_secs: int = 30,
lines: bool = False,
stop_after: int = 0,
payload_to_df_fn: typing.Callable[[str, bool], cudf.DataFrame] = None):
payload_to_df_fn: typing.Callable[[str, bool], DataFrameType] = None):
super().__init__(config)
self._bind_address = bind_address
self._port = port
Expand All @@ -122,9 +120,11 @@ def __init__(self,
self._request_timeout_secs = request_timeout_secs
self._lines = lines
self._stop_after = stop_after
self._payload_to_df_fn = payload_to_df_fn
self._http_server = None

# Leave this as None so we can check if it's set later
self._payload_to_df_fn = payload_to_df_fn

# These are only used when C++ mode is disabled
self._queue = None
self._queue_size = 0
Expand All @@ -146,17 +146,9 @@ def supports_cpp_node(self) -> bool:
"""Indicates whether this stage supports C++ nodes."""
return True

def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(MessageMeta)

def _parse_payload(self, payload: str) -> HttpParseResponse:
try:
if self._payload_to_df_fn is not None:
df = self._payload_to_df_fn(payload, self._lines)
else:
# engine='cudf' is needed when lines=False to avoid using pandas
df = cudf.read_json(payload, lines=self._lines, engine='cudf')

df = self._payload_to_df_fn(payload, self._lines)
except Exception as e:
err_msg = "Error occurred converting HTTP payload to Dataframe"
logger.error("%s: %s", err_msg, e)
Expand Down Expand Up @@ -277,6 +269,9 @@ def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject:
lines=self._lines,
stop_after=self._stop_after)
else:
if self._payload_to_df_fn is None:
self._payload_to_df_fn = self._get_default_payload_to_df_fn(self._config)

node = builder.make_source(self.unique_name, self._generate_frames())

return node
Expand Down
35 changes: 35 additions & 0 deletions morpheus/stages/input/http_source_stage_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base class for HTTP sources."""

import typing

from morpheus.config import Config
from morpheus.io.utils import get_json_reader
from morpheus.messages import MessageMeta
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stage_schema import StageSchema
from morpheus.utils.type_aliases import DataFrameType


class HttpSourceStageBase(PreallocatorMixin, SingleOutputSource):

def _get_default_payload_to_df_fn(self, config: Config) -> typing.Callable[[str, bool], DataFrameType]:
reader = get_json_reader(config)

return lambda payload, lines: reader(payload, lines=lines)

def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(MessageMeta)
10 changes: 7 additions & 3 deletions morpheus/stages/input/kafka_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@
import mrc
import pandas as pd

import cudf

from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.config import auto_determine_bootstrap
from morpheus.io.utils import get_json_reader
from morpheus.messages import MessageMeta
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stage_schema import StageSchema
from morpheus.utils.type_aliases import DataFrameType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -130,6 +130,9 @@ def __init__(self,
self._poll_interval = pd.Timedelta(poll_interval).total_seconds()
self._started = False

# Defined lated if in CPU mode
self._json_reader: typing.Callable[..., DataFrameType] = None

self._records_emitted = 0
self._num_messages = 0

Expand Down Expand Up @@ -167,7 +170,7 @@ def _process_batch(self, consumer, batch):
df = None
try:
buffer.seek(0)
df = cudf.io.read_json(buffer, engine='cudf', lines=True, orient='records')
df = self._json_reader(buffer, lines=True, orient='records')
except Exception as e:
logger.error("Error parsing payload into a dataframe : %s", e)
finally:
Expand Down Expand Up @@ -254,6 +257,7 @@ def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject:
# multiple threads
source.launch_options.pe_count = self._max_concurrent
else:
self._json_reader = get_json_reader(self._config)
source = builder.make_source(self.unique_name, self._source_generator)

return source
7 changes: 3 additions & 4 deletions morpheus/stages/output/compare_dataframe_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

import pandas as pd

import cudf

from morpheus.config import Config
from morpheus.io.deserializers import read_file_to_df
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
Expand Down Expand Up @@ -74,15 +72,16 @@ def __init__(self,

if isinstance(compare_df, str):
compare_df = read_file_to_df(compare_df, df_type='pandas')
elif isinstance(compare_df, cudf.DataFrame):
compare_df = compare_df.to_pandas()
elif isinstance(compare_df, list):
tmp_dfs = []
for item in compare_df:
tmp_df = read_file_to_df(item, df_type='pandas')
tmp_dfs.append(tmp_df)
compare_df = pd.concat(tmp_dfs)
compare_df.reset_index(inplace=True, drop=True)
elif not isinstance(compare_df, pd.DataFrame):
# assume it is a cudf DataFrame
compare_df = compare_df.to_pandas()

self._compare_df = compare_df

Expand Down
16 changes: 10 additions & 6 deletions morpheus/stages/output/http_server_sink_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import pandas as pd
from mrc.core import operators as ops

import cudf

from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.config import ExecutionMode
from morpheus.io import serializers
from morpheus.io.utils import get_df_pkg
from morpheus.messages import MessageMeta
from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin
from morpheus.pipeline.single_port_stage import SinglePortStage
Expand All @@ -41,7 +41,9 @@
logger = logging.getLogger(__name__)


@register_stage("to-http-server", ignore_args=["df_serializer_fn"])
@register_stage("to-http-server",
execution_modes=(ExecutionMode.CPU, ExecutionMode.GPU),
ignore_args=["df_serializer_fn"])
class HttpServerSinkStage(PassThruTypeMixin, SinglePortStage):
"""
Sink stage that starts an HTTP server and listens for incoming requests on a specified endpoint.
Expand Down Expand Up @@ -116,6 +118,8 @@ def __init__(self,

self._df_serializer_fn = df_serializer_fn or self._default_df_serializer

self._df_pkg = get_df_pkg(config)

# FiberQueue doesn't have a way to check the size, nor does it have a way to check if it's empty without
# attempting to perform a read. We'll keep track of the size ourselves.
self._queue = queue.Queue(maxsize=max_queue_size or config.edge_buffer_size)
Expand Down Expand Up @@ -201,10 +205,10 @@ def _request_handler(self, _: str) -> HttpParseResponse:
body=err_msg)

if (len(data_frames) > 0):
df = data_frames[0]
if len(data_frames) > 1:
cat_fn = pd.concat if isinstance(df, pd.DataFrame) else cudf.concat
df = cat_fn(data_frames)
df = self._df_pkg.concat(data_frames)
else:
df = data_frames[0]

return HttpParseResponse(status_code=HTTPStatus.OK.value,
content_type=self._content_type,
Expand Down
5 changes: 2 additions & 3 deletions morpheus/stages/output/write_to_databricks_deltalake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import pandas as pd
from mrc.core import operators as ops

import cudf

from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.messages import MessageMeta
Expand Down Expand Up @@ -97,8 +95,9 @@ def write_to_deltalake(meta: MessageMeta):
convert cudf to spark dataframe
"""
df = meta.copy_dataframe()
if isinstance(df, cudf.DataFrame):
if not isinstance(df, pd.DataFrame):
df = df.to_pandas()

schema = self._extract_schema_from_pandas_dataframe(df)
spark_df = self.spark.createDataFrame(df, schema=schema)
spark_df.write \
Expand Down
5 changes: 2 additions & 3 deletions morpheus/stages/output/write_to_elasticsearch_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@

import mrc
import mrc.core.operators as ops
import pandas as pd
import yaml

import cudf

from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.controllers.elasticsearch_controller import ElasticsearchController
Expand Down Expand Up @@ -110,7 +109,7 @@ def on_data(meta: MessageMeta) -> MessageMeta:
self._controller.refresh_client()

df = meta.copy_dataframe()
if isinstance(df, cudf.DataFrame):
if not isinstance(df, pd.DataFrame):
df = df.to_pandas()
logger.debug("Converted cudf of size: %s to pandas dataframe.", len(df))

Expand Down
Loading

0 comments on commit 34b625c

Please sign in to comment.