Skip to content

Commit

Permalink
minor fix
Browse files Browse the repository at this point in the history
  • Loading branch information
DinisCruz committed Oct 15, 2024
1 parent 4f5422b commit 98e288e
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions osbot_prefect/flows/Flow_Events__To__Prefect_Server.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ def handle_event__task_message(self, flow_event: Flow_Run__Event):
self.prefect_cloud_api.logs__create([log_data])

def handle_event__flow_start(self, event_data: Flow_Run__Event_Data):
flow_name = event_data.flow_name
flow_id = event_data.flow_id
flow_name = event_data.flow_name
flow_run_id = event_data.flow_run_id
prefect__flow_id = self.prefect_cloud_api.flow__create({'name': flow_name}).data.id
tag__current_env = self.current_execution_environment()
tag__current_time = time_now()
prefect__flow_run_definition = dict(flow_id = prefect__flow_id ,
name = flow_id ,
name = flow_run_id ,
parameters = dict(answer = 42 ,
source = 'handle_event__flow_start' ),
context = dict(context_1 = 42 ,
Expand All @@ -112,20 +112,22 @@ def handle_event__flow_start(self, event_data: Flow_Run__Event_Data):
pprint("******* Error in handle_event__flow_start ***** ") # todo: move this to a Flow Events logging system
pprint(prefect_flow_run)
else:
prefect__flow_run_id = prefect_flow_run.data.id
self.prefect_ids_mapping[flow_name] = prefect__flow_id
self.prefect_ids_mapping[flow_id ] = prefect__flow_run_id
prefect__flow_run_id = prefect_flow_run.data.id
self.prefect_ids_mapping[flow_name ] = prefect__flow_id
self.prefect_ids_mapping[flow_run_id] = prefect__flow_run_id
self.prefect_cloud_api.flow_run__set_state_type__running(prefect__flow_run_id)

def handle_event__flow_stop(self, event_data: Flow_Run__Event_Data):
prefect__flow_run_id = self.prefect_ids_mapping.get(event_data.flow_id)
prefect__flow_run_id = self.prefect_ids_mapping.get(event_data.flow_run_id)
self.prefect_cloud_api.flow_run__set_state_type__completed(prefect__flow_run_id)

def handle_event__task_start(self, event_data: Flow_Run__Event_Data):
flow_run_id = event_data.flow_run_id
task_name = event_data.task_name
task_run_id = event_data.task_run_id
prefect__flow_run_id = self.prefect_ids_mapping[flow_run_id]
prefect__flow_run_id = self.prefect_ids_mapping.get(flow_run_id)
if prefect__flow_run_id is None:
raise ValueError(f"in handle_event__task_start, could not find prefect__flow_run_id from flow_run_id: {flow_run_id}")
prefect__task_run_definition = { 'flow_run_id' : prefect__flow_run_id,
'dynamic_key' : Random_Guid() ,
'task_key' : Random_Guid() ,
Expand Down

0 comments on commit 98e288e

Please sign in to comment.