diff --git a/docs/streamingdataframe.md b/docs/streamingdataframe.md index 817a670d9..54d4373dc 100644 --- a/docs/streamingdataframe.md +++ b/docs/streamingdataframe.md @@ -17,8 +17,8 @@ app = Application( ) # Define the input and output topics. By default, the "json" serialization will be used -input_topic = app.topic("my_input_topic") -output_topic = app.topic("my_output_topic") +input_topic = app.topic("input") +output_topic = app.topic("output") def add_one(data: dict): @@ -53,8 +53,8 @@ sdf = sdf[(sdf["field_0"] > 10) & (sdf["field_2"] != "test")] # You may also use a custom function to filter data sdf = sdf.filter(lambda v: v["field_0"] > 10 and v["field_2"] != "test") -# Apply custom function to transform the message -sdf = sdf.apply(add_one) +# Apply custom function to update values in place +sdf = sdf.update(add_one) # Use a stateful function in persist data into the state store # and update the message value @@ -372,6 +372,8 @@ input_topic = app.topic("input", key_deserializer='str') # Outgoing key will be serialized as a string too output_topic = app.topic("my_output_topic", key_serializer='str') +sdf = app.dataframe(input_topic) + # Producing a new message to a topic with the same key sdf = sdf.to_topic(output_topic) diff --git a/src/StreamingDataFrames/docs/stateful-processing.md b/src/StreamingDataFrames/docs/stateful-processing.md deleted file mode 100644 index e69de29bb..000000000