diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a928eabf..0e443e40c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [UNRELEASED] +### Changed + +- Modified the executor core to change the file path for terraform state files to store and read from **.config/executor_plugins** folder +- Clean up any half-done/dirty deployed resources post **deploy up** + ### Operations - Fixed nightly workflow's calling of other workflows. @@ -349,7 +354,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Operations -- Respecting node version as specified in `.nvmrc` file for tests workflow +- Respecting node version as specified in `.nvmrc` file for tests workflow - Bumped versions in pre-commit config - Added prettier for markdown files. - Reduce the number of pinned version numbers in the `setup.py`, `requirements.txt`, and `requirements-client.txt` @@ -5260,4 +5265,4 @@ Installed executor plugins don't have to be referred to by their full module nam - CHANGELOG.md to track changes (this file). - Semantic versioning in VERSION. -- CI pipeline job to enforce versioning. +- CI pipeline job to enforce versioning. \ No newline at end of file diff --git a/covalent/cloud_resource_manager/core.py b/covalent/cloud_resource_manager/core.py index eb63bb604..3d2a2952a 100644 --- a/covalent/cloud_resource_manager/core.py +++ b/covalent/cloud_resource_manager/core.py @@ -27,6 +27,7 @@ from typing import Any, Callable, Dict, List, Optional, Sequence, Union from covalent._shared_files.config import set_config +from covalent._shared_files.defaults import get_default_sdk_config from covalent.executor import _executor_manager logger = logging.getLogger() @@ -34,6 +35,7 @@ handler = logging.StreamHandler(sys.stderr) logger.addHandler(handler) logger.propagate = False +sdk_constants = get_default_sdk_config() def get_executor_module(executor_name: str) -> ModuleType: @@ -140,6 +142,7 @@ def __init__( options: Optional[Dict[str, str]] = None, ): self.executor_name = executor_name + self.executor_tfstate_path = self._get_tf_state_path(sdk_constants["executor_dir"]) self.executor_tf_path = str( Path(executor_module_path).expanduser().resolve() / "assets" / "infra" ) @@ -163,7 +166,7 @@ def __init__( self._terraform_log_env_vars = { "TF_LOG": "ERROR", - "TF_LOG_PATH": os.path.join(self.executor_tf_path, "terraform-error.log"), + "TF_LOG_PATH": os.path.join(self.executor_tfstate_path, "terraform-error.log"), } def _poll_process(self, process: subprocess.Popen, print_callback: Callable) -> int: @@ -190,7 +193,9 @@ def _parse_terraform_error_log(self) -> List[str]: List of lines in the terraform error log. """ - with open(Path(self.executor_tf_path) / "terraform-error.log", "r", encoding="UTF-8") as f: + with open( + Path(self.executor_tfstate_path) / "terraform-error.log", "r", encoding="UTF-8" + ) as f: lines = f.readlines() for _, line in enumerate(lines): error_index = line.strip().find("error:") @@ -199,7 +204,7 @@ def _parse_terraform_error_log(self) -> List[str]: logger.error(error_message) return lines - def _terraform_error_validator(self, tfstate_path: str) -> bool: + def _terraform_error_validator(self, tfstate_path: str) -> str: """ Terraform error validator checks whether any terraform-error.log files existence and validate last line. Args: None @@ -209,13 +214,13 @@ def _terraform_error_validator(self, tfstate_path: str) -> bool: down - if terraform-error.log is empty and tfstate file not exists. *down - if terraform-error.log is not empty and 'On destroy' at last line. """ - tf_error_file = os.path.join(self.executor_tf_path, "terraform-error.log") + tf_error_file = os.path.join(self.executor_tfstate_path, "terraform-error.log") if os.path.exists(tf_error_file) and os.path.getsize(tf_error_file) > 0: with open(tf_error_file, "r", encoding="UTF-8") as error_file: indicator = error_file.readlines()[-1] if indicator == "On deploy": return "*up" - elif indicator == "On destroy": + if indicator == "On destroy": return "*down" return "up" if os.path.exists(tfstate_path) else "down" @@ -253,7 +258,7 @@ def _log_error_msg(self, cmd) -> None: Args: cmd: str - terraform-error.log file path. """ with open( - Path(self.executor_tf_path) / "terraform-error.log", "a", encoding="UTF-8" + Path(self.executor_tfstate_path) / "terraform-error.log", "a", encoding="UTF-8" ) as file: if any(tf_cmd in cmd for tf_cmd in ["init", "plan", "apply"]): file.write("\nOn deploy") @@ -305,6 +310,9 @@ def _run_in_subprocess( if returncode != 0: self._log_error_msg(cmd=cmd) + if "terraform apply" in cmd: + logger.error("Deployment interrupted. Rolling back the deployed resources...") + self.down(print_callback=print_callback) _, stderr = proc.communicate() raise subprocess.CalledProcessError( @@ -389,7 +397,24 @@ def _get_tf_statefile_path(self) -> str: """ # Saving in a directory which doesn't get deleted on purge - return str(Path(self.executor_tf_path) / "terraform.tfstate") + return str(Path(self.executor_tfstate_path) / "terraform.tfstate") + + def _get_tf_state_path(self, parent_path) -> str: + """ + Get the terraform state file path + + Args: + None + + Returns: + Path to terraform state file + + """ + # Saving in a directory which doesn't get deleted on purge + state_path = os.path.join(parent_path, self.executor_name) + if not os.path.exists(state_path): + os.makedirs(state_path) + return state_path def up(self, print_callback: Callable, dry_run: bool = True) -> None: """ @@ -404,12 +429,29 @@ def up(self, print_callback: Callable, dry_run: bool = True) -> None: """ terraform = self._get_tf_path() self._validation_docker() - tfvars_file = Path(self.executor_tf_path) / "terraform.tfvars" + tfvars_file = Path(self.executor_tfstate_path) / "terraform.tfvars" tf_executor_config_file = Path(self.executor_tf_path) / f"{self.executor_name}.conf" tf_init = " ".join([terraform, "init"]) - tf_plan = " ".join([terraform, "plan", "-out", "tf.plan"]) - tf_apply = " ".join([terraform, "apply", "tf.plan"]) + tf_plan = " ".join( + [ + terraform, + "plan", + "--var-file", + f"{tfvars_file}", + "-out", + f"{self.executor_tfstate_path}/tf.plan", + ] + ) + tf_apply = " ".join( + [ + terraform, + "apply", + "-state-out", + self._get_tf_statefile_path(), + f"{self.executor_tfstate_path}/tf.plan", + ] + ) terraform_log_file = self._terraform_log_env_vars["TF_LOG_PATH"] if Path(terraform_log_file).exists(): @@ -431,7 +473,6 @@ def up(self, print_callback: Callable, dry_run: bool = True) -> None: for key, value in infra_settings.items(): if "default" in value: tf_vars_env_dict[f"TF_VAR_{key}"] = value["default"] - if value["default"]: f.write(f'{key}={self._convert_to_tfvar(value["default"])}\n') @@ -480,7 +521,7 @@ def down(self, print_callback: Callable) -> None: terraform = self._get_tf_path() self._validation_docker() tf_state_file = self._get_tf_statefile_path() - tfvars_file = Path(self.executor_tf_path) / "terraform.tfvars" + tfvars_file = Path(self.executor_tfstate_path) / "terraform.tfvars" terraform_log_file = self._terraform_log_env_vars["TF_LOG_PATH"] tf_destroy = " ".join( @@ -490,6 +531,8 @@ def down(self, print_callback: Callable) -> None: f"TF_LOG_PATH={terraform_log_file}", terraform, "destroy", + "--state", + self._get_tf_statefile_path(), "-auto-approve", ] ) @@ -506,9 +549,6 @@ def down(self, print_callback: Callable) -> None: if Path(tfvars_file).exists(): Path(tfvars_file).unlink() - if Path(terraform_log_file).exists() and os.path.getsize(terraform_log_file) == 0: - Path(terraform_log_file).unlink() - if Path(tf_state_file).exists(): Path(tf_state_file).unlink() if Path(f"{tf_state_file}.backup").exists(): @@ -531,9 +571,9 @@ def status(self) -> None: """ terraform = self._get_tf_path() self._validation_docker() - tf_state_file = self._get_tf_statefile_path() - - tf_state = " ".join([terraform, "state", "list", f"-state={tf_state_file}"]) + tf_state = " ".join( + [terraform, "state", "list", f"-state={self._get_tf_statefile_path()}"] + ) # Run `terraform state list` return self._run_in_subprocess(cmd=tf_state, env_vars=self._terraform_log_env_vars)