diff --git a/pyproject.toml b/pyproject.toml index d4047f41d..7a2c32e4b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -123,7 +123,6 @@ ignore_errors = true [[tool.mypy.overrides]] module = [ - "quixstreams.core.*", "quixstreams.dataframe.*", "quixstreams.rowproducer.*" ] diff --git a/quixstreams/core/stream/functions/apply.py b/quixstreams/core/stream/functions/apply.py index 254caca2b..9609d6cdd 100644 --- a/quixstreams/core/stream/functions/apply.py +++ b/quixstreams/core/stream/functions/apply.py @@ -1,7 +1,13 @@ -from typing import Any +from typing import Any, Literal, Union, overload from .base import StreamFunction -from .types import ApplyCallback, ApplyWithMetadataCallback, VoidExecutor +from .types import ( + ApplyCallback, + ApplyExpandedCallback, + ApplyWithMetadataCallback, + ApplyWithMetadataExpandedCallback, + VoidExecutor, +) __all__ = ("ApplyFunction", "ApplyWithMetadataFunction") @@ -14,22 +20,34 @@ class ApplyFunction(StreamFunction): and its result will always be passed downstream. """ + @overload + def __init__(self, func: ApplyCallback, expand: Literal[False] = False) -> None: ... + + @overload + def __init__(self, func: ApplyExpandedCallback, expand: Literal[True]) -> None: ... + def __init__( self, - func: ApplyCallback, + func: Union[ApplyCallback, ApplyExpandedCallback], expand: bool = False, ): super().__init__(func) + + self.func: Union[ApplyCallback, ApplyExpandedCallback] self.expand = expand def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor: child_executor = self._resolve_branching(*child_executors) + func = self.func if self.expand: def wrapper( - value: Any, key: Any, timestamp: int, headers: Any, func=self.func - ): + value: Any, + key: Any, + timestamp: int, + headers: Any, + ) -> None: # Execute a function on a single value and wrap results into a list # to expand them downstream result = func(value) @@ -39,8 +57,11 @@ def wrapper( else: def wrapper( - value: Any, key: Any, timestamp: int, headers: Any, func=self.func - ): + value: Any, + key: Any, + timestamp: int, + headers: Any, + ) -> None: # Execute a function on a single value and return its result result = func(value) child_executor(result, key, timestamp, headers) @@ -57,20 +78,37 @@ class ApplyWithMetadataFunction(StreamFunction): and its result will always be passed downstream. """ + @overload + def __init__( + self, func: ApplyWithMetadataCallback, expand: Literal[False] + ) -> None: ... + + @overload + def __init__( + self, func: ApplyWithMetadataExpandedCallback, expand: Literal[True] + ) -> None: ... + def __init__( self, func: ApplyWithMetadataCallback, expand: bool = False, ): super().__init__(func) + + self.func: Union[ApplyWithMetadataCallback, ApplyWithMetadataExpandedCallback] self.expand = expand def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor: child_executor = self._resolve_branching(*child_executors) + func = self.func + if self.expand: def wrapper( - value: Any, key: Any, timestamp: int, headers: Any, func=self.func + value: Any, + key: Any, + timestamp: int, + headers: Any, ): # Execute a function on a single value and wrap results into a list # to expand them downstream @@ -81,7 +119,10 @@ def wrapper( else: def wrapper( - value: Any, key: Any, timestamp: int, headers: Any, func=self.func + value: Any, + key: Any, + timestamp: int, + headers: Any, ): # Execute a function on a single value and return its result result = func(value, key, timestamp, headers) diff --git a/quixstreams/core/stream/functions/base.py b/quixstreams/core/stream/functions/base.py index 032055207..24438ef13 100644 --- a/quixstreams/core/stream/functions/base.py +++ b/quixstreams/core/stream/functions/base.py @@ -38,6 +38,9 @@ def _resolve_branching(self, *child_executors: VoidExecutor) -> VoidExecutor: If there's only one executor - copying is not neccessary, and the executor is returned as is. """ + if not child_executors: + raise TypeError("At least one executor is required") + if len(child_executors) > 1: def wrapper( diff --git a/quixstreams/core/stream/functions/filter.py b/quixstreams/core/stream/functions/filter.py index bd7a96a3a..e291880c7 100644 --- a/quixstreams/core/stream/functions/filter.py +++ b/quixstreams/core/stream/functions/filter.py @@ -17,11 +17,18 @@ class FilterFunction(StreamFunction): def __init__(self, func: FilterCallback): super().__init__(func) + self.func: FilterCallback def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor: child_executor = self._resolve_branching(*child_executors) - - def wrapper(value: Any, key: Any, timestamp: int, headers: Any, func=self.func): + func = self.func + + def wrapper( + value: Any, + key: Any, + timestamp: int, + headers: Any, + ): # Filter a single value if func(value): child_executor(value, key, timestamp, headers) @@ -42,11 +49,18 @@ class FilterWithMetadataFunction(StreamFunction): def __init__(self, func: FilterWithMetadataCallback): super().__init__(func) + self.func: FilterWithMetadataCallback def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor: child_executor = self._resolve_branching(*child_executors) - - def wrapper(value: Any, key: Any, timestamp: int, headers: Any, func=self.func): + func = self.func + + def wrapper( + value: Any, + key: Any, + timestamp: int, + headers: Any, + ): # Filter a single value if func(value, key, timestamp, headers): child_executor(value, key, timestamp, headers) diff --git a/quixstreams/core/stream/functions/transform.py b/quixstreams/core/stream/functions/transform.py index dae7872dc..d70084faf 100644 --- a/quixstreams/core/stream/functions/transform.py +++ b/quixstreams/core/stream/functions/transform.py @@ -1,4 +1,4 @@ -from typing import Any, Union +from typing import Any, Literal, Union, cast, overload from .base import StreamFunction from .types import TransformCallback, TransformExpandedCallback, VoidExecutor @@ -21,41 +21,53 @@ class TransformFunction(StreamFunction): The result of the callback will always be passed downstream. """ + @overload + def __init__( + self, func: TransformCallback, expand: Literal[False] = False + ) -> None: ... + + @overload + def __init__( + self, func: TransformExpandedCallback, expand: Literal[True] + ) -> None: ... + def __init__( self, func: Union[TransformCallback, TransformExpandedCallback], expand: bool = False, ): super().__init__(func) + + self.func: Union[TransformCallback, TransformExpandedCallback] self.expand = expand def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor: child_executor = self._resolve_branching(*child_executors) if self.expand: + expanded_callback = cast(TransformExpandedCallback, self.func) def wrapper( value: Any, key: Any, timestamp: int, headers: Any, - func: TransformExpandedCallback = self.func, ): - result = func(value, key, timestamp, headers) + result = expanded_callback(value, key, timestamp, headers) for new_value, new_key, new_timestamp, new_headers in result: child_executor(new_value, new_key, new_timestamp, new_headers) else: + callback = cast(TransformCallback, self.func) def wrapper( value: Any, key: Any, timestamp: int, headers: Any, - func: TransformCallback = self.func, ): # Execute a function on a single value and return its result - new_value, new_key, new_timestamp, new_headers = func( + new_value, new_key, new_timestamp, new_headers = callback( value, key, timestamp, headers ) child_executor(new_value, new_key, new_timestamp, new_headers) diff --git a/quixstreams/core/stream/functions/update.py b/quixstreams/core/stream/functions/update.py index 373e634e6..b2d9a19bc 100644 --- a/quixstreams/core/stream/functions/update.py +++ b/quixstreams/core/stream/functions/update.py @@ -20,10 +20,13 @@ class UpdateFunction(StreamFunction): def __init__(self, func: UpdateCallback): super().__init__(func) + self.func: UpdateCallback + def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor: child_executor = self._resolve_branching(*child_executors) + func = self.func - def wrapper(value: Any, key: Any, timestamp: int, headers: Any, func=self.func): + def wrapper(value: Any, key: Any, timestamp: int, headers: Any): # Update a single value and forward it func(value) child_executor(value, key, timestamp, headers) @@ -45,10 +48,13 @@ class UpdateWithMetadataFunction(StreamFunction): def __init__(self, func: UpdateWithMetadataCallback): super().__init__(func) + self.func: UpdateWithMetadataCallback + def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor: child_executor = self._resolve_branching(*child_executors) + func = self.func - def wrapper(value: Any, key: Any, timestamp: int, headers: Any, func=self.func): + def wrapper(value: Any, key: Any, timestamp: int, headers: Any): # Update a single value and forward it func(value, key, timestamp, headers) child_executor(value, key, timestamp, headers) diff --git a/quixstreams/core/stream/stream.py b/quixstreams/core/stream/stream.py index e7708fd36..8e43ad1e5 100644 --- a/quixstreams/core/stream/stream.py +++ b/quixstreams/core/stream/stream.py @@ -3,31 +3,20 @@ import functools import itertools from time import monotonic_ns -from typing import Any, Callable, List, Optional, Union +from typing import Any, Callable, Deque, List, Optional, Union from typing_extensions import Self from quixstreams.dataframe.exceptions import InvalidOperation from .functions import ( - ApplyCallback, - ApplyExpandedCallback, ApplyFunction, - ApplyWithMetadataCallback, - ApplyWithMetadataExpandedCallback, - ApplyWithMetadataFunction, - FilterCallback, FilterFunction, - FilterWithMetadataCallback, FilterWithMetadataFunction, ReturningExecutor, StreamFunction, - TransformCallback, - TransformExpandedCallback, TransformFunction, - UpdateCallback, UpdateFunction, - UpdateWithMetadataCallback, UpdateWithMetadataFunction, VoidExecutor, ) @@ -84,7 +73,7 @@ def __init__( self.func = func if func is not None else ApplyFunction(lambda value: value) self.parent = parent - self.children = set() + self.children: set[Self] = set() self.generated = monotonic_ns() self.pruned = False @@ -101,114 +90,7 @@ def __repr__(self) -> str: ) return f"<{self.__class__.__name__} [{len(tree_funcs)}]: {funcs_repr}>" - def add_filter( - self, - func: Union[FilterCallback, FilterWithMetadataCallback], - *, - metadata: bool = False, - ) -> Self: - """ - Add a function to filter values from the Stream. - - The return value of the function will be interpreted as `bool`. - If the function returns `False`-like result, the Stream will raise `Filtered` - exception during execution. - - :param func: a function to filter values from the stream - :param metadata: if True, the callback will receive key and timestamp along with - the value. - Default - `False`. - :return: a new `Stream` derived from the current one - """ - if metadata: - filter_func = FilterWithMetadataFunction(func) - else: - filter_func = FilterFunction(func) - return self._add(filter_func) - - def add_apply( - self, - func: Union[ - ApplyCallback, - ApplyExpandedCallback, - ApplyWithMetadataCallback, - ApplyWithMetadataExpandedCallback, - ], - *, - expand: bool = False, - metadata: bool = False, - ) -> Self: - """ - Add an "apply" function to the Stream. - - The function is supposed to return a new value, which will be passed - further during execution. - - :param func: a function to generate a new value - :param expand: if True, expand the returned iterable into individual values - downstream. If returned value is not iterable, `TypeError` will be raised. - Default - `False`. - :param metadata: if True, the callback will receive key and timestamp along with - the value. - Default - `False`. - :return: a new `Stream` derived from the current one - """ - if metadata: - apply_func = ApplyWithMetadataFunction(func, expand=expand) - else: - apply_func = ApplyFunction(func, expand=expand) - return self._add(apply_func) - - def add_update( - self, - func: Union[UpdateCallback, UpdateWithMetadataCallback], - *, - metadata: bool = False, - ) -> Self: - """ - Add an "update" function to the Stream, that will mutate the input value. - - The return of this function will be ignored and its input - will be passed downstream. - - :param func: a function to mutate the value - :param metadata: if True, the callback will receive key and timestamp along with - the value. - Default - `False`. - :return: a new Stream derived from the current one - """ - if metadata: - update_func = UpdateWithMetadataFunction(func) - else: - update_func = UpdateFunction(func) - return self._add(update_func) - - def add_transform( - self, - func: Union[TransformCallback, TransformExpandedCallback], - *, - expand: bool = False, - ) -> Self: - """ - Add a "transform" function to the Stream, that will mutate the input value. - - The callback must accept a value, a key, and a timestamp. - It's expected to return a new value, new key and new timestamp. - - The result of the callback which will be passed downstream - during execution. - - - :param func: a function to mutate the value - :param expand: if True, expand the returned iterable into individual items - downstream. If returned value is not iterable, `TypeError` will be raised. - Default - `False`. - :return: a new Stream derived from the current one - """ - - return self._add(TransformFunction(func, expand=expand)) - - def diff(self, other: "Stream") -> Self: + def diff(self, other: Self) -> Optional[Self]: """ Takes the difference between Streams `self` and `other` based on their last common parent, and returns a new, independent `Stream` that includes only @@ -369,7 +251,7 @@ def compose_returning(self) -> ReturningExecutor: # after executing the Stream. # The composed stream must have only the "apply" functions, # which always return a single. - buffer = collections.deque(maxlen=1) + buffer: Deque[tuple[Any, Any, int, Any]] = collections.deque(maxlen=1) composed = self.compose( allow_filters=False, allow_expands=False, @@ -394,29 +276,23 @@ def wrapper(value: Any, key: Any, timestamp: int, headers: Any) -> Any: def _compose( self, tree: List[Self], - composed: List[Callable[[Any, Any, int, Any], None]], + composed: Union[VoidExecutor, List[VoidExecutor]], allow_filters: bool, allow_updates: bool, allow_expands: bool, allow_transforms: bool, - ) -> VoidExecutor: + ) -> Union[VoidExecutor, List[VoidExecutor]]: functions = [node.func for node in tree] # Iterate over a reversed list of functions for func in reversed(functions): - # Validate that only allowed functions are passed - if not allow_updates and isinstance( - func, (UpdateFunction, UpdateWithMetadataFunction) - ): - raise ValueError("Update functions are not allowed") - elif not allow_filters and isinstance( - func, (FilterFunction, FilterWithMetadataFunction) - ): - raise ValueError("Filter functions are not allowed") - elif not allow_transforms and isinstance(func, TransformFunction): - raise ValueError("Transform functions are not allowed") - elif not allow_expands and func.expand: - raise ValueError("Expand functions are not allowed") + self._validate_func( + func, + allow_filters=allow_filters, + allow_updates=allow_updates, + allow_expands=allow_expands, + allow_transforms=allow_transforms, + ) composed = func.get_executor( *composed if isinstance(composed, list) else [composed] @@ -424,6 +300,28 @@ def _compose( return composed + def _validate_func( + self, + func, + allow_filters: bool, + allow_updates: bool, + allow_expands: bool, + allow_transforms: bool, + ) -> None: + # Validate that only allowed functions are passed + if not allow_updates and isinstance( + func, (UpdateFunction, UpdateWithMetadataFunction) + ): + raise ValueError("Update functions are not allowed") + elif not allow_filters and isinstance( + func, (FilterFunction, FilterWithMetadataFunction) + ): + raise ValueError("Filter functions are not allowed") + elif not allow_transforms and isinstance(func, TransformFunction): + raise ValueError("Transform functions are not allowed") + elif not allow_expands and func.expand: + raise ValueError("Expand functions are not allowed") + def _diff_from_last_common_parent(self, other: Self) -> List[Self]: nodes_self = self.root_path() nodes_other = other.root_path() @@ -442,6 +340,11 @@ def _diff_from_last_common_parent(self, other: Self) -> List[Self]: raise ValueError("The diff is empty") return diff + def add(self, func: StreamFunction) -> Self: + new_node = self.__class__(func=func, parent=self) + self.children.add(new_node) + return new_node + def _add(self, func: StreamFunction) -> Self: new_node = self.__class__(func=func, parent=self) self.children.add(new_node) @@ -461,7 +364,8 @@ def _prune(self, other: Self): if other.pruned: raise ValueError("Node has already been pruned") other.pruned = True - node = self + + node: Optional[Self] = self while node: if other in node.children: node.children.remove(other) diff --git a/quixstreams/dataframe/dataframe.py b/quixstreams/dataframe/dataframe.py index cf4fad9b3..01978896e 100644 --- a/quixstreams/dataframe/dataframe.py +++ b/quixstreams/dataframe/dataframe.py @@ -26,12 +26,19 @@ ) from quixstreams.core.stream import ( ApplyCallback, + ApplyFunction, ApplyWithMetadataCallback, + ApplyWithMetadataFunction, FilterCallback, + FilterFunction, FilterWithMetadataCallback, + FilterWithMetadataFunction, Stream, + TransformFunction, UpdateCallback, + UpdateFunction, UpdateWithMetadataCallback, + UpdateWithMetadataFunction, VoidExecutor, ) from quixstreams.models import ( @@ -229,13 +236,14 @@ def func(d: dict, state: State): func=with_metadata_func, processing_context=self._processing_context, ) - stream = self.stream.add_apply(stateful_func, expand=expand, metadata=True) - else: - stream = self.stream.add_apply( - cast(Union[ApplyCallback, ApplyWithMetadataCallback], func), - expand=expand, - metadata=metadata, + + stream = self.stream.add( + ApplyWithMetadataFunction(stateful_func, expand=expand) ) + elif metadata: + stream = self.stream.add(ApplyWithMetadataFunction(func, expand=expand)) + else: + stream = self.stream.add(ApplyFunction(func, expand=expand)) return self.__dataframe_clone__(stream=stream) @overload @@ -399,21 +407,18 @@ def func(d: dict, state: State): if stateful: self._register_store() # Force the callback to accept metadata - with_metadata_func = ( - func - if metadata - else _as_metadata_func(cast(FilterCallbackStateful, func)) - ) + with_metadata_func = func if metadata else _as_metadata_func(func) stateful_func = _as_stateful( func=cast(FilterWithMetadataCallbackStateful, with_metadata_func), processing_context=self._processing_context, ) - stream = self.stream.add_filter(stateful_func, metadata=True) + + stream = self.stream.add(FilterWithMetadataFunction(stateful_func)) + elif metadata: + stream = self.stream.add(FilterWithMetadataFunction(func)) else: - stream = self.stream.add_filter( - cast(Union[FilterCallback, FilterWithMetadataCallback], func), - metadata=metadata, - ) + stream = self.stream.add(FilterFunction(func)) + return self.__dataframe_clone__(stream=stream) @overload @@ -619,7 +624,7 @@ def _set_timestamp_callback( new_timestamp = func(value, key, timestamp, headers) return value, key, new_timestamp, headers - stream = self.stream.add_transform(func=_set_timestamp_callback) + stream = self.stream.add(TransformFunction(_set_timestamp_callback)) return self.__dataframe_clone__(stream=stream) def set_headers( @@ -670,7 +675,7 @@ def _set_headers_callback( new_headers = func(value, key, timestamp, headers) return value, key, timestamp, new_headers - stream = self.stream.add_transform(func=_set_headers_callback) + stream = self.stream.add(TransformFunction(_set_headers_callback)) return self.__dataframe_clone__(stream=stream) def print(self, pretty: bool = True, metadata: bool = False) -> Self: @@ -1136,7 +1141,10 @@ def _add_update( func: Union[UpdateCallback, UpdateWithMetadataCallback], metadata: bool = False, ): - self._stream = self._stream.add_update(func, metadata=metadata) + if metadata: + self._stream = self._stream.add(UpdateWithMetadataFunction(func)) + else: + self._stream = self._stream.add(UpdateFunction(func)) return self def _register_store(self): @@ -1272,6 +1280,24 @@ def _drop(value: Dict, columns: List[str], ignore_missing: bool = False): raise +@overload +def _as_metadata_func( + func: ApplyCallbackStateful, +) -> ApplyWithMetadataCallbackStateful: ... + + +@overload +def _as_metadata_func( + func: FilterCallbackStateful, +) -> FilterWithMetadataCallbackStateful: ... + + +@overload +def _as_metadata_func( + func: UpdateCallbackStateful, +) -> UpdateWithMetadataCallbackStateful: ... + + def _as_metadata_func( func: Union[ApplyCallbackStateful, FilterCallbackStateful, UpdateCallbackStateful], ) -> Union[ diff --git a/quixstreams/dataframe/series.py b/quixstreams/dataframe/series.py index d3987610d..a295e2b6b 100644 --- a/quixstreams/dataframe/series.py +++ b/quixstreams/dataframe/series.py @@ -183,7 +183,7 @@ def func(value: str, state: State): :param func: a callable with one argument and one output :return: a new `StreamingSeries` with the new callable added """ - child = self._stream.add_apply(func) + child = self._stream.add(ApplyFunction(func)) return self.__class__(stream=child, sdf_id=self._sdf_id) def compose_returning(self) -> ReturningExecutor: diff --git a/quixstreams/dataframe/windows/time_based.py b/quixstreams/dataframe/windows/time_based.py index 3bdd2d982..49672cc48 100644 --- a/quixstreams/dataframe/windows/time_based.py +++ b/quixstreams/dataframe/windows/time_based.py @@ -12,9 +12,7 @@ ) from quixstreams.context import message_context -from quixstreams.core.stream import ( - TransformExpandedCallback, -) +from quixstreams.core.stream import TransformExpandedCallback, TransformFunction from quixstreams.processing import ProcessingContext from quixstreams.state import WindowedPartitionTransaction, WindowedState @@ -213,7 +211,9 @@ def _apply_window( # to avoid adding "transform" API to it. # Transform callbacks can modify record key and timestamp, # and it's prone to misuse. - stream = self._dataframe.stream.add_transform(func=windowed_func, expand=True) + stream = self._dataframe.stream.add( + TransformFunction(windowed_func, expand=True) + ) return self._dataframe.__dataframe_clone__(stream=stream) diff --git a/tests/test_quixstreams/test_core/test_stream.py b/tests/test_quixstreams/test_core/test_stream.py index 2d04dd6f9..16d482fe2 100644 --- a/tests/test_quixstreams/test_core/test_stream.py +++ b/tests/test_quixstreams/test_core/test_stream.py @@ -5,9 +5,12 @@ from quixstreams.core.stream import Stream from quixstreams.core.stream.functions import ( ApplyFunction, + ApplyWithMetadataFunction, FilterFunction, + FilterWithMetadataFunction, TransformFunction, UpdateFunction, + UpdateWithMetadataFunction, ) from quixstreams.dataframe.exceptions import InvalidOperation @@ -16,13 +19,13 @@ class TestStream: def test_add_apply(self): - stream = Stream().add_apply(lambda v: v + 1) + stream = Stream().add(ApplyFunction(lambda v: v + 1)) sink = Sink() stream.compose(sink=sink.append_record)(1, "key", 0, []) assert sink[0] == (2, "key", 0, []) def test_add_update(self): - stream = Stream().add_update(lambda v: v.append(1)) + stream = Stream().add(UpdateFunction(lambda v: v.append(1))) result = Sink() stream.compose(sink=result.append_record)([0], "key", 0, []) assert result[0] == ([0, 1], "key", 0, []) @@ -35,7 +38,7 @@ def test_add_update(self): ], ) def test_add_filter(self, value, key, timestamp, headers, expected): - stream = Stream().add_filter(lambda v: v == 0) + stream = Stream().add(FilterFunction(lambda v: v == 0)) result = Sink() stream.compose(sink=result.append_record)(value, key, timestamp, headers) assert result == expected @@ -43,10 +46,10 @@ def test_add_filter(self, value, key, timestamp, headers, expected): def test_root_path(self): stream = ( Stream() - .add_apply(lambda v: ...) - .add_filter(lambda v: ...) - .add_update(lambda v: ...) - .add_transform(lambda v, k, t, h: ...) + .add(ApplyFunction(lambda v: ...)) + .add(FilterFunction(lambda v: ...)) + .add(UpdateFunction(lambda v: ...)) + .add(TransformFunction(lambda v, k, t, h: ...)) ) tree = stream.root_path() assert len(tree) == 5 @@ -58,11 +61,11 @@ def test_root_path(self): def test_diff_success(self): stream = Stream() - stream = stream.add_apply(lambda v: v) + stream = stream.add(ApplyFunction(lambda v: v)) stream2 = ( - stream.add_apply(lambda v: v) - .add_update(lambda v: v) - .add_filter(lambda v: v) + stream.add(ApplyFunction(lambda v: v)) + .add(UpdateFunction(lambda v: v)) + .add(FilterFunction(lambda v: v)) ) diff = stream.diff(stream2) @@ -75,23 +78,23 @@ def test_diff_success(self): def test_diff_differing_origin_fails(self): stream = Stream() - stream = stream.add_apply(lambda v: v) + stream = stream.add(ApplyFunction(lambda v: v)) stream2 = ( - stream.add_apply(lambda v: v) - .add_update(lambda v: v) - .add_filter(lambda v: v) + stream.add(ApplyFunction(lambda v: v)) + .add(UpdateFunction(lambda v: v)) + .add(FilterFunction(lambda v: v)) ) - stream = stream.add_apply(lambda v: v) + stream = stream.add(ApplyFunction(lambda v: v)) with pytest.raises(InvalidOperation): stream.diff(stream2) def test_diff_shared_origin_with_additional_split_fails(self): stream = Stream() - stream = stream.add_apply(lambda v: v) - stream2 = stream.add_apply(lambda v: v) - stream3 = stream2.add_apply(lambda v: v) # noqa: F841 - stream2 = stream2.add_apply(lambda v: v) + stream = stream.add(ApplyFunction(lambda v: v)) + stream2 = stream.add(ApplyFunction(lambda v: v)) + stream3 = stream2.add(ApplyFunction(lambda v: v)) # noqa: F841 + stream2 = stream2.add(ApplyFunction(lambda v: v)) with pytest.raises(InvalidOperation): stream.diff(stream2) @@ -103,7 +106,7 @@ def test_diff_empty_same_stream_fails(self): def test_diff_empty_stream_full_child_fails(self): stream = Stream() - stream2 = stream.add_apply(lambda v: v) + stream2 = stream.add(ApplyFunction(lambda v: v)) with pytest.raises(ValueError, match="The diff is empty"): stream2.diff(stream) @@ -114,31 +117,33 @@ def test_diff_no_common_parent_fails(self): stream.diff(stream2) def test_compose_allow_filters_false(self): - stream = Stream().add_filter(lambda v: v) + stream = Stream().add(FilterFunction(lambda v: v)) with pytest.raises(ValueError, match="Filter functions are not allowed"): stream.compose(allow_filters=False) def test_compose_allow_updates_false(self): - stream = Stream().add_update(lambda v: v) + stream = Stream().add(UpdateFunction(lambda v: v)) with pytest.raises(ValueError, match="Update functions are not allowed"): stream.compose(allow_updates=False) def test_compose_allow_transforms_false(self): - stream = Stream().add_transform(lambda value, key, timestamp, headers: ...) + stream = Stream().add( + TransformFunction(lambda value, key, timestamp, headers: ...) + ) with pytest.raises(ValueError, match="Transform functions are not allowed"): stream.compose(allow_transforms=False) def test_repr(self): stream = ( Stream() - .add_apply(lambda v: v) - .add_update(lambda v: v) - .add_filter(lambda v: v) + .add(ApplyFunction(lambda v: v)) + .add(UpdateFunction(lambda v: v)) + .add(FilterFunction(lambda v: v)) ) repr(stream) def test_apply_expand(self): - stream = Stream().add_apply(lambda v: [v, v], expand=True) + stream = Stream().add(ApplyFunction(lambda v: [v, v], expand=True)) result = Sink() value, key, timestamp, headers = 1, "key", 1, [] @@ -149,15 +154,15 @@ def test_apply_expand(self): ] def test_apply_expand_not_iterable_returned(self): - stream = Stream().add_apply(lambda v: 1, expand=True) + stream = Stream().add(ApplyFunction(lambda v: 1, expand=True)) with pytest.raises(TypeError): stream.compose()(1, "key", 0, []) def test_apply_expand_multiple(self): stream = ( Stream() - .add_apply(lambda v: [v + 1, v + 1], expand=True) - .add_apply(lambda v: [v, v + 1], expand=True) + .add(ApplyFunction(lambda v: [v + 1, v + 1], expand=True)) + .add(ApplyFunction(lambda v: [v, v + 1], expand=True)) ) result = Sink() value, key, timestamp, headers = 1, "key", 1, [("key", b"value")] @@ -172,9 +177,9 @@ def test_apply_expand_multiple(self): def test_apply_expand_filter_all_filtered(self): stream = ( Stream() - .add_apply(lambda v: [v, v], expand=True) - .add_apply(lambda v: [v, v], expand=True) - .add_filter(lambda v: v != 1) + .add(ApplyFunction(lambda v: [v, v], expand=True)) + .add(ApplyFunction(lambda v: [v, v], expand=True)) + .add(FilterFunction(lambda v: v != 1)) ) result = Sink() stream.compose(sink=result.append_record)(1, "key", 0, []) @@ -183,9 +188,9 @@ def test_apply_expand_filter_all_filtered(self): def test_apply_expand_filter_some_filtered(self): stream = ( Stream() - .add_apply(lambda v: [v, v + 1], expand=True) - .add_filter(lambda v: v != 1) - .add_apply(lambda v: [v, v], expand=True) + .add(ApplyFunction(lambda v: [v, v + 1], expand=True)) + .add(FilterFunction(lambda v: v != 1)) + .add(ApplyFunction(lambda v: [v, v], expand=True)) ) result = Sink() key, timestamp, headers = "key", 1, None @@ -195,8 +200,8 @@ def test_apply_expand_filter_some_filtered(self): def test_apply_expand_update(self): stream = ( Stream() - .add_apply(lambda v: [{"x": v}, {"x": v + 1}], expand=True) - .add_update(lambda v: setitem(v, "x", v["x"] + 1)) + .add(ApplyFunction(lambda v: [{"x": v}, {"x": v + 1}], expand=True)) + .add(UpdateFunction(lambda v: setitem(v, "x", v["x"] + 1))) ) result = Sink() key, timestamp, headers = "key", 123, None @@ -209,9 +214,9 @@ def test_apply_expand_update(self): def test_apply_expand_update_filter(self): stream = ( Stream() - .add_apply(lambda v: [{"x": v}, {"x": v + 1}], expand=True) - .add_update(lambda v: setitem(v, "x", v["x"] + 1)) - .add_filter(lambda v: v["x"] != 2) + .add(ApplyFunction(lambda v: [{"x": v}, {"x": v + 1}], expand=True)) + .add(UpdateFunction(lambda v: setitem(v, "x", v["x"] + 1))) + .add(FilterFunction(lambda v: v["x"] != 2)) ) result = Sink() key, timestamp, headers = "key", 123, [] @@ -219,23 +224,26 @@ def test_apply_expand_update_filter(self): assert result == [({"x": 3}, key, timestamp, headers)] def test_compose_allow_expands_false(self): - stream = Stream().add_apply(lambda v: [{"x": v}, {"x": v + 1}], expand=True) + stream = Stream().add( + ApplyFunction(lambda v: [{"x": v}, {"x": v + 1}], expand=True) + ) with pytest.raises(ValueError, match="Expand functions are not allowed"): stream.compose(allow_expands=False) def test_add_apply_with_metadata(self): - stream = Stream().add_apply( - lambda v, key, timestamp, headers: v + 1, metadata=True + stream = Stream().add( + ApplyWithMetadataFunction(lambda v, key, timestamp, headers: v + 1) ) sink = Sink() stream.compose(sink=sink.append_record)(1, "key", 0, None) assert sink[0] == (2, "key", 0, None) def test_apply_record_with_metadata_expanded(self): - stream = Stream().add_apply( - lambda value_, key_, timestamp_, headers_: [value_, value_], - expand=True, - metadata=True, + stream = Stream().add( + ApplyWithMetadataFunction( + lambda value_, key_, timestamp_, headers_: [value_, value_], + expand=True, + ) ) result = Sink() value, key, timestamp, headers = 1, "key", 1, [] @@ -247,8 +255,10 @@ def test_apply_record_with_metadata_expanded(self): ] def test_add_update_with_metadata(self): - stream = Stream().add_update( - lambda value, key, timestamp, headers: value.append(1), metadata=True + stream = Stream().add( + UpdateWithMetadataFunction( + lambda value, key, timestamp, headers: value.append(1) + ) ) result = Sink() stream.compose(sink=result.append_record)([0], "key", 0, []) @@ -262,15 +272,17 @@ def test_add_update_with_metadata(self): ], ) def test_add_filter_with_metadata(self, value, key, timestamp, headers, expected): - stream = Stream().add_filter( - lambda value_, key_, timestamp_, headers_: value_ == 0, metadata=True + stream = Stream().add( + FilterWithMetadataFunction( + lambda value_, key_, timestamp_, headers_: value_ == 0 + ) ) result = Sink() stream.compose(sink=result.append_record)(value, key, timestamp, headers) assert result == expected def test_compose_returning(self): - stream = Stream().add_apply(lambda v: v + 1) + stream = Stream().add(ApplyFunction(lambda v: v + 1)) assert stream.compose_returning()(1, "key", 0, []) == (2, "key", 0, []) assert stream.compose_returning()(2, "key", 0, []) == (3, "key", 0, []) @@ -285,7 +297,7 @@ def _fail(value): raise ValueError("fail") return value + 1 - stream = Stream().add_apply(_fail) + stream = Stream().add(ApplyFunction(_fail)) assert stream.compose_returning()(2, "key", 0, None) == (3, "key", 0, None) with pytest.raises(ValueError): assert stream.compose_returning()(1, "key", 0, None) == (3, "key", 0, None) @@ -294,30 +306,38 @@ def _fail(value): @pytest.mark.parametrize( "stream, err", [ - (Stream().add_update(lambda v: ...), "Update functions are not allowed"), ( - Stream().add_update(lambda v, k, t, h: ..., metadata=True), + Stream().add(UpdateFunction(lambda v: ...)), "Update functions are not allowed", ), - (Stream().add_filter(lambda v: ...), "Filter functions are not allowed"), ( - Stream().add_filter(lambda v, k, t, h: ..., metadata=True), + Stream().add(UpdateWithMetadataFunction(lambda v, k, t, h: ...)), + "Update functions are not allowed", + ), + ( + Stream().add(FilterFunction(lambda v: ...)), + "Filter functions are not allowed", + ), + ( + Stream().add(FilterWithMetadataFunction(lambda v, k, t, h: ...)), "Filter functions are not allowed", ), ( - Stream().add_apply(lambda v: ..., expand=True), + Stream().add(ApplyFunction(lambda v: ..., expand=True)), "Expand functions are not allowed", ), ( - Stream().add_apply(lambda v, k, t, h: ..., expand=True, metadata=True), + Stream().add( + ApplyWithMetadataFunction(lambda v, k, t, h: ..., expand=True) + ), "Expand functions are not allowed", ), ( - Stream().add_transform(lambda v, k, t, h: ..., expand=True), + Stream().add(TransformFunction(lambda v, k, t, h: ..., expand=True)), "Transform functions are not allowed", ), ( - Stream().add_transform(lambda v, k, t, h: ...), + Stream().add(TransformFunction(lambda v, k, t, h: ...)), "Transform functions are not allowed", ), ], @@ -327,12 +347,14 @@ def test_compose_returning_not_allowed_operations_fails(self, stream, err): stream.compose_returning() def test_transform_record(self): - stream = Stream().add_transform( - lambda value, key, timestamp, headers: ( - value + 1, - key + "1", - timestamp + 1, - [("key", b"value")], + stream = Stream().add( + TransformFunction( + lambda value, key, timestamp, headers: ( + value + 1, + key + "1", + timestamp + 1, + [("key", b"value")], + ) ) ) result = Sink() @@ -340,12 +362,14 @@ def test_transform_record(self): assert result[0] == (1, "key1", 1, [("key", b"value")]) def test_transform_record_expanded(self): - stream = Stream().add_transform( - lambda value, key, timestamp, headers: [ - (value + 1, key + "1", timestamp + 1, [("key", b"value")]), - (value + 2, key + "2", timestamp + 2, [("key", b"value2")]), - ], - expand=True, + stream = Stream().add( + TransformFunction( + lambda value, key, timestamp, headers: [ + (value + 1, key + "1", timestamp + 1, [("key", b"value")]), + (value + 2, key + "2", timestamp + 2, [("key", b"value2")]), + ], + expand=True, + ) ) result = Sink() stream.compose(sink=result.append_record)(0, "key", 0, []) @@ -366,10 +390,10 @@ def wrapper(value): return wrapper - stream = Stream().add_apply(add_n(1)) - stream.add_apply(add_n(10)) - stream.add_apply(add_n(20)) - stream = stream.add_apply(add_n(100)) + stream = Stream().add(ApplyFunction(add_n(1))) + stream.add(ApplyFunction(add_n(10))) + stream.add(ApplyFunction(add_n(20))) + stream = stream.add(ApplyFunction(add_n(100))) sink = Sink() extras = ("key", 0, []) stream.compose(sink=sink.append_record)(0, *extras) @@ -390,17 +414,17 @@ def test_branch_with_update_copies_value(self): expected = [([1], key, timestamp, headers), ([2], key, timestamp, headers)] stream = Stream() - stream.add_update(lambda value_: value_.append(1)) - stream.add_update(lambda value_: value_.append(2)) + stream.add(UpdateFunction(lambda value_: value_.append(1))) + stream.add(UpdateFunction(lambda value_: value_.append(2))) sink = Sink() stream.compose(sink=sink.append_record)(value, key, timestamp, headers) assert sink == expected def test_chained_branches(self): - stream = Stream().add_apply(lambda v: v + 1) - stream.add_apply(lambda v: v + 10).add_apply(lambda v: v + 20) - stream = stream.add_apply(lambda v: v + 100) + stream = Stream().add(ApplyFunction(lambda v: v + 1)) + stream.add(ApplyFunction(lambda v: v + 10)).add(ApplyFunction(lambda v: v + 20)) + stream = stream.add(ApplyFunction(lambda v: v + 100)) sink = Sink() extras = ("key", 0, []) stream.compose(sink=sink.append_record)(0, *extras) @@ -409,11 +433,11 @@ def test_chained_branches(self): assert sink == expected def test_longer_branches(self): - stream = Stream().add_apply(lambda v: v + 1) - stream = stream.add_apply(lambda v: v + 2) - stream_2 = stream.add_apply(lambda v: v + 10) - stream_2.add_apply(lambda v: v + 20) - stream = stream.add_apply(lambda v: v + 100) + stream = Stream().add(ApplyFunction(lambda v: v + 1)) + stream = stream.add(ApplyFunction(lambda v: v + 2)) + stream_2 = stream.add(ApplyFunction(lambda v: v + 10)) + stream_2.add(ApplyFunction(lambda v: v + 20)) + stream = stream.add(ApplyFunction(lambda v: v + 100)) sink = Sink() extras = ("key", 0, []) stream.compose(sink=sink.append_record)(0, *extras) @@ -438,18 +462,24 @@ def test_multiple_branches(self): :return: """ - stream = Stream().add_apply(lambda v: v + 120).add_apply(lambda v: v // 2) # 60 - stream_2 = stream.add_apply(lambda v: v // 3) # 20 - stream_3 = stream_2.add_apply(lambda v: v + 10).add_apply( # noqa: F841 - lambda v: v + 3 + stream = ( + Stream() + .add(ApplyFunction(lambda v: v + 120)) + .add(ApplyFunction(lambda v: v // 2)) + ) # 60 + stream_2 = stream.add(ApplyFunction(lambda v: v // 3)) # 20 + stream_3 = stream_2.add(ApplyFunction(lambda v: v + 10)).add( # noqa: F841 + ApplyFunction(lambda v: v + 3) ) # 33 - stream_4 = stream_2.add_apply(lambda v: v + 24) # 44 # noqa: F841 - stream_2 = stream_2.add_apply(lambda v: v + 2) # 22 - stream = stream.add_apply(lambda v: v + 40) # 100 - stream_5 = stream.add_apply(lambda v: v // 2).add_apply( # noqa: F841 - lambda v: v + 5 + stream_4 = stream_2.add(ApplyFunction(lambda v: v + 24)) # 44 # noqa: F841 + stream_2 = stream_2.add(ApplyFunction(lambda v: v + 2)) # 22 + stream = stream.add(ApplyFunction(lambda v: v + 40)) # 100 + stream_5 = stream.add(ApplyFunction(lambda v: v // 2)).add( # noqa: F841 + ApplyFunction(lambda v: v + 5) ) # 55 - stream = stream.add_apply(lambda v: v // 100).add_apply(lambda v: v + 10) # 11 + stream = stream.add(ApplyFunction(lambda v: v // 100)).add( + ApplyFunction(lambda v: v + 10) + ) # 11 sink = Sink() extras = ("key", 0, []) stream.compose(sink=sink.append_record)(0, *extras) @@ -464,17 +494,21 @@ def test_multiple_branches(self): assert sink == expected def test_filter(self): - stream = Stream().add_apply(lambda v: v + 10) - stream2 = stream.add_apply(lambda v: v + 5).add_filter(lambda v: v < 0) - stream2 = stream2.add_apply(lambda v: v + 200) + stream = Stream().add(ApplyFunction(lambda v: v + 10)) + stream2 = stream.add(ApplyFunction(lambda v: v + 5)).add( + FilterFunction(lambda v: v < 0) + ) + stream2 = stream2.add(ApplyFunction(lambda v: v + 200)) stream3 = ( # noqa: F841 - stream.add_apply(lambda v: v + 7) - .add_filter(lambda v: v < 20) - .add_apply(lambda v: v + 4) + stream.add(ApplyFunction(lambda v: v + 7)) + .add(FilterFunction(lambda v: v < 20)) + .add(ApplyFunction(lambda v: v + 4)) + ) + stream = stream.add(ApplyFunction(lambda v: v + 30)).add( + FilterFunction(lambda v: v < 50) ) - stream = stream.add_apply(lambda v: v + 30).add_filter(lambda v: v < 50) - stream4 = stream.add_apply(lambda v: v + 60) # noqa: F841 - stream.add_apply(lambda v: v + 800) + stream4 = stream.add(ApplyFunction(lambda v: v + 60)) # noqa: F841 + stream.add(ApplyFunction(lambda v: v + 800)) sink = Sink() extras = ("key", 0, []) @@ -484,13 +518,15 @@ def test_filter(self): assert sink == expected def test_update(self): - stream = Stream().add_apply(lambda v: v + [10]) - stream2 = stream.add_update(lambda v: v.append(5)) # noqa: F841 - stream = stream.add_update(lambda v: v.append(30)).add_apply(lambda v: v + [6]) - stream3 = stream.add_update(lambda v: v.append(100)) # noqa: F841 - stream4 = stream.add_update(lambda v: v.append(456)) # noqa: F841 - stream = stream.add_apply(lambda v: v + [700]).add_update( - lambda v: v.append(222) + stream = Stream().add(ApplyFunction(lambda v: v + [10])) + stream2 = stream.add(UpdateFunction(lambda v: v.append(5))) # noqa: F841 + stream = stream.add(UpdateFunction(lambda v: v.append(30))).add( + ApplyFunction(lambda v: v + [6]) + ) + stream3 = stream.add(UpdateFunction(lambda v: v.append(100))) # noqa: F841 + stream4 = stream.add(UpdateFunction(lambda v: v.append(456))) # noqa: F841 + stream = stream.add(ApplyFunction(lambda v: v + [700])).add( + UpdateFunction(lambda v: v.append(222)) ) sink = Sink() @@ -508,15 +544,15 @@ def test_update(self): def test_expand(self): stream = Stream() - stream_2 = stream.add_apply(lambda v: [i for i in v[0]], expand=True).add_apply( # noqa: F841 - lambda v: v + 22 - ) - stream_3 = stream.add_apply(lambda v: [i for i in v[1]], expand=True).add_apply( # noqa: F841 - lambda v: v + 33 - ) - stream = stream.add_apply(lambda v: [i for i in v[2]], expand=True) - stream_4 = stream.add_apply(lambda v: v + 44) # noqa: F841 - stream = stream.add_apply(lambda v: v + 11) + stream_2 = stream.add( # noqa: F841 + ApplyFunction(lambda v: [i for i in v[0]], expand=True) + ).add(ApplyFunction(lambda v: v + 22)) + stream_3 = stream.add( # noqa: F841 + ApplyFunction(lambda v: [i for i in v[1]], expand=True) + ).add(ApplyFunction(lambda v: v + 33)) + stream = stream.add(ApplyFunction(lambda v: [i for i in v[2]], expand=True)) + stream_4 = stream.add(ApplyFunction(lambda v: v + 44)) # noqa: F841 + stream = stream.add(ApplyFunction(lambda v: v + 11)) sink = Sink() extras = ("key", 0, []) stream.compose(sink=sink.append_record)([(1, 2), (3, 4), (5, 6)], *extras) @@ -531,12 +567,18 @@ def wrapper(value, k, t, h): return wrapper - stream = Stream().add_apply(lambda v: v + 1) - stream_2 = stream.add_transform(transform(2)) # noqa: F841 - stream = stream.add_transform(transform(3)) - stream_3 = stream.add_apply(lambda v: v + 30).add_transform(transform(4)) # noqa: F841 - stream_4 = stream.add_apply(lambda v: v + 40).add_transform(transform(5)) # noqa: F841 - stream = stream.add_apply(lambda v: v + 100).add_transform(transform(6)) + stream = Stream().add(ApplyFunction(lambda v: v + 1)) + stream_2 = stream.add(TransformFunction(transform(2))) # noqa: F841 + stream = stream.add(TransformFunction(transform(3))) + stream_3 = stream.add(ApplyFunction(lambda v: v + 30)).add( # noqa: F841 + TransformFunction(transform(4)) + ) + stream_4 = stream.add(ApplyFunction(lambda v: v + 40)).add( # noqa: F841 + TransformFunction(transform(5)) + ) + stream = stream.add(ApplyFunction(lambda v: v + 100)).add( + TransformFunction(transform(6)) + ) sink = Sink() extras = ("key", 0, [])