diff --git a/quixstreams/dataframe/dataframe.py b/quixstreams/dataframe/dataframe.py index fcd11de84..532b15681 100644 --- a/quixstreams/dataframe/dataframe.py +++ b/quixstreams/dataframe/dataframe.py @@ -970,6 +970,10 @@ def drop(self, columns: Union[str, List[str]]) -> Self: """ Drop column(s) from the message value (value must support `del`, like a dict). + This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the + original `StreamingDataFrame` is returned for chaining (`sdf.update().print()`). + + Example Snippet: ```python @@ -977,7 +981,7 @@ def drop(self, columns: Union[str, List[str]]) -> Self: # This would transform {"x": 1, "y": 2, "z": 3} to {"z": 3} sdf = StreamingDataframe() - sdf = sdf.drop(["x", "y"]) + sdf.drop(["x", "y"]) ``` :param columns: a single column name or a list of names, where names are `str` @@ -994,8 +998,7 @@ def drop(self, columns: Union[str, List[str]]) -> Self: raise TypeError( f"Expected a string or a list of strings, not {type(columns)}" ) - stream = self.stream.add_update(lambda value: _drop(value, columns)) - return self.__dataframe_clone__(stream) + return self._add_update(lambda value: _drop(value, columns), metadata=False) def _produce( self, diff --git a/tests/test_quixstreams/test_dataframe/test_dataframe.py b/tests/test_quixstreams/test_dataframe/test_dataframe.py index 04cf9078a..282778ff2 100644 --- a/tests/test_quixstreams/test_dataframe/test_dataframe.py +++ b/tests/test_quixstreams/test_dataframe/test_dataframe.py @@ -402,7 +402,7 @@ def test_drop(self, dataframe_factory, columns, expected): value = {"col_a": 1, "col_b": 2, "col_c": 3} key, timestamp, headers = b"key", 0, [] sdf = dataframe_factory() - sdf = sdf.drop(columns) + sdf.drop(columns) assert sdf.test(value=value, key=key, timestamp=timestamp, headers=headers)[ 0 ] == (expected, key, timestamp, headers)