Skip to content

Commit

Permalink
Merge pull request #210 from mobiusml/sequential_deploy_option
Browse files Browse the repository at this point in the history
Add Sequential Deployment Option
  • Loading branch information
movchan74 authored Nov 27, 2024
2 parents 138f9e3 + 9b2ea58 commit 10af9cc
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 17 deletions.
8 changes: 7 additions & 1 deletion aana/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ def load_app(app_path: str):
is_flag=True,
help="Skip migrations before deploying",
)
@click.option(
"--sequential-deploy",
is_flag=True,
help="Deploy the application's deployments sequentially",
)
def deploy(
app_path: str,
host: str,
Expand All @@ -80,6 +85,7 @@ def deploy(
ray_dashboard_host: str,
ray_dashboard_port: int,
skip_migrations: bool,
sequential_deploy: bool,
):
"""Deploy the application.
Expand All @@ -100,7 +106,7 @@ def deploy(
with contextlib.suppress(
DeploymentException
): # we have a nice error message for this
aana_app.deploy(blocking=True)
aana_app.deploy(blocking=True, sequential=sequential_deploy)


@cli.command()
Expand Down
23 changes: 8 additions & 15 deletions aana/deployments/task_queue_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ async def loop(self): # noqa: C901
configuration_attempts = 0
full_queue_attempts = 0
no_tasks_attempts = 0
no_app_attempts = 0

while True:
# Check the health of the app
Expand Down Expand Up @@ -156,22 +157,14 @@ async def loop(self): # noqa: C901
full_queue_attempts = 0

if not handle:
# Sometimes the app isn't available immediately after the deployment is created
# so we need to wait for it to become available
for _ in range(10):
try:
handle = serve.get_app_handle(self.app_name)
break
except ray.serve.exceptions.RayServeException as e:
print(
f"App {self.app_name} not available yet: {e}, retrying..."
)
await asyncio.sleep(1)
else:
# If the app is not available after all retries, try again
# but without catching the exception
# (if it fails, the deployment will be unhealthy, and restart will be attempted)
# Try to get the app handle, if it fails, wait and retry later
try:
handle = serve.get_app_handle(self.app_name)
except ray.serve.exceptions.RayServeException as e:
print(f"App {self.app_name} not available yet: {e}, retrying...")
await sleep_exponential_backoff(1.0, 5.0, no_app_attempts)
no_app_attempts += 1
continue

# Get new tasks from the database
with get_session() as session:
Expand Down
5 changes: 4 additions & 1 deletion aana/sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,12 @@ def wait_for_deployment(self): # noqa: C901

time.sleep(1) # Wait for 1 second before checking again

def deploy(self, blocking: bool = False):
def deploy(self, blocking: bool = False, sequential: bool = False):
"""Deploy the application with the registered endpoints and deployments.
Args:
blocking (bool, optional): If True, the function will block until interrupted. Defaults to False.
sequential (bool, optional): If True, the deployments will be deployed sequentially. Defaults to False.
"""
try:
for deployment_name in self.deployments:
Expand All @@ -369,6 +370,8 @@ def deploy(self, blocking: bool = False):
route_prefix=f"/{deployment_name}",
_blocking=False,
)
if sequential:
self.wait_for_deployment()

serve.api._run(
self.get_main_app(),
Expand Down

0 comments on commit 10af9cc

Please sign in to comment.