diff --git a/docs/advanced/topics.md b/docs/advanced/topics.md index 53563095a..8d5806b95 100644 --- a/docs/advanced/topics.md +++ b/docs/advanced/topics.md @@ -43,7 +43,7 @@ sdf = app.dataframe(input_topic).to_topic(output_topic) # Run the Application. # The topics will be validated and created during this function call. -app.run(sdf) +app.run() ``` ## Topic Configuration @@ -83,5 +83,5 @@ sdf = app.dataframe(input_topic).to_topic(output_topic) # Run the Application. # The topics will be validated and created during this function call. # Note: if the topics already exist, the configs will remain intact. -app.run(sdf) +app.run() ``` diff --git a/docs/connectors/sources/README.md b/docs/connectors/sources/README.md index f3a934618..157742aae 100644 --- a/docs/connectors/sources/README.md +++ b/docs/connectors/sources/README.md @@ -15,7 +15,7 @@ def main(): sdf = app.dataframe(source=source) sdf.print(metadata=True) - app.run(sdf) + app.run() if __name__ == "__main__": main() @@ -60,7 +60,7 @@ def main(): sdf = app.dataframe(topic=topic, source=source) sdf.print(metadata=True) - app.run(sdf) + app.run() if __name__ == "__main__": main() diff --git a/docs/connectors/sources/csv-source.md b/docs/connectors/sources/csv-source.md index a6b8f755a..a728b34a0 100644 --- a/docs/connectors/sources/csv-source.md +++ b/docs/connectors/sources/csv-source.md @@ -19,7 +19,7 @@ def main(): sdf = app.dataframe(source=source) sdf.print(metadata=True) - app.run(sdf) + app.run() if __name__ == "__main__": main() diff --git a/docs/connectors/sources/custom-sources.md b/docs/connectors/sources/custom-sources.md index a0d943739..1f32ece6d 100644 --- a/docs/connectors/sources/custom-sources.md +++ b/docs/connectors/sources/custom-sources.md @@ -79,7 +79,7 @@ def main(): sdf = app.dataframe(source=source) sdf.print(metadata=True) - app.run(sdf) + app.run() if __name__ == "__main__": main() @@ -141,7 +141,7 @@ def main(): sdf = app.dataframe(source=source) sdf.print(metadata=True) - app.run(sdf) + app.run() if __name__ == "__main__": main() diff --git a/docs/connectors/sources/kafka-source.md b/docs/connectors/sources/kafka-source.md index 56e0d99ba..50868eac4 100644 --- a/docs/connectors/sources/kafka-source.md +++ b/docs/connectors/sources/kafka-source.md @@ -24,7 +24,7 @@ def main(): sdf = app.dataframe(source=source) sdf.print(metadata=True) - app.run(sdf) + app.run() if __name__ == "__main__": main() diff --git a/docs/connectors/sources/quix-source.md b/docs/connectors/sources/quix-source.md index 3674dbbc9..ae2fc469a 100644 --- a/docs/connectors/sources/quix-source.md +++ b/docs/connectors/sources/quix-source.md @@ -23,7 +23,7 @@ def main(): sdf = app.dataframe(source=source) sdf.print(metadata=True) - app.run(sdf) + app.run() if __name__ == "__main__": main() diff --git a/docs/processing.md b/docs/processing.md index 956510b3a..1a60767a5 100644 --- a/docs/processing.md +++ b/docs/processing.md @@ -95,7 +95,7 @@ sdf = ( sdf = sdf.to_topic(output_topic) # Run the pipeline -app.run(sdf) +app.run() ``` ### Data Types @@ -480,7 +480,7 @@ sdf['average_is_null'] = sdf["average"].isnull() Under the good, when you access a column on `StreamingDataFrame` it generates the new `StreamingSeries` instance that refers to the value of the passed key. These objects are also lazy, and they are evaluated only when the `StreamingDataFrame`is -executed by `app.run(sdf)`. +executed by `app.run()`. When you set them back to the StreamingDataFrame or use them to filter data, it creates a new step in the pipeline to be evaluated later. diff --git a/docs/quickstart.md b/docs/quickstart.md index 888df4fac..d4fe50b81 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -102,7 +102,7 @@ sdf = sdf.update(lambda row: print(f"Output: {row}")) # Run the streaming application if __name__ == "__main__": - app.run(sdf) + app.run() ``` ### Step 4. Running the Producer diff --git a/docs/tutorials/anomaly-detection/application.py b/docs/tutorials/anomaly-detection/application.py index 1558a4839..418242505 100644 --- a/docs/tutorials/anomaly-detection/application.py +++ b/docs/tutorials/anomaly-detection/application.py @@ -28,4 +28,4 @@ def should_alert(window_value: int, key, timestamp, headers): if __name__ == "__main__": - app.run(sdf) + app.run() diff --git a/docs/tutorials/purchase-filtering/application.py b/docs/tutorials/purchase-filtering/application.py index c8a621443..6e62f405c 100644 --- a/docs/tutorials/purchase-filtering/application.py +++ b/docs/tutorials/purchase-filtering/application.py @@ -32,4 +32,4 @@ def get_purchase_totals(items): if __name__ == "__main__": - app.run(sdf) + app.run() diff --git a/docs/tutorials/word-count/application.py b/docs/tutorials/word-count/application.py index 88e8c2665..564993047 100644 --- a/docs/tutorials/word-count/application.py +++ b/docs/tutorials/word-count/application.py @@ -28,4 +28,4 @@ def should_skip(word_count_pair): if __name__ == "__main__": - app.run(sdf) + app.run() diff --git a/examples/bank_example/json_version/consumer.py b/examples/bank_example/json_version/consumer.py index d6db929c4..63e06fe27 100644 --- a/examples/bank_example/json_version/consumer.py +++ b/examples/bank_example/json_version/consumer.py @@ -66,4 +66,4 @@ def count_transactions(value: dict, state: State): if __name__ == "__main__": # Start message processing - app.run(sdf) + app.run() diff --git a/examples/bank_example/quix_platform_version/consumer.py b/examples/bank_example/quix_platform_version/consumer.py index 5ee9afb50..166d8068b 100644 --- a/examples/bank_example/quix_platform_version/consumer.py +++ b/examples/bank_example/quix_platform_version/consumer.py @@ -69,4 +69,4 @@ def count_transactions(value: dict, state: State): if __name__ == "__main__": # Start message processing - app.run(sdf) + app.run() diff --git a/examples/custom_websocket_source/main.py b/examples/custom_websocket_source/main.py index b99b76392..e37db0e86 100644 --- a/examples/custom_websocket_source/main.py +++ b/examples/custom_websocket_source/main.py @@ -108,7 +108,7 @@ def main(): sdf.print() # Start the application - app.run(sdf) + app.run() if __name__ == "__main__": diff --git a/quixstreams/app.py b/quixstreams/app.py index 7a4cc3b17..925597554 100644 --- a/quixstreams/app.py +++ b/quixstreams/app.py @@ -94,7 +94,7 @@ class Application: df = app.dataframe(topic) df.apply(lambda value, context: print('New message', value)) - app.run(dataframe=df) + app.run() ``` """ @@ -477,7 +477,7 @@ def dataframe( df = app.dataframe(topic) df.apply(lambda value, context: print('New message', value) - app.run(dataframe=df) + app.run() ``` diff --git a/quixstreams/sources/base.py b/quixstreams/sources/base.py index d6ff7fc2e..fc35ee62e 100644 --- a/quixstreams/sources/base.py +++ b/quixstreams/sources/base.py @@ -56,7 +56,7 @@ def main(): sdf = app.dataframe(source=source) sdf.print(metadata=True) - app.run(sdf) + app.run() if __name__ == "__main__": main() diff --git a/quixstreams/sources/kafka/kafka.py b/quixstreams/sources/kafka/kafka.py index ee621647b..5c0e96c68 100644 --- a/quixstreams/sources/kafka/kafka.py +++ b/quixstreams/sources/kafka/kafka.py @@ -48,7 +48,7 @@ class KafkaReplicatorSource(Source): sdf = app.dataframe(source=source) sdf = sdf.print() - app.run(sdf) + app.run() ``` """ diff --git a/quixstreams/sources/kafka/quix.py b/quixstreams/sources/kafka/quix.py index 1f793933e..50453feb6 100644 --- a/quixstreams/sources/kafka/quix.py +++ b/quixstreams/sources/kafka/quix.py @@ -42,7 +42,7 @@ class QuixEnvironmentSource(KafkaReplicatorSource): sdf = app.dataframe(source=source) sdf = sdf.print() - app.run(sdf) + app.run() ``` """