Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-2546] Create classes for dbt clone to run from the cli #7552

Closed
Tracked by #7301
stu-k opened this issue May 8, 2023 · 0 comments · Fixed by #7881
Closed
Tracked by #7301

[CT-2546] Create classes for dbt clone to run from the cli #7552

stu-k opened this issue May 8, 2023 · 0 comments · Fixed by #7881
Assignees

Comments

@stu-k
Copy link
Contributor

stu-k commented May 8, 2023

Description

As part of the dbt clone work, we need to create the click command, task class, task runner to actually run the clone command.

Jeremy has a draft PR up that accomplishes this, but we are not beholden to that exact implementation.

Acceptance criteria

  • the dbt clone command is runnable from the command line
  • dbt clone is tested

Dependencies

Links from the draft PR

Creating the click command

# dbt clone
@cli.command("clone")
@click.pass_context
@p.exclude
@p.full_refresh
@p.profile
@p.profiles_dir
@p.project_dir
@p.resource_type
@p.select
@p.selector
@p.state # required
@p.target
@p.target_path
@p.threads
@p.vars
@p.version_check
@requires.preflight
@requires.profile
@requires.project
@requires.runtime_config
@requires.manifest
@requires.postflight
def clone(ctx, **kwargs):
"""Create clones of selected nodes based on their location in the manifest provided to --state."""
task = CloneTask(
ctx.obj["flags"],
ctx.obj["runtime_config"],
ctx.obj["manifest"],
)
results = task.run()
success = task.interpret_results(results)
return results, success

Creating the clone task class

class CloneTask(GraphRunnableTask):
def raise_on_first_error(self):
return False
def get_model_schemas(self, adapter, selected_uids: Iterable[str]) -> Set[BaseRelation]:
if self.manifest is None:
raise DbtInternalError("manifest was None in get_model_schemas")
result: Set[BaseRelation] = set()
for node in self.manifest.nodes.values():
if node.unique_id not in selected_uids:
continue
if node.is_relational and not node.is_ephemeral:
relation = adapter.Relation.create_from(self.config, node)
result.add(relation.without_identifier())
# cache the 'other' schemas too!
if node.state_relation: # type: ignore
other_relation = adapter.Relation.create_from_node(
self.config, node.state_relation # type: ignore
)
result.add(other_relation.without_identifier())
return result
def before_run(self, adapter, selected_uids: AbstractSet[str]):
with adapter.connection_named("master"):
# unlike in other tasks, we want to add information from the --state manifest *before* caching!
self.defer_to_manifest(adapter, selected_uids)
# only create *our* schemas, but cache *other* schemas in addition
schemas_to_create = super().get_model_schemas(adapter, selected_uids)
self.create_schemas(adapter, schemas_to_create)
schemas_to_cache = self.get_model_schemas(adapter, selected_uids)
self.populate_adapter_cache(adapter, schemas_to_cache)
@property
def resource_types(self):
if not self.args.resource_types:
return NodeType.refable()
values = set(self.args.resource_types)
if "all" in values:
values.remove("all")
values.update(NodeType.refable())
values = [NodeType(val) for val in values if val in NodeType.refable()]
return list(values)
def get_node_selector(self) -> ResourceTypeSelector:
resource_types = self.resource_types
if self.manifest is None or self.graph is None:
raise DbtInternalError("manifest and graph must be set to get perform node selection")
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=resource_types,
)
def get_runner_type(self, _):
return CloneRunner
def _get_deferred_manifest(self) -> Optional[WritableManifest]:
state = self.previous_state
if state is None:
raise DbtRuntimeError(
"--state is required for cloning relations from another environment"
)
if state.manifest is None:
raise DbtRuntimeError(f'Could not find manifest in --state path: "{self.args.state}"')
return state.manifest
# Note that this is different behavior from --defer with other commands, which *merge*
# selected nodes from this manifest + unselected nodes from the other manifest
def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]):
deferred_manifest = self._get_deferred_manifest()
if deferred_manifest is None:
return
if self.manifest is None:
raise DbtInternalError(
"Expected to defer to manifest, but there is no runtime manifest to defer from!"
)
self.manifest.add_from_artifact(other=deferred_manifest)
# TODO: is it wrong to write the manifest here? I think it's right...
write_manifest(self.manifest, self.config.target_path)

Creating the clone runner class

class CloneRunner(BaseRunner):
def before_execute(self):
pass
def after_execute(self, result):
pass
def _build_run_model_result(self, model, context):
result = context["load_result"]("main")
if result:
status = RunStatus.Success
message = str(result.response)
else:
status = RunStatus.Success
message = "No-op"
adapter_response = {}
if result and isinstance(result.response, dbtClassMixin):
adapter_response = result.response.to_dict(omit_none=True)
return RunResult(
node=model,
status=status,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
message=message,
adapter_response=adapter_response,
failures=None,
)
def compile(self, manifest):
# no-op
return self.node
def _materialization_relations(self, result: Any, model) -> List[BaseRelation]:
if isinstance(result, str):
msg = (
'The materialization ("{}") did not explicitly return a '
"list of relations to add to the cache.".format(str(model.get_materialization()))
)
raise CompilationError(msg, node=model)
if isinstance(result, dict):
return _validate_materialization_relations_dict(result, model)
msg = (
"Invalid return value from materialization, expected a dict "
'with key "relations", got: {}'.format(str(result))
)
raise CompilationError(msg, node=model)
def execute(self, model, manifest):
context = generate_runtime_model_context(model, self.config, manifest)
materialization_macro = manifest.find_materialization_macro_by_name(
self.config.project_name, "clone", self.adapter.type()
)
if "config" not in context:
raise DbtInternalError(
"Invalid materialization context generated, missing config: {}".format(context)
)
context_config = context["config"]
hook_ctx = self.adapter.pre_model_hook(context_config)
try:
result = MacroGenerator(
materialization_macro, context, stack=context["context_macro_stack"]
)()
finally:
self.adapter.post_model_hook(context_config, hook_ctx)
for relation in self._materialization_relations(result, model):
self.adapter.cache_added(relation.incorporate(dbt_created=True))
return self._build_run_model_result(model, context)

Tests
https://github.com/dbt-labs/dbt-core/pull/7258/files#diff-822c7d2541e891d6705e38440243e4ec59c625461b0d225070a67cde07521fc4

@github-actions github-actions bot changed the title Create classes for dbt clone to run from the cli [CT-2546] Create classes for dbt clone to run from the cli May 8, 2023
@jtcohen6 jtcohen6 linked a pull request Jun 21, 2023 that will close this issue
6 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants