From 15bded0c8adfb8a723d5340ea7ac55ce7c9bfa89 Mon Sep 17 00:00:00 2001 From: Dinis Cruz Date: Tue, 8 Oct 2024 11:16:25 +0100 Subject: [PATCH] another major set of improvements to the Flow flow --- modules/OSBot-Utils | 2 +- .../flows/Flow_Events__To__Prefect_Server.py | 85 +++++++++++++++++++ osbot_prefect/flows/__init__.py | 0 osbot_prefect/server/Prefect__Cloud_API.py | 13 +++ .../test_Flow_Events__To__Prefect_Server.py | 36 ++++++++ .../test__experiment_with_task_flow.py | 28 ++++++ tests/pytest.ini | 8 ++ 7 files changed, 171 insertions(+), 1 deletion(-) create mode 100644 osbot_prefect/flows/Flow_Events__To__Prefect_Server.py create mode 100644 osbot_prefect/flows/__init__.py create mode 100644 tests/integration/flows/test_Flow_Events__To__Prefect_Server.py create mode 100644 tests/integration/test__experiment_with_task_flow.py create mode 100644 tests/pytest.ini diff --git a/modules/OSBot-Utils b/modules/OSBot-Utils index 7109849..afe3b3b 160000 --- a/modules/OSBot-Utils +++ b/modules/OSBot-Utils @@ -1 +1 @@ -Subproject commit 7109849562e4d2448905ca9de106502743df2aea +Subproject commit afe3b3bf2b92ebceeecc6c7b61b9501b19a01771 diff --git a/osbot_prefect/flows/Flow_Events__To__Prefect_Server.py b/osbot_prefect/flows/Flow_Events__To__Prefect_Server.py new file mode 100644 index 0000000..3a060cf --- /dev/null +++ b/osbot_prefect/flows/Flow_Events__To__Prefect_Server.py @@ -0,0 +1,85 @@ +from osbot_utils.helpers.Random_Guid import Random_Guid + +from osbot_utils.helpers.flows.Task import Task + +from osbot_prefect.server.Prefect__States import Prefect__States +from osbot_utils.utils.Dev import pprint + +from osbot_prefect.server.Prefect__Cloud_API import Prefect__Cloud_API +from osbot_utils.helpers.flows.Flow import Flow +from osbot_utils.base_classes.Type_Safe import Type_Safe +from osbot_utils.helpers.flows.Flow__Events import flow_events, Flow__Event_Type, Flow__Event + + +class Flow_Events__To__Prefect_Server(Type_Safe): + prefect_cloud_api : Prefect__Cloud_API + prefect_ids_mapping : dict + + def add_event_listener(self): + flow_events.event_listeners.append(self.event_listener) + + def handle_event(self, event_type: Flow__Event_Type, event_source, event_data): + if event_type == Flow__Event_Type.FLOW_START: self.handle_event__flow_start(flow=event_source) + elif event_type == Flow__Event_Type.FLOW_STOP : self.handle_event__flow_stop (flow=event_source) + elif event_type == Flow__Event_Type.TASK_START: self.handle_event__task_start(task=event_source) + elif event_type == Flow__Event_Type.TASK_STOP : self.handle_event__task_stop (task=event_source) + else: + print() + print(f"Error in handle_event, unknown event_type: {event_type}") + + def handle_event__flow_start(self, flow: Flow): + prefect__flow_id = self.prefect_cloud_api.flow__create({'name': flow.flow_name}).data.id + + prefect__flow_run_definition = dict(flow_id = prefect__flow_id , + name = flow.flow_id , + parameters = dict(answer = 42 , + source = 'handle_event__flow_start' ), + context = dict(context_1 = 42 , + context_2 = 'handle_event__flow_start'), + tags = ['tag_1', 'tag_2' ]) + prefect_flow_run = self.prefect_cloud_api.flow_run__create(prefect__flow_run_definition) + if prefect_flow_run.status != 'ok': + pprint("******* Error in handle_event__flow_start ***** ") # todo: move this to a Flow Events logging system + pprint(prefect_flow_run) + else: + prefect__flow_run_id = prefect_flow_run.data.id + self.prefect_ids_mapping[flow.flow_name] = prefect__flow_id + self.prefect_ids_mapping[flow.flow_id ] = prefect__flow_run_id + self.prefect_cloud_api.flow_run__set_state_type__running(prefect__flow_run_id) + + def handle_event__flow_stop(self, flow: Flow): + prefect__flow_run_id = self.prefect_ids_mapping.get(flow.flow_id) + self.prefect_cloud_api.flow_run__set_state_type__completed(prefect__flow_run_id) + + def handle_event__task_start(self, task: Task): + prefect__flow_run_id = self.prefect_ids_mapping[task.task_flow.flow_id] + prefect__task_run_definition = { 'flow_run_id' : prefect__flow_run_id, + 'dynamic_key' : Random_Guid() , + 'task_key' : Random_Guid() , + 'name' : task.task_id , + 'task_inputs' : {"prop_1": [{"input_type": "parameter" , + "name" : "an-parameter" }, + {"input_type": "constant" , + "type" :"an-type" }]}, + "tags" : ["tag_a", "tag_b"] } + prefect__task_run = self.prefect_cloud_api.task_run__create(prefect__task_run_definition) + if prefect__task_run.status != 'ok': + pprint("******* Error in handle_event__task_start ***** ") # todo: move this to a Flow Events logging system + pprint(prefect__task_run) + else: + prefect__task_run_id = prefect__task_run.data.id + self.prefect_ids_mapping[task.task_id] = prefect__task_run_id + self.prefect_cloud_api.task_run__set_state_type__running(prefect__task_run_id) + + def handle_event__task_stop(self, task): + prefect__task_run_id = self.prefect_ids_mapping.get(task.task_id) + self.prefect_cloud_api.task_run__set_state_type__running__completed(prefect__task_run_id) + + def event_listener(self, flow_event: Flow__Event): + event_type = flow_event.event_type + event_source = flow_event.event_source + event_data = flow_event.event_data + self.handle_event(event_type=event_type, event_source=event_source, event_data=event_data) + + def remove_event_listener(self): + flow_events.event_listeners.remove(self.event_listener) \ No newline at end of file diff --git a/osbot_prefect/flows/__init__.py b/osbot_prefect/flows/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/osbot_prefect/server/Prefect__Cloud_API.py b/osbot_prefect/server/Prefect__Cloud_API.py index 0a8b8b3..f6a5112 100644 --- a/osbot_prefect/server/Prefect__Cloud_API.py +++ b/osbot_prefect/server/Prefect__Cloud_API.py @@ -1,6 +1,7 @@ from datetime import datetime, timezone, timedelta from enum import Enum +from osbot_prefect.server.Prefect__States import Prefect__States from osbot_utils.utils.Dev import pprint from osbot_utils.base_classes.Type_Safe import Type_Safe @@ -50,6 +51,12 @@ def flow_run__set_state(self, flow_run_id, state): def flow_run__set_state_type(self, flow_run_id, state_type): return self.flow_run__set_state(flow_run_id, {'type': state_type}) + def flow_run__set_state_type__running(self, flow_run_id): + return self.flow_run__set_state_type(flow_run_id, Prefect__States.RUNNING) + + def flow_run__set_state_type__completed(self, flow_run_id): + return self.flow_run__set_state_type(flow_run_id, Prefect__States.COMPLETED) + def flow_run__delete(self, flow_run_id): return self.prefect_rest_api.delete(target='flow_runs', target_id=flow_run_id) @@ -91,6 +98,12 @@ def task_run__set_state(self, task_run_id, state): def task_run__set_state_type(self, task_run_id, state_type): return self.task_run__set_state(task_run_id, {'type': state_type}) + def task_run__set_state_type__running(self, task_run_id): + return self.task_run__set_state_type(task_run_id, Prefect__States.RUNNING) + + def task_run__set_state_type__running__completed(self, task_run_id): + return self.task_run__set_state_type(task_run_id, Prefect__States.COMPLETED) + def to_prefect_timestamp(self, date_time): return date_time.isoformat() #return date_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" diff --git a/tests/integration/flows/test_Flow_Events__To__Prefect_Server.py b/tests/integration/flows/test_Flow_Events__To__Prefect_Server.py new file mode 100644 index 0000000..6288d66 --- /dev/null +++ b/tests/integration/flows/test_Flow_Events__To__Prefect_Server.py @@ -0,0 +1,36 @@ +from logging import CRITICAL +from unittest import TestCase + +from osbot_utils.helpers.flows.models.Flow__Config import Flow__Config +from osbot_utils.utils.Env import load_dotenv +from osbot_utils.helpers.flows.decorators.task import task +from osbot_utils.helpers.flows.Flow import Flow +from osbot_utils.helpers.flows.decorators.flow import flow +from osbot_prefect.flows.Flow_Events__To__Prefect_Server import Flow_Events__To__Prefect_Server + + +class test_Flow_Events__To__Prefect_Server(TestCase): + + def setUp(self): + load_dotenv() + self.flows_to_prefect = Flow_Events__To__Prefect_Server() + + def test_event_listener(self): + self.flows_to_prefect.add_event_listener() + test_flow = an_flow_1() + test_flow.execute_flow() + self.flows_to_prefect.remove_event_listener() + + +# example flow +flow_config = Flow__Config(logging_enabled=False) + +@flow(flow_config=flow_config) +def an_flow_1() -> Flow: + print('inside the flow') + an_task_1() + + +@task() +def an_task_1(): + print('inside the task') \ No newline at end of file diff --git a/tests/integration/test__experiment_with_task_flow.py b/tests/integration/test__experiment_with_task_flow.py new file mode 100644 index 0000000..1bcc73f --- /dev/null +++ b/tests/integration/test__experiment_with_task_flow.py @@ -0,0 +1,28 @@ +from unittest import TestCase + +from osbot_utils.helpers.flows.decorators.task import task +from osbot_utils.helpers.flows.models.Flow__Config import Flow__Config + +from osbot_utils.utils.Dev import pprint + +from osbot_utils.helpers.flows.Flow import Flow + +from osbot_utils.helpers.flows.decorators.flow import flow + + +class test__experiment_with_task_flow(TestCase): + + def test_task_and_flow(self): + flow_config = Flow__Config(logging_enabled=False) + + @task() + def a_task(): + print('in a task') + + @flow(flow_config=flow_config) + def an_flow() -> Flow: + print('in a flow') + a_task() + + flow_1 = an_flow() + flow_1.execute_flow() diff --git a/tests/pytest.ini b/tests/pytest.ini new file mode 100644 index 0000000..6b0843d --- /dev/null +++ b/tests/pytest.ini @@ -0,0 +1,8 @@ +[pytest] +log_cli = true +log_format = "%(asctime)s.%(msecs)03d %(levelname)s %(name)s %(message)s" +#log_format = "%(asctime)s.%(msecs)03d %(levelname)s %(message)s" + +filterwarnings = + ignore:'cgi' is deprecated and slated for removal in Python 3.13:DeprecationWarning:htmlmin.* + ignore:ast.Str is deprecated and will be removed in Python 3.14:DeprecationWarning:werkzeug.* \ No newline at end of file