Skip to content

Commit

Permalink
another major set of improvements to the Flow flow
Browse files Browse the repository at this point in the history
  • Loading branch information
DinisCruz committed Oct 8, 2024
1 parent e1abb34 commit 15bded0
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 1 deletion.
85 changes: 85 additions & 0 deletions osbot_prefect/flows/Flow_Events__To__Prefect_Server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from osbot_utils.helpers.Random_Guid import Random_Guid

from osbot_utils.helpers.flows.Task import Task

from osbot_prefect.server.Prefect__States import Prefect__States
from osbot_utils.utils.Dev import pprint

from osbot_prefect.server.Prefect__Cloud_API import Prefect__Cloud_API
from osbot_utils.helpers.flows.Flow import Flow
from osbot_utils.base_classes.Type_Safe import Type_Safe
from osbot_utils.helpers.flows.Flow__Events import flow_events, Flow__Event_Type, Flow__Event


class Flow_Events__To__Prefect_Server(Type_Safe):
prefect_cloud_api : Prefect__Cloud_API
prefect_ids_mapping : dict

def add_event_listener(self):
flow_events.event_listeners.append(self.event_listener)

def handle_event(self, event_type: Flow__Event_Type, event_source, event_data):
if event_type == Flow__Event_Type.FLOW_START: self.handle_event__flow_start(flow=event_source)
elif event_type == Flow__Event_Type.FLOW_STOP : self.handle_event__flow_stop (flow=event_source)
elif event_type == Flow__Event_Type.TASK_START: self.handle_event__task_start(task=event_source)
elif event_type == Flow__Event_Type.TASK_STOP : self.handle_event__task_stop (task=event_source)
else:
print()
print(f"Error in handle_event, unknown event_type: {event_type}")

def handle_event__flow_start(self, flow: Flow):
prefect__flow_id = self.prefect_cloud_api.flow__create({'name': flow.flow_name}).data.id

prefect__flow_run_definition = dict(flow_id = prefect__flow_id ,
name = flow.flow_id ,
parameters = dict(answer = 42 ,
source = 'handle_event__flow_start' ),
context = dict(context_1 = 42 ,
context_2 = 'handle_event__flow_start'),
tags = ['tag_1', 'tag_2' ])
prefect_flow_run = self.prefect_cloud_api.flow_run__create(prefect__flow_run_definition)
if prefect_flow_run.status != 'ok':
pprint("******* Error in handle_event__flow_start ***** ") # todo: move this to a Flow Events logging system
pprint(prefect_flow_run)
else:
prefect__flow_run_id = prefect_flow_run.data.id
self.prefect_ids_mapping[flow.flow_name] = prefect__flow_id
self.prefect_ids_mapping[flow.flow_id ] = prefect__flow_run_id
self.prefect_cloud_api.flow_run__set_state_type__running(prefect__flow_run_id)

def handle_event__flow_stop(self, flow: Flow):
prefect__flow_run_id = self.prefect_ids_mapping.get(flow.flow_id)
self.prefect_cloud_api.flow_run__set_state_type__completed(prefect__flow_run_id)

def handle_event__task_start(self, task: Task):
prefect__flow_run_id = self.prefect_ids_mapping[task.task_flow.flow_id]
prefect__task_run_definition = { 'flow_run_id' : prefect__flow_run_id,
'dynamic_key' : Random_Guid() ,
'task_key' : Random_Guid() ,
'name' : task.task_id ,
'task_inputs' : {"prop_1": [{"input_type": "parameter" ,
"name" : "an-parameter" },
{"input_type": "constant" ,
"type" :"an-type" }]},
"tags" : ["tag_a", "tag_b"] }
prefect__task_run = self.prefect_cloud_api.task_run__create(prefect__task_run_definition)
if prefect__task_run.status != 'ok':
pprint("******* Error in handle_event__task_start ***** ") # todo: move this to a Flow Events logging system
pprint(prefect__task_run)
else:
prefect__task_run_id = prefect__task_run.data.id
self.prefect_ids_mapping[task.task_id] = prefect__task_run_id
self.prefect_cloud_api.task_run__set_state_type__running(prefect__task_run_id)

def handle_event__task_stop(self, task):
prefect__task_run_id = self.prefect_ids_mapping.get(task.task_id)
self.prefect_cloud_api.task_run__set_state_type__running__completed(prefect__task_run_id)

def event_listener(self, flow_event: Flow__Event):
event_type = flow_event.event_type
event_source = flow_event.event_source
event_data = flow_event.event_data
self.handle_event(event_type=event_type, event_source=event_source, event_data=event_data)

def remove_event_listener(self):
flow_events.event_listeners.remove(self.event_listener)
Empty file added osbot_prefect/flows/__init__.py
Empty file.
13 changes: 13 additions & 0 deletions osbot_prefect/server/Prefect__Cloud_API.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime, timezone, timedelta
from enum import Enum

from osbot_prefect.server.Prefect__States import Prefect__States
from osbot_utils.utils.Dev import pprint

from osbot_utils.base_classes.Type_Safe import Type_Safe
Expand Down Expand Up @@ -50,6 +51,12 @@ def flow_run__set_state(self, flow_run_id, state):
def flow_run__set_state_type(self, flow_run_id, state_type):
return self.flow_run__set_state(flow_run_id, {'type': state_type})

def flow_run__set_state_type__running(self, flow_run_id):
return self.flow_run__set_state_type(flow_run_id, Prefect__States.RUNNING)

def flow_run__set_state_type__completed(self, flow_run_id):
return self.flow_run__set_state_type(flow_run_id, Prefect__States.COMPLETED)

def flow_run__delete(self, flow_run_id):
return self.prefect_rest_api.delete(target='flow_runs', target_id=flow_run_id)

Expand Down Expand Up @@ -91,6 +98,12 @@ def task_run__set_state(self, task_run_id, state):
def task_run__set_state_type(self, task_run_id, state_type):
return self.task_run__set_state(task_run_id, {'type': state_type})

def task_run__set_state_type__running(self, task_run_id):
return self.task_run__set_state_type(task_run_id, Prefect__States.RUNNING)

def task_run__set_state_type__running__completed(self, task_run_id):
return self.task_run__set_state_type(task_run_id, Prefect__States.COMPLETED)

def to_prefect_timestamp(self, date_time):
return date_time.isoformat()
#return date_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
Expand Down
36 changes: 36 additions & 0 deletions tests/integration/flows/test_Flow_Events__To__Prefect_Server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from logging import CRITICAL
from unittest import TestCase

from osbot_utils.helpers.flows.models.Flow__Config import Flow__Config
from osbot_utils.utils.Env import load_dotenv
from osbot_utils.helpers.flows.decorators.task import task
from osbot_utils.helpers.flows.Flow import Flow
from osbot_utils.helpers.flows.decorators.flow import flow
from osbot_prefect.flows.Flow_Events__To__Prefect_Server import Flow_Events__To__Prefect_Server


class test_Flow_Events__To__Prefect_Server(TestCase):

def setUp(self):
load_dotenv()
self.flows_to_prefect = Flow_Events__To__Prefect_Server()

def test_event_listener(self):
self.flows_to_prefect.add_event_listener()
test_flow = an_flow_1()
test_flow.execute_flow()
self.flows_to_prefect.remove_event_listener()


# example flow
flow_config = Flow__Config(logging_enabled=False)

@flow(flow_config=flow_config)
def an_flow_1() -> Flow:
print('inside the flow')
an_task_1()


@task()
def an_task_1():
print('inside the task')
28 changes: 28 additions & 0 deletions tests/integration/test__experiment_with_task_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from unittest import TestCase

from osbot_utils.helpers.flows.decorators.task import task
from osbot_utils.helpers.flows.models.Flow__Config import Flow__Config

from osbot_utils.utils.Dev import pprint

from osbot_utils.helpers.flows.Flow import Flow

from osbot_utils.helpers.flows.decorators.flow import flow


class test__experiment_with_task_flow(TestCase):

def test_task_and_flow(self):
flow_config = Flow__Config(logging_enabled=False)

@task()
def a_task():
print('in a task')

@flow(flow_config=flow_config)
def an_flow() -> Flow:
print('in a flow')
a_task()

flow_1 = an_flow()
flow_1.execute_flow()
8 changes: 8 additions & 0 deletions tests/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[pytest]
log_cli = true
log_format = "%(asctime)s.%(msecs)03d %(levelname)s %(name)s %(message)s"
#log_format = "%(asctime)s.%(msecs)03d %(levelname)s %(message)s"

filterwarnings =
ignore:'cgi' is deprecated and slated for removal in Python 3.13:DeprecationWarning:htmlmin.*
ignore:ast.Str is deprecated and will be removed in Python 3.14:DeprecationWarning:werkzeug.*

0 comments on commit 15bded0

Please sign in to comment.