diff --git a/aana/cli.py b/aana/cli.py index 706ce5e3..2cd439d8 100644 --- a/aana/cli.py +++ b/aana/cli.py @@ -71,6 +71,11 @@ def load_app(app_path: str): is_flag=True, help="Skip migrations before deploying", ) +@click.option( + "--sequential-deploy", + is_flag=True, + help="Deploy the application's deployments sequentially", +) def deploy( app_path: str, host: str, @@ -80,6 +85,7 @@ def deploy( ray_dashboard_host: str, ray_dashboard_port: int, skip_migrations: bool, + sequential_deploy: bool, ): """Deploy the application. @@ -100,7 +106,7 @@ def deploy( with contextlib.suppress( DeploymentException ): # we have a nice error message for this - aana_app.deploy(blocking=True) + aana_app.deploy(blocking=True, sequential=sequential_deploy) @cli.command() diff --git a/aana/deployments/task_queue_deployment.py b/aana/deployments/task_queue_deployment.py index 1d09ad42..5b90bc99 100644 --- a/aana/deployments/task_queue_deployment.py +++ b/aana/deployments/task_queue_deployment.py @@ -110,6 +110,7 @@ async def loop(self): # noqa: C901 configuration_attempts = 0 full_queue_attempts = 0 no_tasks_attempts = 0 + no_app_attempts = 0 while True: # Check the health of the app @@ -156,22 +157,14 @@ async def loop(self): # noqa: C901 full_queue_attempts = 0 if not handle: - # Sometimes the app isn't available immediately after the deployment is created - # so we need to wait for it to become available - for _ in range(10): - try: - handle = serve.get_app_handle(self.app_name) - break - except ray.serve.exceptions.RayServeException as e: - print( - f"App {self.app_name} not available yet: {e}, retrying..." - ) - await asyncio.sleep(1) - else: - # If the app is not available after all retries, try again - # but without catching the exception - # (if it fails, the deployment will be unhealthy, and restart will be attempted) + # Try to get the app handle, if it fails, wait and retry later + try: handle = serve.get_app_handle(self.app_name) + except ray.serve.exceptions.RayServeException as e: + print(f"App {self.app_name} not available yet: {e}, retrying...") + await sleep_exponential_backoff(1.0, 5.0, no_app_attempts) + no_app_attempts += 1 + continue # Get new tasks from the database with get_session() as session: diff --git a/aana/sdk.py b/aana/sdk.py index fe717f55..47005780 100644 --- a/aana/sdk.py +++ b/aana/sdk.py @@ -355,11 +355,12 @@ def wait_for_deployment(self): # noqa: C901 time.sleep(1) # Wait for 1 second before checking again - def deploy(self, blocking: bool = False): + def deploy(self, blocking: bool = False, sequential: bool = False): """Deploy the application with the registered endpoints and deployments. Args: blocking (bool, optional): If True, the function will block until interrupted. Defaults to False. + sequential (bool, optional): If True, the deployments will be deployed sequentially. Defaults to False. """ try: for deployment_name in self.deployments: @@ -369,6 +370,8 @@ def deploy(self, blocking: bool = False): route_prefix=f"/{deployment_name}", _blocking=False, ) + if sequential: + self.wait_for_deployment() serve.api._run( self.get_main_app(),