Skip to content

Commit

Permalink
Allow StreamingDataFrame.apply() to be assigned to keys and used as a…
Browse files Browse the repository at this point in the history
… filter (#238)
  • Loading branch information
daniil-quix authored Nov 15, 2023
1 parent 6017011 commit 266776d
Show file tree
Hide file tree
Showing 37 changed files with 2,210 additions and 1,357 deletions.
37 changes: 19 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,20 @@ See [requirements.txt](./src/StreamingDataFrames/requirements.txt) for the full
Here's an example of how to <b>process</b> data from a Kafka Topic with Quix Streams:

```python
from quixstreams import Application, MessageContext, State
from quixstreams import Application, State

# Define an application
app = Application(
broker_address="localhost:9092", # Kafka broker address
consumer_group="consumer-group-name", # Kafka consumer group
broker_address="localhost:9092", # Kafka broker address
consumer_group="consumer-group-name", # Kafka consumer group
)

# Define the input and output topics. By default, "json" serialization will be used
input_topic = app.topic("my_input_topic")
output_topic = app.topic("my_output_topic")


def add_one(data: dict, ctx: MessageContext):
for field, value in data.items():
if isinstance(value, int):
data[field] += 1


def count(data: dict, ctx: MessageContext, state: State):
def count(data: dict, state: State):
# Get a value from state for the current Kafka message key
total = state.get('total', default=0)
total += 1
Expand All @@ -91,27 +85,34 @@ def count(data: dict, ctx: MessageContext, state: State):
# Update your message data with a value from the state
data['total'] = total


# Create a StreamingDataFrame instance
# StreamingDataFrame is a primary interface to define the message processing pipeline
sdf = app.dataframe(topic=input_topic)

# Print the incoming messages
sdf = sdf.apply(lambda value, ctx: print('Received a message:', value))
sdf = sdf.update(lambda value: print('Received a message:', value))

# Select fields from incoming messages
sdf = sdf[["field_0", "field_2", "field_8"]]
sdf = sdf[["field_1", "field_2", "field_3"]]

# Filter only messages with "field_0" > 10 and "field_2" != "test"
sdf = sdf[(sdf["field_0"] > 10) & (sdf["field_2"] != "test")]
sdf = sdf[(sdf["field_1"] > 10) & (sdf["field_2"] != "test")]

# Filter messages using custom functions
sdf = sdf[sdf.apply(lambda value: 0 < (value['field_1'] + value['field_3']) < 1000)]

# Generate a new value based on the current one
sdf = sdf.apply(lambda value: {**value, 'new_field': 'new_value'})

# Apply custom function to transform the message
sdf = sdf.apply(add_one)
# Update a value based on the entire message content
sdf['field_4'] = sdf.apply(lambda value: value['field_1'] + value['field_3'])

# Apply a stateful function to persist data to the state store
sdf = sdf.apply(count, stateful=True)
# Use a stateful function to persist data to the state store and update the value in place
sdf = sdf.update(count, stateful=True)

# Print the result before producing it
sdf = sdf.apply(lambda value, ctx: print('Producing a message:', value))
sdf = sdf.update(lambda value, ctx: print('Producing a message:', value))

# Produce the result to the output topic
sdf = sdf.to_topic(output_topic)
Expand Down
28 changes: 15 additions & 13 deletions src/StreamingDataFrames/docs/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,31 @@ By default, message values are serialized with `JSON`, message keys are seriali
- `quix_events` & `quix_timeseries` - for serializers only.

## Using SerDes
To set a serializer, you may either pass a string shorthand for it, or an instance of `streamingdataframes.models.serializers.Serializer` and `streamingdataframes.models.serializers.Deserializer` directly
To set a serializer, you may either pass a string shorthand for it, or an instance of `quixstreams.models.serializers.Serializer` and `quixstreams.models.serializers.Deserializer` directly
to the `Application.topic()`.

Example with format shorthands:
```python
from streamingdataframes.models.serializers import JSONDeserializer
app = Application(...)
from quixstreams import Application
app = Application(broker_address='localhost:9092', consumer_group='consumer')
# Deserializing message values from JSON to objects and message keys as strings
input_topic = app.topic(value_deserializer='json', key_deserializer='string')
input_topic = app.topic('input', value_deserializer='json', key_deserializer='string')

# Serializing message values to JSON and message keys to bytes
output_topic = app.topic(value_serializer='json', key_deserializer='bytes')
output_topic = app.topic('output', value_serializer='json', key_deserializer='bytes')
```

Passing `Serializer` and `Deserializer` instances directly:

```python
from streamingdataframes.models.serializers import JSONDeserializer, JSONSerializer
app = Application(...)
input_topic = app.topic(value_deserializer=JSONDeserializer())
output_topic = app.topic(value_deserializer=JSONSerializer())
from quixstreams import Application
from quixstreams.models.serializers import JSONDeserializer, JSONSerializer
app = Application(broker_address='localhost:9092', consumer_group='consumer')
input_topic = app.topic('input', value_deserializer=JSONDeserializer())
output_topic = app.topic('output', value_serializer=JSONSerializer())
```

You can find all available serializers in `streamingdataframes.models.serializers` module.
You can find all available serializers in `quixstreams.models.serializers` module.

We also plan on including other popular ones like Avro and Protobuf in the near future.

Expand All @@ -56,8 +57,9 @@ The Deserializer object will wrap the received value to the dictionary with `col
Example:

```python
from streamingdataframes.models.serializers import IntegerDeserializer
app = Application(...)
input_topic = app.topic(value_deserializer=IntegerDeserializer(column_name='number'))
from quixstreams import Application
from quixstreams.models.serializers import IntegerDeserializer
app = Application(broker_address='localhost:9092', consumer_group='consumer')
input_topic = app.topic('input', value_deserializer=IntegerDeserializer(column_name='number'))
# Will deserialize message with value "123" to "{'number': 123}" ...
```
35 changes: 23 additions & 12 deletions src/StreamingDataFrames/docs/stateful-processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,31 @@ When another consumer reads the message with `KEY_B`, it will not be able to rea

## Using State

The state is available in functions passed to `StreamingDataFrame.apply()` with parameter `stateful=True`:
The state is available in functions passed to `StreamingDataFrame.apply()`, `StreamingDataFrame.update()` and `StreamingDataFrame.filter()` with parameter `stateful=True`:

```python
from quixstreams import Application, MessageContext, State
app = Application()
from quixstreams import Application, State
app = Application(
broker_address='localhost:9092',
consumer_group='consumer',
)
topic = app.topic('topic')

sdf = app.dataframe(topic)

def count_messages(value: dict, ctx: MessageContext, state: State):
def count_messages(value: dict, state: State):
total = state.get('total', default=0)
total += 1
state.set('total', total)
value['total'] = total
return {**value, 'total': total}

# Apply a custom function and inform StreamingDataFrame to provide a State instance to it
# by passing "stateful=True"
sdf.apply(count_messages, stateful=True)
# Apply a custom function and inform StreamingDataFrame to provide a State instance to it via passing "stateful=True"
sdf = sdf.apply(count_messages, stateful=True)

```

Currently, only functions passed to `StreamingDataFrame.apply()` may use State.
Currently, only functions passed to `StreamingDataFrame.apply()`, `StreamingDataFrame.update()` and `StreamingDataFrame.filter()` may use State.

<br>

Expand All @@ -75,11 +78,19 @@ Currently, only functions passed to `StreamingDataFrame.apply()` may use State.
By default, an `Application` keeps the state in `state` directory relative to the current working directory.
To change it, pass `state_dir="your-path"` to `Application` or `Application.Quix` calls:
```python
Application(state_dir="folder/path/here")
from quixstreams import Application
app = Application(
broker_address='localhost:9092',
consumer_group='consumer',
state_dir="folder/path/here",
)

# or

Application.Quix(state_dir="folder/path/here")
app = Application.Quix(
consumer_group='consumer',
state_dir="folder/path/here",
)
```

## State Guarantees
Expand All @@ -105,4 +116,4 @@ We plan to add a proper recovery process in the future.

#### Shared state directory
In the current version, it's assumed that the state directory is shared between consumers (e.g. using Kubernetes PVC)
If consumers live on different nodes and don't have access to the same state directory, they will not be able to pickup state on rebalancing.
If consumers live on different nodes and don't have access to the same state directory, they will not be able to pick up state on rebalancing.
Loading

0 comments on commit 266776d

Please sign in to comment.