Skip to content

Commit

Permalink
try to fix async calls and executions
Browse files Browse the repository at this point in the history
  • Loading branch information
VladimirFilonov committed Jan 7, 2025
1 parent 29962fc commit b53d22e
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 10 deletions.
2 changes: 1 addition & 1 deletion keep/api/tasks/process_event_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ def __handle_formatted_events(
# insert the events to the workflow manager process queue
logger.info("Adding events to the workflow manager queue")
loop = asyncio.get_event_loop()
loop.run(workflow_manager.insert_events(tenant_id, enriched_formatted_events))
asyncio.ensure_future(workflow_manager.insert_events(tenant_id, enriched_formatted_events), loop=loop)
logger.info("Added events to the workflow manager queue")
except Exception:
logger.exception(
Expand Down
17 changes: 10 additions & 7 deletions keep/workflowmanager/workflowscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ async def _handle_interval_workflows(self):
)
)
self.futures.add(future)
future.add_done_callback(lambda f: self.futures.remove(f))
future.add_done_callback(self._async_task_callback)

async def _run_workflow(
self,
Expand Down Expand Up @@ -609,14 +609,13 @@ async def _handle_event_workflows(self):
event,
))
self.futures.add(future)
future.add_done_callback(lambda f: self.futures.remove(f))
future.add_done_callback(self._async_task_callback)

self.logger.debug(
"Event workflows handled",
extra={"current_number_of_workflows": len(self.futures)},
)


async def _run(self):
self.logger.info("Starting workflows scheduler")
while not self._stop:
Expand Down Expand Up @@ -644,9 +643,7 @@ def stop(self):
# Wait for scheduler to stop first
if self.run_future:
try:
self.run_future.result(
timeout=5
) # Add timeout to prevent hanging
self.run_future.cancel() # Add timeout to prevent hanging
except Exception:
self.logger.exception("Error waiting for scheduler to stop")

Expand All @@ -655,7 +652,7 @@ def stop(self):
try:
self.logger.info("Cancelling future")
future.cancel()
future.result(timeout=1) # Add timeout
future.result() # Add timeout
self.logger.info("Future cancelled")
except Exception:
self.logger.exception("Error cancelling future")
Expand Down Expand Up @@ -727,3 +724,9 @@ async def _finish_workflow_execution(
self.logger.error(
f"Failed to send email to {workflow.created_by} for failed workflow {workflow_id}: {e}"
)

def _async_task_callback(self, future):
if isinstance(future.exception(), Exception):
self.logger.exception(future.exception())
raise future.exception()
self.futures.remove(future)
4 changes: 2 additions & 2 deletions tests/test_workflow_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ async def test_workflow_execution(
)
create_alert("fp1", alert_status, base_time - timedelta(minutes=time_diff))

await asyncio.sleep(1)
# await asyncio.sleep(1)

# Create the current alert
current_alert = AlertDto(
Expand Down Expand Up @@ -275,7 +275,7 @@ async def test_workflow_execution(
"send-slack-message-tier-2" in workflow_execution.results and
workflow_execution.status == "success"):
found = True
await asyncio.sleep(1)
await asyncio.sleep(0.1)
count += 1

await workflow_manager.stop()
Expand Down

0 comments on commit b53d22e

Please sign in to comment.