Skip to content

Commit

Permalink
add print, allow inplace for update and to_topic
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-quix committed Jul 16, 2024
1 parent fea6dd3 commit 2efa326
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 11 deletions.
66 changes: 56 additions & 10 deletions quixstreams/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import contextvars
import functools
import operator
import pprint
from copy import deepcopy
from datetime import timedelta
from typing import (
Expand Down Expand Up @@ -319,15 +320,14 @@ def func(values: list, state: State):
func=cast(UpdateWithMetadataCallbackStateful, with_metadata_func),
processing_context=self._processing_context,
)
stream = self.stream.add_update(
return self._add_update(
cast(UpdateWithMetadataCallback, stateful_func), metadata=True
)
else:
stream = self.stream.add_update(
return self._add_update(
cast(Union[UpdateCallback, UpdateWithMetadataCallback], func),
metadata=metadata,
)
return self.__dataframe_clone__(stream=stream)

@overload
def filter(self, func: FilterCallback) -> Self: ...
Expand Down Expand Up @@ -570,7 +570,7 @@ def to_topic(
By default, the current message key will be used.
"""
return self.update(
return self._add_update(
lambda value, orig_key, timestamp, headers: self._produce(
topic=topic,
value=value,
Expand Down Expand Up @@ -673,6 +673,47 @@ def _set_headers_callback(
stream = self.stream.add_transform(func=_set_headers_callback)
return self.__dataframe_clone__(stream=stream)

def print(self, pretty: bool = False, metadata: bool = False) -> Self:
"""
Print out the current message value (and optionally, the message metadata) to
stdout (console) (like the built-in `print` function).
Can also output a more dict-friendly format with `pretty=True`.
Reassignment is OPTIONAL for this function (is applied regardless).
> NOTE: prints the current (edited) values, not the original values.
Example Snippet:
```python
from quixstreams import Application
app = Application()
input_topic = app.topic("data")
sdf = app.dataframe(input_topic)
sdf["edited_col"] = sdf["orig_col"] + "edited"
# print the updated message value with the newly added column
sdf.print()
```
:param pretty: Whether to use "pprint" formatting, which uses new-lines and
indents for easier console reading (but might be worse for log parsing).
:param metadata: Whether to additionally print the key, timestamp, and headers
:return: the updated StreamingDataFrame instance (reassignment NOT required).
"""
_PRINT_ARGS = ["value", "key", "timestamp", "headers"]
if pretty:
printer = functools.partial(pprint.pprint, indent=2)
else:
printer = print
return self._add_update(
lambda *args: printer({_PRINT_ARGS[i]: args[i] for i in range(len(args))}),
metadata=metadata,
)

def compose(
self,
sink: Optional[Callable[[Any, Any, int, Any], None]] = None,
Expand Down Expand Up @@ -929,6 +970,14 @@ def _produce(
)
self._producer.produce_row(row=row, topic=topic, key=key, timestamp=timestamp)

def _add_update(
self,
func: Union[UpdateCallback, UpdateWithMetadataCallback],
metadata: bool = False,
):
self._stream = self._stream.add_update(func, metadata=metadata)
return self

def _register_store(self):
"""
Register the default store for input topic in StateStoreManager
Expand Down Expand Up @@ -986,7 +1035,7 @@ def __setitem__(self, item_key: Any, item: Union[Self, object]):
# Update an item key with a result of another sdf.apply()
diff = self.stream.diff(item.stream)
other_sdf_composed = diff.compose_returning()
stream = self.stream.add_update(
self._add_update(
lambda value, key, timestamp, headers: operator.setitem(
value,
item_key,
Expand All @@ -997,18 +1046,15 @@ def __setitem__(self, item_key: Any, item: Union[Self, object]):
elif isinstance(item, StreamingSeries):
# Update an item key with a result of another series
series_composed = item.compose_returning()
stream = self.stream.add_update(
self._add_update(
lambda value, key, timestamp, headers: operator.setitem(
value, item_key, series_composed(value, key, timestamp, headers)[0]
),
metadata=True,
)
else:
# Update an item key with a constant
stream = self.stream.add_update(
lambda value: operator.setitem(value, item_key, item)
)
self._stream = stream
self._add_update(lambda value: operator.setitem(value, item_key, item))

@overload
def __getitem__(self, item: str) -> StreamingSeries: ...
Expand Down
67 changes: 66 additions & 1 deletion tests/test_quixstreams/test_dataframe/test_dataframe.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import operator
import uuid
from collections import namedtuple
Expand Down Expand Up @@ -369,6 +370,26 @@ def test_set_headers(self, original_headers, new_headers, dataframe_factory):
)[0]
assert result == expected

@pytest.mark.parametrize(
"metadata,expected",
[
(False, str({"value": {"x": 1}})),
(
True,
str({"value": {"x": 1}, "key": b"key", "timestamp": 0, "headers": []}),
),
],
)
def test_print(self, dataframe_factory, metadata, expected, capsys):
sdf = dataframe_factory()
sdf.print(metadata=metadata)

value = {"x": 1}
key, timestamp, headers = b"key", 0, []
sdf.test(value=value, key=key, timestamp=timestamp, headers=headers)

assert expected in capsys.readouterr().out


class TestStreamingDataFrameApplyExpand:
def test_apply_expand(self, dataframe_factory):
Expand Down Expand Up @@ -418,14 +439,55 @@ def test_setitem_expand_not_allowed(self, dataframe_factory):
_ = sdf[sdf.apply(lambda v: [v, v], expand=True)]


class TestStreamingDataFrameUpdate:
def test_update_no_reassign(self, dataframe_factory):
"""
"Update" operations should be applied regardless of a reassignment,
and anything else requires assignment.
"""
sdf = dataframe_factory()
sdf_tree_1 = sdf.stream.tree()
sdf_id_1 = id(sdf)

# non-update non-reassignment (no change!)
sdf.apply(lambda v: v)
sdf_tree_2 = sdf.stream.tree()
sdf_id_2 = id(sdf)
assert sdf_id_1 == sdf_id_2
assert sdf_tree_1 == sdf_tree_2

# non-update reassignment
sdf = sdf.apply(lambda v: v)
sdf_tree_3 = sdf.stream.tree()
sdf_id_3 = id(sdf)
assert sdf_id_2 != sdf_id_3
assert sdf_tree_2 != sdf_tree_3

# update non-reassignment
sdf.update(lambda v: v)
sdf_tree_4 = sdf.stream.tree()
sdf_id_4 = id(sdf)
assert sdf_id_3 == sdf_id_4
assert sdf_tree_3 != sdf_tree_4

# update reassignment
sdf = sdf.update(lambda v: v)
sdf_tree_5 = sdf.stream.tree()
sdf_id_5 = id(sdf)
assert sdf_id_4 == sdf_id_5
assert sdf_tree_4 != sdf_tree_5


class TestStreamingDataFrameToTopic:
@pytest.mark.parametrize("reassign", [True, False])
def test_to_topic(
self,
dataframe_factory,
row_consumer_factory,
row_producer_factory,
topic_manager_topic_factory,
message_context_factory,
reassign,
):
topic = topic_manager_topic_factory(
key_serializer="str",
Expand All @@ -436,7 +498,10 @@ def test_to_topic(
producer = row_producer_factory()

sdf = dataframe_factory(producer=producer)
sdf = sdf.to_topic(topic)
if reassign:
sdf = sdf.to_topic(topic)
else:
sdf.to_topic(topic)

value = {"x": 1, "y": 2}
key, timestamp = "key", 10
Expand Down

0 comments on commit 2efa326

Please sign in to comment.