From be3c597d2c77ec9e1c8ce930b0cd5051a68f5b73 Mon Sep 17 00:00:00 2001 From: Gunjan Chhablani Date: Fri, 26 Apr 2024 01:52:40 -0400 Subject: [PATCH] [Monitoring] Update auto scaling scripts to use metrics by PK API (#4355) --- scripts/monitoring/auto_scale_ec2_workers.py | 15 ++++---- scripts/monitoring/auto_scale_eks_nodes.py | 18 +++++----- scripts/monitoring/auto_scale_workers.py | 37 ++++++++++++-------- scripts/monitoring/evalai_interface.py | 8 +++++ 4 files changed, 47 insertions(+), 31 deletions(-) diff --git a/scripts/monitoring/auto_scale_ec2_workers.py b/scripts/monitoring/auto_scale_ec2_workers.py index 1742dace81..76689fca29 100644 --- a/scripts/monitoring/auto_scale_ec2_workers.py +++ b/scripts/monitoring/auto_scale_ec2_workers.py @@ -72,15 +72,17 @@ def start_instance(challenge, evalai_interface): ) -def start_or_stop_workers(challenge, challenge_metrics, evalai_interface): +def start_or_stop_workers(challenge, evalai_interface): try: + challenge_metrics = evalai_interface.get_challenge_submission_metrics_by_pk(challenge["id"]) pending_submissions = get_pending_submission_count(challenge_metrics) - except Exception: # noqa: F841 + except Exception as e: # noqa: F841 print( "Unable to get the pending submissions for challenge ID: {}, Title: {}. Skipping.".format( challenge["id"], challenge["title"] ) ) + print(e) return print("Pending Submissions: {}, Challenge PK: {}, Title: {}".format(pending_submissions, challenge["id"], challenge["title"])) @@ -94,11 +96,11 @@ def start_or_stop_workers(challenge, challenge_metrics, evalai_interface): # TODO: Factor in limits for the APIs -def start_or_stop_workers_for_challenges(response, metrics, evalai_interface): +def start_or_stop_workers_for_challenges(response, evalai_interface): for challenge in response["results"]: if challenge["uses_ec2_worker"]: try: - start_or_stop_workers(challenge, metrics[str(challenge["id"])], evalai_interface) + start_or_stop_workers(challenge, evalai_interface) except Exception as e: print(e) @@ -112,12 +114,11 @@ def create_evalai_interface(auth_token, evalai_endpoint): def start_job(): evalai_interface = create_evalai_interface(auth_token, evalai_endpoint) response = evalai_interface.get_challenges() - metrics = evalai_interface.get_challenges_submission_metrics() - start_or_stop_workers_for_challenges(response, metrics, evalai_interface) + start_or_stop_workers_for_challenges(response, evalai_interface) next_page = response["next"] while next_page is not None: response = evalai_interface.make_request(next_page, "GET") - start_or_stop_workers_for_challenges(response, metrics, evalai_interface) + start_or_stop_workers_for_challenges(response, evalai_interface) next_page = response["next"] diff --git a/scripts/monitoring/auto_scale_eks_nodes.py b/scripts/monitoring/auto_scale_eks_nodes.py index 5a74faf245..0cd995e049 100644 --- a/scripts/monitoring/auto_scale_eks_nodes.py +++ b/scripts/monitoring/auto_scale_eks_nodes.py @@ -25,7 +25,7 @@ # Env Variables ENV = os.environ.get("ENV", "production") -AUTH_TOKEN = os.environ.get("AUTH_TOKEN") +STAFF_AUTH_TOKEN = os.environ.get("AUTH_TOKEN") EVALAI_ENDPOINT = os.environ.get("API_HOST_URL", "https://eval.ai") json_path = os.environ.get("JSON_PATH", "~/prod_eks_auth_tokens.json") @@ -107,8 +107,7 @@ def stop_eks_worker(challenge, evalai_interface, aws_keys): return response -def get_pending_submission_count_by_pk(metrics, challenge_pk): - challenge_metrics = metrics[str(challenge_pk)] +def get_pending_submission_count(challenge_metrics): pending_submissions = 0 for status in ["running", "submitted", "queued", "resuming"]: pending_submissions += challenge_metrics.get(status, 0) @@ -151,15 +150,17 @@ def scale_up_workers(challenge, original_desired_size, pending_submissions, eval ) -def scale_up_or_down_workers(challenge, metrics, evalai_interface, aws_keys, scale_up_desired_size): +def scale_up_or_down_workers(challenge, evalai_interface, staff_evalai_interface, aws_keys, scale_up_desired_size): try: - pending_submissions = get_pending_submission_count_by_pk(metrics, challenge["id"]) - except Exception: # noqa: F841 + challenge_metrics = staff_evalai_interface.get_challenge_submission_metrics_by_pk(challenge["id"]) + pending_submissions = get_pending_submission_count(challenge_metrics) + except Exception as e: # noqa: F841 print( "Unable to get the pending submissions for challenge ID: {}, Title: {}. Skipping.".format( challenge["id"], challenge["title"] ) ) + print(e) return eks_client, cluster_name, nodegroup_name = get_eks_meta( @@ -209,8 +210,7 @@ def scale_up_or_down_workers(challenge, metrics, evalai_interface, aws_keys, sca def start_job(): # Get metrics - evalai_interface = create_evalai_interface(AUTH_TOKEN) - metrics = evalai_interface.get_challenges_submission_metrics() + staff_evalai_interface = create_evalai_interface(STAFF_AUTH_TOKEN) for challenge_id, details in INCLUDED_CHALLENGE_PKS.items(): # Auth Token @@ -237,7 +237,7 @@ def start_job(): ), "Challenge ID: {}, Title: {} is either not docker-based or remote-evaluation. Skipping.".format( challenge["id"], challenge["title"] ) - scale_up_or_down_workers(challenge, metrics, evalai_interface, aws_keys, scale_up_desired_size) + scale_up_or_down_workers(challenge, evalai_interface, staff_evalai_interface, aws_keys, scale_up_desired_size) time.sleep(1) except Exception as e: print(e) diff --git a/scripts/monitoring/auto_scale_workers.py b/scripts/monitoring/auto_scale_workers.py index 0ed732a8bc..48e83d6a41 100644 --- a/scripts/monitoring/auto_scale_workers.py +++ b/scripts/monitoring/auto_scale_workers.py @@ -88,19 +88,27 @@ def scale_up_or_down_workers(challenge, challenge_metrics): # TODO: Factor in limits for the APIs -def scale_up_or_down_workers_for_challenges(response, metrics): +def scale_up_or_down_workers_for_challenge(challenge, challenge_metrics): + if ENV == "prod": + try: + if challenge["remote_evaluation"] is False: + scale_up_or_down_workers(challenge, challenge_metrics) + except Exception as e: + print(e) + else: + try: + scale_up_or_down_workers(challenge, challenge_metrics) + except Exception as e: + print(e) + + +def scale_up_or_down_workers_for_challenges(response, evalai_interface): for challenge in response["results"]: - if ENV == "prod": - try: - if challenge["remote_evaluation"] is False: - scale_up_or_down_workers(challenge, metrics[str(challenge["id"])]) - except Exception as e: - print(e) - else: - try: - scale_up_or_down_workers(challenge, metrics[str(challenge["id"])]) - except Exception as e: - print(e) + try: + challenge_metrics = evalai_interface.get_challenge_submission_metrics_by_pk(challenge["id"]) + scale_up_or_down_workers_for_challenge(challenge, challenge_metrics) + except Exception as e: + print(e) def create_evalai_interface(auth_token, evalai_endpoint): @@ -112,12 +120,11 @@ def create_evalai_interface(auth_token, evalai_endpoint): def start_job(): evalai_interface = create_evalai_interface(auth_token, evalai_endpoint) response = evalai_interface.get_challenges() - metrics = evalai_interface.get_challenges_submission_metrics() - scale_up_or_down_workers_for_challenges(response, metrics) + scale_up_or_down_workers_for_challenges(response, evalai_interface) next_page = response["next"] while next_page is not None: response = evalai_interface.make_request(next_page, "GET") - scale_up_or_down_workers_for_challenges(response, metrics) + scale_up_or_down_workers_for_challenges(response, evalai_interface) next_page = response["next"] diff --git a/scripts/monitoring/evalai_interface.py b/scripts/monitoring/evalai_interface.py index 9959050ce0..5c5c14b4d9 100644 --- a/scripts/monitoring/evalai_interface.py +++ b/scripts/monitoring/evalai_interface.py @@ -19,6 +19,7 @@ "get_challenges": "/api/challenges/challenge/all/all/all", "get_submissions_for_challenge": "/api/jobs/challenge/{}/submission/", "get_challenges_submission_metrics": "/api/challenges/challenge/get_submission_metrics", + "get_challenge_submission_metrics_by_pk": "/api/challenges/challenge/get_submission_metrics_by_pk/{}/", "manage_ec2_instance": "/api/challenges/{}/manage_ec2_instance/{}", "get_ec2_instance_details": "/api/challenges/{}/get_ec2_instance_details/", } @@ -144,6 +145,13 @@ def get_challenges_submission_metrics(self): response = self.make_request(url, "GET") return response + def get_challenge_submission_metrics_by_pk(self, challenge_pk): + url_template = URLS.get("get_challenge_submission_metrics_by_pk") + url = url_template.format(challenge_pk) + url = self.return_url_per_environment(url) + response = self.make_request(url, "GET") + return response + def get_ec2_instance_details(self, challenge_pk): url_template = URLS.get("get_ec2_instance_details") url = url_template.format(challenge_pk)