Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom SQL for get source maxLoadedAt #11163

Merged
merged 3 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241217-171631.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Calculate source freshness via a SQL query
time: 2024-12-17T17:16:31.841076-08:00
custom:
Author: ChenyuLInx
Issue: "8797"
1 change: 1 addition & 0 deletions core/dbt/artifacts/resources/v1/source_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ParsedSourceMandatory(GraphResource, HasRelationMetadata):
class SourceDefinition(ParsedSourceMandatory):
quoting: Quoting = field(default_factory=Quoting)
loaded_at_field: Optional[str] = None
loaded_at_query: Optional[str] = None
freshness: Optional[FreshnessThreshold] = None
external: Optional[ExternalTable] = None
description: str = ""
Expand Down
18 changes: 16 additions & 2 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@

# Base context collection, used for parsing configs.
class ProviderContext(ManifestContext):
# subclasses are MacroContext, ModelContext, TestContext
# subclasses are MacroContext, ModelContext, TestContext, SourceContext
def __init__(
self,
model,
Expand All @@ -893,7 +893,7 @@
raise DbtInternalError(f"Invalid provider given to context: {provider}")
# mypy appeasement - we know it'll be a RuntimeConfig
self.config: RuntimeConfig
self.model: Union[Macro, ManifestNode] = model
self.model: Union[Macro, ManifestNode, SourceDefinition] = model
super().__init__(config, manifest, model.package_name)
self.sql_results: Dict[str, Optional[AttrDict]] = {}
self.context_config: Optional[ContextConfig] = context_config
Expand Down Expand Up @@ -1558,6 +1558,20 @@
self._search_package = search_package


class SourceContext(ProviderContext):
# SourceContext is being used to render jinja SQL during execution of
# custom SQL in source freshness. It is not used for parsing.
model: SourceDefinition

@contextproperty()
def this(self) -> Optional[RelationProxy]:
return self.db_wrapper.Relation.create_from(self.config, self.model)

Check warning on line 1568 in core/dbt/context/providers.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/context/providers.py#L1568

Added line #L1568 was not covered by tests

@contextproperty()
def source_node(self) -> SourceDefinition:
return self.model

Check warning on line 1572 in core/dbt/context/providers.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/context/providers.py#L1572

Added line #L1572 was not covered by tests


class ModelContext(ProviderContext):
model: ManifestNode

Expand Down
4 changes: 4 additions & 0 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ class UnparsedSourceTableDefinition(HasColumnTests, HasColumnAndTestProps):
config: Dict[str, Any] = field(default_factory=dict)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
identifier: Optional[str] = None
quoting: Quoting = field(default_factory=Quoting)
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
Expand All @@ -345,6 +346,7 @@ class UnparsedSourceDefinition(dbtClassMixin):
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
tables: List[UnparsedSourceTableDefinition] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)
Expand Down Expand Up @@ -382,6 +384,7 @@ class SourceTablePatch(dbtClassMixin):
docs: Optional[Docs] = None
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
identifier: Optional[str] = None
quoting: Quoting = field(default_factory=Quoting)
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
Expand Down Expand Up @@ -425,6 +428,7 @@ class SourcePatch(dbtClassMixin):
freshness: Optional[Optional[FreshnessThreshold]] = field(default_factory=FreshnessThreshold)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
tables: Optional[List[SourceTablePatch]] = None
tags: Optional[List[str]] = None

Expand Down
17 changes: 13 additions & 4 deletions core/dbt/parser/schema_renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,21 @@ def _is_norender_key(self, keypath: Keypath) -> bool:
"tests" and "data_tests" are both currently supported but "tests" has been deprecated
"""
# top level descriptions and data_tests
if len(keypath) >= 1 and keypath[0] in ("tests", "data_tests", "description"):
if len(keypath) >= 1 and keypath[0] in (
"tests",
"data_tests",
"description",
"loaded_at_query",
):
return True

# columns descriptions and data_tests
if len(keypath) == 2 and keypath[1] in ("tests", "data_tests", "description"):
if len(keypath) == 2 and keypath[1] in (
"tests",
"data_tests",
"description",
"loaded_at_query",
):
return True

# pre- and post-hooks
Expand Down Expand Up @@ -69,9 +79,8 @@ def _is_norender_key(self, keypath: Keypath) -> bool:
def should_render_keypath(self, keypath: Keypath) -> bool:
if len(keypath) < 1:
return True

if self.key == "sources":
if keypath[0] == "description":
if keypath[0] in ("description", "loaded_at_query"):
return False
if keypath[0] == "tables":
if self._is_norender_key(keypath[2:]):
Expand Down
19 changes: 19 additions & 0 deletions core/dbt/parser/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
UnparsedSourceTableDefinition,
)
from dbt.events.types import FreshnessConfigProblem, UnusedTables
from dbt.exceptions import ParsingError
from dbt.node_types import NodeType
from dbt.parser.common import ParserRef
from dbt.parser.schema_generic_tests import SchemaGenericTestParser
Expand Down Expand Up @@ -131,11 +132,28 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
# We need to be able to tell the difference between explicitly setting the loaded_at_field to None/null
# and when it's simply not set. This allows a user to override the source level loaded_at_field so that
# specific table can default to metadata-based freshness.
if table.loaded_at_field_present and table.loaded_at_query:
raise ParsingError(
"Cannot specify both loaded_at_field and loaded_at_query at table level."
)
if source.loaded_at_field and source.loaded_at_query:
raise ParsingError(
"Cannot specify both loaded_at_field and loaded_at_query at source level."
)

if table.loaded_at_field_present or table.loaded_at_field is not None:
loaded_at_field = table.loaded_at_field
else:
loaded_at_field = source.loaded_at_field # may be None, that's okay

loaded_at_query: Optional[str]
if table.loaded_at_query is not None:
loaded_at_query = table.loaded_at_query
else:
if table.loaded_at_field_present:
loaded_at_query = None
else:
loaded_at_query = source.loaded_at_query
freshness = merge_freshness(source.freshness, table.freshness)
quoting = source.quoting.merged(table.quoting)
# path = block.path.original_file_path
Expand Down Expand Up @@ -185,6 +203,7 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
meta=meta,
loader=source.loader,
loaded_at_field=loaded_at_field,
loaded_at_query=loaded_at_query,
freshness=freshness,
quoting=quoting,
resource_type=NodeType.Source,
Expand Down
20 changes: 18 additions & 2 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
PartialSourceFreshnessResult,
SourceFreshnessResult,
)
from dbt.clients import jinja
from dbt.context.providers import RuntimeProvider, SourceContext
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import HookNode, SourceDefinition
from dbt.contracts.results import RunStatus
Expand Down Expand Up @@ -114,7 +116,22 @@
adapter_response: Optional[AdapterResponse] = None
freshness: Optional[FreshnessResponse] = None

if compiled_node.loaded_at_field is not None:
if compiled_node.loaded_at_query is not None:
# within the context user can have access to `this`, `source_node`(`model` will point to the same thing), etc
compiled_code = jinja.get_rendered(

Check warning on line 121 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L121

Added line #L121 was not covered by tests
compiled_node.loaded_at_query,
SourceContext(
compiled_node, self.config, manifest, RuntimeProvider(), None
).to_dict(),
compiled_node,
)
adapter_response, freshness = self.adapter.calculate_freshness_from_custom_sql(

Check warning on line 128 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L128

Added line #L128 was not covered by tests
relation,
compiled_code,
macro_resolver=manifest,
)
status = compiled_node.freshness.status(freshness["age"])

Check warning on line 133 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L133

Added line #L133 was not covered by tests
elif compiled_node.loaded_at_field is not None:
adapter_response, freshness = self.adapter.calculate_freshness(
relation,
compiled_node.loaded_at_field,
Expand Down Expand Up @@ -146,7 +163,6 @@
raise DbtRuntimeError(
f"Could not compute freshness for source {compiled_node.name}: no 'loaded_at_field' provided and {self.adapter.type()} adapter does not support metadata-based freshness checks."
)

# adapter_response was not returned in previous versions, so this will be None
# we cannot call to_dict() on NoneType
if adapter_response:
Expand Down
2 changes: 1 addition & 1 deletion core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
"dbt-semantic-interfaces>=0.8.3,<0.9",
# Minor versions for these are expected to be backwards-compatible
"dbt-common>=1.13.0,<2.0",
"dbt-adapters>=1.10.1,<2.0",
"dbt-adapters>=1.13.0,<2.0",
# ----
# Expect compatibility with all new versions of these packages, so lower bounds only.
"packaging>20.9",
Expand Down
22 changes: 22 additions & 0 deletions schemas/dbt/manifest/v12.json
Original file line number Diff line number Diff line change
Expand Up @@ -7752,6 +7752,17 @@
],
"default": null
},
"loaded_at_query": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"freshness": {
"anyOf": [
{
Expand Down Expand Up @@ -17540,6 +17551,17 @@
],
"default": null
},
"loaded_at_query": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"freshness": {
"anyOf": [
{
Expand Down
2 changes: 2 additions & 0 deletions tests/functional/artifacts/expected_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ def expected_seeded_manifest(project, model_database=None, quote_model=False):
},
"identifier": "seed",
"loaded_at_field": None,
"loaded_at_query": None,
"loader": "a_loader",
"meta": {},
"name": "my_table",
Expand Down Expand Up @@ -1299,6 +1300,7 @@ def expected_references_manifest(project):
},
"identifier": "seed",
"loaded_at_field": None,
"loaded_at_query": None,
"loader": "a_loader",
"meta": {},
"name": "my_table",
Expand Down
23 changes: 23 additions & 0 deletions tests/functional/sources/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,3 +472,26 @@
- name: test_table
identifier: source
"""

freshness_via_custom_sql_schema_yml = """version: 2
sources:
- name: test_source
freshness:
warn_after: {count: 10, period: hour}
schema: "{{ var(env_var('DBT_TEST_SCHEMA_NAME_VARIABLE')) }}"
quoting:
identifier: True
tags:
- my_test_source_tag
tables:
- name: source_a
identifier: source
loaded_at_field: "{{ var('test_loaded_at') | as_text }}"
- name: source_b
identifier: source
loaded_at_query: "select max({{ var('test_loaded_at') | as_text }}) from {{this}}"
- name: source_c
identifier: source
loaded_at_query: "select {{current_timestamp()}}"

"""
16 changes: 16 additions & 0 deletions tests/functional/sources/test_source_freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
error_models_model_sql,
error_models_schema_yml,
filtered_models_schema_yml,
freshness_via_custom_sql_schema_yml,
freshness_via_metadata_schema_yml,
override_freshness_models_schema_yml,
)
Expand Down Expand Up @@ -578,3 +579,18 @@ def test_hooks_do_not_run_for_source_freshness(
)
# default behaviour - no hooks run in source freshness
self._assert_project_hooks_not_called(log_output)


class TestSourceFreshnessCustomSQL(SuccessfulSourceFreshnessTest):
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_via_custom_sql_schema_yml}

def test_source_freshness_custom_sql(self, project):
result = self.run_dbt_with_vars(project, ["source", "freshness"], expect_pass=True)
# They are the same source but different queries were executed for each
assert {r.node.name: r.status for r in result} == {
"source_a": "warn",
"source_b": "warn",
"source_c": "pass",
}
Loading
Loading