From bf80d93958e78c7e0312cd75ad71d1b75d0befce Mon Sep 17 00:00:00 2001 From: David Gardner <96306125+dagardner-nv@users.noreply.github.com> Date: Wed, 8 May 2024 23:12:48 -0700 Subject: [PATCH] Support the filter_null parameter in the C++ impl of the FileSourceStage (#1689) * Fixes bug where the `filter_null` constructor argument to the `FileSourceStage` was only implemented in the Python impl of the stage. * Update `filter_null` feature to make the column(s) being filtered upon configurable, previously this was hard-coded to `"data"` * Add new `get_column_names` helper method to `CuDFTableUtil` Closes #1678 ## By Submitting this PR I confirm: - I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md). - When the PR is ready for review, new or existing tests cover these changes. - When the PR is ready for review, the documentation is up to date with these changes. Authors: - David Gardner (https://github.com/dagardner-nv) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) URL: https://github.com/nv-morpheus/Morpheus/pull/1689 --- .../include/morpheus/io/deserializers.hpp | 2 +- .../include/morpheus/stages/file_source.hpp | 35 +++++--- .../include/morpheus/utilities/table_util.hpp | 28 ++++++- morpheus/_lib/src/io/deserializers.cpp | 16 ++-- morpheus/_lib/src/stages/file_source.cpp | 41 +++++++-- morpheus/_lib/src/utilities/table_util.cpp | 45 +++++++++- morpheus/_lib/stages/__init__.pyi | 4 +- morpheus/_lib/stages/module.cpp | 24 ++++-- morpheus/_lib/tests/CMakeLists.txt | 6 ++ .../_lib/tests/utilities/test_table_util.cpp | 79 +++++++++++++++++ morpheus/io/deserializers.py | 84 +++++++++++-------- morpheus/io/utils.py | 6 +- morpheus/stages/input/file_source_stage.py | 17 +++- tests/stages/test_file_source_stage_pipe.py | 69 +++++++++++++++ tests/tests_data/file_with_nans.csv | 3 + tests/tests_data/file_with_nans.jsonlines | 3 + tests/tests_data/file_with_nulls.csv | 3 + tests/tests_data/file_with_nulls.jsonlines | 3 + 18 files changed, 390 insertions(+), 78 deletions(-) create mode 100644 morpheus/_lib/tests/utilities/test_table_util.cpp create mode 100755 tests/stages/test_file_source_stage_pipe.py create mode 100644 tests/tests_data/file_with_nans.csv create mode 100644 tests/tests_data/file_with_nans.jsonlines create mode 100644 tests/tests_data/file_with_nulls.csv create mode 100644 tests/tests_data/file_with_nulls.jsonlines diff --git a/morpheus/_lib/include/morpheus/io/deserializers.hpp b/morpheus/_lib/include/morpheus/io/deserializers.hpp index d98cad6e9a..569d503eb9 100644 --- a/morpheus/_lib/include/morpheus/io/deserializers.hpp +++ b/morpheus/_lib/include/morpheus/io/deserializers.hpp @@ -71,7 +71,7 @@ int get_index_col_count(const cudf::io::table_with_metadata& data_table); int prepare_df_index(cudf::io::table_with_metadata& data_table); /** - * @brief Loads a cudf table from either CSV or JSON file returning the DataFrame as a Python object + * @brief Loads a cudf table from either CSV, JSON or Parquet file returning the DataFrame as a Python object * * @param filename : Name of the file that should be loaded into a table * @return pybind11::object diff --git a/morpheus/_lib/include/morpheus/stages/file_source.hpp b/morpheus/_lib/include/morpheus/stages/file_source.hpp index 6ed1ea4852..95ec2ebd64 100644 --- a/morpheus/_lib/include/morpheus/stages/file_source.hpp +++ b/morpheus/_lib/include/morpheus/stages/file_source.hpp @@ -31,6 +31,7 @@ #include #include #include +#include namespace morpheus { /****** Component public implementations *******************/ @@ -61,13 +62,19 @@ class FileSourceStage : public mrc::pymrc::PythonSource json_lines = std::nullopt); + FileSourceStage(std::string filename, + int repeat = 1, + bool filter_null = true, + std::vector filter_null_columns = {}, + std::optional json_lines = std::nullopt); private: subscriber_fn_t build(); std::string m_filename; int m_repeat{1}; + bool m_filter_null{true}; + std::vector m_filter_null_columns; std::optional m_json_lines; }; @@ -87,16 +94,22 @@ struct FileSourceStageInterfaceProxy * @param parser_kwargs : Optional arguments to pass to the file parser. * @return std::shared_ptr> */ - static std::shared_ptr> init(mrc::segment::Builder& builder, - const std::string& name, - std::string filename, - int repeat = 1, - pybind11::dict parser_kwargs = pybind11::dict()); - static std::shared_ptr> init(mrc::segment::Builder& builder, - const std::string& name, - std::filesystem::path filename, - int repeat = 1, - pybind11::dict parser_kwargs = pybind11::dict()); + static std::shared_ptr> init( + mrc::segment::Builder& builder, + const std::string& name, + std::string filename, + int repeat = 1, + bool filter_null = true, + std::vector filter_null_columns = {}, + pybind11::dict parser_kwargs = pybind11::dict()); + static std::shared_ptr> init( + mrc::segment::Builder& builder, + const std::string& name, + std::filesystem::path filename, + int repeat = 1, + bool filter_null = true, + std::vector filter_null_columns = {}, + pybind11::dict parser_kwargs = pybind11::dict()); }; #pragma GCC visibility pop /** @} */ // end of group diff --git a/morpheus/_lib/include/morpheus/utilities/table_util.hpp b/morpheus/_lib/include/morpheus/utilities/table_util.hpp index b8797901ea..9cef0ee87b 100644 --- a/morpheus/_lib/include/morpheus/utilities/table_util.hpp +++ b/morpheus/_lib/include/morpheus/utilities/table_util.hpp @@ -15,10 +15,13 @@ * limitations under the License. */ +#include "morpheus/export.h" // for MORPHEUS_EXPORT + #include #include // IWYU pragma: keep #include +#include #pragma once @@ -35,12 +38,33 @@ namespace morpheus { /** * @brief Structure that encapsulates cuDF table utilities. */ -struct CuDFTableUtil +struct MORPHEUS_EXPORT CuDFTableUtil { /** - * TODO(Documentation) + * @brief Load a table from a file. + * + * @param filename The name of the file to load. + * @return cudf::io::table_with_metadata The table loaded from the file. */ static cudf::io::table_with_metadata load_table(const std::string& filename); + + /** + * @brief Get the column names from a cudf table_with_metadata. + * + * @param table The table to get the column names from. + * @return std::vector The column names. + */ + static std::vector get_column_names(const cudf::io::table_with_metadata& table); + + /** + * @brief Filters rows from a table that contain null values in a given columns. + * null values in columns other than those specified in `filter_columns` are not considered. + * Any missing columns are ignored. + * + * @param table The table to filter + * @param filter_columns The name of the columns to filter on + */ + static void filter_null_data(cudf::io::table_with_metadata& table, const std::vector& filter_columns); }; /** @} */ // end of group } // namespace morpheus diff --git a/morpheus/_lib/src/io/deserializers.cpp b/morpheus/_lib/src/io/deserializers.cpp index 4704b1ba3d..032cffd57b 100644 --- a/morpheus/_lib/src/io/deserializers.cpp +++ b/morpheus/_lib/src/io/deserializers.cpp @@ -20,6 +20,7 @@ #include "morpheus/utilities/cudf_util.hpp" // for CudfHelper #include "morpheus/utilities/stage_util.hpp" #include "morpheus/utilities/string_util.hpp" +#include "morpheus/utilities/table_util.hpp" // for get_column_names #include #include @@ -29,8 +30,6 @@ #include // for cudf::type_id #include // IWYU pragma: keep -#include -#include #include #include #include @@ -50,7 +49,9 @@ namespace morpheus { std::vector get_column_names_from_table(const cudf::io::table_with_metadata& table) { - return foreach_map(table.metadata.schema_info, [](auto schema) { return schema.name; }); + return foreach_map(table.metadata.schema_info, [](auto schema) { + return schema.name; + }); } cudf::io::table_with_metadata load_table_from_file(const std::string& filename, @@ -69,7 +70,7 @@ cudf::io::table_with_metadata load_table_from_file(const std::string& filename, case FileTypes::JSON: { auto options = cudf::io::json_reader_options::builder(cudf::io::source_info{filename}).lines(json_lines.value_or(true)); - table = cudf::io::read_json(options.build()); + table = cudf::io::read_json(options.build()); break; } case FileTypes::CSV: { @@ -106,12 +107,9 @@ pybind11::object read_file_to_df(const std::string& filename, FileTypes file_typ int get_index_col_count(const cudf::io::table_with_metadata& data_table) { - int index_col_count = 0; - auto const& schema = data_table.metadata.schema_info; + int index_col_count = 0; - std::vector names; - names.reserve(schema.size()); - std::transform(schema.cbegin(), schema.cend(), std::back_inserter(names), [](auto const& c) { return c.name; }); + std::vector names = CuDFTableUtil::get_column_names(data_table); // Check if we have a first column with INT64 data type if (names.size() >= 1 && data_table.tbl->get_column(0).type().id() == cudf::type_id::INT64) diff --git a/morpheus/_lib/src/stages/file_source.cpp b/morpheus/_lib/src/stages/file_source.cpp index 84a59f5f12..c3dce33693 100644 --- a/morpheus/_lib/src/stages/file_source.cpp +++ b/morpheus/_lib/src/stages/file_source.cpp @@ -24,6 +24,7 @@ #include "morpheus/objects/file_types.hpp" #include "morpheus/objects/table_info.hpp" #include "morpheus/utilities/cudf_util.hpp" +#include "morpheus/utilities/table_util.hpp" // for filter_null_data #include #include @@ -37,24 +38,39 @@ #include #include #include +#include // for invalid_argument #include -// IWYU thinks we need __alloc_traits<>::value_type for vector assignments -// IWYU pragma: no_include namespace morpheus { // Component public implementations // ************ FileSourceStage ************* // -FileSourceStage::FileSourceStage(std::string filename, int repeat, std::optional json_lines) : +FileSourceStage::FileSourceStage(std::string filename, + int repeat, + bool filter_null, + std::vector filter_null_columns, + std::optional json_lines) : PythonSource(build()), m_filename(std::move(filename)), m_repeat(repeat), + m_filter_null(filter_null), + m_filter_null_columns(std::move(filter_null_columns)), m_json_lines(json_lines) -{} +{ + if (m_filter_null && m_filter_null_columns.empty()) + { + throw std::invalid_argument("Filter null columns must not be empty if filter_null is true"); + } +} FileSourceStage::subscriber_fn_t FileSourceStage::build() { return [this](rxcpp::subscriber output) { - auto data_table = load_table_from_file(m_filename, FileTypes::Auto, m_json_lines); + auto data_table = load_table_from_file(m_filename, FileTypes::Auto, m_json_lines); + if (m_filter_null) + { + CuDFTableUtil::filter_null_data(data_table, m_filter_null_columns); + } + int index_col_count = prepare_df_index(data_table); // Next, create the message metadata. This gets reused for repeats @@ -116,6 +132,8 @@ std::shared_ptr> FileSourceStageInterfaceP const std::string& name, std::string filename, int repeat, + bool filter_null, + std::vector filter_null_columns, pybind11::dict parser_kwargs) { std::optional json_lines = std::nullopt; @@ -125,7 +143,8 @@ std::shared_ptr> FileSourceStageInterfaceP json_lines = parser_kwargs["lines"].cast(); } - auto stage = builder.construct_object(name, filename, repeat, json_lines); + auto stage = builder.construct_object( + name, filename, repeat, filter_null, std::move(filter_null_columns), json_lines); return stage; } @@ -135,8 +154,16 @@ std::shared_ptr> FileSourceStageInterfaceP const std::string& name, std::filesystem::path filename, int repeat, + bool filter_null, + std::vector filter_null_columns, pybind11::dict parser_kwargs) { - return init(builder, name, filename.string(), repeat, std::move(parser_kwargs)); + return init(builder, + name, + filename.string(), + repeat, + filter_null, + std::move(filter_null_columns), + std::move(parser_kwargs)); } } // namespace morpheus diff --git a/morpheus/_lib/src/utilities/table_util.cpp b/morpheus/_lib/src/utilities/table_util.cpp index 1c93493d92..d6aa159b6d 100644 --- a/morpheus/_lib/src/utilities/table_util.cpp +++ b/morpheus/_lib/src/utilities/table_util.cpp @@ -19,17 +19,24 @@ #include #include +#include // for drop_nulls +#include // for size_type #include #include +#include // for find, transform #include +#include // for back_insert_iterator, back_inserter +#include // for unique_ptr #include // needed for logging #include // for runtime_error +namespace { namespace fs = std::filesystem; namespace py = pybind11; - -cudf::io::table_with_metadata morpheus::CuDFTableUtil::load_table(const std::string& filename) +} // namespace +namespace morpheus { +cudf::io::table_with_metadata CuDFTableUtil::load_table(const std::string& filename) { auto file_path = fs::path(filename); @@ -52,3 +59,37 @@ cudf::io::table_with_metadata morpheus::CuDFTableUtil::load_table(const std::str throw std::runtime_error("Unknown extension"); } } + +std::vector CuDFTableUtil::get_column_names(const cudf::io::table_with_metadata& table) +{ + auto const& schema = table.metadata.schema_info; + + std::vector names; + names.reserve(schema.size()); + std::transform(schema.cbegin(), schema.cend(), std::back_inserter(names), [](auto const& c) { + return c.name; + }); + + return names; +} + +void CuDFTableUtil::filter_null_data(cudf::io::table_with_metadata& table, + const std::vector& filter_columns) +{ + std::vector filter_keys; + auto column_names = get_column_names(table); + for (const auto& column_name : filter_columns) + { + auto found_col = std::find(column_names.cbegin(), column_names.cend(), column_name); + if (found_col != column_names.cend()) + { + filter_keys.push_back((found_col - column_names.cbegin())); + } + } + + auto tv = table.tbl->view(); + auto filtered_table = cudf::drop_nulls(tv, filter_keys, filter_keys.size()); + + table.tbl.swap(filtered_table); +} +} // namespace morpheus diff --git a/morpheus/_lib/stages/__init__.pyi b/morpheus/_lib/stages/__init__.pyi index 78a0ff8091..bfd66dcb64 100644 --- a/morpheus/_lib/stages/__init__.pyi +++ b/morpheus/_lib/stages/__init__.pyi @@ -60,9 +60,9 @@ class DeserializeMultiMessageStage(mrc.core.segment.SegmentObject): pass class FileSourceStage(mrc.core.segment.SegmentObject): @typing.overload - def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: os.PathLike, repeat: int, parser_kwargs: dict) -> None: ... + def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: os.PathLike, repeat: int, filter_null: bool, filter_null_columns: typing.List[str], parser_kwargs: dict) -> None: ... @typing.overload - def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: str, repeat: int, parser_kwargs: dict) -> None: ... + def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: str, repeat: int, filter_null: bool, filter_null_columns: typing.List[str], parser_kwargs: dict) -> None: ... pass class FilterDetectionsStage(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, threshold: float, copy: bool, filter_source: morpheus._lib.common.FilterSource, field_name: str = 'probs') -> None: ... diff --git a/morpheus/_lib/stages/module.cpp b/morpheus/_lib/stages/module.cpp index 1cf57663ac..32c3c5e030 100644 --- a/morpheus/_lib/stages/module.cpp +++ b/morpheus/_lib/stages/module.cpp @@ -52,6 +52,7 @@ #include #include #include +#include namespace morpheus { namespace py = pybind11; @@ -138,20 +139,33 @@ PYBIND11_MODULE(stages, _module) mrc::segment::ObjectProperties, std::shared_ptr>>( _module, "FileSourceStage", py::multiple_inheritance()) - .def(py::init(py::overload_cast( - &FileSourceStageInterfaceProxy::init)), + .def(py::init(py::overload_cast, + py::dict>(&FileSourceStageInterfaceProxy::init)), py::arg("builder"), py::arg("name"), py::arg("filename"), py::arg("repeat"), + py::arg("filter_null"), + py::arg("filter_null_columns"), py::arg("parser_kwargs")) - .def(py::init( - py::overload_cast( - &FileSourceStageInterfaceProxy::init)), + .def(py::init(py::overload_cast, + py::dict>(&FileSourceStageInterfaceProxy::init)), py::arg("builder"), py::arg("name"), py::arg("filename"), py::arg("repeat"), + py::arg("filter_null"), + py::arg("filter_null_columns"), py::arg("parser_kwargs")); py::class_, diff --git a/morpheus/_lib/tests/CMakeLists.txt b/morpheus/_lib/tests/CMakeLists.txt index a17a297aca..e42e7717e8 100644 --- a/morpheus/_lib/tests/CMakeLists.txt +++ b/morpheus/_lib/tests/CMakeLists.txt @@ -188,4 +188,10 @@ add_morpheus_test( test_type_util.cpp ) +add_morpheus_test( + NAME table_util + FILES + utilities/test_table_util.cpp +) + list(POP_BACK CMAKE_MESSAGE_CONTEXT) diff --git a/morpheus/_lib/tests/utilities/test_table_util.cpp b/morpheus/_lib/tests/utilities/test_table_util.cpp new file mode 100644 index 0000000000..021b8a8322 --- /dev/null +++ b/morpheus/_lib/tests/utilities/test_table_util.cpp @@ -0,0 +1,79 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../test_utils/common.hpp" // IWYU pragma: associated + +#include "morpheus/io/deserializers.hpp" +#include "morpheus/utilities/table_util.hpp" // for filter_null_data + +#include // for table_with_metadata +#include // for table +#include + +#include +#include +#include // for pair +#include +// IWYU pragma: no_include + +using namespace morpheus; + +TEST_CLASS(TableUtil); + +TEST_F(TestTableUtil, GetColumnNames) +{ + auto morpheus_root = test::get_morpheus_root(); + auto input_files = {morpheus_root / "tests/tests_data/file_with_nulls.csv", + morpheus_root / "tests/tests_data/file_with_nulls.jsonlines"}; + + for (const auto& input_file : input_files) + { + auto table_w_meta = load_table_from_file(input_file); + auto column_names = CuDFTableUtil::get_column_names(table_w_meta); + + EXPECT_EQ(column_names.size(), 2); + EXPECT_EQ(column_names[0], "data"); + EXPECT_EQ(column_names[1], "other"); + } +} + +TEST_F(TestTableUtil, FilterNullData) +{ + auto morpheus_root = test::get_morpheus_root(); + auto input_files = {morpheus_root / "tests/tests_data/file_with_nans.csv", + morpheus_root / "tests/tests_data/file_with_nans.jsonlines", + morpheus_root / "tests/tests_data/file_with_nulls.csv", + morpheus_root / "tests/tests_data/file_with_nulls.jsonlines"}; + std::vector, std::size_t>> expected_row_counts{ + {{"data"}, 8}, {{"data"}, 8}, {{"other"}, 7}, {{"other"}, 7}, {{"data", "other"}, 5}}; + + for (const auto& input_file : input_files) + { + for (const auto& [filter_columns, expected_row_count] : expected_row_counts) + { + auto table_w_meta = load_table_from_file(input_file); + + EXPECT_EQ(table_w_meta.tbl->num_columns(), 2); + EXPECT_EQ(table_w_meta.tbl->num_rows(), 10); + + CuDFTableUtil::filter_null_data(table_w_meta, filter_columns); + + EXPECT_EQ(table_w_meta.tbl->num_columns(), 2); + EXPECT_EQ(table_w_meta.tbl->num_rows(), expected_row_count); + } + } +} diff --git a/morpheus/io/deserializers.py b/morpheus/io/deserializers.py index 293bc2a303..31499b4359 100644 --- a/morpheus/io/deserializers.py +++ b/morpheus/io/deserializers.py @@ -29,40 +29,11 @@ from morpheus.utils.type_aliases import DataFrameType -def read_file_to_df(file_name: typing.Union[str, io.IOBase], - file_type: FileTypes = FileTypes.Auto, - parser_kwargs: dict = None, - filter_nulls: bool = True, - df_type: typing.Literal["cudf", "pandas"] = "pandas") -> DataFrameType: - """ - Reads a file into a dataframe and performs any of the necessary cleanup. - - Parameters - ---------- - file_name : str - File to read. - file_type : `morpheus.common.FileTypes` - Type of file. Leave as Auto to determine from the extension. - parser_kwargs : dict, optional - Any argument to pass onto the parse, by default {}. Ignored when C++ execution is enabled and `df_type="cudf"` - filter_nulls : bool, optional - Whether to filter null rows after loading, by default True. - df_type : typing.Literal[, optional - What type of parser to use. Options are 'cudf' and 'pandas', by default "pandas". - - Returns - ------- - DataFrameType - A parsed DataFrame. - """ - - # The C++ reader only supports cudf dataframes - if (CppConfig.get_should_use_cpp() and df_type == "cudf"): - df = read_file_to_df_cpp(file_name, file_type) - if (filter_nulls): - df = filter_null_data(df) - return df - +def _read_file_to_df_py(*, + file_name: typing.Union[str, io.IOBase], + file_type: FileTypes, + parser_kwargs: dict, + df_type: typing.Literal["cudf", "pandas"]) -> DataFrameType: if (parser_kwargs is None): parser_kwargs = {} @@ -111,7 +82,50 @@ def read_file_to_df(file_name: typing.Union[str, io.IOBase], assert df is not None + return df + + +def read_file_to_df(file_name: typing.Union[str, io.IOBase], + file_type: FileTypes = FileTypes.Auto, + parser_kwargs: dict = None, + filter_nulls: bool = True, + filter_null_columns: list[str] | str = 'data', + df_type: typing.Literal["cudf", "pandas"] = "pandas") -> DataFrameType: + """ + Reads a file into a dataframe and performs any of the necessary cleanup. + + Parameters + ---------- + file_name : str + File to read. + file_type : `morpheus.common.FileTypes` + Type of file. Leave as Auto to determine from the extension. + parser_kwargs : dict, optional + Any argument to pass onto the parse, by default {}. Ignored when C++ execution is enabled and `df_type="cudf"` + filter_nulls : bool, optional + Whether to filter null rows after loading, by default True. + filter_null_columns : list[str]|str, default = 'data' + Column or columns to filter null values from. Ignored when `filter_null` is False. + df_type : typing.Literal[, optional + What type of parser to use. Options are 'cudf' and 'pandas', by default "pandas". + + Returns + ------- + DataFrameType + A parsed DataFrame. + """ + + # The C++ reader only supports cudf dataframes + if (CppConfig.get_should_use_cpp() and df_type == "cudf"): + df = read_file_to_df_cpp(file_name, file_type) + else: + df = _read_file_to_df_py(file_name=file_name, file_type=file_type, parser_kwargs=parser_kwargs, df_type=df_type) + if (filter_nulls): - df = filter_null_data(df) + if isinstance(filter_null_columns, str): + filter_null_columns = [filter_null_columns] + + for col in filter_null_columns: + df = filter_null_data(df, column_name=col) return df diff --git a/morpheus/io/utils.py b/morpheus/io/utils.py index d8b286a8e8..9a20afb4d5 100644 --- a/morpheus/io/utils.py +++ b/morpheus/io/utils.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) -def filter_null_data(x: DataFrameType): +def filter_null_data(x: DataFrameType, column_name: str = "data") -> DataFrameType: """ Filters out null row in a dataframe's 'data' column if it exists. @@ -34,12 +34,14 @@ def filter_null_data(x: DataFrameType): ---------- x : DataFrameType The dataframe to fix. + column_name : str, default 'data' + The column name to filter on. """ if ("data" not in x): return x - return x[~x['data'].isna()] + return x[~x[column_name].isna()] def cudf_string_cols_exceed_max_bytes(df: cudf.DataFrame, column_max_bytes: dict[str, int]) -> bool: diff --git a/morpheus/stages/input/file_source_stage.py b/morpheus/stages/input/file_source_stage.py index eb4630fb3e..9b3551dce6 100644 --- a/morpheus/stages/input/file_source_stage.py +++ b/morpheus/stages/input/file_source_stage.py @@ -57,8 +57,11 @@ class FileSourceStage(PreallocatorMixin, SingleOutputSource): repeat : int, default = 1, min = 1 Repeats the input dataset multiple times. Useful to extend small datasets for debugging. filter_null : bool, default = True - Whether to filter rows with null 'data' column. Null values in the 'data' column can cause issues down - the line with processing. Setting this to True is recommended. + Whether to filter rows with null `filter_null_columns` columns. Null values in source data can cause issues + down the line with processing. Setting this to True is recommended. + filter_null_columns : list[str], default = None + Column or columns to filter null values from. Ignored when `filter_null` is False. If None, and `filter_null` + is `True`, this will default to `["data"]` parser_kwargs : dict, default = {} Extra options to pass to the file parser. """ @@ -70,6 +73,7 @@ def __init__(self, file_type: FileTypes = FileTypes.Auto, repeat: int = 1, filter_null: bool = True, + filter_null_columns: list[str] = None, parser_kwargs: dict = None): super().__init__(c) @@ -79,6 +83,12 @@ def __init__(self, self._filename = filename self._file_type = file_type self._filter_null = filter_null + + if filter_null_columns is None or len(filter_null_columns) == 0: + filter_null_columns = ["data"] + + self._filter_null_columns = filter_null_columns + self._parser_kwargs = parser_kwargs or {} self._input_count = None @@ -114,6 +124,8 @@ def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: self.unique_name, self._filename, self._repeat_count, + self._filter_null, + self._filter_null_columns, self._parser_kwargs) else: node = builder.make_source(self.unique_name, self._generate_frames()) @@ -126,6 +138,7 @@ def _generate_frames(self) -> typing.Iterable[MessageMeta]: self._filename, self._file_type, filter_nulls=self._filter_null, + filter_null_columns=self._filter_null_columns, parser_kwargs=self._parser_kwargs, df_type="cudf", ) diff --git a/tests/stages/test_file_source_stage_pipe.py b/tests/stages/test_file_source_stage_pipe.py new file mode 100755 index 0000000000..59f9c76d63 --- /dev/null +++ b/tests/stages/test_file_source_stage_pipe.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import pathlib + +import pandas as pd +import pytest + +from _utils import TEST_DIRS +from _utils import assert_results +from morpheus.common import FileTypes +from morpheus.common import determine_file_type +from morpheus.config import Config +from morpheus.io.deserializers import read_file_to_df +from morpheus.pipeline import LinearPipeline +from morpheus.stages.input.file_source_stage import FileSourceStage +from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage + + +@pytest.mark.slow +@pytest.mark.parametrize("input_file", + [ + os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv"), + os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.parquet"), + os.path.join(TEST_DIRS.tests_data_dir, 'examples/abp_pcap_detection/abp_pcap.jsonlines') + ], + ids=["csv", "parquet", "jsonlines"]) +@pytest.mark.parametrize("filter_null", [False, True], ids=["no_filter", "filter_null"]) +@pytest.mark.parametrize("use_pathlib", [False, True], ids=["no_pathlib", "pathlib"]) +@pytest.mark.parametrize("repeat", [1, 2, 5], ids=["repeat1", "repeat2", "repeat5"]) +def test_file_source_stage_pipe(config: Config, input_file: str, filter_null: bool, use_pathlib: bool, repeat: int): + parser_kwargs = {} + if determine_file_type(input_file) == FileTypes.JSON: + # kwarg specific to pandas.read_json + parser_kwargs['convert_dates'] = False + + expected_df = read_file_to_df(file_name=input_file, + filter_nulls=filter_null, + df_type="pandas", + parser_kwargs=parser_kwargs) + expected_df = pd.concat([expected_df for _ in range(repeat)]) + + expected_df.reset_index(inplace=True) + expected_df.drop('index', axis=1, inplace=True) + + if use_pathlib: + input_file = pathlib.Path(input_file) + + pipe = LinearPipeline(config) + pipe.set_source(FileSourceStage(config, filename=input_file, repeat=repeat, filter_null=filter_null)) + comp_stage = pipe.add_stage( + CompareDataFrameStage(config, compare_df=expected_df, exclude=["index"], reset_index=True)) + pipe.run() + + assert_results(comp_stage.get_results()) diff --git a/tests/tests_data/file_with_nans.csv b/tests/tests_data/file_with_nans.csv new file mode 100644 index 0000000000..ff3a8643fa --- /dev/null +++ b/tests/tests_data/file_with_nans.csv @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4a173a9d2027a90c7df128dac1f9126160107954fc286a13d04bd94824d668b8 +size 76 diff --git a/tests/tests_data/file_with_nans.jsonlines b/tests/tests_data/file_with_nans.jsonlines new file mode 100644 index 0000000000..7a9190ce40 --- /dev/null +++ b/tests/tests_data/file_with_nans.jsonlines @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:559f654cd30742b2fe49ec6f163118b660b61f2c6ebe5acb13bdfeb907fe9865 +size 255 diff --git a/tests/tests_data/file_with_nulls.csv b/tests/tests_data/file_with_nulls.csv new file mode 100644 index 0000000000..d2416abb19 --- /dev/null +++ b/tests/tests_data/file_with_nulls.csv @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:65dbc84b9c7ebe0132fbcab419fe681a1628cb7b1c08f09ca62c2b46fbd56c59 +size 46 diff --git a/tests/tests_data/file_with_nulls.jsonlines b/tests/tests_data/file_with_nulls.jsonlines new file mode 100644 index 0000000000..af82d24f9f --- /dev/null +++ b/tests/tests_data/file_with_nulls.jsonlines @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:a1dfca1a616e66ebdcdb87d4adb9b15af594ca6fded67b4d9af8181b061e559f +size 255