Skip to content

Commit

Permalink
Adding the set_data/get_data properties to MessageMeta
Browse files Browse the repository at this point in the history
  • Loading branch information
mdemoret-nv committed Jun 14, 2024
1 parent 508c107 commit 7b92faf
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 30 deletions.
181 changes: 181 additions & 0 deletions morpheus/messages/message_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import typing
import warnings

import cupy as cp
import numpy as np
import pandas as pd

import cudf
Expand Down Expand Up @@ -90,6 +92,18 @@ def __init__(self, df: DataFrameType) -> None:
self._mutex = threading.RLock()
self._df = df

def _get_col_indexers(self, df, columns: typing.Union[None, str, typing.List[str]] = None):

if (columns is None):
columns = df.columns.to_list()
elif (isinstance(columns, str)):
# Convert a single string into a list so all versions return tables, not series
columns = [columns]

column_indexer = df.columns.get_indexer_for(columns)

return column_indexer

@property
def df(self) -> DataFrameType:
msg = ("Warning the df property returns a copy, please use the copy_dataframe method or the mutable_dataframe "
Expand Down Expand Up @@ -201,6 +215,173 @@ def get_meta_range(self,
# If its a str or list, this is the same
return self._df.loc[idx, columns]

@typing.overload
def get_data(self) -> cudf.DataFrame:
...

@typing.overload
def get_data(self, columns: str) -> cudf.Series:
...

@typing.overload
def get_data(self, columns: typing.List[str]) -> cudf.DataFrame:
...

def get_data(self, columns: typing.Union[None, str, typing.List[str]] = None):
"""
Return column values from the underlying DataFrame.
Parameters
----------
columns : typing.Union[None, str, typing.List[str]]
Input column names. Returns all columns if `None` is specified. When a string is passed, a `Series` is
returned. Otherwise, a `Dataframe` is returned.
Returns
-------
Series or Dataframe
Column values from the dataframe.
"""

with self.mutable_dataframe() as df:
column_indexer = self._get_col_indexers(df, columns=columns)

if (-1 in column_indexer):
missing_columns = [columns[i] for i, index_value in enumerate(column_indexer) if index_value == -1]
raise KeyError(f"Requested columns {missing_columns} does not exist in the dataframe")

if (isinstance(columns, str) and len(column_indexer) == 1):
# Make sure to return a series for a single column
column_indexer = column_indexer[0]

return df.iloc[:, column_indexer]

def set_data(self, columns: typing.Union[None, str, typing.List[str]], value):
"""
Set column values to the underlying DataFrame.
Parameters
----------
columns : typing.Union[None, str, typing.List[str]]
Input column names. Sets the value for the corresponding column names. If `None` is specified, all columns
will be used. If the column does not exist, a new one will be created.
value : Any
Value to apply to the specified columns. If a single value is passed, it will be broadcast to all rows. If a
`Series` or `Dataframe` is passed, rows will be matched by index.
"""

# Get exclusive access to the dataframe
with self.mutable_dataframe() as df:
# First try to set the values on just our slice if the columns exist
column_indexer = self._get_col_indexers(df, columns=columns)

# Check if the value is a cupy array and we have a pandas dataframe, convert to numpy
if (isinstance(value, cp.ndarray) and isinstance(df, pd.DataFrame)):
value = value.get()

# Check to see if we are adding a column. If so, we need to use df.loc instead of df.iloc
if (-1 not in column_indexer):

# If we only have one column, convert it to a series (broadcasts work with more types on a series)
if (len(column_indexer) == 1):
column_indexer = column_indexer[0]

try:
# Now update the slice
df.iloc[:, column_indexer] = value
except (ValueError, TypeError):
# Try this as a fallback. Works better for strings. See issue #286
df[columns].iloc[:] = value

else:
# Columns should never be empty if we get here
assert columns is not None

# cudf is really bad at adding new columns
if (isinstance(df, cudf.DataFrame)):

# TODO(morpheus#1487): This logic no longer works in CUDF 24.04.
# We should find a way to reinable the no-dropped-index path as
# that should be more performant than dropping the index.
# # saved_index = None

# # # Check to see if we can use slices
# # if (not (df.index.is_unique and
# # (df.index.is_monotonic_increasing or df.index.is_monotonic_decreasing))):
# # # Save the index and reset
# # saved_index = df.index
# # df.reset_index(drop=True, inplace=True)

# # # Perform the update via slices
# # df.loc[df.index[row_indexer], columns] = value

# # # Reset the index if we changed it
# # if (saved_index is not None):
# # df.set_index(saved_index, inplace=True)

saved_index = df.index
df.reset_index(drop=True, inplace=True)
df.loc[df.index[:], columns] = value
df.set_index(saved_index, inplace=True)
else:
# Now set the slice
df.loc[:, columns] = value

def get_slice(self, start, stop):
"""
Returns a new MessageMeta with only the rows specified by start/stop.
Parameters
----------
start : int
Start offset address.
stop : int
Stop offset address.
Returns
-------
`MessageMeta`
A new `MessageMeta` with sliced offset and count.
"""

with self.mutable_dataframe() as df:
return MessageMeta(df.iloc[start:stop])

def _ranges_to_mask(self, df, ranges):
if isinstance(df, cudf.DataFrame):
zeros_fn = cp.zeros
else:
zeros_fn = np.zeros

mask = zeros_fn(len(df), bool)

for range_ in ranges:
mask[range_[0]:range_[1]] = True

return mask

def copy_ranges(self, ranges: typing.List[typing.Tuple[int, int]]):
"""
Perform a copy of the current message instance for the given `ranges` of rows.
Parameters
----------
ranges : typing.List[typing.Tuple[int, int]]
Rows to include in the copy in the form of `[(`start_row`, `stop_row`),...]`
The `stop_row` isn't included. For example to copy rows 1-2 & 5-7 `ranges=[(1, 3), (5, 8)]`
Returns
-------
`MessageMeta`
A new `MessageMeta` with only the rows specified by `ranges`.
"""

with self.mutable_dataframe() as df:
mask = self._ranges_to_mask(df, ranges=ranges)
return MessageMeta(df.loc[mask, :])


@dataclasses.dataclass(init=False)
class UserMessageMeta(MessageMeta, cpp_class=None):
Expand Down
1 change: 0 additions & 1 deletion tests/messages/test_control_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

from _utils.dataset_manager import DatasetManager
from morpheus import messages
# pylint: disable=morpheus-incorrect-lib-from-import
from morpheus.messages import TensorMemory

# pylint: disable=unsupported-membership-test
Expand Down
81 changes: 52 additions & 29 deletions tests/messages/test_message_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def fixture_is_sliceable(index_type: typing.Literal['normal', 'skip', 'dup', 'do

return index_type not in ("dup", "updown")


def test_count(df: DataFrameType):

meta = MessageMeta(df)
Expand Down Expand Up @@ -137,16 +138,27 @@ def test_update_dataframe(df: DataFrameType):

# new struct column
col_new_name = "bestsellers"
col_new_struct = [{"book": "A Tale of Two Cities", "year": 1859},
{"book": "The Lord of the Rings", "year": 1954},
{"book": "The Little Prince", "year": 1943},
{"book": "The Hobbit", "year": 1937},
{"book": "And Then There Were None", "year": 1939},
{"book": "Dream of the Red Chamber", "year": 1791},
{"book": "The Lion, the Witch and the Wardrobe", "year": 1950},
{"book": "She: A History of Adventure", "year": 1887},
{"book": "Le Petit Larousse", "year": 1905},
{"book": "Harry Potter and the Philosopher's Stone", "year": 1997}]
col_new_struct = [{
"book": "A Tale of Two Cities", "year": 1859
}, {
"book": "The Lord of the Rings", "year": 1954
}, {
"book": "The Little Prince", "year": 1943
}, {
"book": "The Hobbit", "year": 1937
}, {
"book": "And Then There Were None", "year": 1939
}, {
"book": "Dream of the Red Chamber", "year": 1791
}, {
"book": "The Lion, the Witch and the Wardrobe", "year": 1950
}, {
"book": "She: A History of Adventure", "year": 1887
}, {
"book": "Le Petit Larousse", "year": 1905
}, {
"book": "Harry Potter and the Philosopher's Stone", "year": 1997
}]

# if row_count is more than 10 just replicate the struct column
if row_count > len(col_new_struct):
Expand All @@ -163,7 +175,7 @@ def test_update_dataframe(df: DataFrameType):
assert cdf[col_new_name].isin(col_new_struct).all()

# new int column in range 1-row_count
col_new_int = list(range(1, row_count+1))
col_new_int = list(range(1, row_count + 1))

# replace the struct column with int column
with meta.mutable_dataframe() as df_:
Expand Down Expand Up @@ -198,7 +210,7 @@ def test_update_dataframe(df: DataFrameType):
cdf = meta.copy_dataframe()
# (fixme) Michael: Why is the following assert failing? we cannot
# append rows to df_ but can append to cdf.
#assert cdf.shape[0] == row_count + 1
assert cdf.shape[0] == row_count + 1

# (fixme) remove the duplicated row if the previous step was successful

Expand All @@ -221,7 +233,7 @@ def test_update_dataframe(df: DataFrameType):
assert cdf.iloc[row_idx, col_idx] == old_value

# (fixme): this entire block doesn't work. Michael, expected behavior?
"""

# replace the contents of the first row with the last row
with meta.mutable_dataframe() as df_:
df_.iloc[0] = last_row
Expand All @@ -233,7 +245,7 @@ def test_update_dataframe(df: DataFrameType):
df_.iloc[0] = first_row
cdf = meta.copy_dataframe()
DatasetManager.assert_df_equal(cdf.iloc[0], first_row, assert_msg="Should be identical")
"""


@pytest.mark.use_cpp
def test_update_dataframe_cpp(df: DataFrameType):
Expand All @@ -250,16 +262,27 @@ def test_update_dataframe_cpp(df: DataFrameType):

# new struct column
col_new_name = "bestsellers"
col_new_struct = [{"book": "A Tale of Two Cities", "year": 1859},
{"book": "The Lord of the Rings", "year": 1954},
{"book": "The Little Prince", "year": 1943},
{"book": "The Hobbit", "year": 1937},
{"book": "And Then There Were None", "year": 1939},
{"book": "Dream of the Red Chamber", "year": 1791},
{"book": "The Lion, the Witch and the Wardrobe", "year": 1950},
{"book": "She: A History of Adventure", "year": 1887},
{"book": "Le Petit Larousse", "year": 1905},
{"book": "Harry Potter and the Philosopher's Stone", "year": 1997}]
col_new_struct = [{
"book": "A Tale of Two Cities", "year": 1859
}, {
"book": "The Lord of the Rings", "year": 1954
}, {
"book": "The Little Prince", "year": 1943
}, {
"book": "The Hobbit", "year": 1937
}, {
"book": "And Then There Were None", "year": 1939
}, {
"book": "Dream of the Red Chamber", "year": 1791
}, {
"book": "The Lion, the Witch and the Wardrobe", "year": 1950
}, {
"book": "She: A History of Adventure", "year": 1887
}, {
"book": "Le Petit Larousse", "year": 1905
}, {
"book": "Harry Potter and the Philosopher's Stone", "year": 1997
}]

# if row_count is more than 10 just replicate the struct column
if row_count > len(col_new_struct):
Expand All @@ -270,7 +293,7 @@ def test_update_dataframe_cpp(df: DataFrameType):
# add a struct column in cpp
meta.set_data(col_new_name, col_new_struct)
assert col_new_name in meta.get_column_names()
assert meta.get_data()[col_new_name].isin(col_new_struct).all()
assert meta.get_data()[col_new_name].isin(col_new_struct).all() # pylint: disable=unsubscriptable-object

# swap the contents of the first and last books
first_book = col_new_struct[0]
Expand All @@ -279,21 +302,21 @@ def test_update_dataframe_cpp(df: DataFrameType):
col_new_struct[-1] = first_book
meta.set_data(col_new_name, col_new_struct)
assert col_new_name in meta.get_column_names()
assert meta.get_data()[col_new_name].isin(col_new_struct).all()
assert meta.get_data()[col_new_name].isin(col_new_struct).all() # pylint: disable=unsubscriptable-object

# new int column in range 1-row_count
col_new_int_name = "col_new_int"
col_new_int = list(range(1, row_count+1))
col_new_int = list(range(1, row_count + 1))
# add new int column in cpp
meta.set_data(col_new_int_name, col_new_int)
assert col_new_name in meta.get_column_names()
assert meta.get_data()[col_new_int_name].isin(col_new_int).all()
assert meta.get_data()[col_new_int_name].isin(col_new_int).all() # pylint: disable=unsubscriptable-object

# multiply values in col_new_int by 2
col_new_int = [x * 2 for x in col_new_int]
# update new int column in cpp
meta.set_data(col_new_int_name, col_new_int)
assert meta.get_data()[col_new_int_name].isin(col_new_int).all()
assert meta.get_data()[col_new_int_name].isin(col_new_int).all() # pylint: disable=unsubscriptable-object

# (fixme) how do you remove columns and update individual cells?

Expand Down

0 comments on commit 7b92faf

Please sign in to comment.