Skip to content

Commit

Permalink
Cleanup Examples and Tutorials (#675)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniil Gusev <[email protected]>
  • Loading branch information
tim-quix and daniil-quix authored Dec 19, 2024
1 parent d615f82 commit 51dfe04
Show file tree
Hide file tree
Showing 36 changed files with 931 additions and 782 deletions.
9 changes: 8 additions & 1 deletion docs/build/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,16 @@
Generate API docs for `quixstreams` module using [Pydoc Markdown](https://niklasrosenstein.github.io/pydoc-markdown/just-generate-me-some-markdown/).


To generate new API docs
## Generate new API docs

- Go to `docs/build`
- Install requirements via `python -m pip install -r requirements.txt`
- do `./build.sh`
- Check the generated docs in `docs/` folder


## Render/View Docs:
- Go to `docs/build`
- `python -m pip install mkdocs mkdocs-material mkdocs-material-extensions`
- `mkdocs serve -f ../../mkdocs.yml`
- [navigate to `localhost:8000` in browser](`http://localhost:8000`)
2 changes: 1 addition & 1 deletion docs/connectors/sources/custom-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Quix Streams also provides a set of classes to help users implement custom sourc

## Source

The recomended parent class to create a new source. It handles configuring, starting and stopping the source, as well as implementing a series of helpers.
The recommended parent class to create a new source. It handles configuring, starting and stopping the source, as well as implementing a series of helpers.

To get started, implement the [`run`](../../api-reference/sources.md#sourcerun) method and return when `self.running` is `False`.

Expand Down
1 change: 1 addition & 0 deletions docs/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ Check out Quix Streams tutorials for more in-depth examples:
- [Tutorial - Word Count](tutorials/word-count/tutorial.md)
- [Tutorial - Anomaly Detection](tutorials/anomaly-detection/tutorial.md)
- [Tutorial - Purchase Filtering](tutorials/purchase-filtering/tutorial.md)
- [Tutorial - Websocket Source](tutorials/websocket-source/tutorial.md)
2 changes: 1 addition & 1 deletion docs/tutorials/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Besides a Kafka instance to connect to, most of them likely fit the pattern of:

`python ./path/to/producer.py`

`python ./path/to/application.py`
`python ./path/to/tutorial_app.py`


## Running Kafka Locally
Expand Down
30 changes: 0 additions & 30 deletions docs/tutorials/anomaly-detection/application.py

This file was deleted.

179 changes: 133 additions & 46 deletions docs/tutorials/anomaly-detection/tutorial.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
# Tutorial: Anomaly Detection

You will learn how to build a simple anomaly detection system, a common use case of stateful streaming applications. This will show how to use a Quix Streams application to:
We will build a simple anomaly detection system, a common use case of stateful
streaming applications.

- Create a topic


## What You Will Learn

This example will show how to use a Quix Streams `Application` to:

- Ingest a non-Kafka data source
- Use stateful windowed operations
- Do simple event alterations
- Do simple event filtering
- Produce the result to a topic
- Create a Kafka topic
- Produce results to a Kafka topic



Expand All @@ -23,12 +31,11 @@ When this occurs, we want to send alerts as soon as possible so appropriate acti

## Our Example

We will use a producer to generate mock temperature events for 3 machines (MACHINE_IDs '0', '1', or '2'); ID's 0 and 1 are functioning normally, 2 is malfunctioning (overheating).

These events will be processed by our new Anomaly Detector application.

NOTE: our example uses JSON formatting for Kafka message values.
We will use a [Quix Streams `Source`](../../connectors/sources/README.md) to generate mock temperature events for
3 machines (MACHINE_IDs '0', '1', or '2'); ID's 0 and 1 are functioning normally,
2 is malfunctioning (overheating).

These events will be processed by our new Anomaly Detector `Application`.


## Alerting Approach (Windowing)
Expand All @@ -44,21 +51,30 @@ This approach is desirable since temperatures fluctuate quickly; it enables more
- allows more time for the machine to cool back down (as part of normal operation)



## Before Getting Started

- You will see links scattered throughout this tutorial.
1. You will see links scattered throughout this tutorial.
- Tutorial code links are marked **>>> LIKE THIS <<<** .
- ***All other links provided are completely optional***.
- They are great ways to learn more about various concepts if you need it!

2. This tutorial uses a [`Source`](../../connectors/sources/README.md) rather than a Kafka [`Topic`]() to ingest data.
- `Source` connectors enable reading data from a non-Kafka origin (typically to get it into Kafka).
- This approach circumvents users having to run a [producer](../../producer.md) alongside the `Application`.
- A `Source` is easily replaced with an actual Kafka topic (just pass a `Topic` instead of a `Source`).



## Generating Temperature Data

Without going into much detail, we have this [**>>> Temperature Readings Producer <<<**](producer.py) to pair up nicely with our Anomaly Detector.
Our [**>>> Anomaly Detector Application <<<**](tutorial_app.py) uses a `Source` called
`PurchaseGenerator` to generate temperature events.

It cycles through MACHINE_ID's 0-2 (using the ID as the Kafka key), and produces a (-1, 0, +1) temperature change for each machine a few times a second, along with the time.
It cycles through MACHINE_ID's 0-2 (using the ID as the Kafka key), and produces a
(-1, 0, +1) temperature change for each machine a few times a second, along with the time.

So the kafka messages look like:
So the incoming messages look like:

```python
# ...
Expand All @@ -77,57 +93,93 @@ Feel free to inspect it further, but it's just to get some data flowing. Our foc



## Anomaly Detector Application

Now let's go over the `main()` portion of our
[**>>> Anomaly Detector Application <<<**](tutorial_app.py) in detail!

## Anomaly Detector Application

Now let's go over our [**>>> Anomaly Detector Application <<<**](application.py) line-by-line!

### Create Application
### Create an Application

Create a [Quix Streams Application](../../configuration.md), which is our constructor for everything!

We provide it our connection settings, consumer group (ideally unique per Application),
and where the consumer group should start from on the (internal) Source topic.

!!! TIP

Once you are more familiar with Kafka, we recommend
[learning more about auto_offset_reset](https://www.quix.io/blog/kafka-auto-offset-reset-use-cases-and-pitfalls).

#### Our Application

```python
import os
from quixstreams import Application

app = Application(
broker_address=os.environ.get("BROKER_ADDRESS", "localhost:9092"),
broker_address=os.getenv("BROKER_ADDRESS", "localhost:9092"),
consumer_group="temperature_alerter",
auto_offset_reset="earliest"
auto_offset_reset="earliest",
)
```

First, create the [Quix Streams Application](../../configuration.md), which is our constructor for everything! We provide it our connection settings, consumer group (ideally unique per Application), and where the consumer group should start from on our topic.

NOTE: Once you are more familiar with Kafka, we recommend [learning more about auto_offset_reset](https://www.quix.io/blog/kafka-auto-offset-reset-use-cases-and-pitfalls).

### Specify Topics

`Application.topic()` returns [`Topic`](../../api-reference/topics.md) objects which are used by `StreamingDataFrame`.

Create one for each topic used by your `Application`.

!!! NOTE

Any missing topics will be automatically created for you upon running an `Application`.


### Define Topics
#### Our Topics
We have one output topic, named `temperature_alerts`:

```python
temperature_readings_topic = app.topic(name="temperature_readings")
alerts_topic = app.topic(name="alerts")
alerts_topic = app.topic("temperature_alerts")
```

Next we define our input/output topics, named `temperature_readings_topic` and `alerts_topic`, respectively.

They each return [`Topic`](../../api-reference/topics.md) objects, used later on.

NOTE: the topics will automatically be created for you in Kafka when you run the application should they not exist.
### The StreamingDataFrame (SDF)

Now for the fun part: building our [StreamingDataFrame](../../processing.md#introduction-to-streamingdataframe), often shorthanded to "SDF".

### The StreamingDataFrame (SDF)
SDF allows manipulating the message value in a dataframe-like fashion using various operations.

After initializing with either a `Topic` or `Source`, we continue reassigning to the
same `sdf` variable as we add operations.

!!! NOTE

A few `StreamingDataFrame` operations are
["in-place"](../../advanced/dataframe-assignments.md#valid-in-place-operations),
like `.print()`.

#### Initializing our SDF

```python
sdf = app.dataframe(topic=temperature_readings_topic)
sdf = app.dataframe(source=TemperatureGenerator())
```

Now for the fun part: building our [StreamingDataFrame](../../processing.md#introduction-to-streamingdataframe), often shorthanded to "SDF".
First, we initialize our SDF with our `TemperatureGenerator` `Source`,
which means we will be consuming data from a non-Kafka origin.

SDF allows manipulating the message value in a dataframe-like fashion using various operations.

After initializing, we continue re-assigning to the same `sdf` variable as we add operations.
!!! TIP

You can consume from a Kafka topic instead by passing a `Topic` object
with `app.dataframe(topic=<Topic>)`.

Let's go over the SDF operations in this example in detail.


(Also: notice that we pass our input `Topic` from the previous step to it.)

### Prep Data for Windowing

Expand All @@ -148,6 +200,7 @@ So we'll perform a generic SDF transformation using [`SDF.apply(F)`](../../proce
our `F` is a simple `lambda`, in this case.



### Windowing

```python
Expand All @@ -164,6 +217,8 @@ Now we do a (5 second) windowing operation on our temperature value. A few very

- The event's windowing timestamp comes from the "Timestamp" (case-sensitive!!!) field, which SDF looks for in the message value when first receiving it. If it doesn't exist, the kafka message timestamp is used. [A custom function can also be used](../../windowing.md#extracting-timestamps-from-messages).



### Using Window Result

```python
Expand All @@ -180,15 +235,21 @@ Now we get a window result (mean) along with its start/end timestamp:

`>>> {"value": 67.49478585, "start": 1234567890, "end": 1234567895}`

We don't particularly care about the window itself in our case, just the result...so we extract the "value" with `SDF.apply()` and [`SDF.filter(F)`](../../processing.md#streamingdataframefilter), where `F` is our "should_alert" function.
We don't particularly care about the window itself in our case, just the result...
so we extract the "value" with `SDF.apply()` and [`SDF.filter(F)`](../../processing.md#streamingdataframefilter), where
`F` is our "should_alert" function.

In our case, this example event would then stop since `bool(None)` is `False`.

For `SDF.filter(F)`, if the (_**boolean**_-ed) return value of `F` is:
!!! INFO "SDF filtering behavior"

- `True` -> continue processing this event
For `SDF.filter(F)`, if the (_**boolean**_-ed) return value of `F` is:

- `True` -> continue processing this event

- `False` -> stop ALL further processing of this event (including produces!)

- `False` -> stop ALL further processing of this event (including produces!)

In our case, this example event would then stop since `bool(None)` is `False`.

### Producing an Alert

Expand All @@ -199,27 +260,53 @@ sdf = sdf.to_topic(alerts_topic)
However, if the value ended up >= 90....we finally finish by producing our alert to our downstream topic via [`SDF.to_topic(T)`](../../processing.md#writing-data-to-kafka-topics), where `T`
is our previously defined `Topic` (not the topic name!).

NOTE: because we use "Current" windowing, we may produce a lot of "duplicate" alerts once triggered...you could solve this in numerous ways downstream. What we care about is alerting as soon as possible!
!!! NOTE

Because we use "Current" windowing, we may produce a lot of "duplicate" alerts once
triggered...you could solve this in numerous ways downstream. What we care about is
alerting as soon as possible!




### Running the Application

Running a `Source`-based `Application` requires calling `Application.run()` within a
`if __name__ == "__main__"` block.

#### Our Application Run Block

Our entire `Application` (and all its spawned objects) resides within a
`main()` function, executed as required:

```python
if __name__ == "__main__":
main()
```




## Try it yourself!
## Try it Yourself!

### 1. Run Kafka
First, have a running Kafka cluster.

To conveniently follow along with this tutorial, just [run this simple one-liner](../README.md#running-kafka-locally).
To easily run a broker locally with Docker, just [run this simple one-liner](../README.md#running-kafka-locally).

### 2. Install Quix Streams
In your python environment, run `pip install quixstreams`
### 2. Download files
- [tutorial_app.py](tutorial_app.py)

### 3. Run the Producer and Application
Just call `python producer.py` and `python application.py` in separate windows.
### 3. Install Quix Streams
In your desired python environment, execute: `pip install quixstreams`

You'll note that the Application does not print any output beyond initialization: it will only print alerts being fired (and thus when it's producing a downstream message).
### 4. Run the application
In your desired python environment, execute: `python tutorial_app.py`.

### 4. Check out the results!
### 5. Check out the results!

Eventually, you should see an alert will look something like:

`>>> Alerting for MID b'2': Average Temperature 98.57`

NICE!
NICE!
Loading

0 comments on commit 51dfe04

Please sign in to comment.