From 05fcaa2465d36903e1e364fd9d6e88884c6c6512 Mon Sep 17 00:00:00 2001 From: Bram Neijt Date: Tue, 12 Apr 2022 17:05:14 +0200 Subject: [PATCH] Add management schema feature add noop and warn tests improve tests rename tests add view dropping test add unmanaged schema test make tests more dry Delete tmp.csv Manage schemas is optional Add --target-path as a CLI option. (#5402) Include py.typed in MANIFEST.in (#5703) This enables packages that install dbt-core from pypi to use mypy. wip: move manage logic to separate command Add manage command --- .../unreleased/Features-20220920-122529.yaml | 7 + core/dbt/config/project.py | 5 + core/dbt/config/runtime.py | 1 + core/dbt/contracts/project.py | 15 + core/dbt/task/manage.py | 80 +++++ core/dbt/task/run.py | 10 +- core/dbt/tests/fixtures/project.py | 5 + tests/functional/schema_management/README.md | 1 + .../test_schema_management.py | 296 ++++++++++++++++++ 9 files changed, 415 insertions(+), 5 deletions(-) create mode 100644 .changes/unreleased/Features-20220920-122529.yaml create mode 100644 core/dbt/task/manage.py create mode 100644 tests/functional/schema_management/README.md create mode 100644 tests/functional/schema_management/test_schema_management.py diff --git a/.changes/unreleased/Features-20220920-122529.yaml b/.changes/unreleased/Features-20220920-122529.yaml new file mode 100644 index 00000000000..09af06903a6 --- /dev/null +++ b/.changes/unreleased/Features-20220920-122529.yaml @@ -0,0 +1,7 @@ +kind: Features +body: Added a `manage` CLI command that allows users to drop unused database relations +time: 2022-09-20T12:25:29.226182+02:00 +custom: + Author: agoblet bneijt + Issue: "4957" + PR: "5392" diff --git a/core/dbt/config/project.py b/core/dbt/config/project.py index f22436c68a3..5e6d070ed19 100644 --- a/core/dbt/config/project.py +++ b/core/dbt/config/project.py @@ -37,6 +37,7 @@ from dbt.contracts.project import ( Project as ProjectContract, SemverString, + SchemaManagementConfiguration, ) from dbt.contracts.project import PackageConfig, ProjectPackageMetadata from dbt.contracts.publication import ProjectDependencies @@ -429,6 +430,7 @@ def create_project(self, rendered: RenderComponents) -> "Project": model_paths, seed_paths, snapshot_paths, analysis_paths, macro_paths ) + managed_schemas: List[SchemaManagementConfiguration] = value_or(cfg.managed_schemas, []) docs_paths: List[str] = value_or(cfg.docs_paths, all_source_paths) asset_paths: List[str] = value_or(cfg.asset_paths, []) flags = get_flags() @@ -503,6 +505,7 @@ def create_project(self, rendered: RenderComponents) -> "Project": asset_paths=asset_paths, target_path=target_path, snapshot_paths=snapshot_paths, + managed_schemas=managed_schemas, clean_targets=clean_targets, log_path=log_path, packages_install_path=packages_install_path, @@ -618,6 +621,7 @@ class Project: asset_paths: List[str] target_path: str snapshot_paths: List[str] + managed_schemas: List[SchemaManagementConfiguration] clean_targets: List[str] log_path: str packages_install_path: str @@ -695,6 +699,7 @@ def to_project_config(self, with_packages=False): "asset-paths": self.asset_paths, "target-path": self.target_path, "snapshot-paths": self.snapshot_paths, + "managed-schemas": [schema.to_dict() for schema in self.managed_schemas], "clean-targets": self.clean_targets, "log-path": self.log_path, "quoting": self.quoting, diff --git a/core/dbt/config/runtime.py b/core/dbt/config/runtime.py index d6119318e3c..681a08248d5 100644 --- a/core/dbt/config/runtime.py +++ b/core/dbt/config/runtime.py @@ -148,6 +148,7 @@ def from_parts( asset_paths=project.asset_paths, target_path=project.target_path, snapshot_paths=project.snapshot_paths, + managed_schemas=project.managed_schemas, clean_targets=project.clean_targets, log_path=project.log_path, packages_install_path=project.packages_install_path, diff --git a/core/dbt/contracts/project.py b/core/dbt/contracts/project.py index 581932e5888..1bfbe22f6a1 100644 --- a/core/dbt/contracts/project.py +++ b/core/dbt/contracts/project.py @@ -7,6 +7,7 @@ HyphenatedDbtClassMixin, ExtensibleDbtClassMixin, register_pattern, + StrEnum, ) from dataclasses import dataclass, field from typing import Optional, List, Dict, Union, Any @@ -181,6 +182,19 @@ class RegistryPackageMetadata( } +class PruneModelsAction(StrEnum): + SKIP = "skip" + DROP = "drop" + WARN = "warn" + + +@dataclass +class SchemaManagementConfiguration(HyphenatedDbtClassMixin, Replaceable): + database: Optional[str] = None + schema: Optional[str] = None + prune_models: Optional[PruneModelsAction] = None + + @dataclass class Project(HyphenatedDbtClassMixin, Replaceable): name: Identifier @@ -198,6 +212,7 @@ class Project(HyphenatedDbtClassMixin, Replaceable): asset_paths: Optional[List[str]] = None target_path: Optional[str] = None snapshot_paths: Optional[List[str]] = None + managed_schemas: Optional[List[SchemaManagementConfiguration]] = None clean_targets: Optional[List[str]] = None profile: Optional[str] = None log_path: Optional[str] = None diff --git a/core/dbt/task/manage.py b/core/dbt/task/manage.py new file mode 100644 index 00000000000..f755784c5d6 --- /dev/null +++ b/core/dbt/task/manage.py @@ -0,0 +1,80 @@ +# coding=utf-8 +from typing import Dict, Set, Tuple + +from .compile import CompileTask +from .runnable import ManifestTask +from dbt.exceptions import warn_or_error, ValidationException +from dbt.adapters.factory import get_adapter +from dbt.contracts.graph.parsed import ( + ParsedModelNode, +) +from dbt.contracts.project import PruneModelsAction + + +class ManageTask(CompileTask): + def run(self): + ManifestTask._runtime_initialize(self) + models_in_codebase = self.manifest.nodes.keys() + adapter = get_adapter(self.config) + + with adapter.connection_named("master"): + required_schemas = self.get_model_schemas(adapter, models_in_codebase) + self.populate_adapter_cache(adapter, required_schemas) + + adapter.clear_transaction() + self._prune_models(adapter) + + def _prune_models(self, adapter): + self._assert_schema_uniqueness() + + if len(self.config.managed_schemas) == 0: + warn_or_error("No schema's configured to manage") + return + + models_in_codebase: Set[Tuple[str, str, str]] = set( + (n.config.database, n.config.schema, n.config.alias) + for n in self.manifest.nodes.values() + if isinstance(n, ParsedModelNode) + ) + + # get default 'database' + 'schema' for active target + creds = adapter.connections.profile.credentials + default_database, default_schema = creds.database, creds.schema + + for config in self.config.managed_schemas: + database = config.database or default_database + schema = config.schema or default_schema + + models_in_database: Dict[Tuple[str, str, str], str] = { + (database, schema, relation.identifier): relation + for relation in adapter.list_relations(database, schema) + } + if len(models_in_database) == 0: + warn_or_error( + f"No objects in managed schema '{database}.{schema}'" + ) + + should_act_upon = models_in_database.keys() - models_in_codebase + + for (target_database, target_schema, target_identifier) in sorted(should_act_upon): + target_action = config.prune_models or PruneModelsAction.SKIP + if target_action == PruneModelsAction.WARN: + warn_or_error( + f"Found unused model {target_database}.{target_schema}.{target_identifier}" + ) + elif target_action == PruneModelsAction.DROP: + adapter.drop_relation( + models_in_database[(target_database, target_schema, target_identifier)] + ) + + def _assert_schema_uniqueness(self): + schemas = set() + + for config in self.config.managed_schemas: + schema = (config.database, config.schema) + if schema in schemas: + raise ValidationException(f"Duplicate schema found: {schema}") + schemas.add(schema) + + def interpret_results(self, results): + return True diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 4b1cea04727..eb7942a8cc8 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -21,11 +21,11 @@ from dbt.contracts.graph.nodes import HookNode, ResultNode from dbt.contracts.results import NodeStatus, RunResult, RunStatus, RunningStatus, BaseResult from dbt.exceptions import ( - CompilationError, - DbtInternalError, - MissingMaterializationError, - DbtRuntimeError, - DbtValidationError, + CompilationException, + InternalException, + RuntimeException, + ValidationException, + missing_materialization, ) from dbt.events.functions import fire_event, get_invocation_id from dbt.events.types import ( diff --git a/core/dbt/tests/fixtures/project.py b/core/dbt/tests/fixtures/project.py index 19e418003fe..9da3242f1a6 100644 --- a/core/dbt/tests/fixtures/project.py +++ b/core/dbt/tests/fixtures/project.py @@ -464,6 +464,11 @@ def get_tables_in_schema(self): result = self.run_sql(sql, fetch="all") return {model_name: materialization for (model_name, materialization) in result} + def update_models(self, models: dict): + """Update the modules in the test project""" + self.project_root.join("models").remove() + write_project_files(self.project_root, "models", models) + # This is the main fixture that is used in all functional tests. It pulls in the other # fixtures that are necessary to set up a dbt project, and saves some of the information diff --git a/tests/functional/schema_management/README.md b/tests/functional/schema_management/README.md new file mode 100644 index 00000000000..5a380f6ce7a --- /dev/null +++ b/tests/functional/schema_management/README.md @@ -0,0 +1 @@ +Test schema management as introduced by https://github.com/dbt-labs/dbt-core/issues/4957 diff --git a/tests/functional/schema_management/test_schema_management.py b/tests/functional/schema_management/test_schema_management.py new file mode 100644 index 00000000000..5459d2bca2d --- /dev/null +++ b/tests/functional/schema_management/test_schema_management.py @@ -0,0 +1,296 @@ +import pytest +import os +from dbt.exceptions import CompilationException, ValidationException +from dbt.tests.util import run_dbt, check_table_does_exist, check_table_does_not_exist + + +def model(materialized, unique_schema=None): + return f""" + {{{{ + config( + materialized = "{materialized}", + schema = {f'"{unique_schema}"' if unique_schema is not None else "None"} + ) + }}}} + SELECT * FROM ( + VALUES (1, 'one'), + (2, 'two'), + (3, 'three') + ) AS t (num,letter) + """ + + +class Base: + materialized = "table" + + @pytest.fixture(scope="class") + def models(self): + return { + "model_a.sql": model(self.materialized), + "model_b.sql": model(self.materialized), + } + + @pytest.fixture(scope="class") + def dbt_profile_target(self): + return { + "type": "postgres", + "threads": 4, + "host": "localhost", + "port": int(os.getenv("POSTGRES_TEST_PORT", 5432)), + "user": os.getenv("POSTGRES_TEST_USER", "root"), + "pass": os.getenv("POSTGRES_TEST_PASS", "password"), + "dbname": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + } + + +class TestMissingConfiguration(Base): + def test_should_raise_exception( + self, + project, + ): + run_dbt(["run"]) + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + project.update_models( + { + "model_b.sql": model(self.materialized), + } + ) + with pytest.raises(CompilationException): + run_dbt(["--warn-error", "manage"]) + + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + def test_should_not_delete_anything( + self, + project, + ): + run_dbt(["run"]) + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + project.update_models( + { + "model_b.sql": model(self.materialized), + } + ) + run_dbt(["manage"]) + + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + +class TestUnmanagedSchema(TestMissingConfiguration): + @pytest.fixture(scope="class") + def project_config_update(self, unique_schema): + return { + "managed-schemas": [ + { + "database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + "schema": "some_other_schema", + "prune-models": "drop", + } + ] + } + + +class TestEmptyConfiguration(TestMissingConfiguration): + @pytest.fixture(scope="class") + def project_config_update(self, unique_schema): + return {"managed-schemas": []} + + +class TestWarn(TestMissingConfiguration): + @pytest.fixture(scope="class") + def project_config_update(self, unique_schema): + return { + "managed-schemas": [ + { + "database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + "prune-models": "warn", + } + ] + } + + +class TestDrop(Base): + @pytest.fixture(scope="class") + def project_config_update(self, unique_schema): + return { + "managed-schemas": [ + { + "database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + "prune-models": "drop", + } + ] + } + + def test( + self, + project, + ): + run_dbt(["run"]) + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + project.update_models( + { + "model_b.sql": model(self.materialized), + } + ) + run_dbt(["manage"]) + + check_table_does_not_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + +class TestDropView(TestDrop): + materialized = "view" + + +class TestSkip(Base): + @pytest.fixture(scope="class") + def project_config_update(self, unique_schema): + return { + "managed-schemas": [ + { + "database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + "prune-models": "skip", + } + ] + } + + def test_should_not_raise_exception( + self, + project, + ): + run_dbt(["run"]) + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + project.update_models( + { + "model_b.sql": model(self.materialized), + } + ) + + run_dbt(["--warn-error", "manage"]) + + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + def test_should_not_delete_anything( + self, + project, + ): + run_dbt(["run"]) + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + project.update_models( + { + "model_b.sql": model(self.materialized), + } + ) + run_dbt(["manage"]) + + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + +class TestDefaultAction(TestSkip): + @pytest.fixture(scope="class") + def project_config_update(self, unique_schema): + return { + "managed-schemas": [ + { + "database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + } + ] + } + + +class TestCustomSchema(Base): + custom_schema = "custom" + + @pytest.fixture(scope="class") + def models(self): + return { + "model_a.sql": model(self.materialized, self.custom_schema), + "model_b.sql": model(self.materialized, self.custom_schema), + } + + @pytest.fixture(scope="class") + def project_config_update(self, unique_schema): + return { + "managed-schemas": [ + { + "database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + "prune-models": "drop", + "schema": self.custom_schema, + } + ] + } + + def test( + self, + project, + ): + run_dbt(["run"]) + check_table_does_exist(project.adapter, f"{self._generate_schema_name(project)}.model_a") + check_table_does_exist(project.adapter, f"{self._generate_schema_name(project)}.model_b") + + project.update_models( + { + "model_a.sql": model(self.materialized), + "model_b.sql": model(self.materialized, self.custom_schema), + } + ) + run_dbt(["manage"]) + + check_table_does_not_exist( + project.adapter, f"{self._generate_schema_name(project)}.model_a" + ) + check_table_does_not_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, f"{self._generate_schema_name(project)}.model_b") + + def _generate_schema_name(self, project): + return f"{project.test_schema}_{self.custom_schema}" + + +class TestDuplicateConfiguration(Base): + @pytest.fixture(scope="class") + def project_config_update(self, unique_schema): + return { + "managed-schemas": [ + { + "database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + "prune-models": "drop", + }, + { + "database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + "prune-models": "warn", + }, + ] + } + + def test( + self, + project, + ): + run_dbt(["run"]) + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + project.update_models( + { + "model_b.sql": model(self.materialized), + } + ) + with pytest.raises(ValidationException): + run_dbt(["manage"]) + + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b")