Skip to content

Commit

Permalink
Add stateful funcs to examples (#227)
Browse files Browse the repository at this point in the history
  • Loading branch information
daniil-quix committed Nov 7, 2023
1 parent d7f4d02 commit ca52315
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,47 @@
An application to process imaginary purchase transactions in real-time using Kafka
In this application, we will simulate notifications for "Gold" accounts about
purchase events larger than $1000
purchase events larger than $1000 and count them.
"""


from os import environ

from dotenv import load_dotenv

from quixstreams import Application, MessageContext
from quixstreams.models.serializers import JSONSerializer, JSONDeserializer
from quixstreams import Application, MessageContext, State

load_dotenv("./env_vars.env")


def count_transactions(value: dict, ctx: MessageContext, state: State):
"""
Track the number of transactions using persistent state
:param value: message value
:param ctx: message context with key, timestamp and other Kafka message metadata
:param state: instance of State store
"""
total = state.get("total_transactions", 0)
total += 1
state.set("total_transactions", total)
value["total_transactions"] = total


def uppercase_source(value: dict, ctx: MessageContext):
"""
Upper-case field "transaction_source" for each processed message
:param value: message value, a dictionary with all deserialized message data
:param ctx: message context, it contains message metadata like key, topic, timestamp
etc.
:return: this function must either return None or a new dictionary
"""
print(f'Processing message with key "{ctx.key}"')
value["transaction_source"] = value["transaction_source"].upper()
return value


# Define your application and settings
app = Application(
broker_address=environ["BROKER_ADDRESS"],
Expand All @@ -26,10 +53,10 @@
)

# Define an input topic with JSON deserializer
input_topic = app.topic("json__purchase_events", value_deserializer=JSONDeserializer())
input_topic = app.topic("json__purchase_events", value_deserializer="json")

# Define an output topic with JSON dserializer
output_topic = app.topic("json__user_notifications", value_serializer=JSONSerializer())
# Define an output topic with JSON serializer
output_topic = app.topic("json__user_notifications", value_serializer="json")

# Create a StreamingDataFrame and start building your processing pipeline
sdf = app.dataframe(input_topic)
Expand All @@ -41,23 +68,10 @@
]

# Drop all fields except the ones we need
sdf = sdf[["account_id", "transaction_amount", "transaction_source"]] # column subset


def uppercase_source(value: dict, ctx: MessageContext):
"""
Upper-case field "transaction_source" for each processed message
:param value: message value, a dictionary with all deserialized message data
:param ctx: message context, it contains message metadata like key, topic, timestamp
etc.
:return: this function must either return None or a new dictionary
"""
print(f'Processing message with key "{ctx.key}"')
value["transaction_source"] = value["transaction_source"].upper()
return value
sdf = sdf[["account_id", "transaction_amount", "transaction_source"]]

# Update the total number of transactions in state
sdf = sdf.apply(count_transactions, stateful=True)

# Transform field "transaction_source" to upper-case using a custom function
sdf = sdf.apply(uppercase_source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,12 @@

from dotenv import load_dotenv

from quixstreams import Application, MessageContext
from quixstreams.models.serializers import (
QuixTimeseriesSerializer,
QuixDeserializer,
)
from quixstreams import Application, MessageContext, State

# Reminder: the platform will have these values available by default so loading the
# environment would be unnecessary there.
load_dotenv("./bank_example/quix_platform_version/quix_vars.env")

# Define your application and settings
# Quix application is automatically configured to work with Quix platform
app = Application.Quix(
"qts__purchase_notifier",
auto_offset_reset="earliest",
auto_create_topics=True, # Quix app has an option to auto create topics
)

# Define an input topic with JSON deserializer
input_topic = app.topic("qts__purchase_events", value_deserializer=QuixDeserializer())

# Define an output topic with JSON dserializer
output_topic = app.topic(
"qts__user_notifications", value_serializer=QuixTimeseriesSerializer()
)

# Create a StreamingDataFrame and start building your processing pipeline
sdf = app.dataframe(input_topic)


def uppercase_source(value: dict, ctx: MessageContext):
"""
Expand All @@ -53,6 +30,38 @@ def uppercase_source(value: dict, ctx: MessageContext):
return value


def count_transactions(value: dict, ctx: MessageContext, state: State):
"""
Track the number of transactions using persistent state
:param value: message value
:param ctx: message context with key, timestamp and other Kafka message metadata
:param state: instance of State store
"""
total = state.get("total_transactions", 0)
total += 1
state.set("total_transactions", total)
value["total_transactions"] = total


# Define your application and settings
# Quix application is automatically configured to work with Quix platform
app = Application.Quix(
"qts__purchase_notifier",
auto_offset_reset="earliest",
auto_create_topics=True, # Quix app has an option to auto create topics
)

# Define an input topic with Quix deserializer
input_topic = app.topic("qts__purchase_events", value_deserializer="quix")

# Define an output topic with Quix Timeseries serializer
output_topic = app.topic("qts__user_notifications", value_serializer="quix_timeseries")

# Create a StreamingDataFrame and start building your processing pipeline
sdf = app.dataframe(input_topic)


# Filter only messages with "account_class" == "Gold" and "transaction_amount" >= 1000
sdf = sdf[
(sdf["account_class"] == "Gold")
Expand All @@ -62,11 +71,14 @@ def uppercase_source(value: dict, ctx: MessageContext):
# Drop all fields except the ones we need
sdf = sdf[["account_id", "transaction_amount", "transaction_source"]]

# Update the total number of transactions in state
sdf = sdf.apply(count_transactions, stateful=True)

# Transform field "transaction_source" to upper-case using a custom function
sdf = sdf.apply(uppercase_source)

# Add a new field with a notification text
sdf["customer_notification"] = "A high cost purchase was attempted" # add new column
sdf["customer_notification"] = "A high cost purchase was attempted"

# Print the transformed message to the console
sdf = sdf.apply(lambda val, ctx: print(f"Sending update: {val}"))
Expand Down

0 comments on commit ca52315

Please sign in to comment.