diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 6e89f7d..85cfe85 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -106,7 +106,7 @@ jobs: # Cannot check more than compile action here, need GCP environment for upload, run, schedule run: | cd example - poetry run -C .. vertex-deployer -log DEBUG deploy dummy_pipeline --compile --env-file example.env --skip-validation + poetry run -C .. vertex-deployer -log DEBUG deploy dummy_pipeline broken_pipeline --compile --env-file example.env --skip-validation - name: Test create command run: | diff --git a/.github/workflows/deploy_docs.yaml b/.github/workflows/deploy_docs.yaml index e73739b..0f703e2 100644 --- a/.github/workflows/deploy_docs.yaml +++ b/.github/workflows/deploy_docs.yaml @@ -1,9 +1,9 @@ name: Deploy doc to GitHub Pages on: - workflow_run: - workflows: ["CI and Release on main"] - types: [completed] + push: + branches: + - 'main' env: python-version: "3.10" diff --git a/README.md b/README.md index 83369c3..a06b066 100644 --- a/README.md +++ b/README.md @@ -351,9 +351,9 @@ To check that your pipelines are valid, you can use the `check` command. It uses - check that your pipeline can be compiled - check that all configs related to the pipeline are respecting the pipeline definition (using a Pydantic model based on pipeline signature) -To validate one specific pipeline: +To validate one or multiple pipeline(s): ```bash -vertex-deployer check dummy_pipeline +vertex-deployer check dummy_pipeline ``` To validate all pipelines in the `vertex/pipelines` folder: @@ -491,6 +491,7 @@ create │ ├─ settings.py │ └─ utils │ ├─ config.py +│ ├─ console.py │ ├─ exceptions.py │ ├─ logging.py │ ├─ models.py diff --git a/deployer/cli.py b/deployer/cli.py index 178a504..f68a704 100644 --- a/deployer/cli.py +++ b/deployer/cli.py @@ -8,7 +8,7 @@ import typer from loguru import logger from pydantic import ValidationError -from rich.prompt import Prompt +from rich.prompt import Confirm, Prompt from typing_extensions import Annotated from deployer import constants @@ -81,7 +81,7 @@ def main( def pipeline_name_callback(ctx: typer.Context, value: Union[str, bool]) -> Union[str, bool]: - """Callback to check that the pipeline name is valid. Also used for 'all' option.""" + """Callback to check that the pipeline name is valid.""" if value is None: # None is allowed for optional arguments return value @@ -93,22 +93,33 @@ def pipeline_name_callback(ctx: typer.Context, value: Union[str, bool]) -> Union f"'{ctx.obj['settings'].pipelines_root_path}'" ) - if isinstance(value, str): - if value not in pipeline_names.__members__: - raise typer.BadParameter( - f"Pipeline '{value}' not found at '{ctx.obj['settings'].pipelines_root_path}'." - f"\nAvailable pipelines: {list(pipeline_names.__members__)}" - ) + if ctx.params.get("all", False): + to_check = [x.value for x in pipeline_names.__members__.values()] + elif isinstance(value, str): + to_check = [value] + elif isinstance(value, list): + if len(value) == 0: + raise typer.BadParameter("No pipeline names specified.") + to_check = value + else: + raise typer.BadParameter(f"Invalid value for pipeline_names: {value}") + + to_raise = [v for v in to_check if v not in pipeline_names.__members__] + if len(to_raise) > 0: + raise typer.BadParameter( + f"Pipelines {to_raise} not found at '{ctx.obj['settings'].pipelines_root_path}'." + f"\nAvailable pipelines: {list(pipeline_names.__members__)}" + ) return value @app.command(no_args_is_help=True) def deploy( # noqa: C901 ctx: typer.Context, - pipeline_name: Annotated[ - str, + pipeline_names: Annotated[ + List[str], typer.Argument( - ..., help="The name of the pipeline to run.", callback=pipeline_name_callback + ..., help="The names of the pipeline to run.", callback=pipeline_name_callback ), ], env_file: Annotated[ @@ -167,7 +178,7 @@ def deploy( # noqa: C901 ), ] = constants.DEFAULT_SCHEDULER_TIMEZONE, tags: Annotated[ - List[str], typer.Option(help="The tags to use when uploading the pipeline.") + Optional[List[str]], typer.Option(help="The tags to use when uploading the pipeline.") ] = constants.DEFAULT_TAGS, config_filepath: Annotated[ Optional[Path], @@ -244,77 +255,84 @@ def deploy( # noqa: C901 "Both --config-filepath and --config-name are provided." " Please specify only one to run or schedule a pipeline." ) + if config_filepath is not None and len(pipeline_names) > 1: + raise typer.BadParameter( + "Multiple pipelines specified with --config-filepath." + " Please specify a --config-name that will be used for each pipeline." + " Or specify a single pipeline to use the --config-filepath." + ) deployer_settings: DeployerSettings = ctx.obj["settings"] - pipeline_func = import_pipeline_from_dir(deployer_settings.pipelines_root_path, pipeline_name) - from deployer.pipeline_deployer import VertexPipelineDeployer - deployer = VertexPipelineDeployer( - project_id=vertex_settings.PROJECT_ID, - region=vertex_settings.GCP_REGION, - staging_bucket_name=vertex_settings.VERTEX_STAGING_BUCKET_NAME, - service_account=vertex_settings.VERTEX_SERVICE_ACCOUNT, - pipeline_name=pipeline_name, - pipeline_func=pipeline_func, - gar_location=vertex_settings.GAR_LOCATION, - gar_repo_id=vertex_settings.GAR_PIPELINES_REPO_ID, - local_package_path=local_package_path, - ) + for pipeline_name in pipeline_names: + pipeline_func = import_pipeline_from_dir( + deployer_settings.pipelines_root_path, pipeline_name + ) - if run or schedule: - if config_name is not None: - config_filepath = ( - Path(deployer_settings.config_root_path) / pipeline_name / config_name - ) - parameter_values, input_artifacts = load_config(config_filepath) - - if compile: - with console.status("Compiling pipeline..."): - deployer.compile() - - if upload: - with console.status("Uploading pipeline..."): - deployer.upload_to_registry(tags=tags) - - if run: - with console.status("Running pipeline..."): - deployer.run( - enable_caching=enable_caching, - parameter_values=parameter_values, - experiment_name=experiment_name, - input_artifacts=input_artifacts, - tag=tags[0] if tags else None, - ) + deployer = VertexPipelineDeployer( + project_id=vertex_settings.PROJECT_ID, + region=vertex_settings.GCP_REGION, + staging_bucket_name=vertex_settings.VERTEX_STAGING_BUCKET_NAME, + service_account=vertex_settings.VERTEX_SERVICE_ACCOUNT, + pipeline_name=pipeline_name, + pipeline_func=pipeline_func, + gar_location=vertex_settings.GAR_LOCATION, + gar_repo_id=vertex_settings.GAR_PIPELINES_REPO_ID, + local_package_path=local_package_path, + ) - if schedule: - with console.status("Scheduling pipeline..."): - cron = cron.replace("-", " ") # ugly fix to allow cron expression as env variable - deployer.schedule( - cron=cron, - enable_caching=enable_caching, - parameter_values=parameter_values, - tag=tags[0] if tags else None, - delete_last_schedule=delete_last_schedule, - scheduler_timezone=scheduler_timezone, - ) + if run or schedule: + if config_name is not None: + config_filepath = ( + Path(deployer_settings.config_root_path) / pipeline_name / config_name + ) + parameter_values, input_artifacts = load_config(config_filepath) + + if compile: + with console.status("Compiling pipeline..."): + deployer.compile() + + if upload: + with console.status("Uploading pipeline..."): + deployer.upload_to_registry(tags=tags) + + if run: + with console.status("Running pipeline..."): + deployer.run( + enable_caching=enable_caching, + parameter_values=parameter_values, + experiment_name=experiment_name, + input_artifacts=input_artifacts, + tag=tags[0] if tags else None, + ) + + if schedule: + with console.status("Scheduling pipeline..."): + cron = cron.replace("-", " ") # ugly fix to allow cron expression as env variable + deployer.schedule( + cron=cron, + enable_caching=enable_caching, + parameter_values=parameter_values, + tag=tags[0] if tags else None, + delete_last_schedule=delete_last_schedule, + scheduler_timezone=scheduler_timezone, + ) @app.command() def check( ctx: typer.Context, - pipeline_name: Annotated[ - Optional[str], + pipeline_names: Annotated[ + Optional[List[str]], typer.Argument( - ..., help="The name of the pipeline to run.", callback=pipeline_name_callback + ..., help="The names of the pipeline to run.", callback=pipeline_name_callback ), ] = None, all: Annotated[ bool, - typer.Option( - "--all", "-a", help="Whether to check all pipelines.", callback=pipeline_name_callback - ), + typer.Option("--all", "-a", help="Whether to check all pipelines."), ] = False, config_filepath: Annotated[ Optional[Path], @@ -336,6 +354,24 @@ def check( help="Whether to raise an error if the pipeline is not valid.", ), ] = False, + warn_defaults: Annotated[ + bool, + typer.Option( + "--warn-defaults / --no-warn-defaults", + "-wd / -nwd", + help="Whether to warn when a default value is used." + "and not overwritten in config file.", + ), + ] = True, + raise_for_defaults: Annotated[ + bool, + typer.Option( + "--raise-for-defaults / --no-raise-for-defaults", + "-rfd / -nrfd", + help="Whether to raise an validation error when a default value is used." + "and not overwritten in config file.", + ), + ] = False, ): """Check that pipelines are valid. @@ -353,7 +389,7 @@ def check( **This command can be used to check pipelines in a Continuous Integration workflow.** """ - if all and pipeline_name is not None: + if all and pipeline_names: raise typer.BadParameter("Please specify either --all or a pipeline name") from deployer.pipeline_checks import Pipelines @@ -361,23 +397,20 @@ def check( deployer_settings: DeployerSettings = ctx.obj["settings"] if all: - logger.info("Checking all pipelines") # unpack enum to get list of pipeline names - pipelines_to_check = [x.value for x in ctx.obj["pipeline_names"]] - elif pipeline_name is not None: - logger.info(f"Checking pipeline {pipeline_name}") - pipelines_to_check = [pipeline_name] + pipeline_names = [x.value for x in ctx.obj["pipeline_names"]] + logger.info(f"Checking pipelines {pipeline_names}") + if config_filepath is None: to_check = { - p: list_config_filepaths(deployer_settings.config_root_path, p) - for p in pipelines_to_check + p: list_config_filepaths(deployer_settings.config_root_path, p) for p in pipeline_names } else: - to_check = {p: [config_filepath] for p in pipelines_to_check} + to_check = {p: [config_filepath] for p in pipeline_names} try: with console.status("Checking pipelines..."): - Pipelines.model_validate( + pipelines_model = Pipelines.model_validate( { "pipelines": { p: { @@ -388,15 +421,18 @@ def check( } for p, config_filepaths in to_check.items() } - } + }, + context={"raise_for_defaults": raise_for_defaults}, ) except ValidationError as e: if raise_error: raise e - print_check_results_table(to_check, e) + print_check_results_table(to_check, validation_error=e) sys.exit(1) else: - print_check_results_table(to_check) + print_check_results_table( + to_check, pipelines_model=pipelines_model, warn_defaults=warn_defaults + ) @app.command(name="list") @@ -424,9 +460,9 @@ def list_pipelines( @app.command(name="create") def create_pipeline( ctx: typer.Context, - pipeline_name: Annotated[ - str, - typer.Argument(..., help="The name of the pipeline to create."), + pipeline_names: Annotated[ + List[str], + typer.Argument(..., help="The names of the pipeline to create."), ], config_type: Annotated[ ConfigType, @@ -434,14 +470,13 @@ def create_pipeline( ] = ConfigType.json, ): """Create files structure for a new pipeline.""" - if not re.match(r"^[a-zA-Z0-9_]+$", pipeline_name): + invalid_pipelines = [p for p in pipeline_names if not re.match(r"^[a-zA-Z0-9_]+$", p)] + if invalid_pipelines: raise typer.BadParameter( - f"Invalid Pipeline name: '{pipeline_name}'\n" + f"Invalid Pipeline name(s): '{invalid_pipelines}'\n" "Pipeline name must only contain alphanumeric characters and underscores" ) - logger.info(f"Creating pipeline {pipeline_name}") - deployer_settings: DeployerSettings = ctx.obj["settings"] for path in [deployer_settings.pipelines_root_path, deployer_settings.config_root_path]: @@ -452,25 +487,40 @@ def create_pipeline( f" or create it with 'mkdir -p {path}'." ) - pipeline_filepath = Path(deployer_settings.pipelines_root_path) / f"{pipeline_name}.py" - pipeline_filepath.touch(exist_ok=False) - pipeline_filepath.write_text( - constants.PIPELINE_MINIMAL_TEMPLATE.format(pipeline_name=pipeline_name) + existing_pipelines = [ + p for p in pipeline_names if (deployer_settings.pipelines_root_path / f"{p}.py").exists() + ] + if existing_pipelines: + raise typer.BadParameter(f"Pipelines {existing_pipelines} already exist.") + + console.print( + f"Creating pipeline {pipeline_names} with config type: [bold]{config_type}[/bold]" ) - try: - config_dirpath = Path(deployer_settings.config_root_path) / pipeline_name - config_dirpath.mkdir(exist_ok=True) - for config_name in ["test", "dev", "prod"]: - config_filepath = config_dirpath / f"{config_name}.{config_type}" - config_filepath.touch(exist_ok=False) - if config_type == ConfigType.py: - config_filepath.write_text(constants.PYTHON_CONFIG_TEMPLATE) - except Exception as e: - pipeline_filepath.unlink() - raise e + for pipeline_name in pipeline_names: + pipeline_filepath = deployer_settings.pipelines_root_path / f"{pipeline_name}.py" + pipeline_filepath.touch(exist_ok=False) + pipeline_filepath.write_text( + constants.PIPELINE_MINIMAL_TEMPLATE.format(pipeline_name=pipeline_name) + ) - logger.success(f"Pipeline {pipeline_name} created with configs in {config_dirpath}") + try: + config_dirpath = Path(deployer_settings.config_root_path) / pipeline_name + config_dirpath.mkdir(exist_ok=True) + for config_name in ["test", "dev", "prod"]: + config_filepath = config_dirpath / f"{config_name}.{config_type}" + config_filepath.touch(exist_ok=False) + if config_type == ConfigType.py: + config_filepath.write_text(constants.PYTHON_CONFIG_TEMPLATE) + except Exception as e: + pipeline_filepath.unlink() + raise e + + console.print( + f"Pipeline '{pipeline_name}' created at '{pipeline_filepath}'" + f" with config files: {[str(p) for p in config_dirpath.glob('*')]}. :sparkles:", + style="blue", + ) @app.command(name="init") @@ -480,7 +530,7 @@ def init_deployer(ctx: typer.Context): # noqa: C901 console.print("Welcome to Vertex Deployer!", style="blue") console.print("This command will help you getting fired up.", style="blue") - if Prompt.ask("Do you want to configure the deployer?", choices=["y", "n"]) == "y": + if Confirm.ask("Do you want to configure the deployer?"): pyproject_toml_filepath = find_pyproject_toml(Path.cwd().resolve()) if pyproject_toml_filepath is None: @@ -498,7 +548,7 @@ def init_deployer(ctx: typer.Context): # noqa: C901 update_pyproject_toml(pyproject_toml_filepath, new_deployer_settings) console.print("Configuration saved in pyproject.toml :sparkles:", style="blue") - if Prompt.ask("Do you want to build default folder structure", choices=["y", "n"]) == "y": + if Confirm.ask("Do you want to build default folder structure"): def create_file_or_dir(path: Path, text: str = ""): """Create a file (if text is provided) or a directory at path. Warn if path exists.""" @@ -519,14 +569,13 @@ def create_file_or_dir(path: Path, text: str = ""): Path("./.env"), "=\n".join(VertexPipelinesSettings.model_json_schema()["required"]) ) - if Prompt.ask("Do you want to create a pipeline?", choices=["y", "n"]) == "y": + if Confirm.ask("Do you want to create a pipeline?"): wrong_name = True while wrong_name: pipeline_name = Prompt.ask("What is the name of the pipeline?") - pipeline_path = Path(deployer_settings.pipelines_root_path) / f"{pipeline_name}.py" try: - create_pipeline(pipeline_name=pipeline_name) + create_pipeline(ctx, pipeline_names=[pipeline_name]) except typer.BadParameter as e: console.print(e, style="red") except FileExistsError: @@ -536,10 +585,6 @@ def create_file_or_dir(path: Path, text: str = ""): ) else: wrong_name = False - console.print( - f"Pipeline '{pipeline_name}' created at '{pipeline_path}'. :sparkles:", - style="blue", - ) console.print("All done :sparkles:", style="blue") diff --git a/deployer/constants.py b/deployer/constants.py index 3369d5f..51f0c51 100644 --- a/deployer/constants.py +++ b/deployer/constants.py @@ -26,3 +26,13 @@ def {pipeline_name}(): input_artifacts = {} """ + +PIPELINE_CHECKS_TABLE_COLUMNS = [ + "Status", + "Pipeline", + "Pipeline Error Message", + "Config File", + "Attribute", + "Config Error Type", + "Config Error Message", +] diff --git a/deployer/pipeline_checks.py b/deployer/pipeline_checks.py index d971903..31a9405 100644 --- a/deployer/pipeline_checks.py +++ b/deployer/pipeline_checks.py @@ -1,12 +1,13 @@ import shutil from pathlib import Path -from typing import Any, Dict, Generic, List, TypeVar +from typing import Any, Dict, Generic, List, Optional, TypeVar import kfp.dsl from loguru import logger from pydantic import Field, ValidationError, computed_field, model_validator from pydantic.functional_validators import ModelWrapValidatorHandler from pydantic_core import PydanticCustomError +from pydantic_core.core_schema import ValidationInfo from typing_extensions import Annotated, _AnnotatedAlias try: @@ -57,6 +58,7 @@ class Pipeline(CustomBaseModel): config_paths: Annotated[List[Path], Field(validate_default=True)] = None pipelines_root_path: Path config_root_path: Path + configs: Optional[Dict[str, ConfigDynamicModel]] = None # Optional because populated after @model_validator(mode="before") @classmethod @@ -69,6 +71,7 @@ def populate_config_names(cls, data: Any) -> Any: return data @computed_field + @property def pipeline(self) -> graph_component.GraphComponent: """Import pipeline""" if getattr(self, "_pipeline", None) is None: @@ -104,14 +107,16 @@ def compile_pipeline(self): return self @model_validator(mode="after") - def validate_configs(self): + def validate_configs(self, info: ValidationInfo): """Validate configs against pipeline parameters definition""" logger.debug(f"Validating configs for pipeline {self.pipeline_name}") pipelines_dynamic_model = create_model_from_func( - self.pipeline.pipeline_func, type_converter=_convert_artifact_type_to_str + self.pipeline.pipeline_func, + type_converter=_convert_artifact_type_to_str, + exclude_defaults=info.context.get("raise_for_defaults", False), ) config_model = ConfigsDynamicModel[pipelines_dynamic_model] - config_model.model_validate( + self.configs = config_model.model_validate( {"configs": {x.name: {"config_path": x} for x in self.config_paths}} ) return self diff --git a/deployer/settings.py b/deployer/settings.py index 86afe94..6cc3bd4 100644 --- a/deployer/settings.py +++ b/deployer/settings.py @@ -25,7 +25,7 @@ class _DeployerDeploySettings(CustomBaseModel): cron: Optional[str] = None delete_last_schedule: bool = False scheduler_timezone: str = constants.DEFAULT_SCHEDULER_TIMEZONE - tags: List[str] = constants.DEFAULT_TAGS + tags: Optional[List[str]] = constants.DEFAULT_TAGS config_filepath: Optional[Path] = None config_name: Optional[str] = None enable_caching: bool = False @@ -40,6 +40,8 @@ class _DeployerCheckSettings(CustomBaseModel): all: bool = False config_filepath: Optional[Path] = None raise_error: bool = False + warn_defaults: bool = True + raise_for_defaults: bool = False class _DeployerListSettings(CustomBaseModel): diff --git a/deployer/utils/config.py b/deployer/utils/config.py index 0ebb4ef..cba1495 100644 --- a/deployer/utils/config.py +++ b/deployer/utils/config.py @@ -32,7 +32,6 @@ def load_vertex_settings(env_file: Optional[Path] = None) -> VertexPipelinesSett """Load the settings from the environment.""" try: settings = VertexPipelinesSettings(_env_file=env_file, _env_file_encoding="utf-8") - print(settings) except ValidationError as e: msg = "Validation failed for VertexPipelinesSettings. " if env_file is not None: diff --git a/deployer/utils/console.py b/deployer/utils/console.py index 029bd97..eeda8c9 100644 --- a/deployer/utils/console.py +++ b/deployer/utils/console.py @@ -4,7 +4,7 @@ from pydantic import BaseModel from rich.console import Console -from rich.prompt import Prompt +from rich.prompt import Confirm, Prompt console = Console() @@ -21,10 +21,8 @@ def ask_user_for_model_fields(model: Type[BaseModel]) -> dict: set_fields = {} for field_name, field_info in model.model_fields.items(): if isclass(field_info.annotation) and issubclass(field_info.annotation, BaseModel): - answer = Prompt.ask( - f"Do you want to configure command {field_name}?", choices=["y", "n"], default="n" - ) - if answer == "y": + answer = Confirm.ask(f"Do you want to configure command {field_name}?", default=False) + if answer: set_fields[field_name] = ask_user_for_model_fields(field_info.annotation) else: @@ -36,13 +34,9 @@ def ask_user_for_model_fields(model: Type[BaseModel]) -> dict: choices = list(annotation.__members__) if isclass(annotation) and annotation == bool: - choices = ["y", "n"] - default = "y" if field_info.default else "n" - - answer = Prompt.ask(field_name, default=default, choices=choices) - - if isclass(annotation) and annotation == bool: - answer = answer == "y" + answer = Confirm.ask(field_name, default=default) + else: + answer = Prompt.ask(field_name, default=default, choices=choices) if answer != field_info.default: set_fields[field_name] = answer diff --git a/deployer/utils/models.py b/deployer/utils/models.py index 92fa7c7..13f22f0 100644 --- a/deployer/utils/models.py +++ b/deployer/utils/models.py @@ -10,7 +10,10 @@ class CustomBaseModel(BaseModel): # FIXME: arbitrary_types_allowed is a workaround to allow to pass # Vertex Pipelines Artifacts as parameters to a pipeline. model_config = ConfigDict( - extra="forbid", arbitrary_types_allowed=True, protected_namespaces=() + extra="forbid", + arbitrary_types_allowed=True, + protected_namespaces=(), + validate_default=True, ) @@ -30,6 +33,7 @@ def create_model_from_func( func: Callable, model_name: Optional[str] = None, type_converter: Optional[TypeConverterType] = None, + exclude_defaults: bool = False, ) -> CustomBaseModel: """Create a Pydantic model from pipeline parameters.""" if model_name is None: @@ -39,14 +43,14 @@ def create_model_from_func( type_converter = _dummy_type_converter func_signature = signature(func) - func_typing = { - p.name: type_converter(p.annotation) for p in func_signature.parameters.values() - } func_model = create_model( __model_name=model_name, __base__=CustomBaseModel, - **{name: (annotation, ...) for name, annotation in func_typing.items()}, + **{ + p.name: (type_converter(p.annotation), ... if exclude_defaults else p.default) + for p in func_signature.parameters.values() + }, ) return func_model @@ -57,8 +61,8 @@ class ChecksTableRow(CustomBaseModel): status: Literal["✅", "⚠️", "❌"] pipeline: str - pipeline_error_message: str = None + pipeline_error_message: Optional[str] = None config_file: str - attribute: str = None - config_error_type: str = None - config_error_message: str = None + attribute: Optional[str] = None + config_error_type: Optional[str] = None + config_error_message: Optional[str] = None diff --git a/deployer/utils/utils.py b/deployer/utils/utils.py index 1567345..1bd6cc7 100644 --- a/deployer/utils/utils.py +++ b/deployer/utils/utils.py @@ -1,15 +1,17 @@ import importlib import traceback import warnings +from collections import defaultdict from enum import Enum from pathlib import Path from types import TracebackType from typing import Any, Callable, Dict, List, Mapping, Optional, Protocol, Union from loguru import logger -from pydantic import ValidationError +from pydantic import BaseModel, ValidationError from rich.table import Table +from deployer.constants import PIPELINE_CHECKS_TABLE_COLUMNS from deployer.utils.console import console from deployer.utils.models import ChecksTableRow @@ -153,7 +155,7 @@ def dict_to_repr( dict_repr.append(" " * indent * depth + f"{k}") dict_repr.extend(dict_to_repr(v, v_ref, depth=depth + 1, indent=indent)) else: - if subdict.get(k): + if subdict.get(k) is not None: v_str = " " * indent * depth + f"[cyan]* {k}={v}[/cyan]" else: v_str = " " * indent * depth + f"[white]{k}={v}[/white]" @@ -191,34 +193,45 @@ def print_pipelines_list(pipelines_dict: Dict[str, list], with_configs: bool = F console.print(table) -def print_check_results_table( # noqa: C901 - to_check: Dict[str, list], validation_error: Optional[ValidationError] = None +def print_check_results_table( + to_check: Dict[str, list], + pipelines_model: Optional[Any] = None, + validation_error: Optional[ValidationError] = None, + warn_defaults: bool = True, ) -> None: - """This function prints a table of check results to the console. + """Print a table of check results to the console. Args: - to_check (dict[str, list]): A dictionary containing the pipelines to check + to_check (Dict[str, list]): A dictionary containing the pipelines to check as keys and the config filepaths as values. - validation_error (ValidationError): The validation error if any occurred during the check. + pipelines_model (Optional[Any], optional): A Pipelines model used to retrieve default + field and warn user when default value was used. Defaults to None. + validation_error (Optional[ValidationError], optional): The validation error if any + occurred during the check. Defaults to None. + warn_defaults (bool, optional): whether to warn when default values are used and not + overwritten in config file. Defaults to True. """ - val_error_dict = validation_error.errors() if validation_error else {} - parsed_val_error_dict = { - p: [v for v in val_error_dict if v["loc"][1] == p] for p in to_check.keys() - } + parsed_val_error_dict = _parse_validation_errors(validation_error) + parsed_warnings_dict = _get_warnings_for_default_value(pipelines_model) table = Table(show_header=True, header_style="bold", show_lines=True) - - table.add_column("Status", justify="center") - table.add_column("Pipeline") - table.add_column("Pipeline Error Message") - table.add_column("Config File") - table.add_column("Attribute") - table.add_column("Config Error Type") - table.add_column("Config Error Message") + for column in PIPELINE_CHECKS_TABLE_COLUMNS: + table.add_column(column) + table.columns[0].justify = "center" for pipeline_name, config_filepaths in to_check.items(): - errors = parsed_val_error_dict[pipeline_name] - if len(errors) == 0: + p_errors = parsed_val_error_dict.get(pipeline_name, {}) + p_warnings = parsed_warnings_dict.get(pipeline_name, {}) + + if len(p_errors) == 0 and len(p_warnings) == 0: + if len(config_filepaths) == 0: + row = ChecksTableRow( + status="⚠️", + pipeline=pipeline_name, + config_file="No config files found", + ) + table.add_row(*row.model_dump().values(), style="bold yellow") + for config_filepath in config_filepaths: row = ChecksTableRow( status="✅", @@ -226,50 +239,137 @@ def print_check_results_table( # noqa: C901 config_file=config_filepath.name, ) table.add_row(*row.model_dump().values(), style="green") - if len(config_filepaths) == 0: - row = ChecksTableRow( - status="⚠️", - pipeline=pipeline_name, - config_file="No configs found", - ) - table.add_row(*row.model_dump().values(), style="bold yellow") - elif len(errors) == 1 and len(errors[0]["loc"]) == 2: + elif len(p_errors) == 1 and p_errors.keys() == {"pipeline level error"}: + error = p_errors["pipeline level error"] row = ChecksTableRow( status="❌", pipeline=pipeline_name, - pipeline_error_message=errors[0]["msg"], + pipeline_error_message=error[0]["msg"], config_file="Could not check config files due to pipeline error.", ) table.add_row(*row.model_dump().values(), style="red") else: for config_filepath in config_filepaths: - error_rows = [] - for error in errors: - if error["loc"][3] == config_filepath.name: - error_row = {"type": error["type"], "msg": error["msg"]} - if len(error["loc"]) > 4: - error_row["attribute"] = error["loc"][5] - error_rows.append(error_row) - if error_rows: - row = ChecksTableRow( - status="❌", - pipeline=pipeline_name, - config_file=config_filepath.name, - config_error_type="\n".join([er["type"] for er in error_rows]), - attribute="\n".join([er.get("attribute", "") for er in error_rows]), - config_error_message="\n".join([er["msg"] for er in error_rows]), + config_error_type_l = [] + attribute_l = [] + config_error_message_l = [] + status, style = "✅", "green" + + warnings_ = p_warnings.get(config_filepath.name) + if warn_defaults and warnings_ is not None: + status, style = "⚠️", "yellow" + # yellow styling in error string if there are errors + warnings + config_error_type_l.extend( + [f"[yellow]{wr['type']}[/yellow]" for wr in warnings_] + ) + attribute_l.extend( + [f"[yellow]{wr.get('field', '')}[/yellow]" for wr in warnings_] ) - table.add_row(*row.model_dump().values(), style="red") - else: - row = ChecksTableRow( - status="✅", - pipeline=pipeline_name, - config_file=config_filepath.name, + config_error_message_l.extend( + [f"[yellow]{wr['msg']}[/yellow]" for wr in warnings_] ) - table.add_row(*row.model_dump().values(), style="green") + + errors_ = p_errors.get(config_filepath.name) + if errors_ is not None: + status, style = "❌", "red" + config_error_type_l.extend([f"{er['type']}" for er in errors_]) + attribute_l.extend([f"{er.get('field', '')}" for er in errors_]) + config_error_message_l.extend([f"{er['msg']}" for er in errors_]) + + row = ChecksTableRow( + status=status, + pipeline=pipeline_name, + config_file=config_filepath.name, + config_error_type="\n".join(config_error_type_l), + attribute="\n".join(attribute_l), + config_error_message="\n".join(config_error_message_l), + ) + table.add_row(*row.model_dump().values(), style=style) table.columns = [c for c in table.columns if "".join(c._cells) != ""] console.print(table) + + +def _parse_validation_errors( + validation_error: Optional[ValidationError], +) -> Dict[str, Dict[str, List[Dict[str, str]]]]: + """_summary_ + + Args: + validation_error (Optional[ValidationError]): _description_ + + Returns: + Dict[str, Dict[str, List[Dict[str, str]]]]: _description_ + """ + val_errors: List[Dict] = validation_error.errors() if validation_error else {} + + parsed_errors = defaultdict(dict) + for error in val_errors: + pipeline_name = error["loc"][1] + if len(error["loc"]) == 2: + parsed_errors[pipeline_name]["pipeline level error"] = [ + {"type": error["type"], "msg": error["msg"]} + ] + else: + config_name = error["loc"][3] + error_row = {"type": error["type"], "msg": error["msg"]} + parsed_errors[pipeline_name][config_name] = [] + if len(error["loc"]) > 4: + error_row["field"] = error["loc"][5] + parsed_errors[pipeline_name][config_name].append(error_row) + + return parsed_errors + + +def _get_warnings_for_default_value( + pipelines_model: Optional[Any], +) -> Dict[str, Dict[str, List[Dict[str, str]]]]: + """_summary_ + + Args: + pipelines_model (Optional[Any]): _description_ + + Returns: + Dict[str, Dict[str, List[Dict[str, str]]]]: _description_ + """ + if pipelines_model is None: + return {} + + parsed_warnings = defaultdict(dict) + + for pipeline_name, pipeline_configs in pipelines_model.pipelines.items(): + configs_models = pipeline_configs.configs.configs + for cfp in pipeline_configs.config_paths: + unset_fields_with_default = _get_unset_default_fields(configs_models[cfp.name].config) + if unset_fields_with_default: + parsed_warnings[pipeline_name][cfp.name] = [ + { + "type": "default_value", + "field": f["field"], + "msg": f"Using default value from pipeline definition: {f['default']}", + } + for f in unset_fields_with_default + ] + return parsed_warnings + + +def _get_unset_default_fields(model: BaseModel) -> List[Dict]: + """_summary_ + + Args: + model (BaseModel): _description_ + + Returns: + List[Dict]: _description_ + """ + if model is None: + return None + unset_fields = set(model.model_dump()) - set(model.model_dump(exclude_unset=True)) + unset_fields_with_default = [] + for field in unset_fields: + if getattr(model, field): + unset_fields_with_default.append({"field": field, "default": getattr(model, field)}) + return unset_fields_with_default diff --git a/docs/CLI_REFERENCE.md b/docs/CLI_REFERENCE.md index 8c41c18..ceb88ec 100644 --- a/docs/CLI_REFERENCE.md +++ b/docs/CLI_REFERENCE.md @@ -44,14 +44,18 @@ pipeline parameters definition, using Pydantic. **Usage**: ```console -$ vertex-deployer check [OPTIONS] +$ vertex-deployer check [OPTIONS] PIPELINE_NAMES... ``` -**Options**: +**Arguments**: -* `--pipeline-name []` +* `PIPELINE_NAMES...`: The names of the pipeline to create. [optional] + +**Options**: * `--all, -a / --no-all`: Whether to check all pipelines. [default: no-all] * `--config-filepath, -cfp PATH`: Path to the json/py file with parameter values and input artifacts to check. If not specified, all config files in the pipeline dir will be checked. +* `--warn-defaults, -wd / --no-warn-defaults, -nwd`: Whether to warn when a default value is used and not overwritten in config file. [default: warn-defaults] +* `raise-for-defaults, -rfd / --no-raise-for-defaults, -nrfd`: Whether to raise an validation error when a default value is used.and not overwritten in config file. [default: no-raise-for-defaults] * `--raise-error, -re / --no-raise-error, -nre`: Whether to raise an error if the pipeline is not valid. [default: no-raise-error] * `--help`: Show this message and exit. @@ -77,12 +81,12 @@ Create files structure for a new pipeline. **Usage**: ```console -$ vertex-deployer create [OPTIONS] PIPELINE_NAME +$ vertex-deployer create [OPTIONS] PIPELINE_NAMES... ``` **Arguments**: -* `PIPELINE_NAME`: The name of the pipeline to create. [required] +* `PIPELINE_NAMES...`: The names of the pipeline to create. [required] **Options**: @@ -101,7 +105,7 @@ $ vertex-deployer deploy [OPTIONS] PIPELINE_NAME **Arguments**: -* `PIPELINE_NAME`: The name of the pipeline to deploy. [required] +* `PIPELINE_NAMES...`: The names of the pipeline to deploy. [required] **Options**: @@ -112,6 +116,7 @@ $ vertex-deployer deploy [OPTIONS] PIPELINE_NAME * `--schedule, -s / --no-schedule, -ns`: Whether to create a schedule for the pipeline. [default: no-schedule] * `--cron TEXT`: Cron expression for scheduling the pipeline. To pass it to the CLI, use hyphens e.g. `0-10-*-*-*`. * `--delete-last-schedule, -dls / --no-delete-last-schedule`: Whether to delete the previous schedule before creating a new one. [default: no-delete-last-schedule] +* ` --scheduler-timezone TEXT` : IANA Timezone for the scheduler. [default: Europe/Paris] * `--tags TEXT`: The tags to use when uploading the pipeline. [default: latest] * `--config-filepath, -cfp PATH`: Path to the json/py file with parameter values and input artifacts to use when running the pipeline. * `--config-name, -cn TEXT`: Name of the json/py file with parameter values and input artifacts to use when running the pipeline. It must be in the pipeline config dir. e.g. `config_dev.json` for `./vertex/configs/{pipeline-name}/config_dev.json`. diff --git a/docs/configuration.md b/docs/configuration.md index 5e4839e..ca4b20f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -12,12 +12,12 @@ You can override default options for specific CLI commands in the pyproject.toml You can also override global deployer options such as logging level, or pipelines / config root path to better fit your repo structure. ```toml title="pyproject.toml" -[tool.vertex-deployer] +[tool.vertex_deployer] log-level = "INFO" pipelines-root-path = "./vertex/pipelines" config-root-path = "./configs" -[tool.vertex-deployer.deploy] +[tool.vertex_deployer.deploy] enable-cache = true env-file = "example.env" compile = true diff --git a/pyproject.toml b/pyproject.toml index 1b52874..09027b0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -99,6 +99,23 @@ tag_format = "{version}" [tool.semantic_release.changelog] exclude_commit_patterns = ['''^chore\(release\).*'''] +[tool.semantic_release.commit_parser_options] +allowed_tags = [ + "build", + "chore", + "ci", + "docs", + "enh", + "feat", + "fix", + "perf", + "style", + "refactor", + "test", +] +minor_tags = ["feat"] +patch_tags = ["build", "enh", "fix", "perf"] + [tool.vertex_deployer] log_level = "DEBUG" diff --git a/templates/CHANGELOG.md.j2 b/templates/CHANGELOG.md.j2 index 04906cb..b66b395 100644 --- a/templates/CHANGELOG.md.j2 +++ b/templates/CHANGELOG.md.j2 @@ -2,9 +2,11 @@ {% if context.history.unreleased | length > 0 %} {# UNRELEASED #} ## Unreleased -{% for type_, commits in context.history.unreleased | dictsort %} +{% for type_, commits in context.history.unreleased | dictsort %}{% if type_ == "enh" %} +### {{ "Enhancements" | capitalize }} +{% else %} ### {{ type_ | capitalize }} -{% for commit in commits %}{% if type_ != "unknown" %} +{% endif %}{% for commit in commits %}{% if type_ != "unknown" %} * {{ commit.message.rstrip() }} ([`{{ commit.short_hash }}`]({{ commit.hexsha | commit_hash_url }})) {% else %} * {{ commit.message.rstrip() }} ([`{{ commit.short_hash }}`]({{ commit.hexsha | commit_hash_url }})) @@ -12,9 +14,11 @@ {% for version, release in context.history.released.items() %} {# RELEASED #} ## {{ version.as_tag() }} ({{ release.tagged_date.strftime("%Y-%m-%d") }}) -{% for type_, commits in release["elements"] | dictsort %} +{% for type_, commits in release["elements"] | dictsort %}{% if type_ == "enh" %} +### {{ "Enhancements" | capitalize }} +{% else %} ### {{ type_ | capitalize }} -{% for commit in commits %}{% if type_ != "unknown" %} +{% endif %}{% for commit in commits %}{% if type_ != "unknown" %} * {{ commit.message.rstrip() }} ([`{{ commit.short_hash }}`]({{ commit.hexsha | commit_hash_url }})) {% else %} * {{ commit.message.rstrip() }} ([`{{ commit.short_hash }}`]({{ commit.hexsha | commit_hash_url }})) diff --git a/tests/unit_tests/test_console.py b/tests/unit_tests/test_console.py index 06f20c5..7ab4333 100644 --- a/tests/unit_tests/test_console.py +++ b/tests/unit_tests/test_console.py @@ -8,36 +8,38 @@ class TestAskUserForModelFields: def test_ask_user_for_input_for_each_field(self): - # Arrange + # Given class TestModel(BaseModel): field1: str field2: int field3: bool = False - # Act + # When with patch("rich.prompt.Prompt.ask") as mock_prompt: - mock_prompt.side_effect = ["value1", 2, "y"] - result = ask_user_for_model_fields(TestModel) + with patch("rich.prompt.Confirm.ask") as mock_confirm: + mock_confirm.side_effect = [True] + mock_prompt.side_effect = ["value1", 2] + result = ask_user_for_model_fields(TestModel) - # Assert + # Then assert result == {"field1": "value1", "field2": 2, "field3": True} def test_ask_user_with_boolean_fields(self): - # Arrange + # Given class TestModel(BaseModel): field1: bool field2: bool = True - # Act - with patch("rich.prompt.Prompt.ask") as mock_prompt: - mock_prompt.side_effect = ["y", "n"] + # When + with patch("rich.prompt.Confirm.ask") as mock_confirm: + mock_confirm.side_effect = [True, False] result = ask_user_for_model_fields(TestModel) - # Assert + # Then assert result == {"field1": True, "field2": False} def test_ask_user_with_enum_fields(self): - # Arrange + # Given class TestEnum(Enum): OPTION1 = "Option 1" OPTION2 = "Option 2" @@ -46,16 +48,16 @@ class TestEnum(Enum): class TestModel(BaseModel): field1: TestEnum - # Act + # When with patch("rich.prompt.Prompt.ask") as mock_prompt: mock_prompt.side_effect = ["Option 2"] result = ask_user_for_model_fields(TestModel) - # Assert + # Then assert result == {"field1": "Option 2"} def test_ask_user_with_nested_models(self): - # Arrange + # Given class NestedModel(BaseModel): nested_field1: str nested_field2: int @@ -63,47 +65,49 @@ class NestedModel(BaseModel): class TestModel(BaseModel): field1: NestedModel - # Act + # When with patch("rich.prompt.Prompt.ask") as mock_prompt: - mock_prompt.side_effect = ["y", "value1", 2] - result = ask_user_for_model_fields(TestModel) + with patch("rich.prompt.Confirm.ask") as mock_confirm: + mock_confirm.side_effect = [True] + mock_prompt.side_effect = ["value1", 2] + result = ask_user_for_model_fields(TestModel) - # Assert + # Then assert result == {"field1": {"nested_field1": "value1", "nested_field2": 2}} def test_ask_user_with_no_fields(self): - # Arrange + # Given class TestModel(BaseModel): pass - # Act + # When result = ask_user_for_model_fields(TestModel) - # Assert + # Then assert result == {} def test_ask_user_with_no_default_value_and_no_valid_choices(self): - # Arrange + # Given class TestModel(BaseModel): field1: str - # Act + # When with patch("rich.prompt.Prompt.ask") as mock_prompt: mock_prompt.side_effect = ["value1"] result = ask_user_for_model_fields(TestModel) - # Assert + # Then assert result == {"field1": "value1"} def test_ask_user_with_default_value_and_no_valid_choices(self): - # Arrange + # Given class TestModel(BaseModel): field1: str = "default" - # Act + # When with patch("rich.prompt.Prompt.ask") as mock_prompt: mock_prompt.side_effect = ["value1"] result = ask_user_for_model_fields(TestModel) - # Assert + # Then assert result == {"field1": "value1"}