diff --git a/morpheus/io/utils.py b/morpheus/io/utils.py index 6aeabab3e8..ba6abcbfc5 100644 --- a/morpheus/io/utils.py +++ b/morpheus/io/utils.py @@ -14,7 +14,9 @@ # limitations under the License. """IO utilities.""" +import functools import logging +import types import typing import pandas as pd @@ -22,6 +24,8 @@ if typing.TYPE_CHECKING: import cudf +from morpheus.config import Config +from morpheus.config import ExecutionMode from morpheus.utils.type_aliases import DataFrameType from morpheus.utils.type_aliases import SeriesType @@ -133,3 +137,35 @@ def truncate_string_cols_by_bytes(df: DataFrameType, df[col] = decoded_series return performed_truncation + + +def get_df_pkg(config: Config) -> types.ModuleType: + """ + Return the appropriate DataFrame package based on the execution mode. + """ + if config.execution_mode is ExecutionMode.GPU: + import cudf + return cudf + else: + return pd + + +def get_df_class(config: Config) -> type[DataFrameType]: + """ + Return the appropriate DataFrame class based on the execution mode. + """ + df_pkg = get_df_pkg(config) + return df_pkg.DataFrame + + +def get_json_reader(config: Config) -> typing.Callable[..., DataFrameType]: + """ + Return the appropriate JSON reader based on the execution mode. + """ + if config.execution_mode is ExecutionMode.GPU: + import cudf + reader = functools.partial(cudf.read_json, engine='cudf') + else: + reader = pd.read_json + + return reader diff --git a/morpheus/stages/input/file_source_stage.py b/morpheus/stages/input/file_source_stage.py index 548eceb385..260b3c530b 100644 --- a/morpheus/stages/input/file_source_stage.py +++ b/morpheus/stages/input/file_source_stage.py @@ -101,8 +101,10 @@ def __init__(self, self._repeat_count = repeat if c.execution_mode is ExecutionMode.GPU: + print("GPU MODE Using cudf") self._df_type = "cudf" else: + print("CPU MODE Using pandas") self._df_type = "pandas" @property diff --git a/morpheus/stages/input/http_client_source_stage.py b/morpheus/stages/input/http_client_source_stage.py index cc7191ba43..bb6595120b 100644 --- a/morpheus/stages/input/http_client_source_stage.py +++ b/morpheus/stages/input/http_client_source_stage.py @@ -144,15 +144,7 @@ def __init__(self, self._lines = lines self._requst_kwargs = request_kwargs - if payload_to_df_fn is None: - if config.execution_mode == ExecutionMode.GPU: - import cudf - self._payload_to_df_fn = cudf.read_json - else: - import pandas - self._payload_to_df_fn = pandas.read_json - else: - self._payload_to_df_fn = payload_to_df_fn + self._payload_to_df_fn = payload_to_df_fn or self._get_default_payload_to_df_fn(config) @property def name(self) -> str: @@ -163,9 +155,6 @@ def supports_cpp_node(self) -> bool: """Indicates whether or not this stage supports a C++ implementation""" return False - def compute_schema(self, schema: StageSchema): - schema.output_schema.set_type(MessageMeta) - def _parse_response(self, response: requests.Response) -> typing.Union[DataFrameType, None]: """ Returns a DataFrame parsed from the response payload. If the response payload is empty, then `None` is returned. diff --git a/morpheus/stages/input/http_server_source_stage.py b/morpheus/stages/input/http_server_source_stage.py index ed8d99612f..b0dcc3b620 100644 --- a/morpheus/stages/input/http_server_source_stage.py +++ b/morpheus/stages/input/http_server_source_stage.py @@ -22,18 +22,16 @@ import mrc -import cudf - from morpheus.cli.register_stage import register_stage from morpheus.config import Config +from morpheus.config import ExecutionMode from morpheus.messages import MessageMeta -from morpheus.pipeline.preallocator_mixin import PreallocatorMixin -from morpheus.pipeline.single_output_source import SingleOutputSource -from morpheus.pipeline.stage_schema import StageSchema +from morpheus.stages.input.http_source_stage_base import HttpSourceStageBase from morpheus.utils.http_utils import HTTPMethod from morpheus.utils.http_utils import HttpParseResponse from morpheus.utils.http_utils import MimeTypes from morpheus.utils.producer_consumer_queue import Closed +from morpheus.utils.type_aliases import DataFrameType logger = logging.getLogger(__name__) @@ -41,8 +39,8 @@ HEALTH_SUPPORTED_METHODS = (HTTPMethod.GET, HTTPMethod.POST) -@register_stage("from-http") -class HttpServerSourceStage(PreallocatorMixin, SingleOutputSource): +@register_stage("from-http", execution_modes=(ExecutionMode.CPU, ExecutionMode.GPU)) +class HttpServerSourceStage(HttpSourceStageBase): """ Source stage that starts an HTTP server and listens for incoming requests on a specified endpoint. @@ -80,7 +78,7 @@ class HttpServerSourceStage(PreallocatorMixin, SingleOutputSource): Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0` payload_to_df_fn : callable, default None A callable that takes the HTTP payload string as the first argument and the `lines` parameter is passed in as - the second argument and returns a cudf.DataFrame. When supplied, the C++ implementation of this stage is + the second argument and returns a DataFrame. When supplied, the C++ implementation of this stage is disabled, and the Python impl is used. """ @@ -103,7 +101,7 @@ def __init__(self, request_timeout_secs: int = 30, lines: bool = False, stop_after: int = 0, - payload_to_df_fn: typing.Callable[[str, bool], cudf.DataFrame] = None): + payload_to_df_fn: typing.Callable[[str, bool], DataFrameType] = None): super().__init__(config) self._bind_address = bind_address self._port = port @@ -122,9 +120,11 @@ def __init__(self, self._request_timeout_secs = request_timeout_secs self._lines = lines self._stop_after = stop_after - self._payload_to_df_fn = payload_to_df_fn self._http_server = None + # Leave this as None so we can check if it's set later + self._payload_to_df_fn = payload_to_df_fn + # These are only used when C++ mode is disabled self._queue = None self._queue_size = 0 @@ -146,17 +146,9 @@ def supports_cpp_node(self) -> bool: """Indicates whether this stage supports C++ nodes.""" return True - def compute_schema(self, schema: StageSchema): - schema.output_schema.set_type(MessageMeta) - def _parse_payload(self, payload: str) -> HttpParseResponse: try: - if self._payload_to_df_fn is not None: - df = self._payload_to_df_fn(payload, self._lines) - else: - # engine='cudf' is needed when lines=False to avoid using pandas - df = cudf.read_json(payload, lines=self._lines, engine='cudf') - + df = self._payload_to_df_fn(payload, self._lines) except Exception as e: err_msg = "Error occurred converting HTTP payload to Dataframe" logger.error("%s: %s", err_msg, e) @@ -277,6 +269,9 @@ def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: lines=self._lines, stop_after=self._stop_after) else: + if self._payload_to_df_fn is None: + self._payload_to_df_fn = self._get_default_payload_to_df_fn(self._config) + node = builder.make_source(self.unique_name, self._generate_frames()) return node diff --git a/morpheus/stages/input/http_source_stage_base.py b/morpheus/stages/input/http_source_stage_base.py new file mode 100644 index 0000000000..e83a53f353 --- /dev/null +++ b/morpheus/stages/input/http_source_stage_base.py @@ -0,0 +1,35 @@ +# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Base class for HTTP sources.""" + +import typing + +from morpheus.config import Config +from morpheus.io.utils import get_json_reader +from morpheus.messages import MessageMeta +from morpheus.pipeline.preallocator_mixin import PreallocatorMixin +from morpheus.pipeline.single_output_source import SingleOutputSource +from morpheus.pipeline.stage_schema import StageSchema +from morpheus.utils.type_aliases import DataFrameType + + +class HttpSourceStageBase(PreallocatorMixin, SingleOutputSource): + + def _get_default_payload_to_df_fn(self, config: Config) -> typing.Callable[[str, bool], DataFrameType]: + reader = get_json_reader(config) + + return lambda payload, lines: reader(payload, lines=lines) + + def compute_schema(self, schema: StageSchema): + schema.output_schema.set_type(MessageMeta) diff --git a/morpheus/stages/input/kafka_source_stage.py b/morpheus/stages/input/kafka_source_stage.py index 0c12c6db49..c98170fa30 100644 --- a/morpheus/stages/input/kafka_source_stage.py +++ b/morpheus/stages/input/kafka_source_stage.py @@ -22,16 +22,16 @@ import mrc import pandas as pd -import cudf - from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.config import PipelineModes from morpheus.config import auto_determine_bootstrap +from morpheus.io.utils import get_json_reader from morpheus.messages import MessageMeta from morpheus.pipeline.preallocator_mixin import PreallocatorMixin from morpheus.pipeline.single_output_source import SingleOutputSource from morpheus.pipeline.stage_schema import StageSchema +from morpheus.utils.type_aliases import DataFrameType logger = logging.getLogger(__name__) @@ -130,6 +130,9 @@ def __init__(self, self._poll_interval = pd.Timedelta(poll_interval).total_seconds() self._started = False + # Defined lated if in CPU mode + self._json_reader: typing.Callable[..., DataFrameType] = None + self._records_emitted = 0 self._num_messages = 0 @@ -167,7 +170,7 @@ def _process_batch(self, consumer, batch): df = None try: buffer.seek(0) - df = cudf.io.read_json(buffer, engine='cudf', lines=True, orient='records') + df = self._json_reader(buffer, lines=True, orient='records') except Exception as e: logger.error("Error parsing payload into a dataframe : %s", e) finally: @@ -254,6 +257,7 @@ def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: # multiple threads source.launch_options.pe_count = self._max_concurrent else: + self._json_reader = get_json_reader(self._config) source = builder.make_source(self.unique_name, self._source_generator) return source diff --git a/morpheus/stages/output/compare_dataframe_stage.py b/morpheus/stages/output/compare_dataframe_stage.py index 86ae3dc6ce..ecd94563c5 100644 --- a/morpheus/stages/output/compare_dataframe_stage.py +++ b/morpheus/stages/output/compare_dataframe_stage.py @@ -21,8 +21,6 @@ import pandas as pd -import cudf - from morpheus.config import Config from morpheus.io.deserializers import read_file_to_df from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage @@ -74,8 +72,6 @@ def __init__(self, if isinstance(compare_df, str): compare_df = read_file_to_df(compare_df, df_type='pandas') - elif isinstance(compare_df, cudf.DataFrame): - compare_df = compare_df.to_pandas() elif isinstance(compare_df, list): tmp_dfs = [] for item in compare_df: @@ -83,6 +79,9 @@ def __init__(self, tmp_dfs.append(tmp_df) compare_df = pd.concat(tmp_dfs) compare_df.reset_index(inplace=True, drop=True) + elif not isinstance(compare_df, pd.DataFrame): + # assume it is a cudf DataFrame + compare_df = compare_df.to_pandas() self._compare_df = compare_df diff --git a/morpheus/stages/output/http_server_sink_stage.py b/morpheus/stages/output/http_server_sink_stage.py index aac5a93b99..9f8462263b 100644 --- a/morpheus/stages/output/http_server_sink_stage.py +++ b/morpheus/stages/output/http_server_sink_stage.py @@ -25,11 +25,11 @@ import pandas as pd from mrc.core import operators as ops -import cudf - from morpheus.cli.register_stage import register_stage from morpheus.config import Config +from morpheus.config import ExecutionMode from morpheus.io import serializers +from morpheus.io.utils import get_df_pkg from morpheus.messages import MessageMeta from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage @@ -41,7 +41,9 @@ logger = logging.getLogger(__name__) -@register_stage("to-http-server", ignore_args=["df_serializer_fn"]) +@register_stage("to-http-server", + execution_modes=(ExecutionMode.CPU, ExecutionMode.GPU), + ignore_args=["df_serializer_fn"]) class HttpServerSinkStage(PassThruTypeMixin, SinglePortStage): """ Sink stage that starts an HTTP server and listens for incoming requests on a specified endpoint. @@ -116,6 +118,8 @@ def __init__(self, self._df_serializer_fn = df_serializer_fn or self._default_df_serializer + self._df_pkg = get_df_pkg(config) + # FiberQueue doesn't have a way to check the size, nor does it have a way to check if it's empty without # attempting to perform a read. We'll keep track of the size ourselves. self._queue = queue.Queue(maxsize=max_queue_size or config.edge_buffer_size) @@ -201,10 +205,10 @@ def _request_handler(self, _: str) -> HttpParseResponse: body=err_msg) if (len(data_frames) > 0): - df = data_frames[0] if len(data_frames) > 1: - cat_fn = pd.concat if isinstance(df, pd.DataFrame) else cudf.concat - df = cat_fn(data_frames) + df = self._df_pkg.concat(data_frames) + else: + df = data_frames[0] return HttpParseResponse(status_code=HTTPStatus.OK.value, content_type=self._content_type, diff --git a/morpheus/stages/output/write_to_databricks_deltalake_stage.py b/morpheus/stages/output/write_to_databricks_deltalake_stage.py index 6b98ffeb92..53d028d987 100644 --- a/morpheus/stages/output/write_to_databricks_deltalake_stage.py +++ b/morpheus/stages/output/write_to_databricks_deltalake_stage.py @@ -19,8 +19,6 @@ import pandas as pd from mrc.core import operators as ops -import cudf - from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.messages import MessageMeta @@ -97,8 +95,9 @@ def write_to_deltalake(meta: MessageMeta): convert cudf to spark dataframe """ df = meta.copy_dataframe() - if isinstance(df, cudf.DataFrame): + if not isinstance(df, pd.DataFrame): df = df.to_pandas() + schema = self._extract_schema_from_pandas_dataframe(df) spark_df = self.spark.createDataFrame(df, schema=schema) spark_df.write \ diff --git a/morpheus/stages/output/write_to_elasticsearch_stage.py b/morpheus/stages/output/write_to_elasticsearch_stage.py index eede6926e8..f26948cf6a 100644 --- a/morpheus/stages/output/write_to_elasticsearch_stage.py +++ b/morpheus/stages/output/write_to_elasticsearch_stage.py @@ -18,10 +18,9 @@ import mrc import mrc.core.operators as ops +import pandas as pd import yaml -import cudf - from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.controllers.elasticsearch_controller import ElasticsearchController @@ -110,7 +109,7 @@ def on_data(meta: MessageMeta) -> MessageMeta: self._controller.refresh_client() df = meta.copy_dataframe() - if isinstance(df, cudf.DataFrame): + if not isinstance(df, pd.DataFrame): df = df.to_pandas() logger.debug("Converted cudf of size: %s to pandas dataframe.", len(df)) diff --git a/morpheus/stages/postprocess/generate_viz_frames_stage.py b/morpheus/stages/postprocess/generate_viz_frames_stage.py index b2d059666c..d50279e2e6 100644 --- a/morpheus/stages/postprocess/generate_viz_frames_stage.py +++ b/morpheus/stages/postprocess/generate_viz_frames_stage.py @@ -27,17 +27,17 @@ import websockets.legacy.server from websockets.server import serve -import cudf - from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.config import PipelineModes +from morpheus.io.utils import get_df_class from morpheus.messages import ControlMessage from morpheus.messages import MultiResponseMessage from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.utils.producer_consumer_queue import AsyncIOProducerConsumerQueue from morpheus.utils.producer_consumer_queue import Closed +from morpheus.utils.type_aliases import DataFrameType logger = logging.getLogger(__name__) @@ -82,6 +82,8 @@ def __init__(self, self._server_task: asyncio.Task = None self._server_close_event: asyncio.Event = None + self._df_class: type[DataFrameType] = get_df_class(c) + @property def name(self) -> str: return "gen_viz" @@ -146,7 +148,7 @@ def indent_data(y: str): except Exception: return y - if isinstance(df, cudf.DataFrame): + if not isinstance(df, pd.DataFrame): df = df.to_pandas() df["data"] = df["data"].apply(indent_data) @@ -291,7 +293,7 @@ def write_batch(x: MultiResponseMessage | ControlMessage): elif isinstance(x, ControlMessage): df = x.payload().get_data(columns) - out_df = cudf.DataFrame() + out_df = self._df_class() out_df["dt"] = (df["timestamp"] - time0).astype(np.int32) out_df["src"] = df["src_ip"].str.ip_to_int().astype(np.int32)