diff --git a/deployer/cli.py b/deployer/cli.py index 6453fe3..ba0d504 100644 --- a/deployer/cli.py +++ b/deployer/cli.py @@ -283,7 +283,8 @@ def check( **This command can be used to check pipelines in a Continuous Integration workflow.** """ - from deployer.pipeline_checks import Pipelines + if all and pipeline_name is not None: + raise typer.BadParameter("Please specify either --all or a pipeline name") if len(PipelineName.__members__) == 0: raise ValueError( @@ -291,15 +292,14 @@ def check( f" ('{PIPELINE_ROOT_PATH}')" ) + from deployer.pipeline_checks import Pipelines + if all: logger.info("Checking all pipelines") pipelines_to_check = PipelineName.__members__.values() elif pipeline_name is not None: logger.info(f"Checking pipeline {pipeline_name}") pipelines_to_check = [pipeline_name] - else: - raise ValueError("Please specify either --all or a pipeline name") - if config_filepath is None: to_check = { p.value: list_config_filepaths(CONFIG_ROOT_PATH, p.value) for p in pipelines_to_check diff --git a/deployer/pipeline_checks.py b/deployer/pipeline_checks.py index 9352d21..7d63390 100644 --- a/deployer/pipeline_checks.py +++ b/deployer/pipeline_checks.py @@ -18,15 +18,10 @@ from deployer.utils.exceptions import BadConfigError from deployer.utils.logging import disable_logger from deployer.utils.models import CustomBaseModel, create_model_from_pipeline -from deployer.utils.utils import ( - import_pipeline_from_dir, - make_enum_from_python_package_dir, -) +from deployer.utils.utils import import_pipeline_from_dir PipelineConfigT = TypeVar("PipelineConfigT") -PipelineName = make_enum_from_python_package_dir(PIPELINE_ROOT_PATH) - class ConfigDynamicModel(CustomBaseModel, Generic[PipelineConfigT]): """Model used to generate checks for configs based on pipeline dynamic model""" @@ -56,7 +51,7 @@ class ConfigsDynamicModel(CustomBaseModel, Generic[PipelineConfigT]): class Pipeline(CustomBaseModel): """Validation of one pipeline and its configs""" - pipeline_name: PipelineName + pipeline_name: str config_paths: Annotated[List[Path], Field(validate_default=True)] = None @model_validator(mode="before") @@ -72,29 +67,27 @@ def pipeline(self) -> Any: """Import pipeline""" if getattr(self, "_pipeline", None) is None: with disable_logger("deployer.utils.utils"): - self._pipeline = import_pipeline_from_dir( - PIPELINE_ROOT_PATH, self.pipeline_name.value - ) + self._pipeline = import_pipeline_from_dir(PIPELINE_ROOT_PATH, self.pipeline_name) return self._pipeline @model_validator(mode="after") def import_pipeline(self): """Validate that the pipeline can be imported by calling pipeline computed field""" - logger.debug(f"Importing pipeline {self.pipeline_name.value}") + logger.debug(f"Importing pipeline {self.pipeline_name}") try: _ = self.pipeline - except Exception as e: + except (ImportError, ModuleNotFoundError) as e: raise ValueError(f"Pipeline import failed: {e.__repr__()}") # noqa: B904 return self @model_validator(mode="after") def compile_pipeline(self): """Validate that the pipeline can be compiled""" - logger.debug(f"Compiling pipeline {self.pipeline_name.value}") + logger.debug(f"Compiling pipeline {self.pipeline_name}") try: with disable_logger("deployer.pipeline_deployer"): VertexPipelineDeployer( - pipeline_name=self.pipeline_name.value, + pipeline_name=self.pipeline_name, pipeline_func=self.pipeline, local_package_path=TEMP_LOCAL_PACKAGE_PATH, ).compile() @@ -105,7 +98,7 @@ def compile_pipeline(self): @model_validator(mode="after") def validate_configs(self): """Validate configs against pipeline parameters definition""" - logger.debug(f"Validating configs for pipeline {self.pipeline_name.value}") + logger.debug(f"Validating configs for pipeline {self.pipeline_name}") PipelineDynamicModel = create_model_from_pipeline(self.pipeline) ConfigsModel = ConfigsDynamicModel[PipelineDynamicModel] ConfigsModel.model_validate( diff --git a/deployer/utils/utils.py b/deployer/utils/utils.py index 1879307..1b8acfb 100644 --- a/deployer/utils/utils.py +++ b/deployer/utils/utils.py @@ -20,7 +20,7 @@ def make_enum_from_python_package_dir(dir_path: Path, raise_if_not_found: bool = raise FileNotFoundError(f"Directory {dir_path_} not found.") file_paths = dir_path_.glob("*.py") enum_dict = {x.stem: x.stem for x in file_paths if x.stem != "__init__"} - FileNamesEnum = Enum("PipelineNames", enum_dict) + FileNamesEnum = Enum(dir_path_.stem, enum_dict) return FileNamesEnum