Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce TableLastModifiedMetadataBatch and implement BaseAdapter.calculate_freshness_from_metadata_batch #127

Merged
merged 29 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2beba0b
extend RelationConfig and MaterializationConfig
MichelleArk Mar 1, 2024
151b131
Merge branch 'main' into batch-metadata-freshness
MichelleArk Mar 11, 2024
727093c
first pass
MichelleArk Mar 11, 2024
ffaf808
accept information schema in calculate_freshness_from_metadata_batch
MichelleArk Mar 11, 2024
b66644a
implement calculate_freshness_from_metadata in terms of calculate_fre…
MichelleArk Mar 11, 2024
54670e0
add TableLastModifiedMetadataBatch capability
MichelleArk Mar 11, 2024
8046a3e
return batched freshness results keyed by (schema, identifier)
MichelleArk Mar 13, 2024
1dbf3e9
Merge branch 'main' into batch-metadata-freshness
MichelleArk Mar 13, 2024
2c28576
Merge branch 'main' into batch-metadata-freshness
MichelleArk Mar 14, 2024
a9cffcd
Merge branch 'main' into batch-metadata-freshness
MichelleArk Mar 22, 2024
9af8c85
handle queries across information_schema in calculate_freshness_from_…
MichelleArk Mar 25, 2024
82d8217
Merge branch 'main' into batch-metadata-freshness
MichelleArk Mar 25, 2024
ed81529
refactor _create_freshness_response
MichelleArk Mar 25, 2024
c44779a
make schema_identifier_to_source type-safe
MichelleArk Mar 25, 2024
62be8df
update TableLastModifiedMetadataBatch description
MichelleArk Mar 25, 2024
9a6c829
changelog entry
MichelleArk Mar 25, 2024
17701ad
refactor to _get_catalog_relations_by_info_schema, _parse_freshness_row
MichelleArk Mar 28, 2024
511cf14
sanitize raw_relation for freshness batch calculation
MichelleArk Apr 2, 2024
01f27a5
ensure a connection is open if possible prior to executing macro
MichelleArk Apr 4, 2024
05c7149
Merge branch 'main' into batch-metadata-freshness
MichelleArk Apr 4, 2024
154e066
fix agate typing
MichelleArk Apr 4, 2024
8c707cc
lazy load agate_helper
MichelleArk Apr 4, 2024
8a1deac
add needs_conn to BaseAdapter.execute_macro
MichelleArk Apr 4, 2024
f3dcac2
docstring
MichelleArk Apr 4, 2024
ebee880
Merge branch 'main' into batch-metadata-freshness
MichelleArk Apr 5, 2024
02988c8
Merge branch 'main' into batch-metadata-freshness
colin-rogers-dbt Apr 11, 2024
0bbf7ed
cleanup adapter_responses parsing in calculate_freshness_from_metadat…
MichelleArk Apr 11, 2024
a43f04c
Merge branch 'main' into batch-metadata-freshness
MichelleArk Apr 12, 2024
7cf501e
Merge branch 'main' into batch-metadata-freshness
MichelleArk Apr 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240325-180611.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Introduce TableLastModifiedMetadataBatch and implement BaseAdapter.calculate_freshness_from_metadata_batch
time: 2024-03-25T18:06:11.816163-04:00
custom:
Author: michelleark
Issue: "138"
117 changes: 94 additions & 23 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,7 @@ def execute_macro(
project: Optional[str] = None,
context_override: Optional[Dict[str, Any]] = None,
kwargs: Optional[Dict[str, Any]] = None,
needs_conn: bool = False,
) -> AttrDict:
"""Look macro_name up in the manifest and execute its results.

Expand All @@ -1074,6 +1075,10 @@ def execute_macro(
execution context.
:param kwargs: An optional dict of keyword args used to pass to the
macro.
: param needs_conn: A boolean that indicates whether the specified macro
requires an open connection to execute. If needs_conn is True, a
connection is expected and opened if necessary. Otherwise (and by default),
no connection is expected prior to executing the macro.
"""

if kwargs is None:
Expand Down Expand Up @@ -1106,6 +1111,10 @@ def execute_macro(

macro_function = CallableMacroGenerator(macro, macro_context)

if needs_conn:
connection = self.connections.get_thread_connection()
self.connections.open(connection)

with self.connections.exception_handler(f"macro {macro_name}"):
result = macro_function(**kwargs)
return result
Expand Down Expand Up @@ -1297,48 +1306,110 @@ def calculate_freshness(
}
return adapter_response, freshness

def calculate_freshness_from_metadata_batch(
self,
sources: List[BaseRelation],
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[List[Optional[AdapterResponse]], Dict[BaseRelation, FreshnessResponse]]:
"""
Given a list of sources (BaseRelations), calculate the metadata-based freshness in batch.
This method should _not_ execute a warehouse query per source, but rather batch up
the sources into as few requests as possible to minimize the number of roundtrips required
to compute metadata-based freshness for each input source.

:param sources: The list of sources to calculate metadata-based freshness for
:param macro_resolver: An optional macro_resolver to use for get_relation_last_modified
:return: a tuple where:
* the first element is a list of optional AdapterResponses indicating the response
for each request the method made to compute the freshness for the provided sources.
* the second element is a dictionary mapping an input source BaseRelation to a FreshnessResponse,
if it was possible to calculate a FreshnessResponse for the source.
"""
# Track schema, identifiers of sources for lookup from batch query
schema_identifier_to_source = {
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved
(
source.path.get_lowered_part(ComponentName.Schema),
source.path.get_lowered_part(ComponentName.Identifier),
): source
for source in sources
}

# Group metadata sources by information schema -- one query per information schema will be necessary
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved
sources_by_info_schema: Dict[InformationSchema, List[BaseRelation]] = self._get_catalog_relations_by_info_schema(sources)

freshness_responses: Dict[BaseRelation, FreshnessResponse] = {}
adapter_responses: List[Optional[AdapterResponse]] = []
for (
information_schema,
sources_for_information_schema,
) in sources_by_info_schema.items():
result = self.execute_macro(
GET_RELATION_LAST_MODIFIED_MACRO_NAME,
kwargs={
"information_schema": information_schema,
"relations": sources_for_information_schema,
},
macro_resolver=macro_resolver,
needs_conn=True,
)
adapter_response, table = result.response, result.table # type: ignore[attr-defined]
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved
adapter_responses.append(adapter_response)

for row in table:
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved
raw_relation, freshness_response = self._parse_freshness_row(row, table)
source_relation_for_result = schema_identifier_to_source[raw_relation]
freshness_responses[source_relation_for_result] = freshness_response

return adapter_responses, freshness_responses

def calculate_freshness_from_metadata(
self,
source: BaseRelation,
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
kwargs: Dict[str, Any] = {
"information_schema": source.information_schema_only(),
"relations": [source],
}
result = self.execute_macro(
GET_RELATION_LAST_MODIFIED_MACRO_NAME,
kwargs=kwargs,
adapter_responses, freshness_responses = self.calculate_freshness_from_metadata_batch(
sources=[source],
macro_resolver=macro_resolver,
)
adapter_response, table = result.response, result.table # type: ignore[attr-defined]

try:
from dbt_common.clients.agate_helper import get_column_value_uncased
adapter_response = adapter_responses[0] if adapter_responses else None
return adapter_response, list(freshness_responses.values())[0]
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved

row = table[0]
last_modified_val = get_column_value_uncased("last_modified", row)
snapshotted_at_val = get_column_value_uncased("snapshotted_at", row)
except Exception:
raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table)

if last_modified_val is None:
def _create_freshness_response(
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved
self, last_modified: Optional[datetime], snapshotted_at: Optional[datetime]
) -> FreshnessResponse:
if last_modified is None:
# Interpret missing value as "infinitely long ago"
max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
else:
max_loaded_at = _utc(last_modified_val, None, "last_modified")

snapshotted_at = _utc(snapshotted_at_val, None, "snapshotted_at")
max_loaded_at = _utc(last_modified, None, "last_modified")

snapshotted_at = _utc(snapshotted_at, None, "snapshotted_at")
age = (snapshotted_at - max_loaded_at).total_seconds()

freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}

return adapter_response, freshness
return freshness

def _parse_freshness_row(self, row: "agate.Row", table: "agate.Table") -> Tuple[Any, FreshnessResponse]:
from dbt_common.clients.agate_helper import get_column_value_uncased

try:
last_modified_val = get_column_value_uncased("last_modified", row)
snapshotted_at_val = get_column_value_uncased("snapshotted_at", row)
identifier = get_column_value_uncased("identifier", row)
schema = get_column_value_uncased("schema", row)
except Exception:
raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table)

freshness_response = self._create_freshness_response(
last_modified_val,
snapshotted_at_val
)
raw_relation = schema.lower().strip(), identifier.lower().strip()
return raw_relation, freshness_response

def pre_model_hook(self, config: Mapping[str, Any]) -> Any:
"""A hook for running some operation before the model materialization
Expand Down
3 changes: 3 additions & 0 deletions dbt/adapters/capability.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ class Capability(str, Enum):
TableLastModifiedMetadata = "TableLastModifiedMetadata"
"""Indicates support for determining the time of the last table modification by querying database metadata."""

TableLastModifiedMetadataBatch = "TableLastModifiedMetadataBatch"
"""Indicates support for performantly determining the time of the last table modification by querying database metadata in batch."""


class Support(str, Enum):
Unknown = "Unknown"
Expand Down
Loading