diff --git a/aana/deployments/task_queue_deployment.py b/aana/deployments/task_queue_deployment.py index eef68cdd..392d889a 100644 --- a/aana/deployments/task_queue_deployment.py +++ b/aana/deployments/task_queue_deployment.py @@ -66,6 +66,29 @@ def __del__(self): progress=0, ) + async def app_health_check(self) -> bool: + """Check the health of the app. + + The app is considered healthy if for every deployment, at least 50% of the replicas are running. + The reason for this is that even if some replicas are not running, the app can still can process requests. + And in the cluster setup, it is possible that some replicas on other nodes are not running or just starting up + and it is not a reason to consider the app unhealthy. + + Returns: + bool: True if the app is healthy, False otherwise + """ + serve_status = serve.status() + for app in serve_status.applications.values(): + for deployment in app.deployments.values(): + num_replicas = sum(deployment.replica_states.values()) + if num_replicas > 0: + health_ratio = ( + deployment.replica_states.get("RUNNING", 0) / num_replicas + ) + if health_ratio < 0.5: + return False + return True + async def apply_config(self, config: dict[str, Any]): """Apply the configuration. @@ -83,11 +106,22 @@ async def loop(self): # noqa: C901 The loop will check the queue and assign tasks to workers. """ handle = None + app_health_check_attempts = 0 configuration_attempts = 0 full_queue_attempts = 0 no_tasks_attempts = 0 while True: + # Check the health of the app + app_health = await self.app_health_check() + if not app_health: + # If the app is not healthy, wait and retry + await sleep_exponential_backoff(1.0, 5.0, app_health_check_attempts) + app_health_check_attempts += 1 + continue + else: + app_health_check_attempts = 0 + if not self._configured: # Wait for the deployment to be configured. await sleep_exponential_backoff(1.0, 5.0, configuration_attempts)