diff --git a/.changes/unreleased/Features-20220920-122529.yaml b/.changes/unreleased/Features-20220920-122529.yaml new file mode 100644 index 00000000000..09af06903a6 --- /dev/null +++ b/.changes/unreleased/Features-20220920-122529.yaml @@ -0,0 +1,7 @@ +kind: Features +body: Added a `manage` CLI command that allows users to drop unused database relations +time: 2022-09-20T12:25:29.226182+02:00 +custom: + Author: agoblet bneijt + Issue: "4957" + PR: "5392" diff --git a/core/dbt/config/project.py b/core/dbt/config/project.py index f22436c68a3..5e6d070ed19 100644 --- a/core/dbt/config/project.py +++ b/core/dbt/config/project.py @@ -37,6 +37,7 @@ from dbt.contracts.project import ( Project as ProjectContract, SemverString, + SchemaManagementConfiguration, ) from dbt.contracts.project import PackageConfig, ProjectPackageMetadata from dbt.contracts.publication import ProjectDependencies @@ -429,6 +430,7 @@ def create_project(self, rendered: RenderComponents) -> "Project": model_paths, seed_paths, snapshot_paths, analysis_paths, macro_paths ) + managed_schemas: List[SchemaManagementConfiguration] = value_or(cfg.managed_schemas, []) docs_paths: List[str] = value_or(cfg.docs_paths, all_source_paths) asset_paths: List[str] = value_or(cfg.asset_paths, []) flags = get_flags() @@ -503,6 +505,7 @@ def create_project(self, rendered: RenderComponents) -> "Project": asset_paths=asset_paths, target_path=target_path, snapshot_paths=snapshot_paths, + managed_schemas=managed_schemas, clean_targets=clean_targets, log_path=log_path, packages_install_path=packages_install_path, @@ -618,6 +621,7 @@ class Project: asset_paths: List[str] target_path: str snapshot_paths: List[str] + managed_schemas: List[SchemaManagementConfiguration] clean_targets: List[str] log_path: str packages_install_path: str @@ -695,6 +699,7 @@ def to_project_config(self, with_packages=False): "asset-paths": self.asset_paths, "target-path": self.target_path, "snapshot-paths": self.snapshot_paths, + "managed-schemas": [schema.to_dict() for schema in self.managed_schemas], "clean-targets": self.clean_targets, "log-path": self.log_path, "quoting": self.quoting, diff --git a/core/dbt/config/runtime.py b/core/dbt/config/runtime.py index d6119318e3c..681a08248d5 100644 --- a/core/dbt/config/runtime.py +++ b/core/dbt/config/runtime.py @@ -148,6 +148,7 @@ def from_parts( asset_paths=project.asset_paths, target_path=project.target_path, snapshot_paths=project.snapshot_paths, + managed_schemas=project.managed_schemas, clean_targets=project.clean_targets, log_path=project.log_path, packages_install_path=project.packages_install_path, diff --git a/core/dbt/contracts/project.py b/core/dbt/contracts/project.py index 581932e5888..1bfbe22f6a1 100644 --- a/core/dbt/contracts/project.py +++ b/core/dbt/contracts/project.py @@ -7,6 +7,7 @@ HyphenatedDbtClassMixin, ExtensibleDbtClassMixin, register_pattern, + StrEnum, ) from dataclasses import dataclass, field from typing import Optional, List, Dict, Union, Any @@ -181,6 +182,19 @@ class RegistryPackageMetadata( } +class PruneModelsAction(StrEnum): + SKIP = "skip" + DROP = "drop" + WARN = "warn" + + +@dataclass +class SchemaManagementConfiguration(HyphenatedDbtClassMixin, Replaceable): + database: Optional[str] = None + schema: Optional[str] = None + prune_models: Optional[PruneModelsAction] = None + + @dataclass class Project(HyphenatedDbtClassMixin, Replaceable): name: Identifier @@ -198,6 +212,7 @@ class Project(HyphenatedDbtClassMixin, Replaceable): asset_paths: Optional[List[str]] = None target_path: Optional[str] = None snapshot_paths: Optional[List[str]] = None + managed_schemas: Optional[List[SchemaManagementConfiguration]] = None clean_targets: Optional[List[str]] = None profile: Optional[str] = None log_path: Optional[str] = None diff --git a/core/dbt/task/manage.py b/core/dbt/task/manage.py new file mode 100644 index 00000000000..f755784c5d6 --- /dev/null +++ b/core/dbt/task/manage.py @@ -0,0 +1,80 @@ +# coding=utf-8 +from typing import Dict, Set, Tuple + +from .compile import CompileTask +from .runnable import ManifestTask +from dbt.exceptions import warn_or_error, ValidationException +from dbt.adapters.factory import get_adapter +from dbt.contracts.graph.parsed import ( + ParsedModelNode, +) +from dbt.contracts.project import PruneModelsAction + + +class ManageTask(CompileTask): + def run(self): + ManifestTask._runtime_initialize(self) + models_in_codebase = self.manifest.nodes.keys() + adapter = get_adapter(self.config) + + with adapter.connection_named("master"): + required_schemas = self.get_model_schemas(adapter, models_in_codebase) + self.populate_adapter_cache(adapter, required_schemas) + + adapter.clear_transaction() + self._prune_models(adapter) + + def _prune_models(self, adapter): + self._assert_schema_uniqueness() + + if len(self.config.managed_schemas) == 0: + warn_or_error("No schema's configured to manage") + return + + models_in_codebase: Set[Tuple[str, str, str]] = set( + (n.config.database, n.config.schema, n.config.alias) + for n in self.manifest.nodes.values() + if isinstance(n, ParsedModelNode) + ) + + # get default 'database' + 'schema' for active target + creds = adapter.connections.profile.credentials + default_database, default_schema = creds.database, creds.schema + + for config in self.config.managed_schemas: + database = config.database or default_database + schema = config.schema or default_schema + + models_in_database: Dict[Tuple[str, str, str], str] = { + (database, schema, relation.identifier): relation + for relation in adapter.list_relations(database, schema) + } + if len(models_in_database) == 0: + warn_or_error( + f"No objects in managed schema '{database}.{schema}'" + ) + + should_act_upon = models_in_database.keys() - models_in_codebase + + for (target_database, target_schema, target_identifier) in sorted(should_act_upon): + target_action = config.prune_models or PruneModelsAction.SKIP + if target_action == PruneModelsAction.WARN: + warn_or_error( + f"Found unused model {target_database}.{target_schema}.{target_identifier}" + ) + elif target_action == PruneModelsAction.DROP: + adapter.drop_relation( + models_in_database[(target_database, target_schema, target_identifier)] + ) + + def _assert_schema_uniqueness(self): + schemas = set() + + for config in self.config.managed_schemas: + schema = (config.database, config.schema) + if schema in schemas: + raise ValidationException(f"Duplicate schema found: {schema}") + schemas.add(schema) + + def interpret_results(self, results): + return True diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 4b1cea04727..eb7942a8cc8 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -21,11 +21,11 @@ from dbt.contracts.graph.nodes import HookNode, ResultNode from dbt.contracts.results import NodeStatus, RunResult, RunStatus, RunningStatus, BaseResult from dbt.exceptions import ( - CompilationError, - DbtInternalError, - MissingMaterializationError, - DbtRuntimeError, - DbtValidationError, + CompilationException, + InternalException, + RuntimeException, + ValidationException, + missing_materialization, ) from dbt.events.functions import fire_event, get_invocation_id from dbt.events.types import ( diff --git a/core/dbt/tests/fixtures/project.py b/core/dbt/tests/fixtures/project.py index 19e418003fe..9da3242f1a6 100644 --- a/core/dbt/tests/fixtures/project.py +++ b/core/dbt/tests/fixtures/project.py @@ -464,6 +464,11 @@ def get_tables_in_schema(self): result = self.run_sql(sql, fetch="all") return {model_name: materialization for (model_name, materialization) in result} + def update_models(self, models: dict): + """Update the modules in the test project""" + self.project_root.join("models").remove() + write_project_files(self.project_root, "models", models) + # This is the main fixture that is used in all functional tests. It pulls in the other # fixtures that are necessary to set up a dbt project, and saves some of the information diff --git a/tests/functional/schema_management/README.md b/tests/functional/schema_management/README.md new file mode 100644 index 00000000000..5a380f6ce7a --- /dev/null +++ b/tests/functional/schema_management/README.md @@ -0,0 +1 @@ +Test schema management as introduced by https://github.com/dbt-labs/dbt-core/issues/4957 diff --git a/tests/functional/schema_management/test_schema_management.py b/tests/functional/schema_management/test_schema_management.py new file mode 100644 index 00000000000..5459d2bca2d --- /dev/null +++ b/tests/functional/schema_management/test_schema_management.py @@ -0,0 +1,296 @@ +import pytest +import os +from dbt.exceptions import CompilationException, ValidationException +from dbt.tests.util import run_dbt, check_table_does_exist, check_table_does_not_exist + + +def model(materialized, unique_schema=None): + return f""" + {{{{ + config( + materialized = "{materialized}", + schema = {f'"{unique_schema}"' if unique_schema is not None else "None"} + ) + }}}} + SELECT * FROM ( + VALUES (1, 'one'), + (2, 'two'), + (3, 'three') + ) AS t (num,letter) + """ + + +class Base: + materialized = "table" + + @pytest.fixture(scope="class") + def models(self): + return { + "model_a.sql": model(self.materialized), + "model_b.sql": model(self.materialized), + } + + @pytest.fixture(scope="class") + def dbt_profile_target(self): + return { + "type": "postgres", + "threads": 4, + "host": "localhost", + "port": int(os.getenv("POSTGRES_TEST_PORT", 5432)), + "user": os.getenv("POSTGRES_TEST_USER", "root"), + "pass": os.getenv("POSTGRES_TEST_PASS", "password"), + "dbname": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + } + + +class TestMissingConfiguration(Base): + def test_should_raise_exception( + self, + project, + ): + run_dbt(["run"]) + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + project.update_models( + { + "model_b.sql": model(self.materialized), + } + ) + with pytest.raises(CompilationException): + run_dbt(["--warn-error", "manage"]) + + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + def test_should_not_delete_anything( + self, + project, + ): + run_dbt(["run"]) + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + project.update_models( + { + "model_b.sql": model(self.materialized), + } + ) + run_dbt(["manage"]) + + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + +class TestUnmanagedSchema(TestMissingConfiguration): + @pytest.fixture(scope="class") + def project_config_update(self, unique_schema): + return { + "managed-schemas": [ + { + "database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + "schema": "some_other_schema", + "prune-models": "drop", + } + ] + } + + +class TestEmptyConfiguration(TestMissingConfiguration): + @pytest.fixture(scope="class") + def project_config_update(self, unique_schema): + return {"managed-schemas": []} + + +class TestWarn(TestMissingConfiguration): + @pytest.fixture(scope="class") + def project_config_update(self, unique_schema): + return { + "managed-schemas": [ + { + "database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + "prune-models": "warn", + } + ] + } + + +class TestDrop(Base): + @pytest.fixture(scope="class") + def project_config_update(self, unique_schema): + return { + "managed-schemas": [ + { + "database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + "prune-models": "drop", + } + ] + } + + def test( + self, + project, + ): + run_dbt(["run"]) + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + project.update_models( + { + "model_b.sql": model(self.materialized), + } + ) + run_dbt(["manage"]) + + check_table_does_not_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + +class TestDropView(TestDrop): + materialized = "view" + + +class TestSkip(Base): + @pytest.fixture(scope="class") + def project_config_update(self, unique_schema): + return { + "managed-schemas": [ + { + "database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + "prune-models": "skip", + } + ] + } + + def test_should_not_raise_exception( + self, + project, + ): + run_dbt(["run"]) + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + project.update_models( + { + "model_b.sql": model(self.materialized), + } + ) + + run_dbt(["--warn-error", "manage"]) + + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + def test_should_not_delete_anything( + self, + project, + ): + run_dbt(["run"]) + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + project.update_models( + { + "model_b.sql": model(self.materialized), + } + ) + run_dbt(["manage"]) + + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + +class TestDefaultAction(TestSkip): + @pytest.fixture(scope="class") + def project_config_update(self, unique_schema): + return { + "managed-schemas": [ + { + "database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + } + ] + } + + +class TestCustomSchema(Base): + custom_schema = "custom" + + @pytest.fixture(scope="class") + def models(self): + return { + "model_a.sql": model(self.materialized, self.custom_schema), + "model_b.sql": model(self.materialized, self.custom_schema), + } + + @pytest.fixture(scope="class") + def project_config_update(self, unique_schema): + return { + "managed-schemas": [ + { + "database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + "prune-models": "drop", + "schema": self.custom_schema, + } + ] + } + + def test( + self, + project, + ): + run_dbt(["run"]) + check_table_does_exist(project.adapter, f"{self._generate_schema_name(project)}.model_a") + check_table_does_exist(project.adapter, f"{self._generate_schema_name(project)}.model_b") + + project.update_models( + { + "model_a.sql": model(self.materialized), + "model_b.sql": model(self.materialized, self.custom_schema), + } + ) + run_dbt(["manage"]) + + check_table_does_not_exist( + project.adapter, f"{self._generate_schema_name(project)}.model_a" + ) + check_table_does_not_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, f"{self._generate_schema_name(project)}.model_b") + + def _generate_schema_name(self, project): + return f"{project.test_schema}_{self.custom_schema}" + + +class TestDuplicateConfiguration(Base): + @pytest.fixture(scope="class") + def project_config_update(self, unique_schema): + return { + "managed-schemas": [ + { + "database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + "prune-models": "drop", + }, + { + "database": os.getenv("POSTGRES_TEST_DATABASE", "dbt"), + "prune-models": "warn", + }, + ] + } + + def test( + self, + project, + ): + run_dbt(["run"]) + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b") + + project.update_models( + { + "model_b.sql": model(self.materialized), + } + ) + with pytest.raises(ValidationException): + run_dbt(["manage"]) + + check_table_does_exist(project.adapter, "model_a") + check_table_does_exist(project.adapter, "model_b")