diff --git a/docs/build/README.md b/docs/build/README.md index 086b36d31..6474ab610 100644 --- a/docs/build/README.md +++ b/docs/build/README.md @@ -3,9 +3,16 @@ Generate API docs for `quixstreams` module using [Pydoc Markdown](https://niklasrosenstein.github.io/pydoc-markdown/just-generate-me-some-markdown/). -To generate new API docs +## Generate new API docs - Go to `docs/build` - Install requirements via `python -m pip install -r requirements.txt` - do `./build.sh` - Check the generated docs in `docs/` folder + + +## Render/View Docs: +- Go to `docs/build` +- `python -m pip install mkdocs mkdocs-material mkdocs-material-extensions` +- `mkdocs serve -f ../../mkdocs.yml` +- [navigate to `localhost:8000` in browser](`http://localhost:8000`) diff --git a/docs/connectors/sources/custom-sources.md b/docs/connectors/sources/custom-sources.md index 329bf3ae3..297362a17 100644 --- a/docs/connectors/sources/custom-sources.md +++ b/docs/connectors/sources/custom-sources.md @@ -8,7 +8,7 @@ Quix Streams also provides a set of classes to help users implement custom sourc ## Source -The recomended parent class to create a new source. It handles configuring, starting and stopping the source, as well as implementing a series of helpers. +The recommended parent class to create a new source. It handles configuring, starting and stopping the source, as well as implementing a series of helpers. To get started, implement the [`run`](../../api-reference/sources.md#sourcerun) method and return when `self.running` is `False`. diff --git a/docs/introduction.md b/docs/introduction.md index 3ee374baa..5e328a9a3 100644 --- a/docs/introduction.md +++ b/docs/introduction.md @@ -27,3 +27,4 @@ Check out Quix Streams tutorials for more in-depth examples: - [Tutorial - Word Count](tutorials/word-count/tutorial.md) - [Tutorial - Anomaly Detection](tutorials/anomaly-detection/tutorial.md) - [Tutorial - Purchase Filtering](tutorials/purchase-filtering/tutorial.md) +- [Tutorial - Websocket Source](tutorials/websocket-source/tutorial.md) diff --git a/docs/tutorials/README.md b/docs/tutorials/README.md index 58dd5347b..7e9b57bde 100644 --- a/docs/tutorials/README.md +++ b/docs/tutorials/README.md @@ -13,7 +13,7 @@ Besides a Kafka instance to connect to, most of them likely fit the pattern of: `python ./path/to/producer.py` -`python ./path/to/application.py` +`python ./path/to/tutorial_app.py` ## Running Kafka Locally diff --git a/docs/tutorials/anomaly-detection/application.py b/docs/tutorials/anomaly-detection/application.py deleted file mode 100644 index e276120bb..000000000 --- a/docs/tutorials/anomaly-detection/application.py +++ /dev/null @@ -1,30 +0,0 @@ -import os - -from quixstreams import Application - -app = Application( - broker_address=os.environ.get("BROKER_ADDRESS", "localhost:9092"), - consumer_group="temperature_alerter", - auto_offset_reset="earliest", -) -temperature_readings_topic = app.topic(name="temperature_readings") -alerts_topic = app.topic(name="alerts") - - -def should_alert(window_value: int, key, timestamp, headers): - if window_value >= 90: - print(f"Alerting for MID {key}: Average Temperature {window_value}") - return True - - -sdf = app.dataframe(topic=temperature_readings_topic) -sdf = sdf.apply(lambda data: data["Temperature_C"]) -sdf = sdf.hopping_window(duration_ms=5000, step_ms=1000).mean().current() -sdf = sdf.apply(lambda result: round(result["value"], 2)).filter( - should_alert, metadata=True -) -sdf = sdf.to_topic(alerts_topic) - - -if __name__ == "__main__": - app.run() diff --git a/docs/tutorials/anomaly-detection/tutorial.md b/docs/tutorials/anomaly-detection/tutorial.md index be8ff6af1..78e3ddb81 100644 --- a/docs/tutorials/anomaly-detection/tutorial.md +++ b/docs/tutorials/anomaly-detection/tutorial.md @@ -1,12 +1,20 @@ # Tutorial: Anomaly Detection -You will learn how to build a simple anomaly detection system, a common use case of stateful streaming applications. This will show how to use a Quix Streams application to: +We will build a simple anomaly detection system, a common use case of stateful +streaming applications. -- Create a topic + + +## What You Will Learn + +This example will show how to use a Quix Streams `Application` to: + +- Ingest a non-Kafka data source - Use stateful windowed operations - Do simple event alterations - Do simple event filtering -- Produce the result to a topic +- Create a Kafka topic +- Produce results to a Kafka topic @@ -23,12 +31,11 @@ When this occurs, we want to send alerts as soon as possible so appropriate acti ## Our Example -We will use a producer to generate mock temperature events for 3 machines (MACHINE_IDs '0', '1', or '2'); ID's 0 and 1 are functioning normally, 2 is malfunctioning (overheating). - -These events will be processed by our new Anomaly Detector application. - -NOTE: our example uses JSON formatting for Kafka message values. +We will use a [Quix Streams `Source`](../../connectors/sources/README.md) to generate mock temperature events for +3 machines (MACHINE_IDs '0', '1', or '2'); ID's 0 and 1 are functioning normally, +2 is malfunctioning (overheating). +These events will be processed by our new Anomaly Detector `Application`. ## Alerting Approach (Windowing) @@ -44,21 +51,30 @@ This approach is desirable since temperatures fluctuate quickly; it enables more - allows more time for the machine to cool back down (as part of normal operation) + ## Before Getting Started -- You will see links scattered throughout this tutorial. +1. You will see links scattered throughout this tutorial. - Tutorial code links are marked **>>> LIKE THIS <<<** . - ***All other links provided are completely optional***. - They are great ways to learn more about various concepts if you need it! +2. This tutorial uses a [`Source`](../../connectors/sources/README.md) rather than a Kafka [`Topic`]() to ingest data. + - `Source` connectors enable reading data from a non-Kafka origin (typically to get it into Kafka). + - This approach circumvents users having to run a [producer](../../producer.md) alongside the `Application`. + - A `Source` is easily replaced with an actual Kafka topic (just pass a `Topic` instead of a `Source`). + + ## Generating Temperature Data -Without going into much detail, we have this [**>>> Temperature Readings Producer <<<**](producer.py) to pair up nicely with our Anomaly Detector. +Our [**>>> Anomaly Detector Application <<<**](tutorial_app.py) uses a `Source` called +`PurchaseGenerator` to generate temperature events. -It cycles through MACHINE_ID's 0-2 (using the ID as the Kafka key), and produces a (-1, 0, +1) temperature change for each machine a few times a second, along with the time. +It cycles through MACHINE_ID's 0-2 (using the ID as the Kafka key), and produces a +(-1, 0, +1) temperature change for each machine a few times a second, along with the time. -So the kafka messages look like: +So the incoming messages look like: ```python # ... @@ -77,57 +93,93 @@ Feel free to inspect it further, but it's just to get some data flowing. Our foc +## Anomaly Detector Application +Now let's go over the `main()` portion of our +[**>>> Anomaly Detector Application <<<**](tutorial_app.py) in detail! -## Anomaly Detector Application -Now let's go over our [**>>> Anomaly Detector Application <<<**](application.py) line-by-line! -### Create Application +### Create an Application + +Create a [Quix Streams Application](../../configuration.md), which is our constructor for everything! + +We provide it our connection settings, consumer group (ideally unique per Application), +and where the consumer group should start from on the (internal) Source topic. + +!!! TIP + + Once you are more familiar with Kafka, we recommend + [learning more about auto_offset_reset](https://www.quix.io/blog/kafka-auto-offset-reset-use-cases-and-pitfalls). + +#### Our Application ```python import os from quixstreams import Application app = Application( - broker_address=os.environ.get("BROKER_ADDRESS", "localhost:9092"), + broker_address=os.getenv("BROKER_ADDRESS", "localhost:9092"), consumer_group="temperature_alerter", - auto_offset_reset="earliest" + auto_offset_reset="earliest", ) ``` -First, create the [Quix Streams Application](../../configuration.md), which is our constructor for everything! We provide it our connection settings, consumer group (ideally unique per Application), and where the consumer group should start from on our topic. -NOTE: Once you are more familiar with Kafka, we recommend [learning more about auto_offset_reset](https://www.quix.io/blog/kafka-auto-offset-reset-use-cases-and-pitfalls). + +### Specify Topics + +`Application.topic()` returns [`Topic`](../../api-reference/topics.md) objects which are used by `StreamingDataFrame`. + +Create one for each topic used by your `Application`. + +!!! NOTE + + Any missing topics will be automatically created for you upon running an `Application`. -### Define Topics +#### Our Topics +We have one output topic, named `temperature_alerts`: ```python -temperature_readings_topic = app.topic(name="temperature_readings") -alerts_topic = app.topic(name="alerts") +alerts_topic = app.topic("temperature_alerts") ``` -Next we define our input/output topics, named `temperature_readings_topic` and `alerts_topic`, respectively. -They each return [`Topic`](../../api-reference/topics.md) objects, used later on. -NOTE: the topics will automatically be created for you in Kafka when you run the application should they not exist. +### The StreamingDataFrame (SDF) +Now for the fun part: building our [StreamingDataFrame](../../processing.md#introduction-to-streamingdataframe), often shorthanded to "SDF". -### The StreamingDataFrame (SDF) +SDF allows manipulating the message value in a dataframe-like fashion using various operations. + +After initializing with either a `Topic` or `Source`, we continue reassigning to the +same `sdf` variable as we add operations. + +!!! NOTE + + A few `StreamingDataFrame` operations are + ["in-place"](../../advanced/dataframe-assignments.md#valid-in-place-operations), + like `.print()`. + +#### Initializing our SDF ```python -sdf = app.dataframe(topic=temperature_readings_topic) +sdf = app.dataframe(source=TemperatureGenerator()) ``` -Now for the fun part: building our [StreamingDataFrame](../../processing.md#introduction-to-streamingdataframe), often shorthanded to "SDF". +First, we initialize our SDF with our `TemperatureGenerator` `Source`, +which means we will be consuming data from a non-Kafka origin. -SDF allows manipulating the message value in a dataframe-like fashion using various operations. -After initializing, we continue re-assigning to the same `sdf` variable as we add operations. +!!! TIP + + You can consume from a Kafka topic instead by passing a `Topic` object + with `app.dataframe(topic=)`. + +Let's go over the SDF operations in this example in detail. + -(Also: notice that we pass our input `Topic` from the previous step to it.) ### Prep Data for Windowing @@ -148,6 +200,7 @@ So we'll perform a generic SDF transformation using [`SDF.apply(F)`](../../proce our `F` is a simple `lambda`, in this case. + ### Windowing ```python @@ -164,6 +217,8 @@ Now we do a (5 second) windowing operation on our temperature value. A few very - The event's windowing timestamp comes from the "Timestamp" (case-sensitive!!!) field, which SDF looks for in the message value when first receiving it. If it doesn't exist, the kafka message timestamp is used. [A custom function can also be used](../../windowing.md#extracting-timestamps-from-messages). + + ### Using Window Result ```python @@ -180,15 +235,21 @@ Now we get a window result (mean) along with its start/end timestamp: `>>> {"value": 67.49478585, "start": 1234567890, "end": 1234567895}` -We don't particularly care about the window itself in our case, just the result...so we extract the "value" with `SDF.apply()` and [`SDF.filter(F)`](../../processing.md#streamingdataframefilter), where `F` is our "should_alert" function. +We don't particularly care about the window itself in our case, just the result... +so we extract the "value" with `SDF.apply()` and [`SDF.filter(F)`](../../processing.md#streamingdataframefilter), where +`F` is our "should_alert" function. + +In our case, this example event would then stop since `bool(None)` is `False`. -For `SDF.filter(F)`, if the (_**boolean**_-ed) return value of `F` is: +!!! INFO "SDF filtering behavior" -- `True` -> continue processing this event + For `SDF.filter(F)`, if the (_**boolean**_-ed) return value of `F` is: + + - `True` -> continue processing this event + + - `False` -> stop ALL further processing of this event (including produces!) -- `False` -> stop ALL further processing of this event (including produces!) -In our case, this example event would then stop since `bool(None)` is `False`. ### Producing an Alert @@ -199,27 +260,53 @@ sdf = sdf.to_topic(alerts_topic) However, if the value ended up >= 90....we finally finish by producing our alert to our downstream topic via [`SDF.to_topic(T)`](../../processing.md#writing-data-to-kafka-topics), where `T` is our previously defined `Topic` (not the topic name!). -NOTE: because we use "Current" windowing, we may produce a lot of "duplicate" alerts once triggered...you could solve this in numerous ways downstream. What we care about is alerting as soon as possible! +!!! NOTE + + Because we use "Current" windowing, we may produce a lot of "duplicate" alerts once + triggered...you could solve this in numerous ways downstream. What we care about is + alerting as soon as possible! + + + + +### Running the Application + +Running a `Source`-based `Application` requires calling `Application.run()` within a +`if __name__ == "__main__"` block. + +#### Our Application Run Block + +Our entire `Application` (and all its spawned objects) resides within a +`main()` function, executed as required: + +```python +if __name__ == "__main__": + main() +``` + + + -## Try it yourself! +## Try it Yourself! ### 1. Run Kafka First, have a running Kafka cluster. -To conveniently follow along with this tutorial, just [run this simple one-liner](../README.md#running-kafka-locally). +To easily run a broker locally with Docker, just [run this simple one-liner](../README.md#running-kafka-locally). -### 2. Install Quix Streams -In your python environment, run `pip install quixstreams` +### 2. Download files +- [tutorial_app.py](tutorial_app.py) -### 3. Run the Producer and Application -Just call `python producer.py` and `python application.py` in separate windows. +### 3. Install Quix Streams +In your desired python environment, execute: `pip install quixstreams` -You'll note that the Application does not print any output beyond initialization: it will only print alerts being fired (and thus when it's producing a downstream message). +### 4. Run the application +In your desired python environment, execute: `python tutorial_app.py`. -### 4. Check out the results! +### 5. Check out the results! Eventually, you should see an alert will look something like: `>>> Alerting for MID b'2': Average Temperature 98.57` -NICE! \ No newline at end of file +NICE! diff --git a/docs/tutorials/anomaly-detection/producer.py b/docs/tutorials/anomaly-detection/tutorial_app.py similarity index 53% rename from docs/tutorials/anomaly-detection/producer.py rename to docs/tutorials/anomaly-detection/tutorial_app.py index c82c883f8..d6c6e68fb 100644 --- a/docs/tutorials/anomaly-detection/producer.py +++ b/docs/tutorials/anomaly-detection/tutorial_app.py @@ -3,13 +3,21 @@ import time from quixstreams import Application +from quixstreams.sources import Source -class TemperatureEventGenerator: +class TemperatureGenerator(Source): """ - Generates temperature readings for three different machines. + What a Source is: + A Quix Streams Source enables Applications to read data from something other than a Kafka topic. + Basically, this generates JSON data that the StreamingDataFrame consumes directly. + This simplifies the example by not having to run both a producer and Application. - Machine ID's 0, 1 are functioning normally, 2 is malfunctioning. + What it does: + Generates temperature readings for three different machines. + Machine ID's 0 and 1 are functioning normally; ID 2 is malfunctioning. + It cycles through the ID's, generating random temperature changes of -1, 0, or 1. + These probabilities change depending on current temperature and operation. """ # Probabilities are % chance for [-1, 0, 1] change given a current temp in the @@ -33,7 +41,6 @@ class TemperatureEventGenerator: } def __init__(self): - self.stop = False self.event_count = 0 self.machine_temps = {0: 66, 1: 58, 2: 62} self.machine_types = { @@ -41,6 +48,7 @@ def __init__(self): 1: self.probabilities_normal, 2: self.probabilities_issue, } + super().__init__(name="temperature-event-generator") def update_machine_temp(self, machine_id): """ @@ -57,9 +65,6 @@ def generate_event(self): It tracks/loops through the Machine ID's for equal number of readings. """ - if self.stop: - return None - machine_id = self.event_count % 3 self.update_machine_temp(machine_id) event_out = { @@ -71,19 +76,44 @@ def generate_event(self): } self.event_count += 1 if self.machine_temps[machine_id] == 100: - self.stop = True + self.stop() return event_out - -_app = Application(broker_address=os.environ.get("BROKER_ADDRESS", "localhost:9092")) -topic = _app.topic(name="temperature_readings") -event_generator = TemperatureEventGenerator() - -if __name__ == "__main__": - with _app.get_producer() as producer: - while event := event_generator.generate_event(): - event = topic.serialize(**event) + def run(self): + while self.running: + event = self.serialize(**self.generate_event()) print(f"producing event for MID {event.key}, {event.value}") - producer.produce(key=event.key, value=event.value, topic=topic.name) + self.produce(key=event.key, value=event.value) time.sleep(0.2) # just to make things easier to follow along print("An alert should have been generated by now; stopping producing.") + + +# NOTE: A "metadata" function expects these 4 arguments regardless of use. +def should_alert(window_value: int, key, timestamp, headers): + if window_value >= 90: + print(f"Alerting for MID {key}: Average Temperature {window_value}") + return True + + +def main(): + app = Application( + broker_address=os.getenv("BROKER_ADDRESS", "localhost:9092"), + consumer_group="temperature_alerter", + auto_offset_reset="earliest", + ) + alerts_topic = app.topic(name="temperature_alerts") + + # If reading from a Kafka topic, pass topic= instead of a source + sdf = app.dataframe(source=TemperatureGenerator()) + sdf = sdf.apply(lambda data: data["Temperature_C"]) + sdf = sdf.hopping_window(duration_ms=5000, step_ms=1000).mean().current() + sdf = sdf.apply(lambda result: round(result["value"], 2)).filter( + should_alert, metadata=True + ) + sdf.to_topic(alerts_topic) + + app.run() + + +if __name__ == "__main__": + main() diff --git a/docs/tutorials/purchase-filtering/application.py b/docs/tutorials/purchase-filtering/application.py deleted file mode 100644 index 6e62f405c..000000000 --- a/docs/tutorials/purchase-filtering/application.py +++ /dev/null @@ -1,35 +0,0 @@ -import os - -from quixstreams import Application - -app = Application( - broker_address=os.environ.get("BROKER_ADDRESS", "localhost:9092"), - consumer_group="purchase_filtering", - auto_offset_reset="earliest", -) -customer_purchases_topic = app.topic(name="customer_purchases") -customers_qualified_topic = app.topic(name="customers_coupon_qualified") - - -def get_full_name(customer): - return f'{customer["First Name"]} {customer["Last Name"]}' - - -def get_purchase_totals(items): - return sum([i["Price"] * i["Quantity"] for i in items]) - - -SALES_TAX = 1.10 - -sdf = app.dataframe(topic=customer_purchases_topic) -sdf = sdf[ - (sdf["Purchases"].apply(get_purchase_totals) * SALES_TAX >= 100.00) - & (sdf["Membership Type"].isin(["Silver", "Gold"])) -] -sdf["Full Name"] = sdf.apply(get_full_name) -sdf = sdf[["Full Name", "Email"]] -sdf = sdf.to_topic(customers_qualified_topic) - - -if __name__ == "__main__": - app.run() diff --git a/docs/tutorials/purchase-filtering/producer.py b/docs/tutorials/purchase-filtering/producer.py deleted file mode 100644 index 83b9a9d44..000000000 --- a/docs/tutorials/purchase-filtering/producer.py +++ /dev/null @@ -1,58 +0,0 @@ -import os -import time - -from quixstreams import Application - -_app = Application(broker_address=os.environ.get("BROKER_ADDRESS", "localhost:9092")) -topic = _app.topic(name="customer_purchases") - -purchases_data = [ - { - "First Name": "Jane", - "Last Name": "Doe", - "Email": "jdoe@mail.com", - "Membership Type": "Gold", - "Purchases": [ - {"Item ID": "abc123", "Price": 13.99, "Quantity": 1}, - {"Item ID": "def456", "Price": 12.59, "Quantity": 2}, - ], - }, - { - "First Name": "Robbie", - "Last Name": "McRobberson", - "Email": "therob@mail.com", - "Membership Type": "Bronze", - "Purchases": [ - {"Item ID": "abc123", "Price": 13.99, "Quantity": 13}, - ], - }, - { - "First Name": "Howbout", - "Last Name": "Dat", - "Email": "cashmeoutside@mail.com", - "Membership Type": "Silver", - "Purchases": [ - {"Item ID": "abc123", "Price": 3.14, "Quantity": 7}, - {"Item ID": "xyz987", "Price": 7.77, "Quantity": 13}, - ], - }, - { - "First Name": "The", - "Last Name": "Reaper", - "Email": "plzdontfearme@mail.com", - "Membership Type": "Gold", - "Purchases": [ - {"Item ID": "xyz987", "Price": 7.77, "Quantity": 99}, - ], - }, -] - - -if __name__ == "__main__": - with _app.get_producer() as producer: - for cid, purchase_info in enumerate(purchases_data): - event = topic.serialize(key=f"CUSTOMER_ID{cid}", value=purchase_info) - print(f"producing review for {event.key}: {event.value}") - producer.produce(key=event.key, value=event.value, topic=topic.name) - time.sleep(1) # just to make things easier to follow along - print("Sent all customer purchases") diff --git a/docs/tutorials/purchase-filtering/tutorial.md b/docs/tutorials/purchase-filtering/tutorial.md index f7e132c64..1047fba93 100644 --- a/docs/tutorials/purchase-filtering/tutorial.md +++ b/docs/tutorials/purchase-filtering/tutorial.md @@ -3,14 +3,20 @@ We will build a simple Purchase Filtering app to showcase some common Quix Streams dataframe-like operations with dictionary/JSON data (a format frequently used). -You'll learn how to: -- Create a topic + +## What You Will Learn + +This example will show how to use a Quix Streams `Application` to: + +- Ingest a non-Kafka data source - Assign a value to a new column - Use `SDF.apply()` with additional operations - Filter with inequalities combined with and/or (`&`, `|`) - Get a subset/selection of columns -- Produce resulting output to a topic +- Create a Kafka topic +- Produce results to a Kafka topic + ## Outline of the Problem @@ -28,9 +34,8 @@ necessary information downstream. ## Our Example -We will use a simple producer to generate some mock purchase data to be processed by our -new Purchase Filtering application. - +We will use a [Quix Streams `Source`](../../connectors/sources/README.md) to generate some mock purchase data to be +processed by our new Purchase Filtering `Application`. ## Important Takeaways @@ -43,24 +48,30 @@ as if it were a dataframe. ## Before Getting Started -- You will see links scattered throughout this tutorial. +1. You will see links scattered throughout this tutorial. - Tutorial code links are marked **>>> LIKE THIS <<<** . - ***All other links provided are completely optional***. - They are great ways to learn more about various concepts if you need it! +2. This tutorial uses a Quix Streams [`Source`](../../connectors/sources/README.md) rather than a Kafka [`Topic`]() to ingest data. + - `Source` connectors enable reading data from a non-Kafka origin (typically to get it into Kafka). + - This approach circumvents users having to run a [producer](../../producer.md) alongside the `Application`. + - A `Source` is easily replaced with an actual Kafka topic (just pass a `Topic` instead of a `Source`). -- We use the word "column" for consistency with Pandas terminology. - - You can also think of it as a dictionary key. +3. We use the word "column" for consistency with Pandas terminology. + - You can also think of it as a dictionary key. ## Generating Purchase Data -We have a simple [**>>> Purchases Producer <<<**](producer.py) that generates a small static set of -"purchases", which are simply dictionaries with various info about what was purchased by -a customer during their visit. The data is keyed on customer ID. +Our [**>>> Purchase Filtering Application <<<**](tutorial_app.py) uses a `Source` called +`PurchaseGenerator` that generates a small static set of "purchases", which are simply +dictionaries with various info about what was purchased by a customer during their visit. + +The data is keyed on customer ID. -An outgoing Kafka message looks something like: +The incoming Kafka data looks something like: ```python # ... @@ -86,56 +97,99 @@ kafka_value: { ``` + + ## Purchase Filtering Application +Now let's go over the `main()` portion of +our [**>>> Purchase Filtering Application <<<**](tutorial_app.py) in detail! + + + + +### Create an Application + +Create a [Quix Streams Application](../../configuration.md), which is our constructor for everything! + +We provide it our connection settings, consumer group (ideally unique per Application), +and where the consumer group should start from on the (internal) Source topic. -Now let's go over our [**>>> Purchase Filtering Application <<<**](application.py) line-by-line! +!!! TIP + Once you are more familiar with Kafka, we recommend + [learning more about auto_offset_reset](https://www.quix.io/blog/kafka-auto-offset-reset-use-cases-and-pitfalls). -### Create Application +#### Our Application ```python import os from quixstreams import Application app = Application( - broker_address=os.environ.get("BROKER_ADDRESS", "localhost:9092"), - consumer_group="purchase_summing", - auto_offset_reset="earliest" + broker_address=os.getenv("BROKER_ADDRESS", "localhost:9092"), + consumer_group="purchase_filtering", + auto_offset_reset="earliest", ) ``` -First, create the [Quix Streams Application](../../configuration.md), which is our constructor for everything! We provide it our connection settings, consumer group (ideally unique per Application), and where the consumer group should start from on our topic. -NOTE: Once you are more familiar with Kafka, we recommend [learning more about auto_offset_reset](https://www.quix.io/blog/kafka-auto-offset-reset-use-cases-and-pitfalls). +### Specify Topics -### Define Topics +`Application.topic()` returns [`Topic`](../../api-reference/topics.md) objects which are used by `StreamingDataFrame`. + +Create one for each topic used by your `Application`. + +!!! NOTE + + Any missing topics will be automatically created for you upon running an `Application`. + + +#### Our Topics +We have one output topic, named `customers_coupon_qualified`: ```python -customer_purchases_topic = app.topic(name="customer_purchases") customers_qualified_topic = app.topic(name="customers_coupon_qualified") ``` -Next we define our input/output topics, named `customer_purchases` and `customers_coupon_qualified`, respectively. -They each return [`Topic`](../../api-reference/topics.md) objects, used later on. -NOTE: the topics will automatically be created for you in Kafka when you run the application should they not exist. + ### The StreamingDataFrame (SDF) +Now for the fun part: building our [StreamingDataFrame](../../processing.md#introduction-to-streamingdataframe), often shorthanded to "SDF". + +SDF allows manipulating the message value in a dataframe-like fashion using various operations. + +After initializing with either a `Topic` or `Source`, we continue reassigning to the +same `sdf` variable as we add operations. + +!!! NOTE + + A few `StreamingDataFrame` operations are + ["in-place"](../../advanced/dataframe-assignments.md#valid-in-place-operations), + like `.print()`. + +#### Initializing our SDF + ```python -sdf = app.dataframe(topic=customer_purchases_topic) +sdf = app.dataframe(source=PurchaseGenerator()) ``` -Now for the fun part: building our [StreamingDataFrame](../../processing.md#introduction-to-streamingdataframe), often shorthanded to "SDF". +First, we initialize our SDF with our `PurchaseGenerator` `Source`, +which means we will be consuming data from a non-Kafka origin. + + +!!! TIP + + You can consume from a Kafka topic instead by passing a `Topic` object + with `app.dataframe(topic=)`. + +Let's go over the SDF operations in this example in detail. -SDF allows manipulating the message value in a dataframe-like fashion using various operations. -After initializing, we continue re-assigning to the same `sdf` variable as we add operations. -(Also: notice that we pass our input `Topic` from the previous step to it.) ### Filtering Purchases @@ -179,11 +233,13 @@ sdf["Membership Type"].isin(["Silver", "Gold"]) We additionally showcase one of our built-in column operations `.isin()`, a way for SDF to perform an `if x in y` check (SDF is declaratively defined, invalidating that approach). -**NOTE**: some operations (like `.isin()`) are only available when manipulating a column. +!!! INFO "Column-only manipulations" + + Some operations (like `.isin()`) are only available when manipulating a column. - - if you're unsure what's possible, autocomplete often covers you! + - if you're unsure what's possible, autocomplete often covers you! - - _ADVANCED_: [complete list of column operations](../../api-reference/dataframe.md#streamingseries). + - _ADVANCED_: [complete list of column operations](../../api-reference/dataframe.md#streamingseries).
@@ -221,6 +277,7 @@ As such, SDF filtering interprets the SDF operation `&` _boolean_ result as foll So, any events that don't satisfy these conditions will be filtered as desired! + ### Adding a New Column ```python @@ -239,12 +296,14 @@ This is basically a functional equivalent of adding a key to a dictionary. >>> {"Remove Me": "value", "Email": "cool email"}` ``` -becomes +becomes: ```python >>> {"Remove Me": "value", "Email": "cool email", "Full Name": "cool name"}` ``` + + ### Getting a Column Subset/Selection ```python @@ -253,19 +312,23 @@ sdf = sdf[["Email", "Full Name"]] We only need a couple fields to send downstream, so this is a convenient way to select only a specific list of columns (AKA dictionary keys) from our data. -So +So: ```python >>> {"Remove Me": "value", "Email": "cool email", "Full Name": "cool name", }` ``` -becomes +becomes: ```python >>> {"Email": "cool email", "Full Name": "cool name"}` ``` -NOTE: you cannot reference nested keys in this way. +!!! WARNING + + You cannot reference nested keys in this way. + + ### Producing the Result @@ -276,32 +339,57 @@ sdf = sdf.to_topic(customers_qualified_topic) Finally, we produce our non-filtered results downstream via [`SDF.to_topic(T)`](../../processing.md#writing-data-to-kafka-topics), where `T` is our previously defined `Topic` (not the topic name!). -NOTE: by default, our outgoing Kafka key is persisted from the input message. -[You can alter it](../../processing.md#changing-message-key-before-producing), if needed. +!!! "Message key persistence" + + By default, our outgoing Kafka key is persisted from the input message. + [You can alter it](../../processing.md#changing-message-key-before-producing), if needed. + +### Running the Application +Running a `Source`-based `Application` requires calling `Application.run()` within a +`if __name__ == "__main__"` block. -## Try it yourself! +#### Our Application Run Block + +Our entire `Application` (and all its spawned objects) resides within a +`main()` function, executed as required: + +```python +if __name__ == "__main__": + main() +``` + + + +## Try it Yourself! ### 1. Run Kafka First, have a running Kafka cluster. -To conveniently follow along with this tutorial, just [run this simple one-liner](../README.md#running-kafka-locally). +To easily run a broker locally with Docker, just [run this simple one-liner](../README.md#running-kafka-locally). -### 2. Install Quix Streams -In your python environment, run `pip install quixstreams` +### 2. Download files +- [tutorial_app.py](tutorial_app.py) -### 3. Run the Producer and Application -Just call `python producer.py` and `python application.py` in separate windows. +### 3. Install Quix Streams +In your desired python environment, execute: `pip install quixstreams` -### 4. Check out the results! +### 4. Run the application +In your desired python environment, execute: `python tutorial_app.py`. -...but wait, I don't see any message processing output...Is it working??? +### 5. Check out the results! + +...but wait, I don't see any `Application` processing output...Is it working??? One thing to keep in mind is that the Quix Streams does not log/print any message processing operations by default. To get visual outputs around message processing, you can either: -- use [recommended way of printing/logging stuff](../../processing.md#debugging) - + +- use [recommended way of printing/logging with SDF](../../processing.md#debugging) + - use `DEBUG` mode via `Application(loglevel="DEBUG")` - - WARNING: you should NOT run your applications in `DEBUG` mode in production. + + !!! DANGER + + you should NOT run your applications in `DEBUG` mode in production. \ No newline at end of file diff --git a/docs/tutorials/purchase-filtering/tutorial_app.py b/docs/tutorials/purchase-filtering/tutorial_app.py new file mode 100644 index 000000000..302cf0d7f --- /dev/null +++ b/docs/tutorials/purchase-filtering/tutorial_app.py @@ -0,0 +1,111 @@ +import logging +import os +import time + +from quixstreams import Application +from quixstreams.sources import Source + +logger = logging.getLogger(__name__) + + +SALES_TAX = 1.10 + + +class PurchaseGenerator(Source): + """ + What a Source is: + A Quix Streams Source enables Applications to read data from something other than a Kafka topic. + Basically, this generates JSON data that the StreamingDataFrame consumes directly. + This simplifies the example by not having to run both a producer and Application. + + What it does: + Generates "purchase events" based on the below list of `_purchase data`. + """ + + _purchases_data = [ + { + "First Name": "Jane", + "Last Name": "Doe", + "Email": "jdoe@mail.com", + "Membership Type": "Gold", + "Purchases": [ + {"Item ID": "abc123", "Price": 13.99, "Quantity": 1}, + {"Item ID": "def456", "Price": 12.59, "Quantity": 2}, + ], + }, + { + "First Name": "Robbie", + "Last Name": "McRobberson", + "Email": "therob@mail.com", + "Membership Type": "Bronze", + "Purchases": [ + {"Item ID": "abc123", "Price": 13.99, "Quantity": 13}, + ], + }, + { + "First Name": "Howbout", + "Last Name": "Dat", + "Email": "cashmeoutside@mail.com", + "Membership Type": "Silver", + "Purchases": [ + {"Item ID": "abc123", "Price": 3.14, "Quantity": 7}, + {"Item ID": "xyz987", "Price": 7.77, "Quantity": 13}, + ], + }, + { + "First Name": "The", + "Last Name": "Reaper", + "Email": "plzdontfearme@mail.com", + "Membership Type": "Gold", + "Purchases": [ + {"Item ID": "xyz987", "Price": 7.77, "Quantity": 99}, + ], + }, + ] + + def __init__(self): + super().__init__(name="customer-purchases") + + def run(self): + for cid, purchase_info in enumerate(self._purchases_data): + event = self.serialize(key=f"CUSTOMER_ID{cid}", value=purchase_info) + logger.info(f"producing review for {event.key}: {event.value}") + self.produce(key=event.key, value=event.value) + time.sleep(1) # just to make things easier to follow along + logger.info("Sent all customer purchases") + + +def get_full_name(customer): + return f'{customer["First Name"]} {customer["Last Name"]}' + + +def get_purchase_totals(items): + return sum([i["Price"] * i["Quantity"] for i in items]) + + +def main(): + app = Application( + broker_address=os.getenv("BROKER_ADDRESS", "localhost:9092"), + consumer_group="purchase_filtering", + auto_offset_reset="earliest", + ) + customers_qualified_topic = app.topic(name="customers_coupon_qualified") + + # If reading from a Kafka topic, pass topic= instead of a source + sdf = app.dataframe(source=PurchaseGenerator()) + sdf = sdf[ + (sdf["Purchases"].apply(get_purchase_totals) * SALES_TAX >= 100.00) + & (sdf["Membership Type"].isin(["Silver", "Gold"])) + ] + sdf["Full Name"] = sdf.apply(get_full_name) + sdf = sdf[["Full Name", "Email"]] + + sdf.print() + sdf.to_topic(customers_qualified_topic) + + app.run() + + +# This approach is necessary since we are using a Source +if __name__ == "__main__": + main() diff --git a/examples/custom_websocket_source/requirements.txt b/docs/tutorials/websocket-source/requirements.txt similarity index 52% rename from examples/custom_websocket_source/requirements.txt rename to docs/tutorials/websocket-source/requirements.txt index 0c849e8b1..bb63126c4 100644 --- a/examples/custom_websocket_source/requirements.txt +++ b/docs/tutorials/websocket-source/requirements.txt @@ -1,3 +1,3 @@ python-dateutil websockets -quixstreams>=2.11, <3.0 +quixstreams~=3.4.0 diff --git a/docs/tutorials/websocket-source/tutorial.md b/docs/tutorials/websocket-source/tutorial.md new file mode 100644 index 000000000..2ad0c246f --- /dev/null +++ b/docs/tutorials/websocket-source/tutorial.md @@ -0,0 +1,254 @@ +# Tutorial: Websocket Source (Coinbase API) + +This tutorial builds a custom `Source` connector named `CoinbaseSource` for ingesting +ticker updates with the Coinbase Websocket API for processing them with a `StreamingDataFrame`. + +Specifically, it showcases how to use the Quix Streams connector framework to +create and use a customized `Source` (there are also `Sink` connectors as well!). + + +## Outline of the Problem + +We want to track of various Bitcoin prices for real-time analysis, but we need to get +the data into Kafka first. + + +## Our Example +This example showcases: + +1. Extending the [Quix Streams `Source`](../../connectors/sources/README.md) class to read from the Coinbase Websocket API. +2. Using the new extension (`CoinbaseSource`). + +## Before Getting Started + +1. You will see links scattered throughout this tutorial. + - Tutorial code links are marked **>>> LIKE THIS <<<** . + - ***All other links provided are completely optional***. + - They are great ways to learn more about various concepts if you need it! + +2. This tutorial uses a [`Source`](../../connectors/sources/README.md) rather than a Kafka [`Topic`]() to ingest data. + - `Source` connectors enable reading data from a non-Kafka origin (typically to get it into Kafka). + - This approach circumvents users having to run a [producer](../../producer.md) alongside the `Application`. + - A `Source` is easily replaced with an actual Kafka topic (just pass a `Topic` instead of a `Source`). + + +## Creating `CoinbaseSource` + +First Let's take a detailed look at `CoinbaseSource` in our [**>>> Coinbase Application <<<**](tutorial_app.py) +to understand what modifications to `Source` were necessary. + +> [!NOTE] +> Check out the [custom Source docs](../../connectors/sources/custom-sources.md) +> for additional details around what can be adjusted. + + +### Setting up `Source.run()` + +A `Source` requires defining a `.run()` method, which should perform +a data retrieval and produce loop (using `Source.serialize()` and `Source.produce()` methods) within a +`while Source.running` block. + +Lets take a look at `CoinbaseSource`'s `.run()` in detail. + +#### Setting up the API Connection + +First, we establish the connection. + +```python +ws_conn = connect(self._url) +subscribe_payload = { + "type": "subscribe", + "channels": [ + {"name": "ticker", "product_ids": self._product_ids}, + ], +} +ws_conn.send(json.dumps(subscribe_payload)) +``` + +#### Data retrieval loop + +Now we set up the data retrieval loop contained within a `while self.running` block. + +This is so a shutdown from the `Application` level also gracefully exits this loop; the +`Source` essentially stops if the `Source.run()` method is ever exited. + +!!! NOTE + + Since no other teardown is required for websockets, nothing happens after the + `while self.running` block. + +Inside this block, records are retrieved, serialized (to `JSON`), and produced to an +underlying internal topic as close to its raw form as possible (user-level manipulations +occur at the `Application` level using a `StreamingDataFrame`). + +!!! TIP + + The internal topic can accept other data serializations by overriding + `Source.default_topic()`. + + + + +## Using `CoinbaseSource` + +Now that `CoinbaseSource` exists, we can ingest raw data from Coinbase. + +Of course, each user will have their own desired product ID's and transformations to apply. + +Now let's go over the `main()` portion of +our [**>>> Coinbase Application <<<**](tutorial_app.py) in detail! + +### Define the Source + +First, set up a `CoinBaseSource` with our desired `product_ids`. + +Be sure to provide a unique name since it affects the internal topic name. + +```python +coinbase_source = CoinbaseSource( + name="coinbase-source", + url="wss://ws-feed-public.sandbox.exchange.coinbase.com", + product_ids=["ETH-BTC"], +) +``` + +### Create an Application + +Create a [Quix Streams Application](../../configuration.md), which is our constructor for everything! + +We provide it our connection settings, consumer group (ideally unique per Application), +and where the consumer group should start from on the (internal) Source topic. + +!!! TIP + + Once you are more familiar with Kafka, we recommend + [learning more about auto_offset_reset](https://www.quix.io/blog/kafka-auto-offset-reset-use-cases-and-pitfalls). + +#### Our Application +```python +from quixstreams import Application +app = Application( + broker_address="localhost:9092", # your Kafka broker address here + auto_offset_reset="earliest", +) +``` + +### Specify Topics + +`Application.topic()` returns [`Topic`](../../api-reference/topics.md) objects which are used by `StreamingDataFrame`. + +Create one for each topic used by your `Application`. + +!!! NOTE + + Any missing topics will be automatically created for you upon running an `Application`. + +#### Our Topics +We have one output topic, named `price_updates`: + +```python +price_updates_topic = app.topic(name="price_updates") +``` + + + +### The StreamingDataFrame (SDF) + +Now for the fun part: building our [StreamingDataFrame](../../processing.md#introduction-to-streamingdataframe), often shorthanded to "SDF". + +SDF allows manipulating the message value in a dataframe-like fashion using various operations. + +After initializing with either a `Topic` or `Source`, we continue reassigning to the +same `sdf` variable as we add operations. + +!!! NOTE + + A few `StreamingDataFrame` operations are + ["in-place"](../../advanced/dataframe-assignments.md#valid-in-place-operations), + like `.print()`. + +#### Our SDF operations +First, we initialize our SDF with our `coinbase_source`. + +Then, our SDF prints each record to the console, and then produces only +the price and ticker name to our outgoing topic. + +```python +sdf = app.dataframe(source=coinbase_source) +sdf.print() +sdf = sdf[['price', 'volume_24h']] +sdf.to_topic(price_updates_topic) +``` + +#### Example record +As an example, a record processed by our `StreamingDataframe` would print the following: +```python +{ 'value': { 'type': 'ticker', + 'sequence': 754296790, + 'product_id': 'ETH-BTC', + 'price': '0.00005', + 'open_24h': '0.00008', + 'volume_24h': '322206074.45925051', + 'low_24h': '0.00005', + 'high_24h': '0.00041', + 'volume_30d': '3131937035.46099349', + 'best_bid': '0.00001', + 'best_bid_size': '1000000000.00000000', + 'best_ask': '0.00006', + 'best_ask_size': '166668.66666667', + 'side': 'sell', + 'time': '2024-09-19T10:01:26.411029Z', + 'trade_id': 28157206, + 'last_size': '16666.86666667'}} +``` + +and then produce the following to topic `price_updates`: +```python +{ + 'key': 'ETH-BTC', + 'value': {'price': '0.00005', 'volume_24h': '322206074.45925051'} +} + +``` + + +### Running the Application + +Running a `Source`-based `Application` requires calling `Application.run()` within a +`if __name__ == "__main__"` block. + +#### Our Application Run Block + +Our entire `Application` (and all its spawned objects) resides within a +`main()` function, executed as required: + +```python +if __name__ == "__main__": + main() +``` + +This `main()` setup is a personal choice: the only true +requirement is `app.run()` being called inside a `if __name__ == "__main__"` block. + + + +## Try it Yourself! + +### 1. Run Kafka +First, have a running Kafka cluster. + +To easily run a broker locally with Docker, just [run this simple one-liner](../README.md#running-kafka-locally). + +### 2. Download files +- [requirements.txt](requirements.txt) +- [tutorial_app.py](tutorial_app.py) + +### 3. Install requirements +In your desired python environment, execute: `pip install -r requirements.txt` + +### 4. Run the application +In your desired python environment, execute: `python tutorial_app.py`. + +### 5. Check out the results! + +You should see record printouts [like the example above](#example-record). \ No newline at end of file diff --git a/examples/custom_websocket_source/main.py b/docs/tutorials/websocket-source/tutorial_app.py similarity index 91% rename from examples/custom_websocket_source/main.py rename to docs/tutorials/websocket-source/tutorial_app.py index e37db0e86..d45e534ff 100644 --- a/examples/custom_websocket_source/main.py +++ b/docs/tutorials/websocket-source/tutorial_app.py @@ -1,5 +1,6 @@ import json import logging +import os from typing import List from dateutil.parser import isoparse @@ -84,12 +85,6 @@ def run(self) -> None: def main(): - # Initialize an Application with Kafka configuration - app = Application( - broker_address="localhost:9092", # Specify your Kafka broker address here - auto_offset_reset="earliest", - ) - # Configure the CoinbaseSource instance coinbase_source = CoinbaseSource( # Pick the unique name for the source instance. @@ -101,12 +96,25 @@ def main(): ], ) + # Initialize an Application with Kafka configuration + app = Application( + broker_address=os.getenv("BROKER_ADDRESS", "localhost:9092"), + auto_offset_reset="earliest", + ) + + # Define a topic for producing transformed data + price_updates_topic = app.topic(name="price_updates") + # Connect the CoinbaseSource to a StreamingDataFrame sdf = app.dataframe(source=coinbase_source) # Print the incoming messages from the source sdf.print() + # Select specific data columns and produce them to a topic + sdf = sdf[["price", "volume_24h"]] + sdf.to_topic(price_updates_topic) + # Start the application app.run() diff --git a/docs/tutorials/word-count/application.py b/docs/tutorials/word-count/application.py deleted file mode 100644 index 564993047..000000000 --- a/docs/tutorials/word-count/application.py +++ /dev/null @@ -1,31 +0,0 @@ -import os -from collections import Counter - -from quixstreams import Application - -app = Application( - broker_address=os.environ.get("BROKER_ADDRESS", "localhost:9092"), - consumer_group="product_review_word_counter", - auto_offset_reset="earliest", -) -product_reviews_topic = app.topic(name="product_reviews") -word_counts_topic = app.topic(name="product_review_word_counts") - - -def tokenize_and_count(text): - return list(Counter(text.lower().replace(".", " ").split()).items()) - - -def should_skip(word_count_pair): - word, count = word_count_pair - return word not in ["i", "a", "we", "it", "is", "and", "or", "the"] - - -sdf = app.dataframe(topic=product_reviews_topic) -sdf = sdf.apply(tokenize_and_count, expand=True) -sdf = sdf.filter(should_skip) -sdf = sdf.to_topic(word_counts_topic, key=lambda word_count_pair: word_count_pair[0]) - - -if __name__ == "__main__": - app.run() diff --git a/docs/tutorials/word-count/producer.py b/docs/tutorials/word-count/producer.py deleted file mode 100644 index 34e91e80e..000000000 --- a/docs/tutorials/word-count/producer.py +++ /dev/null @@ -1,31 +0,0 @@ -import os -import time -from random import choice - -from quixstreams import Application - -_app = Application(broker_address=os.environ.get("BROKER_ADDRESS", "localhost:9092")) -topic = _app.topic(name="product_reviews") - -review_list = [ - "This is the best thing since sliced bread. The best I say.", - "This is terrible. Could not get it working.", - "I was paid to write this. Seems good.", - "Great product. Would recommend.", - "Not sure who this is for. Seems like it will break after 5 minutes.", - "I bought their competitors product and it is way worse. Use this one instead.", - "I would buy it again. In fact I would buy several of them.", - "Great great GREAT", -] - -product_list = ["product_a", "product_b", "product_c"] - - -if __name__ == "__main__": - with _app.get_producer() as producer: - for review in review_list: - event = topic.serialize(key=choice(product_list), value=review) - print(f"producing review for {event.key}: {event.value}") - producer.produce(key=event.key, value=event.value, topic=topic.name) - time.sleep(0.5) # just to make things easier to follow along - print("Sent all product reviews.") diff --git a/docs/tutorials/word-count/tutorial.md b/docs/tutorials/word-count/tutorial.md index 4873569d5..22774b4ca 100644 --- a/docs/tutorials/word-count/tutorial.md +++ b/docs/tutorials/word-count/tutorial.md @@ -2,13 +2,17 @@ We will build a simple word counter, which is a great introduction to Quix Streams and Kafka! -You'll learn how to: -- Create a topic +## What You Will Learn + +This example will show how to use a Quix Streams `Application` to: + +- Ingest a non-Kafka data source - Do simple event alterations - Generate multiple events from a single event - Filter any undesired events -- Produce events, with new Kafka keys, to a topic +- Create a Kafka topic +- Produce results, with new Kafka keys, to a topic @@ -26,17 +30,20 @@ the counts of each individually downstream for further processing. ## Our Example -We will use a simple producer to generate text to be processed by our -new Word Counter application. +We will use a [Quix Streams `Source`](../../connectors/sources/README.md) to generate text to be processed by our +new Word Counter `Application`. + + +!!! NOTE -NOTE: Our example uses JSON formatting for Kafka message values. + Our example uses `JSON` formatting for Kafka message values. ## Event Expansion The most important concept we want to explore here is how you can "expand" a single -event into multiple new ones. +event/message into multiple new ones. More than that: each new event generated via expansion is processed individually through the remainder of your pipeline, allowing you to write ALL your operations @@ -46,20 +53,27 @@ NOTE: Expanding often includes adjusting outgoing Kafka keys as well, so we addi showcase that. + ## Before Getting Started -- You will see links scattered throughout this tutorial. +1. You will see links scattered throughout this tutorial. - Tutorial code links are marked **>>> LIKE THIS <<<** . - ***All other links provided are completely optional***. - They are great ways to learn more about various concepts if you need it! +2. This tutorial uses a [`Source`](../../connectors/sources/README.md) rather than a Kafka [`Topic`]() to ingest data. + - `Source` connectors enable reading data from a non-Kafka origin (typically to get it into Kafka). + - This approach circumvents users having to run a [producer](../../producer.md) alongside the `Application`. + - A `Source` is easily replaced with an actual Kafka topic (just pass a `Topic` instead of a `Source`). + + ## Generating Text Data -We have a [**>>> Review Producer <<<**](producer.py) that generates a static set of "reviews", -which are simply strings, where the key is the product name. +Our [**>>> Word Counter Application <<<**](tutorial_app.py) uses a `Source` called `ReviewGenerator` +that generates a static set of "reviews", which are simply strings, where the key is the product name. -The Kafka message looks like: +The incoming Kafka messages look like: ```python # ... @@ -71,52 +85,90 @@ The Kafka message looks like: ## Word Counter Application -Now let's go over our [**>>> Word Counter Application <<<**](application.py) line-by-line! +Now let's go over the `main()` portion of +our [**>>> Word Counter Application <<<**](tutorial_app.py) in detail! + -### Create Application + +### Create an Application + +Create a [Quix Streams Application](../../configuration.md), which is our constructor for everything! + +We provide it our connection settings, consumer group (ideally unique per Application), +and where the consumer group should start from on the (internal) Source topic. + +!!! TIP + + Once you are more familiar with Kafka, we recommend + [learning more about auto_offset_reset](https://www.quix.io/blog/kafka-auto-offset-reset-use-cases-and-pitfalls). + +#### Our Application ```python import os from quixstreams import Application app = Application( - broker_address=os.environ.get("BROKER_ADDRESS", "localhost:9092"), + broker_address=os.getenv("BROKER_ADDRESS", "localhost:9092"), consumer_group="product_review_word_counter", auto_offset_reset="earliest" ) ``` -First, create the [Quix Streams Application](../../configuration.md), which is our constructor for everything! We provide it our connection settings, consumer group (ideally unique per Application), and where the consumer group should start from on our topic. -NOTE: Once you are more familiar with Kafka, we recommend [learning more about auto_offset_reset](https://www.quix.io/blog/kafka-auto-offset-reset-use-cases-and-pitfalls). +### Specify Topics + +`Application.topic()` returns [`Topic`](../../api-reference/topics.md) objects which are used by `StreamingDataFrame`. + +Create one for each topic used by your `Application`. -### Define Topics +!!! NOTE + + Any missing topics will be automatically created for you upon running an `Application`. + + +#### Our Topics +We have one output topic, named `product_review_word_counts`: ```python -product_reviews_topic = app.topic(name="product_reviews") word_counts_topic = app.topic(name="product_review_word_counts") ``` -Next we define our input/output topics, named `product_reviews` and `product_review_word_counts`, respectively. -They each return [`Topic`](../../api-reference/topics.md) objects, used later on. -NOTE: The topics will automatically be created for you in Kafka when you run the application should they not exist. +### The StreamingDataFrame (SDF) + +Now for the fun part: building our [StreamingDataFrame](../../processing.md#introduction-to-streamingdataframe), often shorthanded to "SDF". + +SDF allows manipulating the message value in a dataframe-like fashion using various operations. +After initializing with either a `Topic` or `Source`, we continue reassigning to the +same `sdf` variable as we add operations. -### The StreamingDataFrame (SDF) +!!! NOTE + + A few `StreamingDataFrame` operations are + ["in-place"](../../advanced/dataframe-assignments.md#valid-in-place-operations), + like `.print()`. + +#### Initializing our SDF ```python -sdf = app.dataframe(topic=product_reviews_topic) +sdf = app.dataframe(source=ReviewGenerator()) ``` -Now for the fun part: building our [StreamingDataFrame](../../processing.md#introduction-to-streamingdataframe), often shorthanded to "SDF". +First, we initialize our SDF with our `ReviewGenerator` `Source`, +which means we will be consuming data from a non-Kafka origin. -SDF allows manipulating the message value in a dataframe-like fashion using various operations. -After initializing, we continue re-assigning to the same `sdf` variable as we add operations. +!!! TIP + + You can consume from a Kafka topic instead by passing a `Topic` object + with `app.dataframe(topic=)`. + +Let's go over the SDF operations in this example in detail. + -(Also, notice that we pass our input `Topic` from the previous step to it.) ### Tokenizing Text @@ -148,11 +200,13 @@ to this: `>>> [('bob', 1), ('likes', 2), ('bananas', 1), ('and', 1), ('frank', 1), ('apples', 1)]` -NOTE: Two VERY important and related points around the `expand=True` argument: +!!! INFO "`expand=True` argument details" + + 1. It tells SDF "hey, this .apply() returns _**multiple independent**_ events!" + + 2. The `F` returns a `list` (or a non-dict iterable of some kind), hence the "expand"! -1. It tells SDF "hey, this .apply() returns _**multiple independent**_ events!" -2. Our `F` returns a `list` (or a non-dict iterable of some kind), hence the "expand"! ### Filtering Expanded Results @@ -181,6 +235,8 @@ With this filter applied, our "and" event is removed: `>>> [('bob', 1), ('likes', 2), ('bananas', 1), ('frank', 1), ('apples', 1)]` + + ### Producing Events With New Keys ```python @@ -193,10 +249,16 @@ via [`SDF.to_topic(T)`](../../processing.md#writing-data-to-kafka-topics), where Notice here the optional `key` argument, which allows you to provide a [custom key generator](../../processing.md#changing-message-key-before-producing). While it's fairly common to maintain the input event's key (SDF's default behavior), -there are many reasons why you might adjust it...like here (NOTE: advanced concept below)! +there are also many reasons why you might adjust it, so we showcase an example of that +here! + +!!! QUESTION "Why change the key to the word being counted?" -We are changing the message key to the word; this data structure enables -calculating _total word counts over time_ from this topic (with a new application, of course!). + This key change would enable calculating _total word counts over time_ from this + topic without additional data transformations (a more advanced operation). + + Even though we won't do that in this example, you can imagine doing so in a + downstream `Application`! In the end we would produce 5 messages in total, like so: @@ -207,24 +269,68 @@ In the end we would produce 5 messages in total, like so: # etc... ``` -NOTE: This is how you would see the values in the Kafka topic `product_review_word_counts`. +!!! NOTE + + This is a user-friendly representation of how a message key/value in the Kafka topic + `product_review_word_counts` would appear. + + + + + +### Running the Application + +Running a `Source`-based `Application` requires calling `Application.run()` within a +`if __name__ == "__main__"` block. + +#### Our Application Run Block + +Our entire `Application` (and all its spawned objects) resides within a +`main()` function, executed as required: + +```python +if __name__ == "__main__": + main() +``` + + -## Try it yourself! +## Try it Yourself! ### 1. Run Kafka First, have a running Kafka cluster. -To conveniently follow along with this tutorial, just [run this simple one-liner](../README.md#running-kafka-locally). +To easily run a broker locally with Docker, just [run this simple one-liner](../README.md#running-kafka-locally). + +### 2. Download files +- [tutorial_app.py](tutorial_app.py) + +### 3. Install Quix Streams +In your desired python environment, execute: `pip install quixstreams` + +### 4. Run the application +In your desired python environment, execute: `python tutorial_app.py`. + +### 5. Check out the results! + +...but wait, I don't see any `Application` processing output...Is it working??? + +One thing to keep in mind is that the Quix Streams does not log/print any message processing +operations by default. + +To get visual outputs around message processing, you can either: + +- use [recommended way of printing/logging with SDF](../../processing.md#debugging) + +- use `DEBUG` mode via `Application(loglevel="DEBUG")` + + + !!! DANGER -### 2. Install Quix Streams -In your python environment, run `pip install quixstreams` + you should NOT run your applications in `DEBUG` mode in production. -### 3. Run the Producer and Application -Just call `python producer.py` and `python application.py` in separate windows. -### 4. Check out the results! -Look at all those counted words, beautiful! ## Related topics - Data Aggregation diff --git a/docs/tutorials/word-count/tutorial_app.py b/docs/tutorials/word-count/tutorial_app.py new file mode 100644 index 000000000..104607689 --- /dev/null +++ b/docs/tutorials/word-count/tutorial_app.py @@ -0,0 +1,69 @@ +import logging +import os +import time +from collections import Counter +from random import choice + +from quixstreams import Application +from quixstreams.sources import Source + +logger = logging.getLogger(__name__) + + +class ReviewGenerator(Source): + _review_list = [ + "This is the best thing since sliced bread. The best I say.", + "This is terrible. Could not get it working.", + "I was paid to write this. Seems good.", + "Great product. Would recommend.", + "Not sure who this is for. Seems like it will break after 5 minutes.", + "I bought their competitors product and it is way worse. Use this one instead.", + "I would buy it again. In fact I would buy several of them.", + "Great great GREAT", + ] + + _product_list = ["product_a", "product_b", "product_c"] + + def __init__(self): + super().__init__(name="customer-reviews") + + def run(self): + for review in self._review_list: + event = self.serialize(key=choice(self._product_list), value=review) + logger.debug(f"Generating review for {event.key}: {event.value}") + self.produce(key=event.key, value=event.value) + time.sleep(0.5) # just to make things easier to follow along + logger.info("Sent all product reviews.") + + +def tokenize_and_count(text): + return list(Counter(text.lower().replace(".", " ").split()).items()) + + +def should_skip(word_count_pair): + word, count = word_count_pair + return word not in ["i", "a", "we", "it", "is", "and", "or", "the"] + + +def main(): + app = Application( + broker_address=os.getenv("BROKER_ADDRESS", "localhost:9092"), + consumer_group="product_review_word_counter", + auto_offset_reset="earliest", + ) + word_counts_topic = app.topic(name="product_review_word_counts") + + # If reading from a Kafka topic, pass topic= instead of a source + sdf = app.dataframe(source=ReviewGenerator()) + sdf = sdf.apply(tokenize_and_count, expand=True) + sdf = sdf.filter(should_skip) + sdf.print() + sdf.to_topic(word_counts_topic, key=lambda word_count_pair: word_count_pair[0]) + + # Start the application + app.run() + + +# This approach is necessary since we are using a Source +if __name__ == "__main__": + main() diff --git a/examples/.gitignore b/examples/.gitignore deleted file mode 100644 index 9f195d925..000000000 --- a/examples/.gitignore +++ /dev/null @@ -1 +0,0 @@ -state/ \ No newline at end of file diff --git a/examples/README.md b/examples/README.md deleted file mode 100644 index 31cf2e6b5..000000000 --- a/examples/README.md +++ /dev/null @@ -1,20 +0,0 @@ -# Quix Streams Examples - -This folder contains a few boilerplate applications to get you started with -the Quix Streams library. - -## Running an Example - -Simply pick a folder in here, like `bank_example`. Then, a serialization type, like -`json`. Then, run any desired app in that folder via `python examples/path/to/app.py` - -## Requirements - -- Working from the examples directory (`examples/`) - - that includes executing python files from here -- Installing the python requirements file in here - - `pip install -r requirements.txt` -- A Kafka instance to connect to, which has topic creation privileges. - - You can use the easy-to-use one included here via `docker-compose up -d` - - Has a UI at `localhost:9021` - - Kill with `docker-compose down` when finished diff --git a/examples/__init__.py b/examples/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/examples/bank_example/README.md b/examples/bank_example/README.md deleted file mode 100644 index 5e5ab284a..000000000 --- a/examples/bank_example/README.md +++ /dev/null @@ -1,27 +0,0 @@ -# Bank Example - -## Summary of Logic - -In this example, imagine we are a very simple bank with various accounts. We also have -"Silver" and "Gold" memberships, where "Gold" members have extra perks as we'll see! - -Our `producer` is mimicking attempted customer "transactions" (aka purchases) -by generating and producing random customer transactions. - -Our `consumer`, downstream of the `producer`, is going to notify -said customer of the transaction attempt, but only if that customer has a "Gold" -account and the purchase attempt was above a certain cost (we don't want to spam them!). - - -## Purpose - -This example showcases: - - How to use multiple Quix kafka applications together - - Producer - - Consumer via `quixstreams.Application` (consumes and produces) - - Basic usage of the `StreamingDataFrame` object - - Using different serializations/data structures - - json - - Quix serializers (more intended for Quix platform use) - - backwards-compatible with previous Python client in `quixstreams= 1000 -sdf = sdf[(sdf["account_class"] == "Gold") & (sdf["transaction_amount"].abs() >= 1000)] - -# Drop all fields except the ones we need -sdf = sdf[["account_id", "transaction_amount", "transaction_source"]] - -# Update the total number of transactions in state and save result to the message -sdf["total_transactions"] = sdf.apply(count_transactions, stateful=True) - -# Transform field "transaction_source" to upper-case using a custom function -sdf["transaction_source"] = sdf["transaction_source"].apply(lambda v: v.upper()) - -# Add a new field with a notification text -sdf["customer_notification"] = "A high cost purchase was attempted" - -# Print the transformed message to the console -sdf = sdf.update(lambda val: print(f"Sending update: {val}")) - -# Send the message to the output topic -sdf = sdf.to_topic(output_topic) - -if __name__ == "__main__": - # Start message processing - app.run() diff --git a/examples/bank_example/json_version/producer.py b/examples/bank_example/json_version/producer.py deleted file mode 100644 index 949aeb904..000000000 --- a/examples/bank_example/json_version/producer.py +++ /dev/null @@ -1,49 +0,0 @@ -import time -import uuid -from os import environ -from random import choice, randint, random - -from dotenv import load_dotenv - -from quixstreams import Application - -load_dotenv("./env_vars.env") - - -app = Application(broker_address=environ["BROKER_ADDRESS"]) -topic = app.topic(name="json__purchase_events", value_serializer="json") - -retailers = [ - "Billy Bob's Shop", - "Tasty Pete's Burgers", - "Mal-Wart", - "Bikey Bikes", - "Board Game Grove", - "Food Emporium", -] - -i = 0 -# app.get_producer() automatically creates any topics made via `app.topic` -with app.get_producer() as producer: - while i < 10000: - account = randint(0, 10) - account_id = f"A{'0'*(10-len(str(account)))}{account}" - value = { - "account_id": account_id, - "account_class": "Gold" if account >= 8 else "Silver", - "transaction_amount": randint(-2500, -1), - "transaction_source": choice(retailers), - } - print(f"Producing value {value}") - # with current functionality, we need to manually serialize our data - serialized = topic.serialize( - key=account_id, value=value, headers={"uuid": str(uuid.uuid4())} - ) - producer.produce( - topic=topic.name, - headers=serialized.headers, - key=serialized.key, - value=serialized.value, - ) - i += 1 - time.sleep(random()) diff --git a/examples/bank_example/quix_platform_version/__init__.py b/examples/bank_example/quix_platform_version/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/examples/bank_example/quix_platform_version/consumer.py b/examples/bank_example/quix_platform_version/consumer.py deleted file mode 100644 index 9ba61a4fb..000000000 --- a/examples/bank_example/quix_platform_version/consumer.py +++ /dev/null @@ -1,71 +0,0 @@ -""" -An application to process imaginary purchase transactions in real-time using Kafka -running on Quix platform. - -In this application, we will simulate notifications for "Gold" accounts about -purchase events larger than $1000 -""" - -from dotenv import load_dotenv - -from quixstreams import Application, State - -# Reminder: the platform will have these values available by default so loading the -# environment would be unnecessary there. -load_dotenv("./bank_example/quix_platform_version/quix_vars.env") - - -def count_transactions(value: dict, state: State): - """ - Track the number of transactions using persistent state - - :param value: message value - :param state: instance of State store - """ - total = state.get("total_transactions", 0) - total += 1 - state.set("total_transactions", total) - return total - - -# Define your application and settings -# Quix application is automatically configured to work with Quix platform -app = Application( - consumer_group="qts__purchase_notifier", - auto_offset_reset="earliest", - auto_create_topics=True, -) - -# Define an input topic with Quix deserializer -input_topic = app.topic("qts__purchase_events", value_deserializer="quix") - -# Define an output topic with Quix Timeseries serializer -output_topic = app.topic("qts__user_notifications", value_serializer="quix_timeseries") - -# Create a StreamingDataFrame and start building your processing pipeline -sdf = app.dataframe(input_topic) - -# Filter only messages with "account_class" == "Gold" and "transaction_amount" >= 1000 -sdf = sdf[(sdf["account_class"] == "Gold") & (sdf["transaction_amount"].abs() >= 1000)] - -# Drop all fields except the ones we need -sdf = sdf[["Timestamp", "account_id", "transaction_amount", "transaction_source"]] - -# Update the total number of transactions in state and save result to the message -sdf["total_transactions"] = sdf.apply(count_transactions, stateful=True) - -# Transform field "transaction_source" to upper-case using a custom function -sdf["transaction_source"] = sdf["transaction_source"].apply(lambda v: v.upper()) - -# Add a new field with a notification text -sdf["customer_notification"] = "A high cost purchase was attempted" - -# Print the transformed message to the console -sdf = sdf.update(lambda val: print(f"Sending update: {val}")) - -# Send the message to the output topic -sdf = sdf.to_topic(output_topic) - -if __name__ == "__main__": - # Start message processing - app.run() diff --git a/examples/bank_example/quix_platform_version/producer.py b/examples/bank_example/quix_platform_version/producer.py deleted file mode 100644 index 9cd28b79d..000000000 --- a/examples/bank_example/quix_platform_version/producer.py +++ /dev/null @@ -1,56 +0,0 @@ -import time -import uuid -from random import choice, randint, random -from time import sleep - -from dotenv import load_dotenv - -from quixstreams import Application -from quixstreams.models.serializers import QuixTimeseriesSerializer - -load_dotenv("./bank_example/quix_platform_version/quix_vars.env") - - -app = Application() -serializer = QuixTimeseriesSerializer() -topic = app.topic(name="qts__purchase_events", value_serializer=serializer) - - -retailers = [ - "Billy Bob's Shop", - "Tasty Pete's Burgers", - "Mal-Wart", - "Bikey Bikes", - "Board Game Grove", - "Food Emporium", -] - - -i = 0 -# app.get_producer() automatically creates any topics made via `app.topic` -with app.get_producer() as producer: - while i < 10000: - account = randint(0, 10) - account_id = f"A{'0'*(10-len(str(account)))}{account}" - value = { - "account_id": account_id, - "account_class": "Gold" if account >= 8 else "Silver", - "transaction_amount": randint(-2500, -1), - "transaction_source": choice(retailers), - "Timestamp": time.time_ns(), - } - print(f"Producing value {value}") - # with current functionality, we need to manually serialize our data - serialized = topic.serialize( - key=account_id, - value=value, - headers={**serializer.extra_headers, "uuid": str(uuid.uuid4())}, - ) - producer.produce( - topic=topic.name, - headers=serialized.headers, - key=serialized.key, - value=serialized.value, - ) - i += 1 - sleep(random()) diff --git a/examples/bank_example/quix_platform_version/quix_vars.env b/examples/bank_example/quix_platform_version/quix_vars.env deleted file mode 100644 index f37f5276b..000000000 --- a/examples/bank_example/quix_platform_version/quix_vars.env +++ /dev/null @@ -1,7 +0,0 @@ -# Required - will always be defined by default in a Quix platform workspace. -Quix__Sdk__Token= - -# Optional; can usually be found by the library automatically via the quix auth token. -# It will always be defined by default in a Quix platform workspace. -Quix__Workspace_Id= -Quix__Portal__Api= \ No newline at end of file diff --git a/examples/custom_websocket_source/README.md b/examples/custom_websocket_source/README.md deleted file mode 100644 index 55784a55a..000000000 --- a/examples/custom_websocket_source/README.md +++ /dev/null @@ -1,50 +0,0 @@ -# Custom Websocket Source Example - -In this example, we are going to create a custom websocket source to connect to the -Coinbase Websocket API and process ticker updates with a Streaming DataFrame. - -See [main.py](./main.py) file for more details on how the Source is implemented. - -Under the hood, the Application will start the source as a subprocess, and it will produce data to the intermediate Kafka topic for robust processing. - -Streaming DataFrame will connect to this topic and process the data. - -To find more info about Sources, please see [the Sources docs](https://quix.io/docs/quix-streams/connectors/sources/index.html). - -## How to run - -1. Ensure that you have some local Kafka or Redpanda broker to connect to. -If not, you can start it locally using Docker and provided [docker-compose.yml](../docker-compose.yml) file. -2. Open the [main.py](./main.py) file and adjust the Kafka connection settings -in the Application class if necessary. -3. Install the [requirements](./requirements.txt) for the source: `pip install -r requirements.txt` -4. Go to `custom_websocket_source` directory and start the source: `python main.py` -5. Observe the output. -You should see something like this in your console: - -```commandline -[2024-09-19 12:08:25,971] [INFO] [quixstreams] : Starting the Application with the config: broker_address="{'bootstrap.servers': 'localhost:9092'}" consumer_group="quixstreams-default" auto_offset_reset="earliest" commit_interval=5.0s commit_every=0 processing_guarantee="at-least-once" -[2024-09-19 12:08:25,971] [INFO] [quixstreams] : Topics required for this application: "coinbase-source" -[2024-09-19 12:08:26,023] [INFO] [quixstreams] : Validating Kafka topics exist and are configured correctly... -[2024-09-19 12:08:26,032] [INFO] [quixstreams] : Kafka topics validation complete -[2024-09-19 12:08:26,032] [INFO] [quixstreams] : Waiting for incoming messages -[2024-09-19 12:08:29,059] [INFO] [quixstreams] : Starting source coinbase-source -[2024-09-19 12:08:29,310] [INFO] [coinbase-source] [8534] : Source started -{ 'value': { 'type': 'ticker', - 'sequence': 754296790, - 'product_id': 'ETH-BTC', - 'price': '0.00005', - 'open_24h': '0.00008', - 'volume_24h': '322206074.45925051', - 'low_24h': '0.00005', - 'high_24h': '0.00041', - 'volume_30d': '3131937035.46099349', - 'best_bid': '0.00001', - 'best_bid_size': '1000000000.00000000', - 'best_ask': '0.00006', - 'best_ask_size': '166668.66666667', - 'side': 'sell', - 'time': '2024-09-19T10:01:26.411029Z', - 'trade_id': 28157206, - 'last_size': '16666.86666667'}} -``` diff --git a/examples/custom_websocket_source/__init__.py b/examples/custom_websocket_source/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/examples/docker-compose.yml b/examples/docker-compose.yml deleted file mode 100644 index 89c941683..000000000 --- a/examples/docker-compose.yml +++ /dev/null @@ -1,77 +0,0 @@ ---- -version: '2' -services: - - broker: - image: confluentinc/cp-kafka:7.5.0 - hostname: broker - container_name: broker - ports: - - "9092:9092" - - "9101:9101" - environment: - KAFKA_NODE_ID: 1 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092' - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_JMX_PORT: 9101 - KAFKA_JMX_HOSTNAME: localhost - KAFKA_PROCESS_ROLES: 'broker,controller' - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' - KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' - KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' - # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" - # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh - CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' - - schema-registry: - image: confluentinc/cp-schema-registry:7.5.0 - hostname: schema-registry - container_name: schema-registry - depends_on: - - broker - ports: - - "8081:8081" - environment: - SCHEMA_REGISTRY_HOST_NAME: schema-registry - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' - SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 - - control-center: - image: confluentinc/cp-enterprise-control-center:7.5.0 - hostname: control-center - container_name: control-center - depends_on: - - broker - - schema-registry - ports: - - "9021:9021" - environment: - CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' - CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083' - CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" - CONTROL_CENTER_REPLICATION_FACTOR: 1 - CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 - CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 - CONFLUENT_METRICS_TOPIC_REPLICATION: 1 - PORT: 9021 - - rest-proxy: - image: confluentinc/cp-kafka-rest:7.5.0 - depends_on: - - broker - - schema-registry - ports: - - 8082:8082 - hostname: rest-proxy - container_name: rest-proxy - environment: - KAFKA_REST_HOST_NAME: rest-proxy - KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092' - KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" - KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' diff --git a/examples/env_vars.env b/examples/env_vars.env deleted file mode 100644 index ef8902479..000000000 --- a/examples/env_vars.env +++ /dev/null @@ -1 +0,0 @@ -BROKER_ADDRESS=localhost:9092 diff --git a/examples/requirements.txt b/examples/requirements.txt deleted file mode 100644 index 1a5c92744..000000000 --- a/examples/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -../. -python-dotenv diff --git a/mkdocs.yml b/mkdocs.yml index 9df6b2671..6d541a96d 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -8,6 +8,8 @@ theme: features: - navigation.path - navigation.indexes + - content.code.copy + - content.code.select markdown_extensions: - attr_list @@ -26,9 +28,10 @@ nav: - Quickstart: 'quickstart.md' - Tutorials: - 'tutorials/README.md' - - Word Count: 'tutorials/word-count/tutorial.md' - Anomaly Detection: 'tutorials/anomaly-detection/tutorial.md' - Purchase Filtering: 'tutorials/purchase-filtering/tutorial.md' + - Word Count: 'tutorials/word-count/tutorial.md' + - Websocket Source: 'tutorials/websocket-source/tutorial.md' - How to: - Produce Data to Kafka: producer.md - Process & Transform Data: processing.md