Skip to content

Commit

Permalink
add drop feature
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-quix committed Jul 2, 2024
1 parent 835ef27 commit eb5e1ac
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 0 deletions.
39 changes: 39 additions & 0 deletions quixstreams/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,35 @@ def hopping_window(
name=name,
)

def drop(self, columns: Union[str, List[str]]) -> Self:
"""
Drop column(s) from the message value (value must support `del`, like a dict).
Example Snippet:
```python
# Remove columns "x" and "y" from the value.
# This would transform {"x": 1, "y": 2, "z": 3} to {"z": 3}
sdf = StreamingDataframe()
sdf = sdf.drop(["x", "y"])
```
:param columns: a single column name or a list of names, where names are `str`
:return: a new StreamingDataFrame instance
"""
if isinstance(columns, list):
if not all(isinstance(s, str) for s in columns):
raise TypeError(f"column list must contain strings only")
elif isinstance(columns, str):
columns = [columns]
else:
raise TypeError(
f"Expected a string or a list of strings, not {type(columns)}"
)
stream = self.stream.add_update(lambda value: _drop(value, columns))
return self.__dataframe_clone__(stream)

def _produce(
self,
topic: Topic,
Expand Down Expand Up @@ -1055,6 +1084,16 @@ def __bool__(self):
)


def _drop(value: Dict, columns: List[str]):
"""
remove columns from the value, inplace
:param value: a dict or something that supports `del`
:param columns: a list of column names
"""
for column in columns:
del value[column]


def _as_metadata_func(
func: Union[ApplyCallbackStateful, FilterCallbackStateful, UpdateCallbackStateful]
) -> Union[
Expand Down
23 changes: 23 additions & 0 deletions tests/test_quixstreams/test_dataframe/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,29 @@ def test_set_headers(self, original_headers, new_headers, dataframe_factory):
)[0]
assert result == expected

@pytest.mark.parametrize(
"columns, expected",
[
("col_a", {"col_b": 2, "col_c": 3}),
(["col_a"], {"col_b": 2, "col_c": 3}),
(["col_a", "col_b"], {"col_c": 3}),
],
)
def test_drop(self, dataframe_factory, columns, expected):
value = {"col_a": 1, "col_b": 2, "col_c": 3}
key, timestamp, headers = b"key", 0, []
sdf = dataframe_factory()
sdf = sdf.drop(columns)
assert sdf.test(value=value, key=key, timestamp=timestamp, headers=headers)[
0
] == (expected, key, timestamp, headers)

@pytest.mark.parametrize("columns", [["col_a", 3], b"col_d", {"col_a"}])
def test_drop_invalid_columns(self, dataframe_factory, columns):
sdf = dataframe_factory()
with pytest.raises(TypeError):
sdf.drop(columns)


class TestStreamingDataFrameApplyExpand:
def test_apply_expand(self, dataframe_factory):
Expand Down

0 comments on commit eb5e1ac

Please sign in to comment.