Skip to content

Commit

Permalink
Add sdf.contains() method to check if column exists in a message (#235)
Browse files Browse the repository at this point in the history
Add sdf.contains("key") method that can be passed to SDF as a filter.
  • Loading branch information
harisbotic authored Nov 13, 2023
1 parent 9ca19b0 commit d6e0cf0
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
16 changes: 16 additions & 0 deletions src/StreamingDataFrames/quixstreams/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,22 @@ def producer(self) -> RowProducerProto:
def producer(self, producer: RowProducerProto):
self._real_producer = producer

@staticmethod
def contains(key: str) -> Column:
"""
Check if the key is present in the Row value.
:param key: a column name to check.
:returns: a Column object that evaluates to True if the key is present or False otherwise.
Example:
>>> df = StreamingDataframe()
>>> sdf['has_column'] = sdf.contains('column_x')
# This would add a new column 'has_column' which contains boolean values
# indicating the presence of 'column_x' in each row.
"""
return Column(_eval_func=lambda row: key in row.keys())

def __setitem__(self, key: str, value: Any):
self._apply(lambda row: setitem(key, value, row))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,35 @@ def test_compound_inequality_filter_is_filtered(
row = row_factory({"x": 1, "y": 2})
assert dataframe.process(row) is None

def test_contains_on_existing_column(self, dataframe_factory, row_factory):
dataframe = dataframe_factory()
dataframe["has_column"] = dataframe.contains("x")
row = row_factory({"x": 1})
assert (
dataframe.process(row).value
== row_factory({"x": 1, "has_column": True}).value
)

def test_contains_on_missing_column(self, dataframe_factory, row_factory):
dataframe = dataframe_factory()
dataframe["has_column"] = dataframe.contains("wrong_column")
row = row_factory({"x": 1})
assert (
dataframe.process(row).value
== row_factory({"x": 1, "has_column": False}).value
)

def test_contains_as_filter(self, dataframe_factory, row_factory):
dataframe = dataframe_factory()
dataframe = dataframe[dataframe.contains("x")]

valid_row = row_factory({"x": 1, "y": 2})
valid_result = dataframe.process(valid_row)
assert valid_result is not None and valid_result.value == valid_row.value

invalid_row = row_factory({"y": 2})
assert dataframe.process(invalid_row) is None


class TestDataframeKafka:
def test_to_topic(
Expand Down

0 comments on commit d6e0cf0

Please sign in to comment.