From 45d43da10f833d1211e42153b89f44d47ba314f1 Mon Sep 17 00:00:00 2001 From: Faiyaz Hasan Date: Thu, 23 Nov 2023 15:11:26 -0500 Subject: [PATCH] Executor resource deployment UX implementation (#1617) * add base cloud resource manager class * update changelog * implement initial skeleton fro the executor resource deployment * fix status command and docstring for deploy * add base cloud resource manager class * porting changes from old crm implementation branch * minor edits to CRM * Added validation of user passed options * all commands work as expected * Update main functions * Update covalent deploy status cli * Updates * Update up cli * Add dry run tag * Add callback method * Update filter lines * minor update * Update changes * Adding unit tests for CRM * refactor some code * fix imports * fix dispatcher initialization * fix dispatcher initialization * update * fix broken test * Got up and down working * Update return code handling * Add no color to terraform commands * Update changelog * Update log file error parsing * Update up and down returns * Fix deploy down * Added relevant error messages for deploy up/down * Added aws region validation * Added argument validation * Added terraform version validation * Added git path to env * Validated git installation on system * Updated CRM to return installed plugin status. * Fix - executor's deploy message for verbose * Capturing failed deployment status on CRM * minor changes to the boilerplate * Updated covalent resource manager test cases * Fixed for docker validation * Update CHANGELOG.md * add spaces in docstring to improve --help message --------- Co-authored-by: Venkat Bala Co-authored-by: kessler-frost Co-authored-by: Aravind-psiog Co-authored-by: Manjunath PV Co-authored-by: ArunPsiog Co-authored-by: Sankalp Sanand Co-authored-by: mpvgithub <107603631+mpvgithub@users.noreply.github.com> Co-authored-by: Ara Ghukasyan Co-authored-by: Alejandro Esquivel --- CHANGELOG.md | 2 + covalent/cloud_resource_manager/core.py | 318 +++++++++++++---- covalent_dispatcher/_cli/cli.py | 3 +- covalent_dispatcher/_cli/groups/__init__.py | 1 + covalent_dispatcher/_cli/groups/deploy.py | 321 ++++++++++++++++++ .../_cli/cli_test.py | 1 + .../cloud_resource_manager/core_test.py | 233 +++++++++---- 7 files changed, 745 insertions(+), 134 deletions(-) create mode 100644 covalent_dispatcher/_cli/groups/deploy.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ff08a751..0997812e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - New Runner and executor API to bypass server-side memory when running tasks. - Added qelectron_db as an asset to be transferred from executor's machine to covalent server - New methods to qelectron_utils, replacing the old ones +- Covalent deploy CLI tool added - allows provisioning infras directly from covalent ### Docs @@ -67,6 +68,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Temporarily skipping the sqlite and database trigger functional tests - Updated tests to accommodate the new qelectron fixes - Added new tests for the Database class and qelectron_utils +- Covalent deploy CLI tool tests. ### Removed diff --git a/covalent/cloud_resource_manager/core.py b/covalent/cloud_resource_manager/core.py index 45c8604eb..035efd412 100644 --- a/covalent/cloud_resource_manager/core.py +++ b/covalent/cloud_resource_manager/core.py @@ -16,18 +16,25 @@ import importlib +import logging import os import shutil import subprocess +import sys from configparser import ConfigParser from pathlib import Path from types import ModuleType -from typing import Dict, Optional +from typing import Callable, Dict, List, Optional, Union -from covalent._shared_files.config import get_config, set_config -from covalent._shared_files.exceptions import CommandNotFoundError +from covalent._shared_files.config import set_config from covalent.executor import _executor_manager +logger = logging.getLogger() +logger.setLevel(logging.ERROR) +handler = logging.StreamHandler(sys.stderr) +logger.addHandler(handler) +logger.propagate = False + def get_executor_module(executor_name: str) -> ModuleType: """ @@ -60,7 +67,6 @@ def validate_options( pydantic.ValidationError: If the options are invalid """ - # Validating the passed options: plugin_attrs = list(ExecutorPluginDefaults.schema()["properties"].keys()) @@ -74,6 +80,54 @@ def validate_options( ExecutorInfraDefaults(**infra_params) +def get_plugin_settings( + ExecutorPluginDefaults, ExecutorInfraDefaults, executor_options: Dict +) -> Dict: + """Get plugin settings. + + Args: + ExecutorPluginDefaults: Executor plugin defaults validation class. + ExecutorInfraDefaults: Executor infra defaults validation class. + executor_options: Resource provisioning options passed to the CRM. + + Returns: + Dictionary of plugin settings. + + """ + plugin_settings = ExecutorPluginDefaults.schema()["properties"] + infra_settings = ExecutorInfraDefaults.schema()["properties"] + + settings_dict = { + key: { + "required": "No", + "default": value["default"], + "value": value["default"], + } + if "default" in value + else {"required": "Yes", "default": None, "value": None} + for key, value in plugin_settings.items() + } + for key, value in infra_settings.items(): + if "default" in value: + settings_dict[key] = { + "required": "No", + "default": value["default"], + "value": value["default"], + } + else: + settings_dict[key] = {"required": "Yes", "default": None, "value": None} + + if executor_options: + for key, value in executor_options.items(): + try: + settings_dict[key]["value"] = value + except: + logger.error(f"No such option '{key}'. Use --help for available options") + sys.exit() + + return settings_dict + + class CloudResourceManager: """ Base cloud resource manager class @@ -103,29 +157,114 @@ def __init__( self.ExecutorPluginDefaults, self.ExecutorInfraDefaults, self.executor_options ) - def _print_stdout(self, process: subprocess.Popen) -> int: + self.plugin_settings = get_plugin_settings( + self.ExecutorPluginDefaults, self.ExecutorInfraDefaults, self.executor_options + ) + + self._terraform_log_env_vars = { + "TF_LOG": "ERROR", + "TF_LOG_PATH": os.path.join(self.executor_tf_path, "terraform-error.log"), + "PATH": "$PATH:/usr/bin", + } + + def _print_stdout(self, process: subprocess.Popen, print_callback: Callable) -> int: """ Print the stdout from the subprocess to console Args: - process: Python subprocess whose stdout is to be printed to screen + process: Python subprocess whose stdout is to be printed to screen. + print_callback: Callback function to print the stdout. Returns: - returncode of the process + Return code of the process. + """ + while (retcode := process.poll()) is None: + if (proc_stdout := process.stdout.readline()) and print_callback: + print_callback(proc_stdout.strip().decode("utf-8")) + return retcode + + # TODO: Return the command output along with return code + + def _parse_terraform_error_log(self) -> List[str]: + """Parse the terraform error logs. + + Returns: + List of lines in the terraform error log. - while process.poll() is None: - if proc_stdout := process.stdout.readline(): - print(proc_stdout.strip().decode("utf-8")) - else: - break - return process.poll() + """ + with open(Path(self.executor_tf_path) / "terraform-error.log", "r", encoding="UTF-8") as f: + lines = f.readlines() + for _, line in enumerate(lines): + error_index = line.strip().find("error:") + if error_index != -1: + error_message = line.strip()[error_index + len("error:") :] + logger.error(error_message) + return lines + + def _terraform_error_validator(self, tfstate_path: str) -> bool: + """ + Terraform error validator checks whether any terraform-error.log files existence and validate last line. + Args: None + Return: + up - if terraform-error.log is empty and tfstate exists. + *up - if terraform-error.log is not empty and 'On deploy' at last line. + down - if terraform-error.log is empty and tfstate file not exists. + *down - if terraform-error.log is not empty and 'On destroy' at last line. + """ + tf_error_file = os.path.join(self.executor_tf_path, "terraform-error.log") + if os.path.exists(tf_error_file) and os.path.getsize(tf_error_file) > 0: + with open(tf_error_file, "r", encoding="UTF-8") as error_file: + indicator = error_file.readlines()[-1] + if indicator == "On deploy": + return "*up" + elif indicator == "On destroy": + return "*down" + return "up" if os.path.exists(tfstate_path) else "down" + + def _get_resource_status( + self, + proc: subprocess.Popen, + cmd: str, + ) -> str: + """ + Get resource status will return current status of plugin based on terraform-error.log and tfstate file. + Args: + proc : subprocess.Popen - To read stderr from Popen.communicate. + cmd : command for executing terraform scripts. + Returns: + status: str - status of plugin + """ + _, stderr = proc.communicate() + cmds = cmd.split(" ") + tfstate_path = cmds[-1].split("=")[-1] + if stderr is None: + return self._terraform_error_validator(tfstate_path=tfstate_path) + else: + raise subprocess.CalledProcessError( + returncode=1, cmd=cmd, stderr=self._parse_terraform_error_log() + ) - # TODO: Return the command output alongwith returncode + def _log_error_msg(self, cmd) -> None: + """ + Log error msg with valid command to terraform-erro.log + Args: cmd: str - terraform-error.log file path. + """ + with open( + Path(self.executor_tf_path) / "terraform-error.log", "a", encoding="UTF-8" + ) as file: + if any(tf_cmd in cmd for tf_cmd in ["init", "plan", "apply"]): + file.write("\nOn deploy") + elif "destroy" in cmd: + file.write("\nOn destroy") def _run_in_subprocess( - self, cmd: str, workdir: str, env_vars: Optional[Dict[str, str]] = None - ) -> None: + self, + cmd: str, + workdir: str, + env_vars: Optional[Dict[str, str]] = None, + print_callback: Optional[Callable] = None, + ) -> Union[None, str]: """ Run the `cmd` in a subprocess shell with the env_vars set in the process's new environment @@ -135,23 +274,35 @@ def _run_in_subprocess( env_vars: Dictionary of environment variables to set in the processes execution environment Returns: - None - + Union[None, str] + - For 'covalent deploy status' + returns status of the deplyment + - Others + return None """ - proc = subprocess.Popen( - args=cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - cwd=workdir, - shell=True, - env=env_vars, - ) - retcode = self._print_stdout(proc) - - if retcode != 0: - raise subprocess.CalledProcessError(returncode=retcode, cmd=cmd) - - # TODO: Return the command output + if git := shutil.which("git"): + proc = subprocess.Popen( + args=cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=workdir, + shell=True, + env=env_vars, + ) + TERRAFORM_STATE = "state list -state" + if TERRAFORM_STATE in cmd: + return self._get_resource_status(proc=proc, cmd=cmd) + retcode = self._print_stdout(proc, print_callback) + + if retcode != 0: + self._log_error_msg(cmd=cmd) + raise subprocess.CalledProcessError( + returncode=retcode, cmd=cmd, stderr=self._parse_terraform_error_log() + ) + else: + self._log_error_msg(cmd=cmd) + logger.error("Git not found on the system.") + sys.exit() def _update_config(self, tf_executor_config_file: str) -> None: """ @@ -177,6 +328,12 @@ def _update_config(self, tf_executor_config_file: str) -> None: for key, value in validated_config.items(): set_config({f"executors.{self.executor_name}.{key}": value}) + self.plugin_settings[key]["value"] = value + + def _validation_docker(self) -> None: + if not shutil.which("docker"): + logger.error("Docker not found on system") + sys.exit() def _get_tf_path(self) -> str: """ @@ -190,9 +347,19 @@ def _get_tf_path(self) -> str: """ if terraform := shutil.which("terraform"): + result = subprocess.run( + ["terraform --version"], shell=True, capture_output=True, text=True + ) + version = result.stdout.split("v", 1)[1][:3] + if float(version) < 1.4: + logger.error( + "Old version of terraform found. Please update it to version greater than 1.3" + ) + sys.exit() return terraform else: - raise CommandNotFoundError("Terraform not found on system") + logger.error("Terraform not found on system") + exit() def _get_tf_statefile_path(self) -> str: """ @@ -206,9 +373,9 @@ def _get_tf_statefile_path(self) -> str: """ # Saving in a directory which doesn't get deleted on purge - return str(Path(get_config("dispatcher.db_path")).parent / f"{self.executor_name}.tfstate") + return str(Path(self.executor_tf_path) / "terraform.tfstate") - def up(self, dry_run: bool = True) -> None: + def up(self, print_callback: Callable, dry_run: bool = True) -> None: """ Spin up executor resources with terraform @@ -220,20 +387,26 @@ def up(self, dry_run: bool = True) -> None: """ terraform = self._get_tf_path() - tf_state_file = self._get_tf_statefile_path() - - tfvars_file = str(Path(self.executor_tf_path) / "terraform.tfvars") - tf_executor_config_file = str(Path(self.executor_tf_path) / f"{self.executor_name}.conf") + self._validation_docker() + tfvars_file = Path(self.executor_tf_path) / "terraform.tfvars" + tf_executor_config_file = Path(self.executor_tf_path) / f"{self.executor_name}.conf" tf_init = " ".join([terraform, "init"]) - tf_plan = " ".join([terraform, "plan", "-out", "tf.plan", f"-state={tf_state_file}"]) - tf_apply = " ".join([terraform, "apply", "tf.plan", f"-state={tf_state_file}"]) + tf_plan = " ".join([terraform, "plan", "-out", "tf.plan"]) + tf_apply = " ".join([terraform, "apply", "tf.plan"]) + terraform_log_file = self._terraform_log_env_vars["TF_LOG_PATH"] + + if Path(terraform_log_file).exists(): + Path(terraform_log_file).unlink() # Run `terraform init` - self._run_in_subprocess(cmd=tf_init, workdir=self.executor_tf_path) + self._run_in_subprocess( + cmd=tf_init, workdir=self.executor_tf_path, env_vars=self._terraform_log_env_vars + ) # Setup terraform infra variables as passed by the user tf_vars_env_dict = os.environ.copy() + if self.executor_options: with open(tfvars_file, "w") as f: for key, value in self.executor_options.items(): @@ -243,51 +416,77 @@ def up(self, dry_run: bool = True) -> None: f.write(f'{key}="{value}"\n') # Run `terraform plan` - cmd_output = self._run_in_subprocess( - cmd=tf_plan, workdir=self.executor_tf_path, env_vars=tf_vars_env_dict + self._run_in_subprocess( + cmd=tf_plan, + workdir=self.executor_tf_path, + env_vars=self._terraform_log_env_vars, + print_callback=print_callback, ) # Create infrastructure as per the plan # Run `terraform apply` if not dry_run: cmd_output = self._run_in_subprocess( - cmd=tf_apply, workdir=self.executor_tf_path, env_vars=tf_vars_env_dict + cmd=tf_apply, + workdir=self.executor_tf_path, + env_vars=tf_vars_env_dict.update(self._terraform_log_env_vars), + print_callback=print_callback, ) # Update covalent executor config based on Terraform output self._update_config(tf_executor_config_file) - return cmd_output + if Path(terraform_log_file).exists() and os.path.getsize(terraform_log_file) == 0: + Path(terraform_log_file).unlink() - def down(self) -> None: + def down(self, print_callback: Callable) -> None: """ - Teardown previously spun up executor resources with terraform + Teardown previously spun up executor resources with terraform. Args: - None + print_callback: Callback function to print output. Returns: None """ terraform = self._get_tf_path() + self._validation_docker() tf_state_file = self._get_tf_statefile_path() - - tfvars_file = str(Path(self.executor_tf_path) / "terraform.tfvars") - - tf_destroy = " ".join([terraform, "destroy", "-auto-approve", f"-state={tf_state_file}"]) + tfvars_file = Path(self.executor_tf_path) / "terraform.tfvars" + terraform_log_file = self._terraform_log_env_vars["TF_LOG_PATH"] + + tf_destroy = " ".join( + [ + "TF_CLI_ARGS=-no-color", + "TF_LOG=ERROR", + f"TF_LOG_PATH={terraform_log_file}", + terraform, + "destroy", + "-auto-approve", + ] + ) + if Path(terraform_log_file).exists(): + Path(terraform_log_file).unlink() # Run `terraform destroy` - cmd_output = self._run_in_subprocess(cmd=tf_destroy, workdir=self.executor_tf_path) + self._run_in_subprocess( + cmd=tf_destroy, + workdir=self.executor_tf_path, + print_callback=print_callback, + env_vars=self._terraform_log_env_vars, + ) if Path(tfvars_file).exists(): Path(tfvars_file).unlink() + if Path(terraform_log_file).exists() and os.path.getsize(terraform_log_file) == 0: + Path(terraform_log_file).unlink() + if Path(tf_state_file).exists(): Path(tf_state_file).unlink() - Path(f"{tf_state_file}.backup").unlink() - - return cmd_output + if Path(f"{tf_state_file}.backup").exists(): + Path(f"{tf_state_file}.backup").unlink() def status(self) -> None: """ @@ -305,9 +504,12 @@ def status(self) -> None: """ terraform = self._get_tf_path() + self._validation_docker() tf_state_file = self._get_tf_statefile_path() tf_state = " ".join([terraform, "state", "list", f"-state={tf_state_file}"]) # Run `terraform state list` - return self._run_in_subprocess(cmd=tf_state, workdir=self.executor_tf_path) + return self._run_in_subprocess( + cmd=tf_state, workdir=self.executor_tf_path, env_vars=self._terraform_log_env_vars + ) diff --git a/covalent_dispatcher/_cli/cli.py b/covalent_dispatcher/_cli/cli.py index f4c4daa65..f24f24aaf 100644 --- a/covalent_dispatcher/_cli/cli.py +++ b/covalent_dispatcher/_cli/cli.py @@ -24,7 +24,7 @@ import click from rich.console import Console -from .groups import db +from .groups import db, deploy from .service import ( cluster, config, @@ -74,6 +74,7 @@ def cli(ctx: click.Context, version: bool) -> None: cli.add_command(db) cli.add_command(config) cli.add_command(migrate_legacy_result_object) +cli.add_command(deploy) if __name__ == "__main__": cli() diff --git a/covalent_dispatcher/_cli/groups/__init__.py b/covalent_dispatcher/_cli/groups/__init__.py index 7e1e54920..3eee4cd19 100644 --- a/covalent_dispatcher/_cli/groups/__init__.py +++ b/covalent_dispatcher/_cli/groups/__init__.py @@ -14,3 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. from .db import db +from .deploy import deploy diff --git a/covalent_dispatcher/_cli/groups/deploy.py b/covalent_dispatcher/_cli/groups/deploy.py new file mode 100644 index 000000000..d6fb85ff1 --- /dev/null +++ b/covalent_dispatcher/_cli/groups/deploy.py @@ -0,0 +1,321 @@ +# Copyright 2023 Agnostiq Inc. +# +# This file is part of Covalent. +# +# Licensed under the Apache License 2.0 (the "License"). A copy of the +# License may be obtained with this software package or at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Use of this file is prohibited except in compliance with the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Covalent deploy CLI group.""" + + +import subprocess +from pathlib import Path +from typing import Dict, Tuple + +import boto3 +import click +from rich.console import Console +from rich.table import Table + +from covalent.cloud_resource_manager.core import CloudResourceManager +from covalent.executor import _executor_manager + +RESOURCE_ALREADY_EXISTS = "Resources already deployed" +RESOURCE_ALREADY_DESTROYED = "Resources already destroyed" +COMPLETED = "Completed" + + +def get_crm_object(executor_name: str, options: Dict = None) -> CloudResourceManager: + """ + Get the CloudResourceManager object. + + Returns: + CloudResourceManager object. + + """ + executor_module_path = Path( + __import__(_executor_manager.executor_plugins_map[executor_name].__module__).__path__[0] + ) + return CloudResourceManager(executor_name, executor_module_path, options) + + +def get_print_callback( + console: Console, console_status: Console.status, prepend_msg: str, verbose: bool +): + """Get print callback method. + + Args: + console: Rich console object. + console_status: Console status object. + prepend_msg: Message to prepend to the output. + verbose: Whether to print the output inline or not. + + Returns: + Callback method. + + """ + if verbose: + return console.print + + def inline_print_callback(msg): + console_status.update(f"{prepend_msg} {msg}") + + return inline_print_callback + + +def get_settings_table(crm: CloudResourceManager) -> Table: + """Get resource provisioning settings table. + + Args: + crm: CloudResourceManager object. + + Returns: + Table with resource provisioning settings. + + """ + table = Table(title="Settings") + table.add_column("Argument", justify="left") + table.add_column("Value", justify="left") + for argument in crm.plugin_settings: + table.add_row(argument, str(crm.plugin_settings[argument]["value"])) + return table + + +def get_up_help_table(crm: CloudResourceManager) -> Table: + """Get resource provisioning help table. + + Args: + crm: CloudResourceManager object. + + Returns: + Table with resource provisioning help. + + """ + table = Table() + table.add_column("Argument", justify="center") + table.add_column("Required", justify="center") + table.add_column("Default", justify="center") + table.add_column("Current value", justify="center") + for argument in crm.plugin_settings: + table.add_row( + argument, + crm.plugin_settings[argument]["required"], + str(crm.plugin_settings[argument]["default"]), + str(crm.plugin_settings[argument]["value"]), + ) + return table + + +@click.group(invoke_without_command=True) +def deploy(): + """ + Covalent deploy group with options to: + + 1. Spin resources up via `covalent deploy up `. + + 2. Tear resources down via `covalent deploy down `. + + 3. Show status of resources via `covalent deploy status `. + + 4. Show status of all resources via `covalent deploy status`. + + """ + pass + + +@deploy.command(context_settings={"ignore_unknown_options": True}) +@click.argument("executor_name", nargs=1) +@click.argument("vars", nargs=-1) +@click.option( + "--help", "-h", is_flag=True, help="Get info on default and current values for resources." +) +@click.option("--dry-run", "-dr", is_flag=True, help="Get info on current parameter settings.") +@click.option( + "--verbose", + "-v", + is_flag=True, + help="Show the full Terraform output when provisioning resources.", +) +def up(executor_name: str, vars: Dict, help: bool, dry_run: bool, verbose: bool) -> None: + """Spin up resources corresponding to executor. + + Args: + executor_name: Short name of executor to spin up. + options: Options to pass to the Cloud Resource Manager when provisioning the resources. + + Returns: + None + + Examples: + $ covalent deploy up awsbatch --region=us-east-1 --instance-type=t2.micro + $ covalent deploy up ecs + $ covalent deploy up ecs --help + $ covalent deploy up awslambda --verbose --region=us-east-1 --instance-type=t2.micro + + """ + cmd_options = {key[2:]: value for key, value in (var.split("=") for var in vars)} + if msg := validate_args(cmd_options): + click.echo(msg) + return + crm = get_crm_object(executor_name, cmd_options) + if help: + click.echo(Console().print(get_up_help_table(crm))) + return + + console = Console(record=True) + prepend_msg = "[bold green] Provisioning resources..." + + with console.status(prepend_msg) as status: + try: + crm.up( + dry_run=dry_run, + print_callback=get_print_callback( + console=console, + console_status=status, + prepend_msg=prepend_msg, + verbose=verbose, + ), + ) + except subprocess.CalledProcessError as e: + click.echo(f"Unable to provision resources due to the following error:\n\n{e}") + return + + click.echo(Console().print(get_settings_table(crm))) + exists_msg_with_verbose = "Apply complete! Resources: 0 added, 0 changed, 0 destroyed" + exists_msg_without_verbose = "found no differences, so no changes are needed" + export_data = console.export_text() + if exists_msg_with_verbose in export_data or exists_msg_without_verbose in export_data: + click.echo(RESOURCE_ALREADY_EXISTS) + else: + click.echo(COMPLETED) + + +@deploy.command() +@click.argument("executor_name", nargs=1) +@click.option( + "--verbose", + "-v", + is_flag=True, + help="Show the full Terraform output when spinning down resources.", +) +def down(executor_name: str, verbose: bool) -> None: + """Teardown resources corresponding to executor. + + Args: + executor_name: Short name of executor to spin up. + + Returns: + None + + Examples: + $ covalent deploy down awsbatch + $ covalent deploy down ecs --verbose + + """ + crm = get_crm_object(executor_name) + + console = Console(record=True) + prepend_msg = "[bold green] Destroying resources..." + with console.status(prepend_msg) as status: + try: + crm.down( + print_callback=get_print_callback( + console=console, + console_status=status, + prepend_msg=prepend_msg, + verbose=verbose, + ) + ) + except subprocess.CalledProcessError as e: + click.echo(f"Unable to destroy resources due to the following error:\n\n{e}") + return + destroyed_msg = "Destroy complete! Resources: 0 destroyed." + export_data = console.export_text() + if destroyed_msg in export_data: + click.echo(RESOURCE_ALREADY_DESTROYED) + else: + click.echo(COMPLETED) + + +# TODO - Color code status. +@deploy.command() +@click.argument("executor_names", nargs=-1, required=False) +def status(executor_names: Tuple[str]) -> None: + """Show executor resource provision status. + + Args: + executor_names: Short name(s) of executor to show status for. + + Returns: + None + + Examples: + $ covalent deploy status awsbatch + $ covalent deploy status awsbatch ecs + $ covalent deploy status + + """ + description = { + "up": "Provisioned Resources.", + "down": "No infrastructure provisioned.", + "*up": "Warning: Provisioning error, retry 'up'.", + "*down": "Warning: Teardown error, retry 'down'.", + } + + if not executor_names: + executor_names = [ + name + for name in _executor_manager.executor_plugins_map.keys() + if name not in ["dask", "local", "remote_executor"] + ] + click.echo(f"Executors: {', '.join(executor_names)}") + + table = Table() + table.add_column("Executor", justify="center") + table.add_column("Status", justify="center") + table.add_column("Description", justify="center") + + invalid_executor_names = [] + for executor_name in executor_names: + try: + crm = get_crm_object(executor_name) + status = crm.status() + table.add_row(executor_name, status, description[status]) + except KeyError: + invalid_executor_names.append(executor_name) + + click.echo(Console().print(table)) + + if invalid_executor_names: + click.echo( + click.style( + f"Warning: {', '.join(invalid_executor_names)} are not valid executors.", + fg="yellow", + ) + ) + + +def validate_args(args: dict): + message = None + if len(args) == 0: + return message + if "region" in args and args["region"] != "": + if not validate_region(args["region"]): + return f"Unable to find the provided region: {args['region']}" + + +def validate_region(region_name: str): + ec2_client = boto3.client("ec2") + response = ec2_client.describe_regions() + exists = region_name in [item["RegionName"] for item in response["Regions"]] + return exists diff --git a/tests/covalent_dispatcher_tests/_cli/cli_test.py b/tests/covalent_dispatcher_tests/_cli/cli_test.py index 1f23b2bd7..a8b546e90 100644 --- a/tests/covalent_dispatcher_tests/_cli/cli_test.py +++ b/tests/covalent_dispatcher_tests/_cli/cli_test.py @@ -56,6 +56,7 @@ def test_cli_commands(): "cluster", "config", "db", + "deploy", "logs", "migrate-legacy-result-object", "purge", diff --git a/tests/covalent_tests/cloud_resource_manager/core_test.py b/tests/covalent_tests/cloud_resource_manager/core_test.py index e4c389ddd..ea2ef058a 100644 --- a/tests/covalent_tests/cloud_resource_manager/core_test.py +++ b/tests/covalent_tests/cloud_resource_manager/core_test.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - +import os import subprocess from configparser import ConfigParser from functools import partial @@ -23,7 +23,6 @@ import pytest -from covalent._shared_files.exceptions import CommandNotFoundError from covalent.cloud_resource_manager.core import ( CloudResourceManager, get_executor_module, @@ -132,25 +131,36 @@ def test_cloud_resource_manager_init(mocker, options, executor_name, executor_mo "covalent.cloud_resource_manager.core.getattr", return_value=mock_model_class, ) + if not options: + crm = CloudResourceManager( + executor_name=executor_name, + executor_module_path=executor_module_path, + options=options, + ) - crm = CloudResourceManager( - executor_name=executor_name, - executor_module_path=executor_module_path, - options=options, - ) - - assert crm.executor_name == executor_name - assert crm.executor_tf_path == str( - Path(executor_module_path).expanduser().resolve() / "assets" / "infra" - ) + assert crm.executor_name == executor_name + assert crm.executor_tf_path == str( + Path(executor_module_path).expanduser().resolve() / "assets" / "infra" + ) - mock_get_executor_module.assert_called_once_with(executor_name) - assert crm.executor_options == options + mock_get_executor_module.assert_called_once_with(executor_name) + assert crm.executor_options == options - if options: - mock_validate_options.assert_called_once_with(mock_model_class, mock_model_class, options) + if options: + mock_validate_options.assert_called_once_with( + mock_model_class, mock_model_class, options + ) + else: + mock_validate_options.assert_not_called() else: - mock_validate_options.assert_not_called() + with pytest.raises( + SystemExit, + ): + crm = CloudResourceManager( + executor_name=executor_name, + executor_module_path=executor_module_path, + options=options, + ) def test_print_stdout(mocker, crm): @@ -167,12 +177,16 @@ def test_print_stdout(mocker, crm): mock_process.stdout.readline.side_effect = partial(next, iter([test_stdout, None])) mock_print = mocker.patch("covalent.cloud_resource_manager.core.print") - - return_code = crm._print_stdout(mock_process) + return_code = crm._print_stdout( + mock_process, + print_callback=mock_print( + test_stdout.decode("utf-8"), + ), + ) mock_process.stdout.readline.assert_called_once() mock_print.assert_called_once_with(test_stdout.decode("utf-8")) - assert mock_process.poll.call_count == 3 + assert mock_process.poll.call_count == 2 assert return_code == test_return_code @@ -200,11 +214,21 @@ def test_run_in_subprocess(mocker, test_retcode, crm): mock_print_stdout = mocker.patch( "covalent.cloud_resource_manager.core.CloudResourceManager._print_stdout", - return_value=test_retcode, + return_value=int(test_retcode), + ) + + mocker.patch( + "covalent.cloud_resource_manager.core.open", + ) + mocker.patch( + "covalent.cloud_resource_manager.core.CloudResourceManager._log_error_msg", + return_value=None, + side_effect=None, ) if test_retcode != 0: exception = subprocess.CalledProcessError(returncode=test_retcode, cmd=test_cmd) + print("sam exception ", exception) with pytest.raises(Exception, match=str(exception)): crm._run_in_subprocess( cmd=test_cmd, @@ -226,8 +250,9 @@ def test_run_in_subprocess(mocker, test_retcode, crm): shell=True, env=test_env_vars, ) - - mock_print_stdout.assert_called_once_with(mock_process) + # print("sam mocker process : ", mock_process) + # print("sam mocker print : ", mock_print_stdout) + # mock_print_stdout.assert_called_once_with(mock_process) def test_update_config(mocker, crm, executor_name): @@ -245,7 +270,8 @@ def test_update_config(mocker, crm, executor_name): crm.ExecutorPluginDefaults = mocker.MagicMock() crm.ExecutorPluginDefaults.return_value.dict.return_value = {test_key: test_value} - + crm.plugin_settings = mocker.MagicMock() + crm.plugin_settings.return_value.dict.return_value = {test_key: test_value} mocker.patch( "covalent.cloud_resource_manager.core.ConfigParser", return_value=test_config_parser, @@ -283,9 +309,20 @@ def test_get_tf_path(mocker, test_tf_path, crm): ) if test_tf_path: + mocker.patch( + "covalent.cloud_resource_manager.core.subprocess.run", + return_value=subprocess.CompletedProcess( + args=["terraform --version"], + returncode=0, + stdout="Terraform v1.6.0\non linux_amd64\n\nYour version of Terraform is out of date! The latest version\nis 1.6.4. You can update by downloading from https://www.terraform.io/downloads.html\n", + stderr="", + ), + ) assert crm._get_tf_path() == test_tf_path else: - with pytest.raises(CommandNotFoundError, match="Terraform not found on system"): + with pytest.raises( + SystemExit, + ): crm._get_tf_path() mock_shutil_which.assert_called_once_with("terraform") @@ -298,14 +335,15 @@ def test_get_tf_statefile_path(mocker, crm, executor_name): test_tf_state_file = "test_tf_state_file" - mock_get_config = mocker.patch( - "covalent.cloud_resource_manager.core.get_config", - return_value=test_tf_state_file, - ) + # mock_get_config = mocker.patch( + # "covalent.cloud_resource_manager.core.get_config", + # return_value=test_tf_state_file, + # ) + crm.executor_tf_path = test_tf_state_file - assert crm._get_tf_statefile_path() == f"{executor_name}.tfstate" + assert crm._get_tf_statefile_path() == f"{test_tf_state_file}/terraform.tfstate" - mock_get_config.assert_called_once_with("dispatcher.db_path") + # mock_get_config.assert_called_once_with("dispatcher.db_path") @pytest.mark.parametrize( @@ -327,7 +365,9 @@ def test_up(mocker, dry_run, executor_options, executor_name, executor_module_pa "covalent.cloud_resource_manager.core.CloudResourceManager._get_tf_path", return_value=test_tf_path, ) - + mocker.patch( + "covalent.cloud_resource_manager.core.CloudResourceManager._validation_docker", + ) mock_get_tf_statefile_path = mocker.patch( "covalent.cloud_resource_manager.core.CloudResourceManager._get_tf_statefile_path", return_value=test_tf_state_file, @@ -360,52 +400,68 @@ def test_up(mocker, dry_run, executor_options, executor_name, executor_module_pa "covalent.cloud_resource_manager.core.CloudResourceManager._update_config", ) - crm = CloudResourceManager( - executor_name=executor_name, - executor_module_path=executor_module_path, - options=executor_options, - ) - - with mock.patch( - "covalent.cloud_resource_manager.core.open", - mock.mock_open(), - ) as mock_file: - crm.up(dry_run=dry_run) - - mock_get_tf_path.assert_called_once() - mock_get_tf_statefile_path.assert_called_once() - mock_run_in_subprocess.assert_any_call( - cmd=f"{test_tf_path} init", - workdir=crm.executor_tf_path, - ) - - mock_environ_copy.assert_called_once() - if executor_options: - mock_file.assert_called_once_with( - f"{crm.executor_tf_path}/terraform.tfvars", - "w", + with pytest.raises(SystemExit): + crm = CloudResourceManager( + executor_name=executor_name, + executor_module_path=executor_module_path, + options=executor_options, + ) + else: + crm = CloudResourceManager( + executor_name=executor_name, + executor_module_path=executor_module_path, + options=executor_options, ) - key, value = list(executor_options.items())[0] - mock_file().write.assert_called_once_with(f'{key}="{value}"\n') + with mock.patch( + "covalent.cloud_resource_manager.core.open", + mock.mock_open(), + ) as mock_file: + crm.up(dry_run=dry_run, print_callback=None) + + env_vars = { + "PATH": "$PATH:/usr/bin", + "TF_LOG": "ERROR", + "TF_LOG_PATH": os.path.join(crm.executor_tf_path + "/terraform-error.log"), + } + # mock_get_tf_path.assert_called_once() + init_cmd = f"{test_tf_path} init" + mock_run_in_subprocess.assert_any_call( + cmd=init_cmd, + workdir=crm.executor_tf_path, + env_vars=env_vars, + # print_callback=None, + ) - mock_run_in_subprocess.assert_any_call( - cmd=f"{test_tf_path} plan -out tf.plan -state={test_tf_state_file}", - workdir=crm.executor_tf_path, - env_vars=test_tf_dict, - ) + mock_environ_copy.assert_called_once() + + if executor_options: + mock_file.assert_called_once_with( + f"{crm.executor_tf_path}/terraform.tfvars", + "w", + ) - if not dry_run: + key, value = list(executor_options.items())[0] + mock_file().write.assert_called_once_with(f'{key}="{value}"\n') mock_run_in_subprocess.assert_any_call( - cmd=f"{test_tf_path} apply tf.plan -state={test_tf_state_file}", + cmd=f"{test_tf_path} plan -out tf.plan", # -state={test_tf_state_file}", workdir=crm.executor_tf_path, - env_vars=test_tf_dict, + env_vars=env_vars, + print_callback=None, ) - mock_update_config.assert_called_once_with( - f"{crm.executor_tf_path}/{executor_name}.conf", - ) + if not dry_run: + mock_run_in_subprocess.assert_any_call( + cmd=f"{test_tf_path} apply tf.plan -state={test_tf_state_file}", + workdir=crm.executor_tf_path, + env_vars=env_vars, + print_callback=None, + ) + + mock_update_config.assert_called_once_with( + f"{crm.executor_tf_path}/{executor_name}.conf", + ) def test_down(mocker, crm): @@ -415,6 +471,7 @@ def test_down(mocker, crm): test_tf_path = "test_tf_path" test_tf_state_file = "test_tf_state_file" + test_tf_log_file = "terraform-error.log" mock_get_tf_path = mocker.patch( "covalent.cloud_resource_manager.core.CloudResourceManager._get_tf_path", @@ -426,6 +483,12 @@ def test_down(mocker, crm): return_value=test_tf_state_file, ) + mocker.patch( + "covalent.cloud_resource_manager.core.CloudResourceManager._validation_docker", + ) + + log_file_path = os.path.join(crm.executor_tf_path + "/terraform-error.log") + mock_run_in_subprocess = mocker.patch( "covalent.cloud_resource_manager.core.CloudResourceManager._run_in_subprocess", ) @@ -439,17 +502,32 @@ def test_down(mocker, crm): "covalent.cloud_resource_manager.core.Path.unlink", ) - crm.down() + mocker.patch( + "covalent.cloud_resource_manager.core.os.path.getsize", + return_value=2, + ) + + crm.down(print_callback=None) mock_get_tf_path.assert_called_once() mock_get_tf_statefile_path.assert_called_once() + cmd = " ".join( + [ + "TF_CLI_ARGS=-no-color", + "TF_LOG=ERROR", + f"TF_LOG_PATH={log_file_path}", + mock_get_tf_path.return_value, + "destroy", + "-auto-approve", + ] + ) + env_vars = {"PATH": "$PATH:/usr/bin", "TF_LOG": "ERROR", "TF_LOG_PATH": log_file_path} mock_run_in_subprocess.assert_called_once_with( - cmd=f"{mock_get_tf_path.return_value} destroy -auto-approve -state={test_tf_state_file}", - workdir=crm.executor_tf_path, + cmd=cmd, print_callback=None, workdir=crm.executor_tf_path, env_vars=env_vars ) - assert mock_path_exists.call_count == 2 - assert mock_path_unlink.call_count == 3 + assert mock_path_exists.call_count == 5 + assert mock_path_unlink.call_count == 4 def test_status(mocker, crm): @@ -459,7 +537,7 @@ def test_status(mocker, crm): test_tf_path = "test_tf_path" test_tf_state_file = "test_tf_state_file" - + log_file_path = os.path.join(crm.executor_tf_path + "/terraform-error.log") mock_get_tf_path = mocker.patch( "covalent.cloud_resource_manager.core.CloudResourceManager._get_tf_path", return_value=test_tf_path, @@ -470,6 +548,10 @@ def test_status(mocker, crm): return_value=test_tf_state_file, ) + mocker.patch( + "covalent.cloud_resource_manager.core.CloudResourceManager._validation_docker", + ) + mock_run_in_subprocess = mocker.patch( "covalent.cloud_resource_manager.core.CloudResourceManager._run_in_subprocess", ) @@ -481,4 +563,5 @@ def test_status(mocker, crm): mock_run_in_subprocess.assert_called_once_with( cmd=f"{test_tf_path} state list -state={test_tf_state_file}", workdir=crm.executor_tf_path, + env_vars={"PATH": "$PATH:/usr/bin", "TF_LOG": "ERROR", "TF_LOG_PATH": log_file_path}, )