Skip to content

Commit

Permalink
wip: move manage logic to separate command
Browse files Browse the repository at this point in the history
  • Loading branch information
Axel Goblet committed Sep 16, 2022
1 parent b765429 commit 3a17595
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 61 deletions.
5 changes: 0 additions & 5 deletions core/dbt/config/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ def __init__(
user_config: UserConfig,
threads: int,
credentials: Credentials,
manage_schemas: bool = False,
):
"""Explicitly defining `__init__` to work around bug in Python 3.9.7
https://bugs.python.org/issue45081
Expand All @@ -110,7 +109,6 @@ def __init__(
self.threads = threads
self.credentials = credentials
self.profile_env_vars = {} # never available on init
self.manage_schemas = manage_schemas

def to_profile_info(self, serialize_credentials: bool = False) -> Dict[str, Any]:
"""Unlike to_project_config, this dict is not a mirror of any existing
Expand Down Expand Up @@ -242,7 +240,6 @@ def from_credentials(
profile_name: str,
target_name: str,
user_config: Optional[Dict[str, Any]] = None,
manage_schemas: bool = False,
) -> "Profile":
"""Create a profile from an existing set of Credentials and the
remaining information.
Expand All @@ -267,7 +264,6 @@ def from_credentials(
user_config=user_config_obj,
threads=threads,
credentials=credentials,
manage_schemas=manage_schemas,
)
profile.validate()
return profile
Expand Down Expand Up @@ -359,7 +355,6 @@ def from_raw_profile_info(
target_name=target_name,
threads=threads,
user_config=user_config,
manage_schemas=profile_data.get("manage_schemas", False),
)

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions core/dbt/config/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,13 @@ def create_project(self, rendered: RenderComponents) -> "Project":
)
test_paths: List[str] = value_or(cfg.test_paths, ["tests"])
analysis_paths: List[str] = value_or(cfg.analysis_paths, ["analyses"])
snapshot_paths: List[SchemaManagementConfiguration] = value_or(cfg.snapshot_paths, ["snapshots"])
snapshot_paths: List[str] = value_or(cfg.snapshot_paths, ["snapshots"])

all_source_paths: List[str] = _all_source_paths(
model_paths, seed_paths, snapshot_paths, analysis_paths, macro_paths
)

managed_schemas: List[str] = value_or(cfg.managed_schemas, [])
managed_schemas: List[SchemaManagementConfiguration] = value_or(cfg.managed_schemas, [])
docs_paths: List[str] = value_or(cfg.docs_paths, all_source_paths)
asset_paths: List[str] = value_or(cfg.asset_paths, [])
target_path: str = flag_or(flags.TARGET_PATH, cfg.target_path, "target")
Expand Down
1 change: 0 additions & 1 deletion core/dbt/config/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ def from_parts(
args=args,
cli_vars=cli_vars,
dependencies=dependencies,
manage_schemas=profile.manage_schemas,
)

# Called by 'load_projects' in this class
Expand Down
1 change: 0 additions & 1 deletion core/dbt/contracts/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ def resolve(self, connection: Connection) -> Connection:
class Credentials(ExtensibleDbtClassMixin, Replaceable, metaclass=abc.ABCMeta):
database: str
schema: str
manage_schemas: bool
_ALIASES: ClassVar[Dict[str, str]] = field(default={}, init=False)

@abc.abstractproperty
Expand Down
17 changes: 17 additions & 0 deletions core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import dbt.task.serve as serve_task
import dbt.task.snapshot as snapshot_task
import dbt.task.test as test_task
import dbt.task.manage as manage_task
from dbt.profiler import profiler
from dbt.adapters.factory import reset_adapters, cleanup_connections

Expand Down Expand Up @@ -448,6 +449,21 @@ def _build_debug_subparser(subparsers, base_subparser):
return sub


def _build_manage_subparser(subparsers, base_subparser):
sub = subparsers.add_parser(
"manage",
parents=[base_subparser],
help="""
Drops relations that are present in the database and absent in the DBT models.
Not to be confused with the clean command which deletes folders rather than relations.
""",
)
_add_version_check(sub)
sub.set_defaults(cls=manage_task.ManageTask, which="manage", rpc_method="manage")
return sub


def _build_deps_subparser(subparsers, base_subparser):
sub = subparsers.add_parser(
"deps",
Expand Down Expand Up @@ -1150,6 +1166,7 @@ def parse_args(args, cls=DBTArgumentParser):
_build_debug_subparser(subs, base_subparser)
_build_deps_subparser(subs, base_subparser)
_build_list_subparser(subs, base_subparser)
_build_manage_subparser(subs, base_subparser)

build_sub = _build_build_subparser(subs, base_subparser)
snapshot_sub = _build_snapshot_subparser(subs, base_subparser)
Expand Down
68 changes: 68 additions & 0 deletions core/dbt/task/manage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# coding=utf-8
from typing import Dict, Set, Tuple, AbstractSet
from .compile import CompileTask
from dbt.exceptions import (
warn_or_error,
)
from dbt.parser.manifest import ManifestLoader
from dbt.adapters.factory import get_adapter
from dbt.contracts.graph.parsed import (
ParsedModelNode,
)


class ManageTask(CompileTask):
def before_run(self, adapter, selected_uids: AbstractSet[str]):
required_schemas = self.get_model_schemas(adapter, selected_uids)
self.populate_adapter_cache(adapter, required_schemas)

def run(self):
manifest = ManifestLoader.get_full_manifest(self.config)

managed_schemas_actions_config: Dict[Tuple[str, str], str] = {
(ms.database or "", ms.schema or ""): ms.action or "warn"
for ms in self.config.managed_schemas
}

if len(managed_schemas_actions_config) == 0:
warn_or_error(
"Schema management enabled for connection but no schemas configured to manage"
)
return

# TODO consider compilation before management to enforce valid models
# was_successfull_complete_run = not any(
# r.status in (NodeStatus.Error, NodeStatus.Fail, NodeStatus.Skipped) for r in results
# )
# if not was_successfull_complete_run:
# warn("One or more models failed, skipping schema management")
# return

models_in_results: Set[Tuple[str, str, str]] = set(
(n.database, n.schema, n.name)
for n in manifest.nodes.values()
if isinstance(n, ParsedModelNode)
)

adapter = get_adapter(self.config)
with adapter.connection_named("master"):
for database, schema in managed_schemas_actions_config.keys():
available_models: Dict[Tuple[str, str, str], str] = {
(database, schema, relation.identifier): relation
for relation in adapter.list_relations(database, schema)
}
if len(available_models) == 0:
warn_or_error(
f"No modules in managed schema '{schema}' for database '{database}'"
)
should_act_upon = available_models.keys() - models_in_results
for (target_database, target_schema, target_identifier) in should_act_upon:
target_action = managed_schemas_actions_config[
(target_database, target_schema)
]
if target_action == "warn":
print("WARN ABOUT ", target_database, target_schema, target_identifier)
elif target_action == "drop":
adapter.drop_relation(
available_models[(target_database, target_schema, target_identifier)]
)
52 changes: 0 additions & 52 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
from dbt.contracts.graph.parsed import ParsedHookNode
from dbt.contracts.results import NodeStatus, RunResult, RunStatus, RunningStatus
from dbt.exceptions import (
warn_or_error,
warn,
CompilationException,
InternalException,
RuntimeException,
Expand Down Expand Up @@ -449,7 +447,6 @@ def after_run(self, adapter, results):
}
with adapter.connection_named("master"):
self.safe_run_hooks(adapter, RunHookType.End, extras)
self.manage_schema(adapter, results)

def after_hooks(self, adapter, results, elapsed):
self.print_results_line(results, elapsed)
Expand All @@ -470,52 +467,3 @@ def get_runner_type(self, _):
def task_end_messages(self, results):
if results:
print_run_end_messages(results)

def manage_schema(self, adapter, results: List[RunResult]):
# Read config
manage_schemas_config = self.config.manage_schemas # type: bool
managed_schemas_actions_config: Dict[Tuple[str, str], str] = {
(ms.database or "", ms.schema or ""): ms.action or "warn"
for ms in self.config.managed_schemas
}

if not manage_schemas_config:
# TODO debug not doing anything
warn("Schema's configured to be managed, but manage_schemas is false in the profile")
return

if len(managed_schemas_actions_config) == 0:
warn_or_error("Schema management enabled for connection but no schema's configured to manage")
return

# Never manage schema if we have a failed node
was_successfull_complete_run = not any(
r.status in (NodeStatus.Error, NodeStatus.Fail, NodeStatus.Skipped) for r in results
)
if not was_successfull_complete_run and manage_schemas_config:
warn("One or more models failed, skipping schema management")
return

models_in_results: Set[Tuple[str, str, str]] = set(
(r.node.database, r.node.schema, r.node.identifier)
for r in results
if (r.node.is_relational and not r.node.is_ephemeral_model)
)

for database, schema in managed_schemas_actions_config.keys():
available_models: Dict[Tuple[str, str, str], str] = {
(database, schema, relation.identifier): relation
for relation in adapter.list_relations(database, schema)
}
if len(available_models) == 0:
warn_or_error(f"No modules in managed schema '{schema}' for database '{database}'")
should_act_upon = available_models.keys() - models_in_results
for (target_database, target_schema, target_identifier) in should_act_upon:
target_action = managed_schemas_actions_config[(target_database, target_schema)]
if target_action == "warn":
print("WARN ABOUT ", target_database, target_schema, target_identifier)
elif target_action == "drop":
adapter.drop_relation(
available_models[(target_database, target_schema, target_identifier)]
)

0 comments on commit 3a17595

Please sign in to comment.