diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index df6d50b..4cd4315 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -53,5 +53,14 @@ jobs: dbt-cloud job run | tee run.json echo ::set-output name=run_id::$(cat run.json | jq .data.id -r) - - name: Test 'dbt cloud run get' - run: dbt-cloud run get --run-id ${{ steps.job_run.outputs.run_id }} \ No newline at end of file + - name: Test 'dbt-cloud run get' + run: dbt-cloud run get --run-id ${{ steps.job_run.outputs.run_id }} + + - name: Test 'dbt-cloud job export' + run: dbt-cloud job export | tee job.json + + - name: Test 'dbt-cloud job import' + run: cat job.json | dbt-cloud job import | tee job_imported.json + + - name: Test 'dbt-cloud job delete' + run: dbt-cloud job delete --job-id $(cat job_imported.json | jq .data.id) \ No newline at end of file diff --git a/.gitignore b/.gitignore index f90fe83..3d22fad 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,12 @@ +# VSCode +.vscode/ + +# Python **/*.egg-info/ venv/ -*.pyc \ No newline at end of file +*.pyc + + +# pytest-cov +.coverage +cov_html/ \ No newline at end of file diff --git a/README.md b/README.md index cb535d3..247b6fe 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # dbt-cloud-cli -`dbt-cloud-cli` is a command line interface for [dbt Cloud API v2.0](https://docs.getdbt.com/dbt-cloud/api-v2). It abstracts the REST API calls in an easy-to-use interface that can be incorporated into automated and manual (ad-hoc) workloads. Here are some example use cases for `dbt-cloud-cli`: +`dbt-cloud-cli` is a command line interface for [dbt Cloud API](https://docs.getdbt.com/dbt-cloud/api-v2). It abstracts the REST API calls in an easy-to-use interface that can be incorporated into automated and manual (ad-hoc) workloads. Here are some example use cases for `dbt-cloud-cli`: 1. Triggering dbt Cloud jobs in CI/CD: You can use [dbt-cloud job run](#dbt-cloud-job-run) in a CI/CD workflow (e.g., Github Actions) to trigger a dbt Cloud job that runs and tests the changes in a commit branch 2. Setting up dbt Cloud jobs: You can use [dbt-cloud job create](#dbt-cloud-job-create) to create standardized jobs between dbt Cloud projects. @@ -30,6 +30,9 @@ The following environment variables are used as argument defaults: * [dbt-cloud job run](#dbt-cloud-job-run) * [dbt-cloud job get](#dbt-cloud-job-get) * [dbt-cloud job create](#dbt-cloud-job-create) +* [dbt-cloud job delete](#dbt-cloud-job-delete) +* [dbt-cloud job export](#dbt-cloud-job-export) +* [dbt-cloud job import](#dbt-cloud-job-import) * [dbt-cloud run get](#dbt-cloud-run-get) ## dbt-cloud job run @@ -235,6 +238,192 @@ dbt-cloud job create --project-id REFACTED --environment-id 49819 --name "Create } ``` +## dbt-cloud job delete + +This command deletes a job in a dbt Cloud project. Note that this command uses an undocumented v3 API endpoint. + +### Usage + +```bash +>> dbt-cloud job delete --job-id 48474 +{ + "status": { + "code": 200, + "is_success": true, + "user_message": "Success!", + "developer_message": "" + }, + "data": { + "execution": { + "timeout_seconds": 0 + }, + "generate_docs": false, + "run_generate_sources": false, + "id": 48474, + "account_id": REDACTED, + "project_id": REDACTED, + "environment_id": 49819, + "name": "Do nothing!", + "dbt_version": null, + "created_at": "2021-12-25T10:12:29.114456+00:00", + "updated_at": "2021-12-25T10:12:29.814383+00:00", + "execute_steps": [ + "dbt run -s not_a_model" + ], + "state": 2, + "deferring_job_definition_id": null, + "lifecycle_webhooks": false, + "lifecycle_webhooks_url": null, + "triggers": { + "github_webhook": false, + "git_provider_webhook": null, + "custom_branch_only": true, + "schedule": false + }, + "settings": { + "threads": 4, + "target_name": "default" + }, + "schedule": { + "cron": "0 * * * *", + "date": { + "type": "every_day" + }, + "time": { + "type": "every_hour", + "interval": 1 + } + }, + "is_deferrable": false, + "generate_sources": false, + "cron_humanized": "Every hour", + "next_run": null, + "next_run_humanized": null + } +} +``` + +## dbt-cloud job export + +This command exports a dbt Cloud job as JSON to a file and can be used in conjunction with [dbt-cloud job import](#dbt-cloud-job-import) to copy jobs between dbt Cloud projects. + +### Usage + +```bash +>> dbt-cloud job export | tee job.json +{ + "execution": { + "timeout_seconds": 0 + }, + "generate_docs": false, + "run_generate_sources": false, + "account_id": REDACTED, + "project_id": REDACTED, + "environment_id": 49819, + "name": "Do nothing!", + "dbt_version": null, + "created_at": "2021-11-18T15:19:03.185668+00:00", + "updated_at": "2021-12-25T09:17:12.788186+00:00", + "execute_steps": [ + "dbt run -s not_a_model" + ], + "state": 1, + "deferring_job_definition_id": null, + "lifecycle_webhooks": false, + "lifecycle_webhooks_url": null, + "triggers": { + "github_webhook": false, + "git_provider_webhook": null, + "custom_branch_only": true, + "schedule": false + }, + "settings": { + "threads": 4, + "target_name": "default" + }, + "schedule": { + "cron": "0 * * * *", + "date": { + "type": "every_day" + }, + "time": { + "type": "every_hour", + "interval": 1 + } + }, + "is_deferrable": false, + "generate_sources": false, + "cron_humanized": "Every hour", + "next_run": null, + "next_run_humanized": null +} +``` + +## dbt-cloud job import + +This command imports a dbt Cloud job from exported JSON. You can use JSON manipulation tools (e.g., [jq](https://stedolan.github.io/jq/)) to modify the job definition before importing it. + +### Usage + +```bash +>> cat job.json | jq '.environment_id = 49819 | .name = "Imported job"' | dbt-cloud job import +{ + "status": { + "code": 201, + "is_success": true, + "user_message": "Success!", + "developer_message": "" + }, + "data": { + "execution": { + "timeout_seconds": 0 + }, + "generate_docs": false, + "run_generate_sources": false, + "id": 48475, + "account_id": REDACTED, + "project_id": REDACTED, + "environment_id": 49819, + "name": "Imported job", + "dbt_version": null, + "created_at": "2021-12-25T10:40:13.193129+00:00", + "updated_at": "2021-12-25T10:40:13.193149+00:00", + "execute_steps": [ + "dbt run -s not_a_model" + ], + "state": 1, + "deferring_job_definition_id": null, + "lifecycle_webhooks": false, + "lifecycle_webhooks_url": null, + "triggers": { + "github_webhook": false, + "git_provider_webhook": null, + "custom_branch_only": true, + "schedule": false + }, + "settings": { + "threads": 4, + "target_name": "default" + }, + "schedule": { + "cron": "0 * * * *", + "date": { + "type": "every_day" + }, + "time": { + "type": "every_hour", + "interval": 1 + } + }, + "is_deferrable": false, + "generate_sources": false, + "cron_humanized": "Every hour", + "next_run": null, + "next_run_humanized": null + } +} +``` + ## dbt-cloud run get This command prints a dbt Cloud run status JSON response. For more information on the API endpoint arguments and response, run `dbt-cloud run get --help` and check out the [dbt Cloud API docs](https://docs.getdbt.com/dbt-cloud/api-v2#operation/getRunById). diff --git a/dbt_cloud/args.py b/dbt_cloud/args.py index 95ac950..e573cb4 100644 --- a/dbt_cloud/args.py +++ b/dbt_cloud/args.py @@ -1,6 +1,7 @@ import click import os from pydantic import BaseModel, validator, Field +from dbt_cloud.serde import json_to_dict class ArgsBaseModel(BaseModel): @@ -42,12 +43,9 @@ def field_not_none(cls, value, field): else: return value - def get_payload(self, exclude_keys=["api_token", "account_id", "job_id"]) -> dict: - payload = self.dict() - payload = { - key: value for key, value in payload.items() if key not in exclude_keys - } - return payload + def get_payload(self, exclude=["api_token", "account_id", "job_id"]) -> dict: + payload = self.json(exclude=set(exclude)) + return json_to_dict(payload) class DbtCloudArgsBaseModel(ArgsBaseModel): diff --git a/dbt_cloud/cli.py b/dbt_cloud/cli.py index 029db15..0f9436a 100644 --- a/dbt_cloud/cli.py +++ b/dbt_cloud/cli.py @@ -1,13 +1,17 @@ import json import time import click +from pathlib import Path +from dbt_cloud.args import DbtCloudArgsBaseModel from dbt_cloud.job import ( DbtCloudJob, + DbtCloudJobArgs, DbtCloudJobRunArgs, DbtCloudJobGetArgs, DbtCloudJobCreateArgs, ) from dbt_cloud.run import DbtCloudRunStatus, DbtCloudRunGetArgs +from dbt_cloud.serde import json_to_dict, dict_to_json from dbt_cloud.exc import DbtCloudException @@ -26,7 +30,7 @@ def job_run(): pass -@job.command() +@job.command(help="Triggers a dbt Cloud job run and returns a status JSON response.") @DbtCloudJobRunArgs.click_options @click.option( f"--wait/--no-wait", @@ -35,7 +39,7 @@ def job_run(): ) def run(wait, **kwargs): args = DbtCloudJobRunArgs(**kwargs) - job = DbtCloudJob(**args.dict()) + job = args.get_job() response, run = job.run(args=args) if wait: while True: @@ -49,27 +53,72 @@ def run(wait, **kwargs): f"Job run failed with {status.name} status. For more information, see {href}." ) time.sleep(5) - click.echo(json.dumps(response.json(), indent=2)) + click.echo(dict_to_json(response.json())) response.raise_for_status() -@job.command() +@job.command(help="Returns the details of a dbt Cloud job.") @DbtCloudJobGetArgs.click_options def get(**kwargs): args = DbtCloudJobGetArgs(**kwargs) job = DbtCloudJob(**args.dict()) response = job.get(order_by=args.order_by) - click.echo(json.dumps(response.json(), indent=2)) + click.echo(dict_to_json(response.json())) response.raise_for_status() -@job.command() +@job.command(help="Creates a job in a dbt Cloud project.") @DbtCloudJobCreateArgs.click_options def create(**kwargs): args = DbtCloudJobCreateArgs(**kwargs) job = DbtCloudJob(job_id=None, **args.dict()) response = job.create(args) - click.echo(json.dumps(response.json(), indent=2)) + click.echo(dict_to_json(response.json())) + response.raise_for_status() + + +@job.command(help="Deletes a job from a dbt Cloud project.") +@DbtCloudJobArgs.click_options +def delete(**kwargs): + args = DbtCloudJobArgs(**kwargs) + job = args.get_job() + response = job.delete() + click.echo(dict_to_json(response.json())) + response.raise_for_status() + + +@job.command(help="Exports a dbt Cloud job as JSON to a file.") +@DbtCloudJobArgs.click_options +@click.option( + "-f", + "--file", + default="-", + type=click.File("w"), + help="Export file path.", +) +def export(file, **kwargs): + args = DbtCloudJobArgs(**kwargs) + job = args.get_job() + exclude = ["id"] + file.write(job.to_json(exclude=exclude)) + + +@job.command(help="Imports a dbt Cloud job from exported JSON.", name="import") +@DbtCloudArgsBaseModel.click_options +@click.option( + "-f", + "--file", + default="-", + type=click.File("r"), + help="Import file path.", +) +def import_job(file, **kwargs): + args = DbtCloudArgsBaseModel(**kwargs) + job_create_kwargs = json_to_dict(file.read()) + job_create_args = DbtCloudJobCreateArgs(**job_create_kwargs) + job = DbtCloudJob(job_id=None, **args.dict()) + response = job.create(job_create_args) + click.echo(dict_to_json(response.json())) response.raise_for_status() @@ -79,5 +128,5 @@ def get(**kwargs): args = DbtCloudRunGetArgs(**kwargs) run = args.get_run() response, _ = run.get_status() - click.echo(json.dumps(response.json(), indent=2)) + click.echo(dict_to_json(response.json())) response.raise_for_status() diff --git a/dbt_cloud/job.py b/dbt_cloud/job.py index 0907b1e..5b87e7a 100644 --- a/dbt_cloud/job.py +++ b/dbt_cloud/job.py @@ -2,10 +2,12 @@ import os from enum import Enum from typing import Optional, List, Tuple +from pathlib import Path from pydantic import Field from dbt_cloud.account import DbtCloudAccount from dbt_cloud.args import ArgsBaseModel, DbtCloudArgsBaseModel from dbt_cloud.run import DbtCloudRun +from dbt_cloud.serde import dict_to_json, json_to_dict class DateTypeEnum(Enum): @@ -19,11 +21,17 @@ class TimeTypeEnum(Enum): AT_EXACT_HOURS = "at_exact_hours" -class DbtCloudJobRunArgs(DbtCloudArgsBaseModel): +class DbtCloudJobArgs(DbtCloudArgsBaseModel): job_id: int = Field( default_factory=lambda: os.environ["DBT_CLOUD_JOB_ID"], description="Numeric ID of the job to run (default: 'DBT_CLOUD_JOB_ID' environment variable)", ) + + def get_job(self) -> "DbtCloudJob": + return DbtCloudJob(**self.dict()) + + +class DbtCloudJobRunArgs(DbtCloudJobArgs): cause: str = Field( default="Triggered via API", description="A text description of the reason for running this job", @@ -57,11 +65,7 @@ class DbtCloudJobRunArgs(DbtCloudArgsBaseModel): ) -class DbtCloudJobGetArgs(DbtCloudArgsBaseModel): - job_id: int = Field( - default_factory=lambda: os.environ["DBT_CLOUD_JOB_ID"], - description="Numeric ID of the job to run (default: 'DBT_CLOUD_JOB_ID' environment variable)", - ) +class DbtCloudJobGetArgs(DbtCloudJobArgs): order_by: Optional[str] = Field( description="Field to order the result by. Use '-' to indicate reverse order." ) @@ -123,16 +127,16 @@ class DbtCloudJobCreateArgs(DbtCloudArgsBaseModel): schedule: Optional[DbtCloudJobSchedule] = Field(default_factory=DbtCloudJobSchedule) def get_payload(self): - return super().get_payload(exclude_keys=["api_token"]) + return super().get_payload(exclude=["api_token"]) class DbtCloudJob(DbtCloudAccount): job_id: Optional[int] - def get_api_url(self) -> str: + def get_api_url(self, api_version: str = "v2") -> str: if self.job_id is not None: - return f"{super().get_api_url()}/jobs/{self.job_id}" - return f"{super().get_api_url()}/jobs" + return f"{super().get_api_url(api_version)}/jobs/{self.job_id}" + return f"{super().get_api_url(api_version)}/jobs" def get(self, order_by: str = None) -> requests.Response: response = requests.get( @@ -150,6 +154,14 @@ def create(self, args: DbtCloudJobCreateArgs) -> requests.Response: ) return response + def delete(self): + response = requests.delete( + url=f"{self.get_api_url()}/", + headers={"Authorization": f"Token {self.api_token}"}, + json={}, + ) + return response + def run(self, args: DbtCloudJobRunArgs) -> Tuple[requests.Response, DbtCloudRun]: assert str(args.job_id) == str(self.job_id), f"{args.job_id} != {self.job_id}" response = requests.post( @@ -163,3 +175,26 @@ def run(self, args: DbtCloudJobRunArgs) -> Tuple[requests.Response, DbtCloudRun] account_id=self.account_id, api_token=self.api_token, ) + + def to_json(self, exclude=[]): + response = self.get() + job_dict = { + key: value + for key, value in response.json()["data"].items() + if key not in exclude + } + return dict_to_json(job_dict) + + def to_file(self, file_path: Path, exclude=[]): + response_json = self.to_json(exclude=exclude) + file_path.write_text(response_json) + + @classmethod + def from_json(cls, value: str, api_token: str): + job_dict = json_to_dict(value) + return cls(api_token=api_token, **job_dict) + + @classmethod + def from_file(cls, file_path: Path, api_token: str): + job_json = file_path.read_text() + return cls.from_json(job_json, api_token) diff --git a/dbt_cloud/serde.py b/dbt_cloud/serde.py new file mode 100644 index 0000000..7366ca3 --- /dev/null +++ b/dbt_cloud/serde.py @@ -0,0 +1,9 @@ +import json + + +def dict_to_json(value: dict) -> str: + return json.dumps(value, indent=2) + + +def json_to_dict(value: str) -> dict: + return json.loads(value) diff --git a/tests/conftest.py b/tests/conftest.py index 5b266e9..ede9b4f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -47,3 +47,21 @@ def job(): @pytest.fixture def run(job): return DbtCloudRun(run_id=36053848, **job.dict()) + + +@pytest.fixture +def mock_job_api( + requests_mock, job, job_get_response, job_create_response, job_run_response +): + requests_mock.get(job.get_api_url() + "/", json=job_get_response, status_code=200) + requests_mock.post( + job.get_api_url() + "/", json=job_create_response, status_code=201 + ) + job_template = job.copy() + job_template.job_id = None + requests_mock.post( + job_template.get_api_url() + "/", json=job_create_response, status_code=201 + ) + requests_mock.post( + job.get_api_url() + "/run/", json=job_run_response, status_code=200 + ) diff --git a/tests/test_job.py b/tests/test_job.py index 91a8317..f4bc42c 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,18 +1,14 @@ from dbt_cloud.job import DbtCloudJob, DbtCloudJobCreateArgs, DbtCloudJobRunArgs -def test_mock_job_get(requests_mock, job, job_get_response): - url = job.get_api_url() + "/" - requests_mock.get(url, json=job_get_response, status_code=200) +def test_mock_job_get(mock_job_api, job, job_get_response): response = job.get() assert response.json() == job_get_response def test_mock_job_create( - requests_mock, job, job_create_response, project_id, environment_id + mock_job_api, job, job_create_response, project_id, environment_id ): - url = job.get_api_url() + "/" - requests_mock.post(url, json=job_create_response, status_code=201) args = DbtCloudJobCreateArgs( project_id=project_id, environment_id=environment_id, @@ -23,9 +19,7 @@ def test_mock_job_create( assert response.json() == job_create_response -def test_mock_job_run(requests_mock, job, job_run_response): - url = job.get_api_url() + "/run/" - requests_mock.post(url, json=job_run_response, status_code=200) +def test_mock_job_run(mock_job_api, job, job_run_response): args = DbtCloudJobRunArgs() response, job_run = job.run(args) assert response.json() == job_run_response diff --git a/tests/test_job_import.py b/tests/test_job_import.py deleted file mode 100644 index 2ed4ed8..0000000 --- a/tests/test_job_import.py +++ /dev/null @@ -1,22 +0,0 @@ -import json -from dbt_cloud.job import DbtCloudJobCreateArgs - - -def test_job_args_import_from_json(job_get_response): - job_dict = job_get_response["data"] - args = DbtCloudJobCreateArgs(**job_dict) - assert args.environment_id == 49819 - assert args.account_id == 123456 - assert args.project_id == 123457 - assert args.name == "Do nothing!" - assert args.execute_steps == ["dbt run -s not_a_model"] - assert not args.generate_docs - assert args.dbt_version is None - assert not args.triggers.github_webhook - assert args.triggers.custom_branch_only - assert not args.triggers.schedule - assert args.settings.threads == 4 - assert args.settings.target_name == "default" - assert args.schedule.cron == "0 * * * *" - assert args.schedule.date.type.value == "every_day" - assert args.schedule.time.type.value == "every_hour" diff --git a/tests/test_job_import_export.py b/tests/test_job_import_export.py new file mode 100644 index 0000000..3608ccc --- /dev/null +++ b/tests/test_job_import_export.py @@ -0,0 +1,43 @@ +import json +from dbt_cloud.serde import json_to_dict +from dbt_cloud.job import DbtCloudJob, DbtCloudJobCreateArgs + + +def test_job_args_import_from_json(job_get_response): + job_dict = job_get_response["data"] + args = DbtCloudJobCreateArgs(**job_dict) + assert args.environment_id == 49819 + assert args.account_id == 123456 + assert args.project_id == 123457 + assert args.name == "Do nothing!" + assert args.execute_steps == ["dbt run -s not_a_model"] + assert not args.generate_docs + assert args.dbt_version is None + assert not args.triggers.github_webhook + assert args.triggers.custom_branch_only + assert not args.triggers.schedule + assert args.settings.threads == 4 + assert args.settings.target_name == "default" + assert args.schedule.cron == "0 * * * *" + assert args.schedule.date.type.value == "every_day" + assert args.schedule.time.type.value == "every_hour" + + +def test_job_to_file_and_from_file(shared_datadir, mock_job_api, job): + path = shared_datadir / "job.json" + job.to_file(path) + job_imported = DbtCloudJob.from_file(file_path=path, api_token=job.api_token) + assert job_imported.job_id is None + job_imported.job_id = job.job_id + assert job.dict() == job_imported.dict() + + +def test_job_export_import(shared_datadir, mock_job_api, job): + path = shared_datadir / "job.json" + job.to_file(path, exclude=["id"]) + + job_create_kwargs = json_to_dict(path.read_text()) + job_create_args = DbtCloudJobCreateArgs(**job_create_kwargs) + job_kwargs = {**job.dict(), "job_id": None} + job_new = DbtCloudJob(**job_kwargs) + job_new.create(job_create_args)