From b53d22e47e9a723c46ad8acef7169be2459a4e54 Mon Sep 17 00:00:00 2001 From: Vladimir Filonov Date: Tue, 7 Jan 2025 16:48:40 +0400 Subject: [PATCH] try to fix async calls and executions --- keep/api/tasks/process_event_task.py | 2 +- keep/workflowmanager/workflowscheduler.py | 17 ++++++++++------- tests/test_workflow_execution.py | 4 ++-- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/keep/api/tasks/process_event_task.py b/keep/api/tasks/process_event_task.py index 4a73f77b3..88895573c 100644 --- a/keep/api/tasks/process_event_task.py +++ b/keep/api/tasks/process_event_task.py @@ -427,7 +427,7 @@ def __handle_formatted_events( # insert the events to the workflow manager process queue logger.info("Adding events to the workflow manager queue") loop = asyncio.get_event_loop() - loop.run(workflow_manager.insert_events(tenant_id, enriched_formatted_events)) + asyncio.ensure_future(workflow_manager.insert_events(tenant_id, enriched_formatted_events), loop=loop) logger.info("Added events to the workflow manager queue") except Exception: logger.exception( diff --git a/keep/workflowmanager/workflowscheduler.py b/keep/workflowmanager/workflowscheduler.py index caf705898..135003893 100644 --- a/keep/workflowmanager/workflowscheduler.py +++ b/keep/workflowmanager/workflowscheduler.py @@ -167,7 +167,7 @@ async def _handle_interval_workflows(self): ) ) self.futures.add(future) - future.add_done_callback(lambda f: self.futures.remove(f)) + future.add_done_callback(self._async_task_callback) async def _run_workflow( self, @@ -609,14 +609,13 @@ async def _handle_event_workflows(self): event, )) self.futures.add(future) - future.add_done_callback(lambda f: self.futures.remove(f)) + future.add_done_callback(self._async_task_callback) self.logger.debug( "Event workflows handled", extra={"current_number_of_workflows": len(self.futures)}, ) - async def _run(self): self.logger.info("Starting workflows scheduler") while not self._stop: @@ -644,9 +643,7 @@ def stop(self): # Wait for scheduler to stop first if self.run_future: try: - self.run_future.result( - timeout=5 - ) # Add timeout to prevent hanging + self.run_future.cancel() # Add timeout to prevent hanging except Exception: self.logger.exception("Error waiting for scheduler to stop") @@ -655,7 +652,7 @@ def stop(self): try: self.logger.info("Cancelling future") future.cancel() - future.result(timeout=1) # Add timeout + future.result() # Add timeout self.logger.info("Future cancelled") except Exception: self.logger.exception("Error cancelling future") @@ -727,3 +724,9 @@ async def _finish_workflow_execution( self.logger.error( f"Failed to send email to {workflow.created_by} for failed workflow {workflow_id}: {e}" ) + + def _async_task_callback(self, future): + if isinstance(future.exception(), Exception): + self.logger.exception(future.exception()) + raise future.exception() + self.futures.remove(future) diff --git a/tests/test_workflow_execution.py b/tests/test_workflow_execution.py index a83686b7b..1c958fafc 100644 --- a/tests/test_workflow_execution.py +++ b/tests/test_workflow_execution.py @@ -244,7 +244,7 @@ async def test_workflow_execution( ) create_alert("fp1", alert_status, base_time - timedelta(minutes=time_diff)) - await asyncio.sleep(1) + # await asyncio.sleep(1) # Create the current alert current_alert = AlertDto( @@ -275,7 +275,7 @@ async def test_workflow_execution( "send-slack-message-tier-2" in workflow_execution.results and workflow_execution.status == "success"): found = True - await asyncio.sleep(1) + await asyncio.sleep(0.1) count += 1 await workflow_manager.stop()