From 2b23ff0740aaf6b6cf09e219a68d4f6c4f2ddc11 Mon Sep 17 00:00:00 2001 From: abuabraham-ttd Date: Tue, 10 Dec 2024 13:14:11 -0800 Subject: [PATCH] Move validations around --- scripts/aws/ec2.py | 15 +++---- scripts/confidential_compute.py | 78 ++++++++++++++++++--------------- 2 files changed, 48 insertions(+), 45 deletions(-) diff --git a/scripts/aws/ec2.py b/scripts/aws/ec2.py index 1ecb457cc..4c32e296d 100644 --- a/scripts/aws/ec2.py +++ b/scripts/aws/ec2.py @@ -46,11 +46,7 @@ def __get_current_region(self) -> str: except requests.RequestException as e: raise RuntimeError(f"Failed to fetch region: {e}") - def __validate_configs(self, secret): - required_keys = ["api_token", "environment", "core_base_url", "optout_base_url"] - missing_keys = [key for key in required_keys if key not in secret] - if missing_keys: - raise ConfidentialComputeMissingConfigError(missing_keys) + def __validate_ec2_specific_config(self, secret): if "enclave_memory_mb" in secret or "enclave_cpu_count" in secret: max_capacity = self.__get_max_capacity() for key in ["enclave_memory_mb", "enclave_cpu_count"]: @@ -63,12 +59,12 @@ def _get_secret(self, secret_identifier: str) -> ConfidentialComputeConfig: try: client = boto3.client("secretsmanager", region_name=region) except Exception as e: - raise RuntimeError("Please specify AWS secrets as env values, or use IAM instance profile for your instance") + raise RuntimeError("Please use IAM instance profile for your instance that has permission to access Secret Manager") try: secret = json.loads(client.get_secret_value(SecretId=secret_identifier)["SecretString"]) - self.__validate_configs(secret) + self.__validate_ec2_specific_config(secret) return self.__add_defaults(secret) - except ClientError as e: + except ClientError as _: raise SecretNotFoundException(f"{secret_identifier} in {region}") @staticmethod @@ -144,7 +140,7 @@ def _setup_auxiliaries(self) -> None: def _validate_auxiliaries(self) -> None: """Validates auxiliary services.""" - self.validate_operator_key() + self.validate_configuration() proxy = "socks5://127.0.0.1:3306" config_url = "http://127.0.0.1:27015/getConfig" try: @@ -163,7 +159,6 @@ def run_compute(self) -> None: """Main execution flow for confidential compute.""" self._setup_auxiliaries() self._validate_auxiliaries() - self.validate_connectivity() command = [ "nitro-cli", "run-enclave", "--eif-path", "/opt/uid2operator/uid2operator.eif", diff --git a/scripts/confidential_compute.py b/scripts/confidential_compute.py index 186781eab..faf170b9c 100644 --- a/scripts/confidential_compute.py +++ b/scripts/confidential_compute.py @@ -20,7 +20,25 @@ class ConfidentialCompute(ABC): def __init__(self): self.configs: ConfidentialComputeConfig = {} - def validate_environment(self): + def validate_configuration(self): + """ Validates the paramters specified through configs/secret manager .""" + + def validate_operator_key(): + """ Validates the operator key format and its environment alignment.""" + operator_key = self.configs.get("api_token") + if not operator_key: + raise ValueError("API token is missing from the configuration.") + pattern = r"^(UID2|EUID)-.\-(I|P)-\d+-\*$" + if re.match(pattern, operator_key): + env = self.configs.get("environment", "").lower() + debug_mode = self.configs.get("debug_mode", False) + expected_env = "I" if debug_mode or env == "integ" else "P" + if operator_key.split("-")[2] != expected_env: + raise ValueError( + f"Operator key does not match the expected environment ({expected_env})." + ) + return True + def validate_url(url_key, environment): """URL should include environment except in prod""" if environment != "prod" and environment not in self.configs[url_key]: @@ -33,6 +51,27 @@ def validate_url(url_key, environment): f"{url_key} is invalid. Ensure {self.configs[url_key]} follows HTTPS, and doesn't have any path specified." ) + def validate_connectivity(self) -> None: + """ Validates that the core and opt-out URLs are accessible.""" + try: + core_url = self.configs["core_base_url"] + optout_url = self.configs["optout_base_url"] + core_ip = self.__resolve_hostname(core_url) + requests.get(core_url, timeout=5) + optout_ip = self.__resolve_hostname(optout_url) + requests.get(optout_url, timeout=5) + except (requests.ConnectionError, requests.Timeout) as e: + raise Exception( + f"Failed to reach required URLs. Consider enabling {core_ip}, {optout_ip} in the egress firewall." + ) + except Exception as e: + raise Exception("Failed to reach the URLs.") from e + + required_keys = ["api_token", "environment", "core_base_url", "optout_base_url"] + missing_keys = [key for key in required_keys if key not in self.configs] + if missing_keys: + raise ConfidentialComputeMissingConfigError(missing_keys) + environment = self.configs["environment"] if self.configs.get("debug_mode") and environment == "prod": @@ -40,41 +79,10 @@ def validate_url(url_key, environment): validate_url("core_base_url", environment) validate_url("optout_base_url", environment) - - - def validate_operator_key(self): - """ Validates the operator key format and its environment alignment.""" - operator_key = self.configs.get("api_token") - if not operator_key: - raise ValueError("API token is missing from the configuration.") - pattern = r"^(UID2|EUID)-.\-(I|P)-\d+-\*$" - if re.match(pattern, operator_key): - env = self.configs.get("environment", "").lower() - debug_mode = self.configs.get("debug_mode", False) - expected_env = "I" if debug_mode or env == "integ" else "P" - if operator_key.split("-")[2] != expected_env: - raise ValueError( - f"Operator key does not match the expected environment ({expected_env})." - ) - return True - - def validate_connectivity(self) -> None: - """ Validates that the core and opt-out URLs are accessible.""" - try: - core_url = self.configs["core_base_url"] - optout_url = self.configs["optout_base_url"] - core_ip = self.__resolve_hostname(core_url) - requests.get(core_url, timeout=5) - optout_ip = self.__resolve_hostname(optout_url) - requests.get(optout_url, timeout=5) - except (requests.ConnectionError, requests.Timeout) as e: - raise Exception( - f"Failed to reach required URLs. Consider enabling {core_ip}, {optout_ip} in the egress firewall." - ) - except Exception as e: - raise Exception("Failed to reach the URLs.") from e + validate_operator_key() + validate_connectivity() - + @abstractmethod def _get_secret(self, secret_identifier: str) -> ConfidentialComputeConfig: """