Skip to content

Commit

Permalink
Merge branch 'main' into unit_testing_feature_branch
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank committed Sep 11, 2023
2 parents 120b36e + b39eeb3 commit 2b376d9
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 52 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230906-120212.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Update metric helper functions to work with new semantic layer metrics
time: 2023-09-06T12:02:12.156534-07:00
custom:
Author: QMalcolm
Issue: "8134"
2 changes: 1 addition & 1 deletion core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ def resolve(self, target_name: str, target_package: Optional[str] = None) -> Met
target_package=target_package,
)

return ResolvedMetricReference(target_metric, self.manifest, self.Relation)
return ResolvedMetricReference(target_metric, self.manifest)


# `var` implementations.
Expand Down
87 changes: 46 additions & 41 deletions core/dbt/contracts/graph/metrics.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
from dbt.node_types import NodeType
from dbt.contracts.graph.manifest import Manifest, Metric
from dbt_semantic_interfaces.type_enums import MetricType

from typing import Any, Dict, Iterator, List


DERIVED_METRICS = [MetricType.DERIVED, MetricType.RATIO]
BASE_METRICS = [MetricType.SIMPLE, MetricType.CUMULATIVE]


class MetricReference(object):
Expand All @@ -17,76 +24,74 @@ class ResolvedMetricReference(MetricReference):
for working with metrics (ie. __str__ and templating functions)
"""

def __init__(self, node, manifest, Relation):
def __init__(self, node: Metric, manifest: Manifest) -> None:
super().__init__(node.name, node.package_name)
self.node = node
self.manifest = manifest
self.Relation = Relation

def __getattr__(self, key):
def __getattr__(self, key) -> Any:
return getattr(self.node, key)

def __str__(self):
def __str__(self) -> str:
return f"{self.node.name}"

@classmethod
def parent_metrics(cls, metric_node, manifest):
def parent_metrics(cls, metric_node: Metric, manifest: Manifest) -> Iterator[Metric]:
"""For a given metric, yeilds all upstream metrics."""
yield metric_node

for parent_unique_id in metric_node.depends_on.nodes:
node = manifest.metrics.get(parent_unique_id)
if node and node.resource_type == NodeType.Metric:
node = manifest.expect(parent_unique_id)
if isinstance(node, Metric):
yield from cls.parent_metrics(node, manifest)

@classmethod
def parent_metrics_names(cls, metric_node, manifest):
yield metric_node.name

for parent_unique_id in metric_node.depends_on.nodes:
node = manifest.metrics.get(parent_unique_id)
if node and node.resource_type == NodeType.Metric:
yield from cls.parent_metrics_names(node, manifest)
def parent_metrics_names(cls, metric_node: Metric, manifest: Manifest) -> Iterator[str]:
"""For a given metric, yeilds all upstream metric names"""
for metric in cls.parent_metrics(metric_node, manifest):
yield metric.name

@classmethod
def reverse_dag_parsing(cls, metric_node, manifest, metric_depth_count):
if metric_node.calculation_method == "derived":
def reverse_dag_parsing(
cls, metric_node: Metric, manifest: Manifest, metric_depth_count: int
) -> Iterator[Dict[str, int]]:
"""For the given metric, yeilds dictionaries having {<metric_name>: <depth_from_initial_metric} of upstream derived metrics.
This function is intended as a helper function for other metric helper functions.
"""
if metric_node.type in DERIVED_METRICS:
yield {metric_node.name: metric_depth_count}
metric_depth_count = metric_depth_count + 1

for parent_unique_id in metric_node.depends_on.nodes:
node = manifest.metrics.get(parent_unique_id)
if (
node
and node.resource_type == NodeType.Metric
and node.calculation_method == "derived"
):
yield from cls.reverse_dag_parsing(node, manifest, metric_depth_count)
for parent_unique_id in metric_node.depends_on.nodes:
node = manifest.expect(parent_unique_id)
if isinstance(node, Metric):
yield from cls.reverse_dag_parsing(node, manifest, metric_depth_count + 1)

def full_metric_dependency(self):
"""Returns a unique list of all upstream metric names."""
to_return = list(set(self.parent_metrics_names(self.node, self.manifest)))
return to_return

def base_metric_dependency(self):
def base_metric_dependency(self) -> List[str]:
"""Returns a unique list of names for all upstream non-derived metrics."""
in_scope_metrics = list(self.parent_metrics(self.node, self.manifest))
base_metrics = {
metric.name for metric in in_scope_metrics if metric.type not in DERIVED_METRICS
}

to_return = []
for metric in in_scope_metrics:
if metric.calculation_method != "derived" and metric.name not in to_return:
to_return.append(metric.name)

return to_return
return list(base_metrics)

def derived_metric_dependency(self):
def derived_metric_dependency(self) -> List[str]:
"""Returns a unique list of names for all upstream derived metrics."""
in_scope_metrics = list(self.parent_metrics(self.node, self.manifest))
derived_metrics = {
metric.name for metric in in_scope_metrics if metric.type in DERIVED_METRICS
}

to_return = []
for metric in in_scope_metrics:
if metric.calculation_method == "derived" and metric.name not in to_return:
to_return.append(metric.name)

return to_return
return list(derived_metrics)

def derived_metric_dependency_depth(self):
def derived_metric_dependency_depth(self) -> List[Dict[str, int]]:
"""Returns a list of {<metric_name>: <depth_from_initial_metric>} for all upstream metrics."""
metric_depth_count = 1
to_return = list(self.reverse_dag_parsing(self.node, self.manifest, metric_depth_count))

Expand Down
24 changes: 14 additions & 10 deletions tests/functional/metrics/test_metric_helper_functions.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
import pytest

from dbt.tests.util import run_dbt, get_manifest
from dbt.tests.util import run_dbt
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.metrics import ResolvedMetricReference

from tests.functional.metrics.fixtures import models_people_sql, basic_metrics_yml
from tests.functional.metrics.fixtures import (
models_people_sql,
basic_metrics_yml,
semantic_model_people_yml,
metricflow_time_spine_sql,
)


class TestMetricHelperFunctions:
@pytest.fixture(scope="class")
def models(self):
return {
"metrics.yml": basic_metrics_yml,
"semantic_people.yml": semantic_model_people_yml,
"metricflow_time_spine.sql": metricflow_time_spine_sql,
"people.sql": models_people_sql,
}

@pytest.mark.skip(
"TODO reactivate after we begin property hydrating metric `depends_on` and `refs`"
)
def test_expression_metric(
def test_derived_metric(
self,
project,
):

# initial parse
run_dbt(["compile"])
manifest = run_dbt(["parse"])
assert isinstance(manifest, Manifest)

# make sure all the metrics are in the manifest
manifest = get_manifest(project.project_root)
parsed_metric = manifest.metrics["metric.test.average_tenure_plus_one"]
testing_metric = ResolvedMetricReference(parsed_metric, manifest, None)
testing_metric = ResolvedMetricReference(parsed_metric, manifest)

full_metric_dependency = set(testing_metric.full_metric_dependency())
expected_full_metric_dependency = set(
Expand Down

0 comments on commit 2b376d9

Please sign in to comment.