-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Executor improvements Part 01 #1924
base: develop
Are you sure you want to change the base?
Changes from all commits
3542d5d
74bb2de
31e5d34
ad230e2
a5ad8a1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,13 +27,15 @@ | |
from typing import Any, Callable, Dict, List, Optional, Sequence, Union | ||
|
||
from covalent._shared_files.config import set_config | ||
from covalent._shared_files.defaults import get_default_sdk_config | ||
from covalent.executor import _executor_manager | ||
|
||
logger = logging.getLogger() | ||
logger.setLevel(logging.ERROR) | ||
handler = logging.StreamHandler(sys.stderr) | ||
logger.addHandler(handler) | ||
logger.propagate = False | ||
sdk_constants = get_default_sdk_config() | ||
|
||
|
||
def get_executor_module(executor_name: str) -> ModuleType: | ||
|
@@ -140,6 +142,7 @@ def __init__( | |
options: Optional[Dict[str, str]] = None, | ||
): | ||
self.executor_name = executor_name | ||
self.executor_tfstate_path = self._get_tf_state_path(sdk_constants["executor_dir"]) | ||
self.executor_tf_path = str( | ||
Path(executor_module_path).expanduser().resolve() / "assets" / "infra" | ||
) | ||
|
@@ -163,7 +166,7 @@ def __init__( | |
|
||
self._terraform_log_env_vars = { | ||
"TF_LOG": "ERROR", | ||
"TF_LOG_PATH": os.path.join(self.executor_tf_path, "terraform-error.log"), | ||
"TF_LOG_PATH": os.path.join(self.executor_tfstate_path, "terraform-error.log"), | ||
} | ||
|
||
def _poll_process(self, process: subprocess.Popen, print_callback: Callable) -> int: | ||
|
@@ -190,7 +193,9 @@ def _parse_terraform_error_log(self) -> List[str]: | |
List of lines in the terraform error log. | ||
|
||
""" | ||
with open(Path(self.executor_tf_path) / "terraform-error.log", "r", encoding="UTF-8") as f: | ||
with open( | ||
Path(self.executor_tfstate_path) / "terraform-error.log", "r", encoding="UTF-8" | ||
) as f: | ||
lines = f.readlines() | ||
for _, line in enumerate(lines): | ||
error_index = line.strip().find("error:") | ||
|
@@ -199,7 +204,7 @@ def _parse_terraform_error_log(self) -> List[str]: | |
logger.error(error_message) | ||
return lines | ||
|
||
def _terraform_error_validator(self, tfstate_path: str) -> bool: | ||
def _terraform_error_validator(self, tfstate_path: str) -> str: | ||
""" | ||
Terraform error validator checks whether any terraform-error.log files existence and validate last line. | ||
Args: None | ||
|
@@ -209,13 +214,13 @@ def _terraform_error_validator(self, tfstate_path: str) -> bool: | |
down - if terraform-error.log is empty and tfstate file not exists. | ||
*down - if terraform-error.log is not empty and 'On destroy' at last line. | ||
""" | ||
tf_error_file = os.path.join(self.executor_tf_path, "terraform-error.log") | ||
tf_error_file = os.path.join(self.executor_tfstate_path, "terraform-error.log") | ||
if os.path.exists(tf_error_file) and os.path.getsize(tf_error_file) > 0: | ||
with open(tf_error_file, "r", encoding="UTF-8") as error_file: | ||
indicator = error_file.readlines()[-1] | ||
if indicator == "On deploy": | ||
return "*up" | ||
elif indicator == "On destroy": | ||
if indicator == "On destroy": | ||
return "*down" | ||
return "up" if os.path.exists(tfstate_path) else "down" | ||
|
||
|
@@ -253,7 +258,7 @@ def _log_error_msg(self, cmd) -> None: | |
Args: cmd: str - terraform-error.log file path. | ||
""" | ||
with open( | ||
Path(self.executor_tf_path) / "terraform-error.log", "a", encoding="UTF-8" | ||
Path(self.executor_tfstate_path) / "terraform-error.log", "a", encoding="UTF-8" | ||
) as file: | ||
if any(tf_cmd in cmd for tf_cmd in ["init", "plan", "apply"]): | ||
file.write("\nOn deploy") | ||
|
@@ -305,6 +310,9 @@ def _run_in_subprocess( | |
|
||
if returncode != 0: | ||
self._log_error_msg(cmd=cmd) | ||
if "terraform apply" in cmd: | ||
logger.error("Deployment interrupted. Rolling back the deployed resources...") | ||
self.down(print_callback=print_callback) | ||
|
||
_, stderr = proc.communicate() | ||
raise subprocess.CalledProcessError( | ||
|
@@ -389,7 +397,24 @@ def _get_tf_statefile_path(self) -> str: | |
|
||
""" | ||
# Saving in a directory which doesn't get deleted on purge | ||
return str(Path(self.executor_tf_path) / "terraform.tfstate") | ||
return str(Path(self.executor_tfstate_path) / "terraform.tfstate") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
def _get_tf_state_path(self, parent_path) -> str: | ||
""" | ||
Get the terraform state file path | ||
|
||
Args: | ||
None | ||
|
||
Returns: | ||
Path to terraform state file | ||
|
||
""" | ||
# Saving in a directory which doesn't get deleted on purge | ||
state_path = os.path.join(parent_path, self.executor_name) | ||
if not os.path.exists(state_path): | ||
os.makedirs(state_path) | ||
Comment on lines
+413
to
+416
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment as above. One minor thing is that if possible let's try to use |
||
return state_path | ||
|
||
def up(self, print_callback: Callable, dry_run: bool = True) -> None: | ||
""" | ||
|
@@ -404,12 +429,29 @@ def up(self, print_callback: Callable, dry_run: bool = True) -> None: | |
""" | ||
terraform = self._get_tf_path() | ||
self._validation_docker() | ||
tfvars_file = Path(self.executor_tf_path) / "terraform.tfvars" | ||
tfvars_file = Path(self.executor_tfstate_path) / "terraform.tfvars" | ||
tf_executor_config_file = Path(self.executor_tf_path) / f"{self.executor_name}.conf" | ||
|
||
tf_init = " ".join([terraform, "init"]) | ||
tf_plan = " ".join([terraform, "plan", "-out", "tf.plan"]) | ||
tf_apply = " ".join([terraform, "apply", "tf.plan"]) | ||
tf_plan = " ".join( | ||
[ | ||
terraform, | ||
"plan", | ||
"--var-file", | ||
f"{tfvars_file}", | ||
"-out", | ||
f"{self.executor_tfstate_path}/tf.plan", | ||
] | ||
) | ||
tf_apply = " ".join( | ||
[ | ||
terraform, | ||
"apply", | ||
"-state-out", | ||
self._get_tf_statefile_path(), | ||
f"{self.executor_tfstate_path}/tf.plan", | ||
] | ||
) | ||
terraform_log_file = self._terraform_log_env_vars["TF_LOG_PATH"] | ||
|
||
if Path(terraform_log_file).exists(): | ||
|
@@ -431,7 +473,6 @@ def up(self, print_callback: Callable, dry_run: bool = True) -> None: | |
for key, value in infra_settings.items(): | ||
if "default" in value: | ||
tf_vars_env_dict[f"TF_VAR_{key}"] = value["default"] | ||
|
||
if value["default"]: | ||
f.write(f'{key}={self._convert_to_tfvar(value["default"])}\n') | ||
|
||
|
@@ -480,7 +521,7 @@ def down(self, print_callback: Callable) -> None: | |
terraform = self._get_tf_path() | ||
self._validation_docker() | ||
tf_state_file = self._get_tf_statefile_path() | ||
tfvars_file = Path(self.executor_tf_path) / "terraform.tfvars" | ||
tfvars_file = Path(self.executor_tfstate_path) / "terraform.tfvars" | ||
terraform_log_file = self._terraform_log_env_vars["TF_LOG_PATH"] | ||
|
||
tf_destroy = " ".join( | ||
|
@@ -490,6 +531,8 @@ def down(self, print_callback: Callable) -> None: | |
f"TF_LOG_PATH={terraform_log_file}", | ||
terraform, | ||
"destroy", | ||
"--state", | ||
self._get_tf_statefile_path(), | ||
"-auto-approve", | ||
] | ||
) | ||
|
@@ -506,9 +549,6 @@ def down(self, print_callback: Callable) -> None: | |
if Path(tfvars_file).exists(): | ||
Path(tfvars_file).unlink() | ||
|
||
if Path(terraform_log_file).exists() and os.path.getsize(terraform_log_file) == 0: | ||
Path(terraform_log_file).unlink() | ||
|
||
if Path(tf_state_file).exists(): | ||
Path(tf_state_file).unlink() | ||
if Path(f"{tf_state_file}.backup").exists(): | ||
|
@@ -531,9 +571,9 @@ def status(self) -> None: | |
""" | ||
terraform = self._get_tf_path() | ||
self._validation_docker() | ||
tf_state_file = self._get_tf_statefile_path() | ||
|
||
tf_state = " ".join([terraform, "state", "list", f"-state={tf_state_file}"]) | ||
tf_state = " ".join( | ||
[terraform, "state", "list", f"-state={self._get_tf_statefile_path()}"] | ||
) | ||
|
||
# Run `terraform state list` | ||
return self._run_in_subprocess(cmd=tf_state, env_vars=self._terraform_log_env_vars) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use
get_config
instead of directly importing the defaults since they might've been changed by the user in the config file directly. Usingget_config
with specifically what is needed (instead of the entire sdk config) will take that into consideration as well.