Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spike: inferred contracts #8339

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1382,7 +1382,8 @@ def sql(self) -> Optional[str]:
if self.model.language == ModelLanguage.sql: # type: ignore[union-attr]
# If the model is deferred and the adapter doesn't support zero-copy cloning, then select * from the prod
# relation
if getattr(self.model, "defer_relation", None):
# TODO: avoid routing on args.which if possible
if getattr(self.model, "defer_relation", None) and self.config.args.which == "clone":
Comment on lines +1385 to +1386
Copy link
Contributor

@jtcohen6 jtcohen6 Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely want to avoid the need for this. I had previously wondered if we'd be able to consolidate the deferral methods used by clone and other commands, by:

  • clobbering unselected resources with the production versions (to support "traditional" deferral)
  • adding defer_relation for selected resources only (to support clone)

(this comment/issue: #7965)

Edit: Michelle already made this change below. Then we can also refactor the clone task to use merge_from_artifact, and delete add_from_artifact.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussing live with @MichelleArk: We should overwrite compiled_code (also aliased as sql) within the clone task. Then we don't need to have this task-aware conditional logic live in the context method.

# TODO https://github.com/dbt-labs/dbt-core/issues/7976
return f"select * from {self.model.defer_relation.relation_name or str(self.defer_relation)}" # type: ignore[union-attr]
elif getattr(self.model, "extra_ctes_injected", None):
Expand Down
5 changes: 5 additions & 0 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,11 @@ def merge_from_artifact(
):
merged.add(unique_id)
self.nodes[unique_id] = node.replace(deferred=True)
elif current and (node.resource_type in refables and not node.is_ephemeral):
defer_relation = DeferRelation(
node.database, node.schema, node.alias, node.relation_name
)
self.nodes[unique_id] = current.replace(defer_relation=defer_relation)

# Rebuild the flat_graph, which powers the 'graph' context variable,
# now that we've deferred some nodes
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ def default(cls) -> "OnConfigurationChangeOption":
@dataclass
class ContractConfig(dbtClassMixin, Replaceable):
enforced: bool = False
inferred: bool = False


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,22 @@

{% macro table_columns_and_constraints() %}
{# loop through user_provided_columns to create DDL with data types and constraints #}
{%- if defer_relation and config.get('contract').inferred -%}
{% set prod_columns = get_column_schema_from_query(get_empty_subquery_sql("select * from " ~ defer_relation)) %}
{% set raw_columns = {} %}
{% for column in prod_columns -%}
{# TODO: reconsider this mutating operation, could be done with an additional warehouse call #}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 At minimum, we should probably be deepcopying to avoid mutating the user-provided value stored in the context... right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can either:

  • plumb it through (but this takes some time, and tightly couples the methods)
  • make the warehouse call twice (and consider caching/optimization possibilities in the future)

{%- if column.name not in model['columns'] -%}
{% do model['columns'].update(
{column.name: {'name': column.name, 'data_type': column.data_type, 'description': '', 'constraints': [], 'tags': [], 'meta': {}}}
) %}
{%- endif -%}
{% endfor -%}
{%- endif -%}

{%- set raw_column_constraints = adapter.render_raw_columns_constraints(raw_columns=model['columns']) -%}
{%- set raw_model_constraints = adapter.render_raw_model_constraints(raw_constraints=model['constraints']) -%}

(
{% for c in raw_column_constraints -%}
{{ c }}{{ "," if not loop.last or raw_model_constraints }}
Expand All @@ -36,15 +50,30 @@

{#-- First ensure the user has defined 'columns' in yaml specification --#}
{%- set user_defined_columns = model['columns'] -%}
{%- if not user_defined_columns -%}

{%- if not user_defined_columns and not config.get('contract').inferred -%}
{{ exceptions.raise_contract_error([], []) }}
{%- endif -%}

{#-- Obtain the column schema provided by sql file. #}
{%- set sql_file_provided_columns = get_column_schema_from_query(sql, config.get('sql_header', none)) -%}
{#--Obtain the column schema provided by the schema file by generating an 'empty schema' query from the model's columns. #}
{%- set schema_file_provided_columns = get_column_schema_from_query(get_empty_schema_sql(user_defined_columns)) -%}
{%- if defer_relation and config.get('contract').inferred -%}
{% set prod_columns = get_column_schema_from_query(get_empty_subquery_sql("select * from " ~ defer_relation)) %}

{% set provided_column_dict = {} %}
{%- for provided_column in schema_file_provided_columns-%}
{%- do provided_column_dict.update({provided_column.name: provided_column}) %}
{%- endfor -%}

{#-- Add any inferred columns not provided to full list of provided columns #}
{%- for prod_colum in prod_columns -%}
{%- if prod_colum.name not in provided_column_dict -%}
{% do schema_file_provided_columns.append(prod_colum) -%}
{%- endif -%}
{%- endfor -%}
{%- endif -%}
{#-- create dictionaries with name and formatted data type and strings for exception #}
{%- set sql_columns = format_columns(sql_file_provided_columns) -%}
{%- set yaml_columns = format_columns(schema_file_provided_columns) -%}
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ def _validate_constraint_prerequisites(self, model_node: ModelNode):
)

errors = []
if not model_node.columns:
if not model_node.columns and not model_node.config.contract.inferred:
errors.append(
"Constraints must be defined in a `yml` schema configuration file like `schema.yml`."
)
Expand Down
3 changes: 3 additions & 0 deletions plugins/postgres/dbt/include/postgres/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
{%- endif %} table {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{%- if contract_config.inferred and not defer_relation -%}
{{ exceptions.raise_compiler_error("Contract cannot be inferred without a --defer relation")}}
{%- endif -%}
{{ get_assert_columns_equivalent(sql) }}
{{ get_table_columns_and_constraints() }} ;
insert into {{ relation }} (
Expand Down