Skip to content

Commit

Permalink
Support the filter_null parameter in the C++ impl of the FileSourceSt…
Browse files Browse the repository at this point in the history
…age (#1689)

* Fixes bug where the `filter_null` constructor argument to the `FileSourceStage` was only implemented in the Python impl of the stage.
* Update `filter_null` feature to make the column(s) being filtered upon configurable, previously this was hard-coded to `"data"`
* Add new `get_column_names` helper method to `CuDFTableUtil`

Closes #1678

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1689
  • Loading branch information
dagardner-nv authored May 9, 2024
1 parent 78dab99 commit bf80d93
Show file tree
Hide file tree
Showing 18 changed files with 390 additions and 78 deletions.
2 changes: 1 addition & 1 deletion morpheus/_lib/include/morpheus/io/deserializers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ int get_index_col_count(const cudf::io::table_with_metadata& data_table);
int prepare_df_index(cudf::io::table_with_metadata& data_table);

/**
* @brief Loads a cudf table from either CSV or JSON file returning the DataFrame as a Python object
* @brief Loads a cudf table from either CSV, JSON or Parquet file returning the DataFrame as a Python object
*
* @param filename : Name of the file that should be loaded into a table
* @return pybind11::object
Expand Down
35 changes: 24 additions & 11 deletions morpheus/_lib/include/morpheus/stages/file_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <optional>
#include <string>
#include <thread>
#include <vector>

namespace morpheus {
/****** Component public implementations *******************/
Expand Down Expand Up @@ -61,13 +62,19 @@ class FileSourceStage : public mrc::pymrc::PythonSource<std::shared_ptr<MessageM
* @param repeat : Repeats the input dataset multiple times. Useful to extend small datasets for debugging
* @param json_lines: Whether to force json or jsonlines parsing
*/
FileSourceStage(std::string filename, int repeat = 1, std::optional<bool> json_lines = std::nullopt);
FileSourceStage(std::string filename,
int repeat = 1,
bool filter_null = true,
std::vector<std::string> filter_null_columns = {},
std::optional<bool> json_lines = std::nullopt);

private:
subscriber_fn_t build();

std::string m_filename;
int m_repeat{1};
bool m_filter_null{true};
std::vector<std::string> m_filter_null_columns;
std::optional<bool> m_json_lines;
};

Expand All @@ -87,16 +94,22 @@ struct FileSourceStageInterfaceProxy
* @param parser_kwargs : Optional arguments to pass to the file parser.
* @return std::shared_ptr<mrc::segment::Object<FileSourceStage>>
*/
static std::shared_ptr<mrc::segment::Object<FileSourceStage>> init(mrc::segment::Builder& builder,
const std::string& name,
std::string filename,
int repeat = 1,
pybind11::dict parser_kwargs = pybind11::dict());
static std::shared_ptr<mrc::segment::Object<FileSourceStage>> init(mrc::segment::Builder& builder,
const std::string& name,
std::filesystem::path filename,
int repeat = 1,
pybind11::dict parser_kwargs = pybind11::dict());
static std::shared_ptr<mrc::segment::Object<FileSourceStage>> init(
mrc::segment::Builder& builder,
const std::string& name,
std::string filename,
int repeat = 1,
bool filter_null = true,
std::vector<std::string> filter_null_columns = {},
pybind11::dict parser_kwargs = pybind11::dict());
static std::shared_ptr<mrc::segment::Object<FileSourceStage>> init(
mrc::segment::Builder& builder,
const std::string& name,
std::filesystem::path filename,
int repeat = 1,
bool filter_null = true,
std::vector<std::string> filter_null_columns = {},
pybind11::dict parser_kwargs = pybind11::dict());
};
#pragma GCC visibility pop
/** @} */ // end of group
Expand Down
28 changes: 26 additions & 2 deletions morpheus/_lib/include/morpheus/utilities/table_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
* limitations under the License.
*/

#include "morpheus/export.h" // for MORPHEUS_EXPORT

#include <cudf/io/types.hpp>
#include <cudf/table/table.hpp> // IWYU pragma: keep

#include <string>
#include <vector>

#pragma once

Expand All @@ -35,12 +38,33 @@ namespace morpheus {
/**
* @brief Structure that encapsulates cuDF table utilities.
*/
struct CuDFTableUtil
struct MORPHEUS_EXPORT CuDFTableUtil
{
/**
* TODO(Documentation)
* @brief Load a table from a file.
*
* @param filename The name of the file to load.
* @return cudf::io::table_with_metadata The table loaded from the file.
*/
static cudf::io::table_with_metadata load_table(const std::string& filename);

/**
* @brief Get the column names from a cudf table_with_metadata.
*
* @param table The table to get the column names from.
* @return std::vector<std::string> The column names.
*/
static std::vector<std::string> get_column_names(const cudf::io::table_with_metadata& table);

/**
* @brief Filters rows from a table that contain null values in a given columns.
* null values in columns other than those specified in `filter_columns` are not considered.
* Any missing columns are ignored.
*
* @param table The table to filter
* @param filter_columns The name of the columns to filter on
*/
static void filter_null_data(cudf::io::table_with_metadata& table, const std::vector<std::string>& filter_columns);
};
/** @} */ // end of group
} // namespace morpheus
16 changes: 7 additions & 9 deletions morpheus/_lib/src/io/deserializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "morpheus/utilities/cudf_util.hpp" // for CudfHelper
#include "morpheus/utilities/stage_util.hpp"
#include "morpheus/utilities/string_util.hpp"
#include "morpheus/utilities/table_util.hpp" // for get_column_names

#include <cudf/column/column.hpp>
#include <cudf/io/csv.hpp>
Expand All @@ -29,8 +30,6 @@
#include <cudf/types.hpp> // for cudf::type_id
#include <pybind11/pybind11.h> // IWYU pragma: keep

#include <algorithm>
#include <iterator>
#include <memory>
#include <regex>
#include <sstream>
Expand All @@ -50,7 +49,9 @@ namespace morpheus {

std::vector<std::string> get_column_names_from_table(const cudf::io::table_with_metadata& table)
{
return foreach_map(table.metadata.schema_info, [](auto schema) { return schema.name; });
return foreach_map(table.metadata.schema_info, [](auto schema) {
return schema.name;
});
}

cudf::io::table_with_metadata load_table_from_file(const std::string& filename,
Expand All @@ -69,7 +70,7 @@ cudf::io::table_with_metadata load_table_from_file(const std::string& filename,
case FileTypes::JSON: {
auto options =
cudf::io::json_reader_options::builder(cudf::io::source_info{filename}).lines(json_lines.value_or(true));
table = cudf::io::read_json(options.build());
table = cudf::io::read_json(options.build());
break;
}
case FileTypes::CSV: {
Expand Down Expand Up @@ -106,12 +107,9 @@ pybind11::object read_file_to_df(const std::string& filename, FileTypes file_typ

int get_index_col_count(const cudf::io::table_with_metadata& data_table)
{
int index_col_count = 0;
auto const& schema = data_table.metadata.schema_info;
int index_col_count = 0;

std::vector<std::string> names;
names.reserve(schema.size());
std::transform(schema.cbegin(), schema.cend(), std::back_inserter(names), [](auto const& c) { return c.name; });
std::vector<std::string> names = CuDFTableUtil::get_column_names(data_table);

// Check if we have a first column with INT64 data type
if (names.size() >= 1 && data_table.tbl->get_column(0).type().id() == cudf::type_id::INT64)
Expand Down
41 changes: 34 additions & 7 deletions morpheus/_lib/src/stages/file_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "morpheus/objects/file_types.hpp"
#include "morpheus/objects/table_info.hpp"
#include "morpheus/utilities/cudf_util.hpp"
#include "morpheus/utilities/table_util.hpp" // for filter_null_data

#include <cudf/types.hpp>
#include <glog/logging.h>
Expand All @@ -37,24 +38,39 @@
#include <memory>
#include <optional>
#include <sstream>
#include <stdexcept> // for invalid_argument
#include <utility>
// IWYU thinks we need __alloc_traits<>::value_type for vector assignments
// IWYU pragma: no_include <ext/alloc_traits.h>

namespace morpheus {
// Component public implementations
// ************ FileSourceStage ************* //
FileSourceStage::FileSourceStage(std::string filename, int repeat, std::optional<bool> json_lines) :
FileSourceStage::FileSourceStage(std::string filename,
int repeat,
bool filter_null,
std::vector<std::string> filter_null_columns,
std::optional<bool> json_lines) :
PythonSource(build()),
m_filename(std::move(filename)),
m_repeat(repeat),
m_filter_null(filter_null),
m_filter_null_columns(std::move(filter_null_columns)),
m_json_lines(json_lines)
{}
{
if (m_filter_null && m_filter_null_columns.empty())
{
throw std::invalid_argument("Filter null columns must not be empty if filter_null is true");
}
}

FileSourceStage::subscriber_fn_t FileSourceStage::build()
{
return [this](rxcpp::subscriber<source_type_t> output) {
auto data_table = load_table_from_file(m_filename, FileTypes::Auto, m_json_lines);
auto data_table = load_table_from_file(m_filename, FileTypes::Auto, m_json_lines);
if (m_filter_null)
{
CuDFTableUtil::filter_null_data(data_table, m_filter_null_columns);
}

int index_col_count = prepare_df_index(data_table);

// Next, create the message metadata. This gets reused for repeats
Expand Down Expand Up @@ -116,6 +132,8 @@ std::shared_ptr<mrc::segment::Object<FileSourceStage>> FileSourceStageInterfaceP
const std::string& name,
std::string filename,
int repeat,
bool filter_null,
std::vector<std::string> filter_null_columns,
pybind11::dict parser_kwargs)
{
std::optional<bool> json_lines = std::nullopt;
Expand All @@ -125,7 +143,8 @@ std::shared_ptr<mrc::segment::Object<FileSourceStage>> FileSourceStageInterfaceP
json_lines = parser_kwargs["lines"].cast<bool>();
}

auto stage = builder.construct_object<FileSourceStage>(name, filename, repeat, json_lines);
auto stage = builder.construct_object<FileSourceStage>(
name, filename, repeat, filter_null, std::move(filter_null_columns), json_lines);

return stage;
}
Expand All @@ -135,8 +154,16 @@ std::shared_ptr<mrc::segment::Object<FileSourceStage>> FileSourceStageInterfaceP
const std::string& name,
std::filesystem::path filename,
int repeat,
bool filter_null,
std::vector<std::string> filter_null_columns,
pybind11::dict parser_kwargs)
{
return init(builder, name, filename.string(), repeat, std::move(parser_kwargs));
return init(builder,
name,
filename.string(),
repeat,
filter_null,
std::move(filter_null_columns),
std::move(parser_kwargs));
}
} // namespace morpheus
45 changes: 43 additions & 2 deletions morpheus/_lib/src/utilities/table_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,24 @@

#include <cudf/io/csv.hpp>
#include <cudf/io/json.hpp>
#include <cudf/stream_compaction.hpp> // for drop_nulls
#include <cudf/types.hpp> // for size_type
#include <glog/logging.h>
#include <pybind11/pybind11.h>

#include <algorithm> // for find, transform
#include <filesystem>
#include <iterator> // for back_insert_iterator, back_inserter
#include <memory> // for unique_ptr
#include <ostream> // needed for logging
#include <stdexcept> // for runtime_error

namespace {
namespace fs = std::filesystem;
namespace py = pybind11;

cudf::io::table_with_metadata morpheus::CuDFTableUtil::load_table(const std::string& filename)
} // namespace
namespace morpheus {
cudf::io::table_with_metadata CuDFTableUtil::load_table(const std::string& filename)
{
auto file_path = fs::path(filename);

Expand All @@ -52,3 +59,37 @@ cudf::io::table_with_metadata morpheus::CuDFTableUtil::load_table(const std::str
throw std::runtime_error("Unknown extension");
}
}

std::vector<std::string> CuDFTableUtil::get_column_names(const cudf::io::table_with_metadata& table)
{
auto const& schema = table.metadata.schema_info;

std::vector<std::string> names;
names.reserve(schema.size());
std::transform(schema.cbegin(), schema.cend(), std::back_inserter(names), [](auto const& c) {
return c.name;
});

return names;
}

void CuDFTableUtil::filter_null_data(cudf::io::table_with_metadata& table,
const std::vector<std::string>& filter_columns)
{
std::vector<cudf::size_type> filter_keys;
auto column_names = get_column_names(table);
for (const auto& column_name : filter_columns)
{
auto found_col = std::find(column_names.cbegin(), column_names.cend(), column_name);
if (found_col != column_names.cend())
{
filter_keys.push_back((found_col - column_names.cbegin()));
}
}

auto tv = table.tbl->view();
auto filtered_table = cudf::drop_nulls(tv, filter_keys, filter_keys.size());

table.tbl.swap(filtered_table);
}
} // namespace morpheus
4 changes: 2 additions & 2 deletions morpheus/_lib/stages/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ class DeserializeMultiMessageStage(mrc.core.segment.SegmentObject):
pass
class FileSourceStage(mrc.core.segment.SegmentObject):
@typing.overload
def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: os.PathLike, repeat: int, parser_kwargs: dict) -> None: ...
def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: os.PathLike, repeat: int, filter_null: bool, filter_null_columns: typing.List[str], parser_kwargs: dict) -> None: ...
@typing.overload
def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: str, repeat: int, parser_kwargs: dict) -> None: ...
def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: str, repeat: int, filter_null: bool, filter_null_columns: typing.List[str], parser_kwargs: dict) -> None: ...
pass
class FilterDetectionsStage(mrc.core.segment.SegmentObject):
def __init__(self, builder: mrc.core.segment.Builder, name: str, threshold: float, copy: bool, filter_source: morpheus._lib.common.FilterSource, field_name: str = 'probs') -> None: ...
Expand Down
24 changes: 19 additions & 5 deletions morpheus/_lib/stages/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include <memory>
#include <sstream>
#include <string>
#include <vector>

namespace morpheus {
namespace py = pybind11;
Expand Down Expand Up @@ -138,20 +139,33 @@ PYBIND11_MODULE(stages, _module)
mrc::segment::ObjectProperties,
std::shared_ptr<mrc::segment::Object<FileSourceStage>>>(
_module, "FileSourceStage", py::multiple_inheritance())
.def(py::init(py::overload_cast<mrc::segment::Builder&, const std::string&, std::string, int, py::dict>(
&FileSourceStageInterfaceProxy::init)),
.def(py::init(py::overload_cast<mrc::segment::Builder&,
const std::string&,
std::string,
int,
bool,
std::vector<std::string>,
py::dict>(&FileSourceStageInterfaceProxy::init)),
py::arg("builder"),
py::arg("name"),
py::arg("filename"),
py::arg("repeat"),
py::arg("filter_null"),
py::arg("filter_null_columns"),
py::arg("parser_kwargs"))
.def(py::init(
py::overload_cast<mrc::segment::Builder&, const std::string&, std::filesystem::path, int, py::dict>(
&FileSourceStageInterfaceProxy::init)),
.def(py::init(py::overload_cast<mrc::segment::Builder&,
const std::string&,
std::filesystem::path,
int,
bool,
std::vector<std::string>,
py::dict>(&FileSourceStageInterfaceProxy::init)),
py::arg("builder"),
py::arg("name"),
py::arg("filename"),
py::arg("repeat"),
py::arg("filter_null"),
py::arg("filter_null_columns"),
py::arg("parser_kwargs"));

py::class_<mrc::segment::Object<FilterDetectionsStage>,
Expand Down
6 changes: 6 additions & 0 deletions morpheus/_lib/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,10 @@ add_morpheus_test(
test_type_util.cpp
)

add_morpheus_test(
NAME table_util
FILES
utilities/test_table_util.cpp
)

list(POP_BACK CMAKE_MESSAGE_CONTEXT)
Loading

0 comments on commit bf80d93

Please sign in to comment.