diff --git a/README.md b/README.md index f1a2313..047526e 100644 --- a/README.md +++ b/README.md @@ -29,11 +29,12 @@ - [Installation](#installation) - [From git repo](#from-git-repo) - [From GCS (not available in PyPI yet)](#from-gcs-not-available-in-pypi-yet) + - [Add to requirements](#add-to-requirements) - [Usage](#usage) - [Setup](#setup) - [Folder Structure](#folder-structure) - - [CLI: Deploying a Pipeline](#cli-deploying-a-pipeline) - - [CLI: Checking Pipelines are valid](#cli-checking-pipelines-are-valid) + - [CLI: Deploying a Pipeline with `deploy`](#cli-deploying-a-pipeline-with-deploy) + - [CLI: Checking Pipelines are valid with `check`](#cli-checking-pipelines-are-valid-with-check) - [CLI: Other commands](#cli-other-commands) - [`create`](#create) - [`list`](#list) @@ -43,12 +44,13 @@ ## Why this tool? -Two uses cases: -- quickly iterate over your pipelines by compiling and running them in multiple environments (test, dev, staging, etc) without duplicating code or looking for the right kfp / aiplatform snippet. -- deploy your pipelines to Vertex Pipelines in a standardized manner in your CD with Cloud Build or GitHub Actions. -- check pipeline validity in your CI. +Three uses cases: +1. **CI:** check pipeline validity. +1. **Dev mode:** duickly iterate over your pipelines by compiling and running them in multiple environments (test, dev, staging, etc) without duplicating code or looking for the right kfp / aiplatform snippet. +2. **CD:** deploy your pipelines to Vertex Pipelines in a standardized manner in your CD with Cloud Build or GitHub Actions. -Commands: + +Four commands: - `check`: check your pipelines (imports, compile, check configs validity against pipeline definition). - `deploy`: compile, upload to Artifact Registry, run and schedule your pipelines. - `create`: create a new pipeline and config files. @@ -153,7 +155,7 @@ export GCP_REGION= export VERTEX_STAGING_BUCKET_NAME= gcloud storage buckets create gs://${VERTEX_STAGING_BUCKET_NAME} --location=${GCP_REGION} ``` -7. Create a service account for Vertex Pipelines: # TODO: complete iam bindings +7. Create a service account for Vertex Pipelines: ```bash export VERTEX_SERVICE_ACCOUNT_NAME=foobar export VERTEX_SERVICE_ACCOUNT="${VERTEX_SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" @@ -191,7 +193,7 @@ vertex ``` > [!NOTE] -> You must have at lease these files. If you need to share some config elements between pipelines, +> You must have at least these files. If you need to share some config elements between pipelines, > you can have a `shared` folder in `configs` and import them in your pipeline configs. #### Pipelines @@ -219,7 +221,7 @@ def pipeline(): Config file can be either `.py`, `.json` or `.toml` files. They must be located in the `config/{pipeline_name}` folder. -**Why two formats?** +**Why three formats?** `.py` files are useful to define complex configs (e.g. a list of dicts) while `.json` / `.toml` files are useful to define simple configs (e.g. a string). @@ -256,7 +258,7 @@ VERTEX_SERVICE_ACCOUNT=YOUR_VERTEX_SERVICE_ACCOUNT # Vertex Pipelines Service A > An [`example.env`](./example/example.env) file is provided in this repo. > This also allows you to work with multiple environments thanks to env files (`test.env`, `dev.env`, `prod.env`, etc) -### CLI: Deploying a Pipeline +### CLI: Deploying a Pipeline with `deploy` Let's say you defines a pipeline in `dummy_pipeline.py` and a config file named `config_test.json`. You can deploy your pipeline using the following command: ```bash @@ -267,17 +269,17 @@ vertex-deployer deploy dummy_pipeline \ --env-file example.env \ --local-package-path . \ --tags my-tag \ - --parameter-values-filepath vertex/configs/dummy_pipeline/config_test.json \ + --config-filepath vertex/configs/dummy_pipeline/config_test.json \ --experiment-name my-experiment \ --enable-caching ``` -### CLI: Checking Pipelines are valid +### CLI: Checking Pipelines are valid with `check` To check that your pipelines are valid, you can use the `check` command. It uses a pydantic model to: - check that your pipeline imports and definition are valid - check that your pipeline can be compiled -- generate a pydantic model from the pipeline parameters definition and check that all configs related to the pipeline are valid +- check that all configs related to the pipeline are respecting the pipeline definition (using a Pydantic model based on pipeline signature) To validate one specific pipeline: ```bash diff --git a/deployer/cli.py b/deployer/cli.py index 7def058..ba0d504 100644 --- a/deployer/cli.py +++ b/deployer/cli.py @@ -283,7 +283,8 @@ def check( **This command can be used to check pipelines in a Continuous Integration workflow.** """ - from deployer.pipeline_checks import Pipelines + if all and pipeline_name is not None: + raise typer.BadParameter("Please specify either --all or a pipeline name") if len(PipelineName.__members__) == 0: raise ValueError( @@ -291,15 +292,14 @@ def check( f" ('{PIPELINE_ROOT_PATH}')" ) + from deployer.pipeline_checks import Pipelines + if all: logger.info("Checking all pipelines") pipelines_to_check = PipelineName.__members__.values() elif pipeline_name is not None: logger.info(f"Checking pipeline {pipeline_name}") pipelines_to_check = [pipeline_name] - else: - raise ValueError("Please specify either --all or a pipeline name") - if config_filepath is None: to_check = { p.value: list_config_filepaths(CONFIG_ROOT_PATH, p.value) for p in pipelines_to_check @@ -368,6 +368,13 @@ def create( """Create files structure for a new pipeline.""" logger.info(f"Creating pipeline {pipeline_name}") + if not Path(PIPELINE_ROOT_PATH).is_dir(): + raise FileNotFoundError( + f"Pipeline root path '{PIPELINE_ROOT_PATH}' does not exist." + " Please check that the pipeline root path is correct" + f" or create it with `mkdir -p {PIPELINE_ROOT_PATH}`." + ) + pipeline_filepath = Path(PIPELINE_ROOT_PATH) / f"{pipeline_name}.py" pipeline_filepath.touch(exist_ok=False) pipeline_filepath.write_text(PIPELINE_MINIMAL_TEMPLATE.format(pipeline_name=pipeline_name)) diff --git a/deployer/pipeline_checks.py b/deployer/pipeline_checks.py index 9352d21..7d63390 100644 --- a/deployer/pipeline_checks.py +++ b/deployer/pipeline_checks.py @@ -18,15 +18,10 @@ from deployer.utils.exceptions import BadConfigError from deployer.utils.logging import disable_logger from deployer.utils.models import CustomBaseModel, create_model_from_pipeline -from deployer.utils.utils import ( - import_pipeline_from_dir, - make_enum_from_python_package_dir, -) +from deployer.utils.utils import import_pipeline_from_dir PipelineConfigT = TypeVar("PipelineConfigT") -PipelineName = make_enum_from_python_package_dir(PIPELINE_ROOT_PATH) - class ConfigDynamicModel(CustomBaseModel, Generic[PipelineConfigT]): """Model used to generate checks for configs based on pipeline dynamic model""" @@ -56,7 +51,7 @@ class ConfigsDynamicModel(CustomBaseModel, Generic[PipelineConfigT]): class Pipeline(CustomBaseModel): """Validation of one pipeline and its configs""" - pipeline_name: PipelineName + pipeline_name: str config_paths: Annotated[List[Path], Field(validate_default=True)] = None @model_validator(mode="before") @@ -72,29 +67,27 @@ def pipeline(self) -> Any: """Import pipeline""" if getattr(self, "_pipeline", None) is None: with disable_logger("deployer.utils.utils"): - self._pipeline = import_pipeline_from_dir( - PIPELINE_ROOT_PATH, self.pipeline_name.value - ) + self._pipeline = import_pipeline_from_dir(PIPELINE_ROOT_PATH, self.pipeline_name) return self._pipeline @model_validator(mode="after") def import_pipeline(self): """Validate that the pipeline can be imported by calling pipeline computed field""" - logger.debug(f"Importing pipeline {self.pipeline_name.value}") + logger.debug(f"Importing pipeline {self.pipeline_name}") try: _ = self.pipeline - except Exception as e: + except (ImportError, ModuleNotFoundError) as e: raise ValueError(f"Pipeline import failed: {e.__repr__()}") # noqa: B904 return self @model_validator(mode="after") def compile_pipeline(self): """Validate that the pipeline can be compiled""" - logger.debug(f"Compiling pipeline {self.pipeline_name.value}") + logger.debug(f"Compiling pipeline {self.pipeline_name}") try: with disable_logger("deployer.pipeline_deployer"): VertexPipelineDeployer( - pipeline_name=self.pipeline_name.value, + pipeline_name=self.pipeline_name, pipeline_func=self.pipeline, local_package_path=TEMP_LOCAL_PACKAGE_PATH, ).compile() @@ -105,7 +98,7 @@ def compile_pipeline(self): @model_validator(mode="after") def validate_configs(self): """Validate configs against pipeline parameters definition""" - logger.debug(f"Validating configs for pipeline {self.pipeline_name.value}") + logger.debug(f"Validating configs for pipeline {self.pipeline_name}") PipelineDynamicModel = create_model_from_pipeline(self.pipeline) ConfigsModel = ConfigsDynamicModel[PipelineDynamicModel] ConfigsModel.model_validate( diff --git a/deployer/utils/utils.py b/deployer/utils/utils.py index 1879307..1b8acfb 100644 --- a/deployer/utils/utils.py +++ b/deployer/utils/utils.py @@ -20,7 +20,7 @@ def make_enum_from_python_package_dir(dir_path: Path, raise_if_not_found: bool = raise FileNotFoundError(f"Directory {dir_path_} not found.") file_paths = dir_path_.glob("*.py") enum_dict = {x.stem: x.stem for x in file_paths if x.stem != "__init__"} - FileNamesEnum = Enum("PipelineNames", enum_dict) + FileNamesEnum = Enum(dir_path_.stem, enum_dict) return FileNamesEnum