-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updates to reflect a single input measure per metric #843
Merged
Merged
Changes from 1 commit
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,9 @@ | ||
from __future__ import annotations | ||
|
||
import collections | ||
import logging | ||
import time | ||
from dataclasses import dataclass | ||
from typing import DefaultDict, Dict, List, Optional, Sequence, Set, Tuple, Union | ||
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union | ||
|
||
from dbt_semantic_interfaces.enum_extension import assert_values_exhausted | ||
from dbt_semantic_interfaces.pretty_print import pformat_big_objects | ||
|
@@ -15,7 +14,6 @@ | |
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity | ||
|
||
from metricflow.dag.id_generation import DATAFLOW_PLAN_PREFIX, IdGeneratorRegistry | ||
from metricflow.dataflow.builder.measure_additiveness import group_measure_specs_by_additiveness | ||
from metricflow.dataflow.builder.node_data_set import DataflowPlanNodeOutputDataSetResolver | ||
from metricflow.dataflow.builder.node_evaluator import ( | ||
JoinLinkableInstancesRecipe, | ||
|
@@ -30,7 +28,6 @@ | |
ConstrainTimeRangeNode, | ||
DataflowPlan, | ||
FilterElementsNode, | ||
JoinAggregatedMeasuresByGroupByColumnsNode, | ||
JoinDescription, | ||
JoinOverTimeRangeNode, | ||
JoinToBaseOutputNode, | ||
|
@@ -253,22 +250,23 @@ def _build_metrics_output_node( | |
metric_specs=[metric_spec], | ||
) | ||
elif metric.type is MetricType.SIMPLE or MetricType.CUMULATIVE: | ||
metric_input_measure_specs = self._metric_lookup.measures_for_metric( | ||
metric_input_measure_spec = self._metric_lookup.measure_for_metric( | ||
metric_reference=metric_reference, | ||
column_association_resolver=self._column_association_resolver, | ||
) | ||
assert metric_input_measure_spec, "Simple and cumulative metrics must have one input measure." | ||
|
||
logger.info( | ||
f"For {metric_spec}, needed measures are:\n" | ||
f"{pformat_big_objects(metric_input_measure_specs=metric_input_measure_specs)}" | ||
f"For {metric_spec}, needed measure is:\n" | ||
f"{pformat_big_objects(metric_input_measure_spec=metric_input_measure_spec)}" | ||
) | ||
combined_where = where_constraint | ||
if metric_spec.constraint: | ||
combined_where = ( | ||
combined_where.combine(metric_spec.constraint) if combined_where else metric_spec.constraint | ||
) | ||
aggregated_measures_node = self.build_aggregated_measures( | ||
metric_input_measure_specs=metric_input_measure_specs, | ||
metric_input_measure_spec=metric_input_measure_spec, | ||
metric_spec=metric_spec, | ||
queried_linkable_specs=queried_linkable_specs, | ||
where_constraint=combined_where, | ||
|
@@ -641,7 +639,7 @@ def build_computed_metrics_node( | |
|
||
def build_aggregated_measures( | ||
self, | ||
metric_input_measure_specs: Sequence[MetricInputMeasureSpec], | ||
metric_input_measure_spec: MetricInputMeasureSpec, | ||
metric_spec: MetricSpec, | ||
queried_linkable_specs: LinkableSpecSet, | ||
where_constraint: Optional[WhereFilterSpec] = None, | ||
|
@@ -656,81 +654,29 @@ def build_aggregated_measures( | |
a composite set of aggregations originating from multiple semantic models, and joined into a single | ||
aggregated set of measures. | ||
""" | ||
output_nodes: List[BaseOutput] = [] | ||
semantic_models_and_constraints_to_measures: DefaultDict[ | ||
tuple[str, Optional[WhereFilterSpec]], List[MetricInputMeasureSpec] | ||
] = collections.defaultdict(list) | ||
for input_spec in metric_input_measure_specs: | ||
semantic_model_names = [ | ||
dsource.name | ||
for dsource in self._semantic_model_lookup.get_semantic_models_for_measure( | ||
measure_reference=input_spec.measure_spec.as_reference | ||
) | ||
] | ||
assert ( | ||
len(semantic_model_names) == 1 | ||
), f"Validation should enforce one semantic model per measure, but found {semantic_model_names} for {input_spec}!" | ||
semantic_models_and_constraints_to_measures[(semantic_model_names[0], input_spec.constraint)].append( | ||
input_spec | ||
) | ||
|
||
for (semantic_model, measure_constraint), measures in semantic_models_and_constraints_to_measures.items(): | ||
logger.info( | ||
f"Building aggregated measures for {semantic_model}. " | ||
f" Input measures: {measures} with constraints: {measure_constraint}" | ||
) | ||
if measure_constraint is None: | ||
node_where_constraint = where_constraint | ||
elif where_constraint is None: | ||
node_where_constraint = measure_constraint | ||
else: | ||
node_where_constraint = where_constraint.combine(measure_constraint) | ||
|
||
input_specs_by_measure_spec = {spec.measure_spec: spec for spec in measures} | ||
grouped_measures_by_additiveness = group_measure_specs_by_additiveness( | ||
tuple(input_specs_by_measure_spec.keys()) | ||
) | ||
measures_by_additiveness = grouped_measures_by_additiveness.measures_by_additiveness | ||
|
||
# Build output nodes for each distinct non-additive dimension spec, including the None case | ||
for non_additive_spec, measure_specs in measures_by_additiveness.items(): | ||
non_additive_message = "" | ||
if non_additive_spec is not None: | ||
non_additive_message = f" with non-additive dimension spec: {non_additive_spec}" | ||
|
||
logger.info(f"Building aggregated measures for {semantic_model}{non_additive_message}") | ||
input_specs = tuple(input_specs_by_measure_spec[measure_spec] for measure_spec in measure_specs) | ||
output_nodes.append( | ||
self._build_aggregated_measures_from_measure_source_node( | ||
metric_input_measure_specs=input_specs, | ||
metric_spec=metric_spec, | ||
queried_linkable_specs=queried_linkable_specs, | ||
where_constraint=node_where_constraint, | ||
time_range_constraint=time_range_constraint, | ||
cumulative=cumulative, | ||
cumulative_window=cumulative_window, | ||
cumulative_grain_to_date=cumulative_grain_to_date, | ||
) | ||
) | ||
|
||
if len(output_nodes) == 1: | ||
return output_nodes[0] | ||
measure_constraint = metric_input_measure_spec.constraint | ||
logger.info(f"Building aggregated measure: {metric_input_measure_spec} with constraint: {measure_constraint}") | ||
if measure_constraint is None: | ||
node_where_constraint = where_constraint | ||
elif where_constraint is None: | ||
node_where_constraint = measure_constraint | ||
else: | ||
return FilterElementsNode( | ||
parent_node=JoinAggregatedMeasuresByGroupByColumnsNode(parent_nodes=output_nodes), | ||
include_specs=InstanceSpecSet.merge( | ||
( | ||
queried_linkable_specs.as_spec_set, | ||
InstanceSpecSet( | ||
measure_specs=tuple(x.post_aggregation_spec for x in metric_input_measure_specs) | ||
), | ||
) | ||
), | ||
) | ||
node_where_constraint = where_constraint.combine(measure_constraint) | ||
|
||
return self._build_aggregated_measures_from_measure_source_node( | ||
metric_input_measure_spec=metric_input_measure_spec, | ||
metric_spec=metric_spec, | ||
queried_linkable_specs=queried_linkable_specs, | ||
where_constraint=node_where_constraint, | ||
time_range_constraint=time_range_constraint, | ||
cumulative=cumulative, | ||
cumulative_window=cumulative_window, | ||
cumulative_grain_to_date=cumulative_grain_to_date, | ||
) | ||
|
||
def _build_aggregated_measures_from_measure_source_node( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And this? |
||
self, | ||
metric_input_measure_specs: Sequence[MetricInputMeasureSpec], | ||
metric_input_measure_spec: MetricInputMeasureSpec, | ||
metric_spec: MetricSpec, | ||
queried_linkable_specs: LinkableSpecSet, | ||
where_constraint: Optional[WhereFilterSpec] = None, | ||
|
@@ -745,8 +691,8 @@ def _build_aggregated_measures_from_measure_source_node( | |
if time_dimension_spec.element_name == self._metric_time_dimension_reference.element_name | ||
] | ||
metric_time_dimension_requested = len(metric_time_dimension_specs) > 0 | ||
measure_specs = tuple(x.measure_spec for x in metric_input_measure_specs) | ||
measure_properties = self._build_measure_spec_properties(measure_specs) | ||
measure_spec = metric_input_measure_spec.measure_spec | ||
measure_properties = self._build_measure_spec_properties([measure_spec]) | ||
non_additive_dimension_spec = measure_properties.non_additive_dimension_spec | ||
|
||
cumulative_metric_adjusted_time_constraint: Optional[TimeRangeConstraint] = None | ||
|
@@ -781,7 +727,7 @@ def _build_aggregated_measures_from_measure_source_node( | |
required_linkable_specs = LinkableSpecSet.merge((queried_linkable_specs, extraneous_linkable_specs)) | ||
logger.info( | ||
f"Looking for a recipe to get:\n" | ||
f"{pformat_big_objects(measure_specs=measure_specs, required_linkable_set=required_linkable_specs)}" | ||
f"{pformat_big_objects(measure_specs=[measure_spec], required_linkable_set=required_linkable_specs)}" | ||
) | ||
|
||
find_recipe_start_time = time.time() | ||
|
@@ -800,7 +746,7 @@ def _build_aggregated_measures_from_measure_source_node( | |
if not measure_recipe: | ||
# TODO: Improve for better user understandability. | ||
raise UnableToSatisfyQueryError( | ||
f"Recipe not found for measure specs: {measure_specs} and linkable specs: {required_linkable_specs}" | ||
f"Recipe not found for measure spec: {measure_spec} and linkable specs: {required_linkable_specs}" | ||
) | ||
|
||
# If a cumulative metric is queried with metric_time, join over time range. | ||
|
@@ -832,7 +778,7 @@ def _build_aggregated_measures_from_measure_source_node( | |
parent_node=join_to_time_spine_node or time_range_node or measure_recipe.source_node, | ||
include_specs=InstanceSpecSet.merge( | ||
( | ||
InstanceSpecSet(measure_specs=measure_specs), | ||
InstanceSpecSet(measure_specs=(measure_spec,)), | ||
InstanceSpecSet.create_from_linkable_specs(measure_recipe.required_local_linkable_specs), | ||
) | ||
), | ||
|
@@ -848,7 +794,7 @@ def _build_aggregated_measures_from_measure_source_node( | |
|
||
specs_to_keep_after_join = InstanceSpecSet.merge( | ||
( | ||
InstanceSpecSet(measure_specs=measure_specs), | ||
InstanceSpecSet(measure_specs=(measure_spec,)), | ||
required_linkable_specs.as_spec_set, | ||
) | ||
) | ||
|
@@ -909,22 +855,16 @@ def _build_aggregated_measures_from_measure_source_node( | |
pre_aggregate_node = FilterElementsNode( | ||
parent_node=pre_aggregate_node, | ||
include_specs=InstanceSpecSet.merge( | ||
(InstanceSpecSet(measure_specs=measure_specs), queried_linkable_specs.as_spec_set) | ||
(InstanceSpecSet(measure_specs=(measure_spec,)), queried_linkable_specs.as_spec_set) | ||
), | ||
) | ||
aggregate_measures_node = AggregateMeasuresNode( | ||
parent_node=pre_aggregate_node, | ||
metric_input_measure_specs=tuple(metric_input_measure_specs), | ||
metric_input_measure_specs=(metric_input_measure_spec,), | ||
) | ||
|
||
join_aggregated_measure_to_time_spine = False | ||
for metric_input_measure in metric_input_measure_specs: | ||
if metric_input_measure.join_to_timespine: | ||
join_aggregated_measure_to_time_spine = True | ||
break | ||
|
||
# Only join to time spine if metric time was requested in the query. | ||
if join_aggregated_measure_to_time_spine and metric_time_dimension_requested: | ||
if metric_input_measure_spec.join_to_timespine and metric_time_dimension_requested: | ||
return JoinToTimeSpineNode( | ||
parent_node=aggregate_measures_node, | ||
requested_metric_time_dimension_specs=metric_time_dimension_specs, | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we rename this, too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call!