Skip to content

Commit

Permalink
Add set_headers API (#387)
Browse files Browse the repository at this point in the history
  • Loading branch information
daniil-quix authored Jun 14, 2024
1 parent 0cb9393 commit e8747c0
Show file tree
Hide file tree
Showing 13 changed files with 684 additions and 466 deletions.
20 changes: 10 additions & 10 deletions docs/api-reference/application.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class Application()
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/app.py#L53)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/app.py#L53)

The main Application class.

Expand Down Expand Up @@ -82,7 +82,7 @@ def __init__(broker_address: Optional[Union[str, ConnectionConfig]] = None,
topic_create_timeout: float = 60)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/app.py#L91)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/app.py#L91)


<br>
Expand Down Expand Up @@ -179,7 +179,7 @@ def Quix(cls,
topic_create_timeout: float = 60) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/app.py#L304)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/app.py#L304)

>***NOTE:*** DEPRECATED: use Application with `quix_sdk_token` argument instead.

Expand Down Expand Up @@ -289,7 +289,7 @@ def topic(name: str,
timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/app.py#L442)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/app.py#L442)

Create a topic definition.

Expand Down Expand Up @@ -370,7 +370,7 @@ topic = app.topic("input-topic", timestamp_extractor=custom_ts_extractor)
def dataframe(topic: Topic) -> StreamingDataFrame
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/app.py#L522)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/app.py#L522)

A simple helper method that generates a `StreamingDataFrame`, which is used

Expand Down Expand Up @@ -420,7 +420,7 @@ to be used as an input topic.
def stop(fail: bool = False)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/app.py#L561)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/app.py#L561)

Stop the internal poll loop and the message processing.

Expand All @@ -447,7 +447,7 @@ to unhandled exception, and it shouldn't commit the current checkpoint.
def get_producer() -> Producer
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/app.py#L584)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/app.py#L584)

Create and return a pre-configured Producer instance.
The Producer is initialized with params passed to Application.
Expand Down Expand Up @@ -482,7 +482,7 @@ with app.get_producer() as producer:
def get_consumer(auto_commit_enable: bool = True) -> Consumer
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/app.py#L614)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/app.py#L614)

Create and return a pre-configured Consumer instance.
The Consumer is initialized with params passed to Application.
Expand Down Expand Up @@ -527,7 +527,7 @@ with app.get_consumer() as consumer:
def clear_state()
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/app.py#L657)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/app.py#L657)

Clear the state of the application.

Expand All @@ -541,7 +541,7 @@ Clear the state of the application.
def run(dataframe: StreamingDataFrame)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/app.py#L663)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/app.py#L663)

Start processing data from Kafka using provided `StreamingDataFrame`

Expand Down
4 changes: 2 additions & 2 deletions docs/api-reference/context.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
def set_message_context(context: Optional[MessageContext])
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/context.py#L20)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/context.py#L20)

Set a MessageContext for the current message in the given `contextvars.Context`

Expand Down Expand Up @@ -55,7 +55,7 @@ sdf = sdf.update(lambda value: alter_context(value))
def message_context() -> MessageContext
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/context.py#L51)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/context.py#L51)

Get a MessageContext for the current message, which houses most of the message

Expand Down
106 changes: 80 additions & 26 deletions docs/api-reference/dataframe.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class StreamingDataFrame(BaseStreaming)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/dataframe.py#L60)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/dataframe.py#L62)

`StreamingDataFrame` is the main object you will use for ETL work.

Expand Down Expand Up @@ -81,7 +81,7 @@ def apply(func: Union[
metadata: bool = False) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/dataframe.py#L175)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/dataframe.py#L177)

Apply a function to transform the value and return a new value.

Expand Down Expand Up @@ -139,7 +139,7 @@ def update(func: Union[
metadata: bool = False) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/dataframe.py#L264)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/dataframe.py#L266)

Apply a function to mutate value in-place or to perform a side effect

Expand Down Expand Up @@ -197,7 +197,7 @@ def filter(func: Union[
metadata: bool = False) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/dataframe.py#L352)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/dataframe.py#L354)

Filter value using provided function.

Expand Down Expand Up @@ -249,7 +249,7 @@ def group_by(key: Union[str, Callable[[Any], Any]],
key_serializer: Optional[SerializerType] = "json") -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/dataframe.py#L438)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/dataframe.py#L440)

"Groups" messages by re-keying them via the provided group_by operation

Expand Down Expand Up @@ -314,7 +314,7 @@ a clone with this operation added (assign to keep its effect).
def contains(key: str) -> StreamingSeries
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/dataframe.py#L516)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/dataframe.py#L518)

Check if the key is present in the Row value.

Expand Down Expand Up @@ -353,7 +353,7 @@ or False otherwise.
def to_topic(topic: Topic, key: Optional[Callable[[Any], Any]] = None) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/dataframe.py#L541)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/dataframe.py#L543)

Produce current value to a topic. You can optionally specify a new key.

Expand Down Expand Up @@ -396,7 +396,7 @@ By default, the current message key will be used.
def set_timestamp(func: Callable[[Any, Any, int, Any], int]) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/dataframe.py#L582)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/dataframe.py#L584)

Set a new timestamp based on the current message value and its metadata.

Expand Down Expand Up @@ -425,10 +425,64 @@ sdf = sdf.set_timestamp(lambda value, key, timestamp, headers: value['new_timest
<br>
***Arguments:***

- `func`: callable accepting the current value and the current timestamp.
- `func`: callable accepting the current value, key, timestamp, and headers.
It's expected to return a new timestamp as integer in milliseconds.


<br>
***Returns:***

a new StreamingDataFrame instance

<a id="quixstreams.dataframe.dataframe.StreamingDataFrame.set_headers"></a>

<br><br>

#### StreamingDataFrame.set\_headers

```python
def set_headers(
func: Callable[
[Any, Any, int, List[Tuple[str, HeaderValue]]],
Collection[Tuple[str, HeaderValue]],
]
) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/dataframe.py#L625)

Set new message headers based on the current message value and metadata.

The new headers will be used when producing messages to the output topics.

The provided callback must accept value, key, timestamp, and headers,
and return a new collection of (header, value) tuples.


<br>
***Example Snippet:***

```python
from quixstreams import Application


app = Application()
input_topic = app.topic("data")

sdf = app.dataframe(input_topic)
# Updating the record's headers based on the value and metadata
sdf = sdf.set_headers(lambda value, key, timestamp, headers: [('id', value['id'])])
```


<br>
***Arguments:***

- `func`: callable accepting the current value, key, timestamp, and headers.
It's expected to return a new set of headers
as a collection of (header, value) tuples.


<br>
***Returns:***

Expand All @@ -446,7 +500,7 @@ def compose(
) -> Dict[str, VoidExecutor]
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/dataframe.py#L623)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/dataframe.py#L676)

Compose all functions of this StreamingDataFrame into one big closure.

Expand Down Expand Up @@ -500,7 +554,7 @@ def test(value: Any,
topic: Optional[Topic] = None) -> List[Any]
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/dataframe.py#L660)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/dataframe.py#L713)

A shorthand to test `StreamingDataFrame` with provided value

Expand Down Expand Up @@ -537,7 +591,7 @@ def tumbling_window(duration_ms: Union[int, timedelta],
name: Optional[str] = None) -> TumblingWindowDefinition
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/dataframe.py#L697)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/dataframe.py#L750)

Create a tumbling window transformation on this StreamingDataFrame.

Expand Down Expand Up @@ -623,7 +677,7 @@ def hopping_window(duration_ms: Union[int, timedelta],
name: Optional[str] = None) -> HoppingWindowDefinition
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/dataframe.py#L773)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/dataframe.py#L826)

Create a hopping window transformation on this StreamingDataFrame.

Expand Down Expand Up @@ -717,7 +771,7 @@ sdf = (
class StreamingSeries(BaseStreaming)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/series.py#L47)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/series.py#L47)

`StreamingSeries` are typically generated by `StreamingDataframes` when getting
elements from, or performing certain operations on, a `StreamingDataframe`,
Expand Down Expand Up @@ -783,7 +837,7 @@ sdf = sdf[["column_a"] & (sdf["new_sum_field"] >= 10)]
def from_apply_callback(cls, func: ApplyWithMetadataCallback) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/series.py#L107)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/series.py#L107)

Create a StreamingSeries from a function.

Expand Down Expand Up @@ -811,7 +865,7 @@ instance of `StreamingSeries`
def apply(func: ApplyCallback) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/series.py#L121)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/series.py#L121)

Add a callable to the execution list for this series.

Expand Down Expand Up @@ -863,7 +917,7 @@ a new `StreamingSeries` with the new callable added
def compose_returning() -> ReturningExecutor
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/series.py#L155)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/series.py#L155)

Compose a list of functions from this StreamingSeries and its parents into one

Expand Down Expand Up @@ -894,7 +948,7 @@ def compose(
None]] = None) -> VoidExecutor
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/series.py#L170)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/series.py#L170)

Compose all functions of this StreamingSeries into one big closure.

Expand Down Expand Up @@ -952,7 +1006,7 @@ def test(value: Any,
ctx: Optional[MessageContext] = None) -> Any
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/series.py#L214)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/series.py#L214)

A shorthand to test `StreamingSeries` with provided value

Expand Down Expand Up @@ -984,7 +1038,7 @@ result of `StreamingSeries`
def isin(other: Container) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/series.py#L269)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/series.py#L269)

Check if series value is in "other".

Expand Down Expand Up @@ -1029,7 +1083,7 @@ new StreamingSeries
def contains(other: Union[Self, object]) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/series.py#L296)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/series.py#L296)

Check if series value contains "other"

Expand Down Expand Up @@ -1074,7 +1128,7 @@ new StreamingSeries
def is_(other: Union[Self, object]) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/series.py#L321)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/series.py#L321)

Check if series value refers to the same object as `other`

Expand Down Expand Up @@ -1116,7 +1170,7 @@ new StreamingSeries
def isnot(other: Union[Self, object]) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/series.py#L344)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/series.py#L344)

Check if series value does not refer to the same object as `other`

Expand Down Expand Up @@ -1159,7 +1213,7 @@ new StreamingSeries
def isnull() -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/series.py#L368)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/series.py#L368)

Check if series value is None.

Expand Down Expand Up @@ -1196,7 +1250,7 @@ new StreamingSeries
def notnull() -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/series.py#L391)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/series.py#L391)

Check if series value is not None.

Expand Down Expand Up @@ -1233,7 +1287,7 @@ new StreamingSeries
def abs() -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/1c8ae6251382eb13c734c048ab13f1ccc4c7352b/quixstreams/dataframe/series.py#L414)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/0cb9393ae30a0cf9d4eec5bdb102d4081a963b9c/quixstreams/dataframe/series.py#L414)

Get absolute value of the series value.

Expand Down
Loading

0 comments on commit e8747c0

Please sign in to comment.