Skip to content

Commit

Permalink
Introduce TableLastModifiedMetadataBatch and implement BaseAdapter.ca…
Browse files Browse the repository at this point in the history
…lculate_freshness_from_metadata_batch (#127)
  • Loading branch information
MichelleArk authored Apr 12, 2024
1 parent 67315c7 commit b65b761
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 23 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240325-180611.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Introduce TableLastModifiedMetadataBatch and implement BaseAdapter.calculate_freshness_from_metadata_batch
time: 2024-03-25T18:06:11.816163-04:00
custom:
Author: michelleark
Issue: "138"
117 changes: 94 additions & 23 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,7 @@ def execute_macro(
project: Optional[str] = None,
context_override: Optional[Dict[str, Any]] = None,
kwargs: Optional[Dict[str, Any]] = None,
needs_conn: bool = False,
) -> AttrDict:
"""Look macro_name up in the manifest and execute its results.
Expand All @@ -1074,6 +1075,10 @@ def execute_macro(
execution context.
:param kwargs: An optional dict of keyword args used to pass to the
macro.
: param needs_conn: A boolean that indicates whether the specified macro
requires an open connection to execute. If needs_conn is True, a
connection is expected and opened if necessary. Otherwise (and by default),
no connection is expected prior to executing the macro.
"""

if kwargs is None:
Expand Down Expand Up @@ -1106,6 +1111,10 @@ def execute_macro(

macro_function = CallableMacroGenerator(macro, macro_context)

if needs_conn:
connection = self.connections.get_thread_connection()
self.connections.open(connection)

with self.connections.exception_handler(f"macro {macro_name}"):
result = macro_function(**kwargs)
return result
Expand Down Expand Up @@ -1297,48 +1306,110 @@ def calculate_freshness(
}
return adapter_response, freshness

def calculate_freshness_from_metadata_batch(
self,
sources: List[BaseRelation],
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[List[Optional[AdapterResponse]], Dict[BaseRelation, FreshnessResponse]]:
"""
Given a list of sources (BaseRelations), calculate the metadata-based freshness in batch.
This method should _not_ execute a warehouse query per source, but rather batch up
the sources into as few requests as possible to minimize the number of roundtrips required
to compute metadata-based freshness for each input source.
:param sources: The list of sources to calculate metadata-based freshness for
:param macro_resolver: An optional macro_resolver to use for get_relation_last_modified
:return: a tuple where:
* the first element is a list of optional AdapterResponses indicating the response
for each request the method made to compute the freshness for the provided sources.
* the second element is a dictionary mapping an input source BaseRelation to a FreshnessResponse,
if it was possible to calculate a FreshnessResponse for the source.
"""
# Track schema, identifiers of sources for lookup from batch query
schema_identifier_to_source = {
(
source.path.get_lowered_part(ComponentName.Schema),
source.path.get_lowered_part(ComponentName.Identifier),
): source
for source in sources
}

# Group metadata sources by information schema -- one query per information schema will be necessary
sources_by_info_schema: Dict[InformationSchema, List[BaseRelation]] = self._get_catalog_relations_by_info_schema(sources)

freshness_responses: Dict[BaseRelation, FreshnessResponse] = {}
adapter_responses: List[Optional[AdapterResponse]] = []
for (
information_schema,
sources_for_information_schema,
) in sources_by_info_schema.items():
result = self.execute_macro(
GET_RELATION_LAST_MODIFIED_MACRO_NAME,
kwargs={
"information_schema": information_schema,
"relations": sources_for_information_schema,
},
macro_resolver=macro_resolver,
needs_conn=True,
)
adapter_response, table = result.response, result.table # type: ignore[attr-defined]
adapter_responses.append(adapter_response)

for row in table:
raw_relation, freshness_response = self._parse_freshness_row(row, table)
source_relation_for_result = schema_identifier_to_source[raw_relation]
freshness_responses[source_relation_for_result] = freshness_response

return adapter_responses, freshness_responses

def calculate_freshness_from_metadata(
self,
source: BaseRelation,
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
kwargs: Dict[str, Any] = {
"information_schema": source.information_schema_only(),
"relations": [source],
}
result = self.execute_macro(
GET_RELATION_LAST_MODIFIED_MACRO_NAME,
kwargs=kwargs,
adapter_responses, freshness_responses = self.calculate_freshness_from_metadata_batch(
sources=[source],
macro_resolver=macro_resolver,
)
adapter_response, table = result.response, result.table # type: ignore[attr-defined]

try:
from dbt_common.clients.agate_helper import get_column_value_uncased
adapter_response = adapter_responses[0] if adapter_responses else None
return adapter_response, freshness_responses[source]

row = table[0]
last_modified_val = get_column_value_uncased("last_modified", row)
snapshotted_at_val = get_column_value_uncased("snapshotted_at", row)
except Exception:
raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table)

if last_modified_val is None:
def _create_freshness_response(
self, last_modified: Optional[datetime], snapshotted_at: Optional[datetime]
) -> FreshnessResponse:
if last_modified is None:
# Interpret missing value as "infinitely long ago"
max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
else:
max_loaded_at = _utc(last_modified_val, None, "last_modified")

snapshotted_at = _utc(snapshotted_at_val, None, "snapshotted_at")
max_loaded_at = _utc(last_modified, None, "last_modified")

snapshotted_at = _utc(snapshotted_at, None, "snapshotted_at")
age = (snapshotted_at - max_loaded_at).total_seconds()

freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}

return adapter_response, freshness
return freshness

def _parse_freshness_row(self, row: "agate.Row", table: "agate.Table") -> Tuple[Any, FreshnessResponse]:
from dbt_common.clients.agate_helper import get_column_value_uncased

try:
last_modified_val = get_column_value_uncased("last_modified", row)
snapshotted_at_val = get_column_value_uncased("snapshotted_at", row)
identifier = get_column_value_uncased("identifier", row)
schema = get_column_value_uncased("schema", row)
except Exception:
raise MacroResultError(GET_RELATION_LAST_MODIFIED_MACRO_NAME, table)

freshness_response = self._create_freshness_response(
last_modified_val,
snapshotted_at_val
)
raw_relation = schema.lower().strip(), identifier.lower().strip()
return raw_relation, freshness_response

def pre_model_hook(self, config: Mapping[str, Any]) -> Any:
"""A hook for running some operation before the model materialization
Expand Down
3 changes: 3 additions & 0 deletions dbt/adapters/capability.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ class Capability(str, Enum):
TableLastModifiedMetadata = "TableLastModifiedMetadata"
"""Indicates support for determining the time of the last table modification by querying database metadata."""

TableLastModifiedMetadataBatch = "TableLastModifiedMetadataBatch"
"""Indicates support for performantly determining the time of the last table modification by querying database metadata in batch."""


class Support(str, Enum):
Unknown = "Unknown"
Expand Down

0 comments on commit b65b761

Please sign in to comment.