Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yczhang-nv committed Sep 11, 2024
1 parent f647a7d commit 03e5605
Show file tree
Hide file tree
Showing 13 changed files with 129 additions and 2,238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,3 @@ class DFPMessageMeta(MessageMeta, cpp_class=None):
def __init__(self, df: pd.DataFrame, user_id: str) -> None:
super().__init__(df)
self.user_id = user_id

def get_df(self):
return self.df

def set_df(self, df):
self._df = df
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ def get_user_cache(user_id: str):
def try_build_window(message: MessageMeta, user_id: str) -> typing.Union[MessageMeta, None]:
with get_user_cache(user_id) as user_cache:

# incoming_df = message.get_df()
with message.mutable_dataframe() as dfm:
incoming_df = dfm.to_pandas()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ def _process_events(self, message: ControlMessage):
with message.payload().mutable_dataframe() as df:
df['event_time'] = datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')

# cudf does not support replacing values with different types
# df.replace(np.nan, 'NaN', regex=False, inplace=True)

def on_data(self, message: ControlMessage):
"""Process a message."""
if (not message or message.payload().count == 0):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def process_features(self, message: ControlMessage):
start_time = time.time()

# Process the columns
df_processed = process_dataframe(message.payload().df, self._input_schema)
df_processed = process_dataframe(message.payload().get_data(), self._input_schema)

# Apply the new dataframe, only the rows in the offset
with message.payload().mutable_dataframe() as df:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def _build_window(self, message: DFPMessageMeta) -> ControlMessage:

with self._get_user_cache(user_id) as user_cache:

incoming_df = message.get_df()
incoming_df = message.get_data()
# existing_df = user_cache.df

if (not user_cache.append_dataframe(incoming_df=incoming_df)):
Expand Down

Large diffs are not rendered by default.

Loading

0 comments on commit 03e5605

Please sign in to comment.