Skip to content

Commit

Permalink
fixed code after refactoring changes to Flow events
Browse files Browse the repository at this point in the history
  • Loading branch information
DinisCruz committed Oct 15, 2024
1 parent c13981d commit 0661a99
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 103 deletions.
126 changes: 66 additions & 60 deletions osbot_prefect/flows/Flow_Events__To__Prefect_Server.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
import logging

from osbot_prefect.server.Prefect__Artifacts import Prefect__Artifacts
from osbot_prefect.utils.for__osbot_aws import in_aws_lambda
from osbot_utils.utils.Env import in_github_action
from osbot_utils.utils.Misc import time_now
from osbot_utils.helpers.Random_Guid import Random_Guid
from osbot_utils.helpers.flows.Task import Task

from osbot_prefect.server.Prefect__States import Prefect__States
from osbot_utils.utils.Dev import pprint
from osbot_prefect.server.Prefect__Cloud_API import Prefect__Cloud_API
from osbot_utils.helpers.flows.Flow import Flow
from osbot_utils.base_classes.Type_Safe import Type_Safe
from osbot_utils.helpers.flows.Flow__Events import flow_events, Flow__Event_Type, Flow__Event
from osbot_prefect.server.Prefect__Artifacts import Prefect__Artifacts
from osbot_prefect.utils.for__osbot_aws import in_aws_lambda
from osbot_utils.helpers.flows.models.Flow_Run__Event_Data import Flow_Run__Event_Data
from osbot_utils.utils.Env import in_github_action
from osbot_utils.utils.Misc import time_now, timestamp_to_datetime
from osbot_utils.helpers.Random_Guid import Random_Guid
from osbot_utils.utils.Dev import pprint
from osbot_prefect.server.Prefect__Cloud_API import Prefect__Cloud_API
from osbot_utils.base_classes.Type_Safe import Type_Safe
from osbot_utils.helpers.flows.Flow__Events import flow_events, Flow_Run__Event_Type, Flow_Run__Event


class Flow_Events__To__Prefect_Server(Type_Safe):
Expand All @@ -29,14 +24,16 @@ def __exit__(self, exc_type, exc_val, exc_tb):
def add_event_listener(self):
flow_events.event_listeners.append(self.event_listener)

def handle_event(self, event_type: Flow__Event_Type, event_source, event_data):
if event_type == Flow__Event_Type.FLOW_MESSAGE: self.handle_event__task_message(event_data = event_data )
elif event_type == Flow__Event_Type.FLOW_START : self.handle_event__flow_start (flow = event_source)
elif event_type == Flow__Event_Type.FLOW_STOP : self.handle_event__flow_stop (flow = event_source)
elif event_type == Flow__Event_Type.NEW_RESULT : self.handle_event__new_result (event_data = event_data )
elif event_type == Flow__Event_Type.NEW_ARTIFACT: self.handle_event__new_artifact(event_data = event_data )
elif event_type == Flow__Event_Type.TASK_START : self.handle_event__task_start (task = event_source)
elif event_type == Flow__Event_Type.TASK_STOP : self.handle_event__task_stop (task = event_source)
def handle_event(self, flow_event: Flow_Run__Event):
event_type = flow_event.event_type
event_data = flow_event.event_data
if event_type == Flow_Run__Event_Type.FLOW_MESSAGE: self.handle_event__task_message(flow_event = flow_event)
elif event_type == Flow_Run__Event_Type.FLOW_START : self.handle_event__flow_start (event_data = event_data)
elif event_type == Flow_Run__Event_Type.FLOW_STOP : self.handle_event__flow_stop (event_data = event_data)
elif event_type == Flow_Run__Event_Type.NEW_RESULT : self.handle_event__new_result (event_data = event_data)
elif event_type == Flow_Run__Event_Type.NEW_ARTIFACT: self.handle_event__new_artifact(event_data = event_data)
elif event_type == Flow_Run__Event_Type.TASK_START : self.handle_event__task_start (event_data = event_data)
elif event_type == Flow_Run__Event_Type.TASK_STOP : self.handle_event__task_stop (event_data = event_data)
else:
print()
print(f"Error in handle_event, unknown event_type: {event_type}")
Expand All @@ -50,11 +47,12 @@ def current_execution_environment(self):
return 'local'

def handle_event__new_artifact(self, event_data):
artifact_key = event_data.get('key' ) # add code to validate this value
artifact_type = event_data.get('type' )
artifact_description = event_data.get('description')
artifact_data = event_data.get('data' )
flow_run_id = event_data.get('flow_run_id')
artifact_data = event_data.data.get('artifact_data')
flow_run_id = event_data.flow_run_id
artifact_key = artifact_data.get('key' ) # add code to validate this value
artifact_type = artifact_data.get('type' )
artifact_description = artifact_data.get('description')
artifact_data = artifact_data.get('data' )
prefect__flow_run_id = self.prefect_ids_mapping.get(flow_run_id)

kwargs = { "key" : artifact_key , # find a better name for this variable than kwargs
Expand All @@ -65,38 +63,45 @@ def handle_event__new_artifact(self, event_data):
self.prefect_cloud_api.artifacts__create(kwargs)

def handle_event__new_result(self, event_data):
result_key = event_data.get('key' ) # add code to validate this value
result_description = event_data.get('description')
flow_run_id = event_data.get('flow_run_id')
data = event_data.data.get('result_data')
flow_run_id = event_data.flow_run_id
result_key = data.get('key' ) # add code to validate this value
result_description = data.get('description')
prefect__flow_run_id = self.prefect_ids_mapping.get(flow_run_id)
result_data = { "key" : result_key ,
"type" : Prefect__Artifacts.RESULT ,
"description": result_description ,
"flow_run_id": prefect__flow_run_id }
self.prefect_cloud_api.artifacts__create(result_data)

def handle_event__task_message(self, event_data):
flow_run_id = event_data.get('flow_run_id')
task_run_id = event_data.get('task_run_id')
def handle_event__task_message(self, flow_event: Flow_Run__Event):
event_data = flow_event.event_data
event_timestamp = flow_event.timestamp
flow_run_id = event_data.flow_run_id
task_run_id = event_data.task_run_id
message_data = event_data.data.get('message_data')
prefect__flow_run_id = self.prefect_ids_mapping.get(flow_run_id)
prefect__task_run_id = self.prefect_ids_mapping.get(task_run_id)

event_datetime = timestamp_to_datetime(event_timestamp)
prefect_timestamp = self.prefect_cloud_api.to_prefect_timestamp(event_datetime)
log_name = 'log-message'
log_data = dict(flow_run_id = prefect__flow_run_id ,
task_run_id = prefect__task_run_id ,
level = event_data.get('log_level' , logging.INFO ),
name = log_name ,
message = event_data.get('message_text' ,'' ),
timestamp = self.prefect_cloud_api.to_prefect_timestamp__now_utc())
log_data = dict(flow_run_id = prefect__flow_run_id ,
task_run_id = prefect__task_run_id ,
level = message_data.get('log_level' ),
name = log_name ,
message = message_data.get('message_text',''),
timestamp = prefect_timestamp )

self.prefect_cloud_api.logs__create([log_data])

def handle_event__flow_start(self, flow: Flow):
prefect__flow_id = self.prefect_cloud_api.flow__create({'name': flow.flow_name}).data.id
def handle_event__flow_start(self, event_data: Flow_Run__Event_Data):
flow_name = event_data.flow_name
flow_id = event_data.flow_id
prefect__flow_id = self.prefect_cloud_api.flow__create({'name': flow_name}).data.id
tag__current_env = self.current_execution_environment()
tag__current_time = time_now()
prefect__flow_run_definition = dict(flow_id = prefect__flow_id ,
name = flow.flow_id ,
name = flow_id ,
parameters = dict(answer = 42 ,
source = 'handle_event__flow_start' ),
context = dict(context_1 = 42 ,
Expand All @@ -107,21 +112,24 @@ def handle_event__flow_start(self, flow: Flow):
pprint("******* Error in handle_event__flow_start ***** ") # todo: move this to a Flow Events logging system
pprint(prefect_flow_run)
else:
prefect__flow_run_id = prefect_flow_run.data.id
self.prefect_ids_mapping[flow.flow_name] = prefect__flow_id
self.prefect_ids_mapping[flow.flow_id ] = prefect__flow_run_id
prefect__flow_run_id = prefect_flow_run.data.id
self.prefect_ids_mapping[flow_name] = prefect__flow_id
self.prefect_ids_mapping[flow_id ] = prefect__flow_run_id
self.prefect_cloud_api.flow_run__set_state_type__running(prefect__flow_run_id)

def handle_event__flow_stop(self, flow: Flow):
prefect__flow_run_id = self.prefect_ids_mapping.get(flow.flow_id)
def handle_event__flow_stop(self, event_data: Flow_Run__Event_Data):
prefect__flow_run_id = self.prefect_ids_mapping.get(event_data.flow_id)
self.prefect_cloud_api.flow_run__set_state_type__completed(prefect__flow_run_id)

def handle_event__task_start(self, task: Task):
prefect__flow_run_id = self.prefect_ids_mapping[task.task_flow.flow_id]
def handle_event__task_start(self, event_data: Flow_Run__Event_Data):
flow_run_id = event_data.flow_run_id
task_name = event_data.task_name
task_run_id = event_data.task_run_id
prefect__flow_run_id = self.prefect_ids_mapping[flow_run_id]
prefect__task_run_definition = { 'flow_run_id' : prefect__flow_run_id,
'dynamic_key' : Random_Guid() ,
'task_key' : Random_Guid() ,
'name' : task.task_name ,
'name' : task_name ,
'task_inputs' : {"prop_1": [{"input_type": "parameter" ,
"name" : "an-parameter" },
{"input_type": "constant" ,
Expand All @@ -133,18 +141,16 @@ def handle_event__task_start(self, task: Task):
pprint(prefect__task_run)
else:
prefect__task_run_id = prefect__task_run.data.id
self.prefect_ids_mapping[task.task_id] = prefect__task_run_id
self.prefect_ids_mapping[task_run_id] = prefect__task_run_id # capture mapping # todo: this will need a persistent layer, when running in pure S3
self.prefect_cloud_api.task_run__set_state_type__running(prefect__task_run_id)

def handle_event__task_stop(self, task):
prefect__task_run_id = self.prefect_ids_mapping.get(task.task_id)
def handle_event__task_stop(self, event_data: Flow_Run__Event_Data):
task_run_id = event_data.task_run_id
prefect__task_run_id = self.prefect_ids_mapping.get(task_run_id)
self.prefect_cloud_api.task_run__set_state_type__running__completed(prefect__task_run_id)

def event_listener(self, flow_event: Flow__Event):
event_type = flow_event.event_type
event_source = flow_event.event_source
event_data = flow_event.event_data
self.handle_event(event_type=event_type, event_source=event_source, event_data=event_data)
def event_listener(self, flow_event: Flow_Run__Event):
self.handle_event(flow_event=flow_event)

def remove_event_listener(self):
flow_events.event_listeners.remove(self.event_listener)
2 changes: 1 addition & 1 deletion osbot_prefect/server/Prefect__Cloud_API.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def flows_ids(self, limit=5): # todo: see
def logs__create(self, log_data):
return self.prefect_rest_api.create(target='logs', data=log_data)

def logs__filter(self, limit=5):
def logs__filter(self, limit=100):
filter_data = {"sort": "TIMESTAMP_DESC",
"limit": limit}
return self.prefect_rest_api.filter(target='logs', filter_data=filter_data)
Expand Down
10 changes: 5 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ homepage = "https://github.com/owasp-sbot/OSBot-Prefect"
repository = "https://github.com/owasp-sbot/OSBot-Prefect"

[tool.poetry.dependencies]
python = "^3.7"
python = "^3.11"
osbot-utils = "*"


Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from unittest import TestCase

from osbot_utils.utils.Misc import time_now

from osbot_utils.helpers.flows.models.Flow__Config import Flow__Config
from osbot_utils.utils.Misc import time_now
from osbot_utils.helpers.flows.models.Flow_Run__Config import Flow_Run__Config
from osbot_utils.utils.Env import load_dotenv
from osbot_utils.helpers.flows.decorators.task import task
from osbot_utils.helpers.flows.Flow import Flow
Expand All @@ -28,7 +26,7 @@ def test_event_listener(self):


# example flow
flow_config = Flow__Config(logging_enabled=False)
flow_config = Flow_Run__Config(logging_enabled=True)

@flow(flow_config=flow_config)
def an_flow_1() -> Flow:
Expand Down
Loading

0 comments on commit 0661a99

Please sign in to comment.