Skip to content

Commit

Permalink
add drop feature (#393)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-quix authored Jul 17, 2024
1 parent 000d795 commit 21415a0
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 0 deletions.
44 changes: 44 additions & 0 deletions quixstreams/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,40 @@ def hopping_window(
name=name,
)

def drop(self, columns: Union[str, List[str]]) -> Self:
"""
Drop column(s) from the message value (value must support `del`, like a dict).
This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the
original `StreamingDataFrame` is returned for chaining (`sdf.update().print()`).
Example Snippet:
```python
# Remove columns "x" and "y" from the value.
# This would transform {"x": 1, "y": 2, "z": 3} to {"z": 3}
sdf = StreamingDataframe()
sdf.drop(["x", "y"])
```
:param columns: a single column name or a list of names, where names are `str`
:return: a new StreamingDataFrame instance
"""
if isinstance(columns, list):
if not columns:
return self
if not all(isinstance(s, str) for s in columns):
raise TypeError(f"column list must contain strings only")
elif isinstance(columns, str):
columns = [columns]
else:
raise TypeError(
f"Expected a string or a list of strings, not {type(columns)}"
)
return self._add_update(lambda value: _drop(value, columns), metadata=False)

def _produce(
self,
topic: Topic,
Expand Down Expand Up @@ -1111,6 +1145,16 @@ def __bool__(self):
)


def _drop(value: Dict, columns: List[str]):
"""
remove columns from the value, inplace
:param value: a dict or something that supports `del`
:param columns: a list of column names
"""
for column in columns:
del value[column]


def _as_metadata_func(
func: Union[ApplyCallbackStateful, FilterCallbackStateful, UpdateCallbackStateful]
) -> Union[
Expand Down
33 changes: 33 additions & 0 deletions tests/test_quixstreams/test_dataframe/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,39 @@ def test_print(self, dataframe_factory, metadata, expected, capsys):

assert expected in capsys.readouterr().out

@pytest.mark.parametrize(
"columns, expected",
[
("col_a", {"col_b": 2, "col_c": 3}),
(["col_a"], {"col_b": 2, "col_c": 3}),
(["col_a", "col_b"], {"col_c": 3}),
],
)
def test_drop(self, dataframe_factory, columns, expected):
value = {"col_a": 1, "col_b": 2, "col_c": 3}
key, timestamp, headers = b"key", 0, []
sdf = dataframe_factory()
sdf.drop(columns)
assert sdf.test(value=value, key=key, timestamp=timestamp, headers=headers)[
0
] == (expected, key, timestamp, headers)

@pytest.mark.parametrize("columns", [["col_a", 3], b"col_d", {"col_a"}])
def test_drop_invalid_columns(self, dataframe_factory, columns):
sdf = dataframe_factory()
with pytest.raises(TypeError):
sdf.drop(columns)

def test_drop_empty_list(self, dataframe_factory):
"""
Dropping an empty list is ignored entirely.
"""
sdf = dataframe_factory()
pre_drop_stream = sdf.stream.tree()
sdf = sdf.drop([])
post_drop_stream = sdf.stream.tree()
assert pre_drop_stream == post_drop_stream


class TestStreamingDataFrameApplyExpand:
def test_apply_expand(self, dataframe_factory):
Expand Down

0 comments on commit 21415a0

Please sign in to comment.