From 247b2906bc25558b26c50057fbf87fe00e9110ca Mon Sep 17 00:00:00 2001 From: Paul Yang Date: Tue, 29 Oct 2024 15:48:48 -0700 Subject: [PATCH] Pass `MeasureSpecProperties` in `DataflowPlanBuilder`. --- .../dataflow/builder/dataflow_plan_builder.py | 37 ++++++++++--------- .../builder/measure_spec_properties.py | 2 + 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 78e74ad8bb..502a66829a 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -986,9 +986,11 @@ def _build_measure_spec_properties(self, measure_specs: Sequence[MeasureSpec]) - ) semantic_model_name = semantic_model_names.pop() - agg_time_dimension = self._semantic_model_lookup.measure_lookup.get_properties( - measure_specs[0].reference - ).agg_time_dimension_reference + measure_reference = measure_specs[0].reference + measure_properties = self._semantic_model_lookup.measure_lookup.get_properties(measure_reference) + agg_time_dimension = measure_properties.agg_time_dimension_reference + agg_time_dimension_grain = measure_properties.agg_time_granularity + non_additive_dimension_spec = measure_specs[0].non_additive_dimension_spec for measure_spec in measure_specs: if non_additive_dimension_spec != measure_spec.non_additive_dimension_spec: @@ -1002,6 +1004,7 @@ def _build_measure_spec_properties(self, measure_specs: Sequence[MeasureSpec]) - measure_specs=tuple(measure_specs), semantic_model_name=semantic_model_name, agg_time_dimension=agg_time_dimension, + agg_time_dimension_grain=agg_time_dimension_grain, non_additive_dimension_spec=non_additive_dimension_spec, ) @@ -1470,7 +1473,7 @@ def __get_required_and_extraneous_linkable_specs( self, queried_linkable_specs: LinkableSpecSet, filter_specs: Sequence[WhereFilterSpec], - non_additive_dimension_spec: Optional[NonAdditiveDimensionSpec] = None, + measure_spec_properties: Optional[MeasureSpecProperties] = None, ) -> Tuple[LinkableSpecSet, LinkableSpecSet]: """Get the required and extraneous linkable specs for this query. @@ -1480,15 +1483,18 @@ def __get_required_and_extraneous_linkable_specs( linkable_spec_sets_to_merge: List[LinkableSpecSet] = [] for filter_spec in filter_specs: linkable_spec_sets_to_merge.append(LinkableSpecSet.create_from_specs(filter_spec.linkable_specs)) - if non_additive_dimension_spec: - non_additive_dimension_grain = self._semantic_model_lookup.get_defined_time_granularity( - TimeDimensionReference(non_additive_dimension_spec.name) + + if measure_spec_properties is not None: + non_additive_dimension_spec = ( + measure_spec_properties.non_additive_dimension_spec if measure_spec_properties else None ) - linkable_spec_sets_to_merge.append( - LinkableSpecSet.create_from_specs( - non_additive_dimension_spec.linkable_specs(non_additive_dimension_grain) + if non_additive_dimension_spec is not None: + agg_time_dimension_grain = measure_spec_properties.agg_time_dimension_grain + linkable_spec_sets_to_merge.append( + LinkableSpecSet.create_from_specs( + non_additive_dimension_spec.linkable_specs(agg_time_dimension_grain) + ) ) - ) extraneous_linkable_specs = LinkableSpecSet.merge_iterable(linkable_spec_sets_to_merge).dedupe() required_linkable_specs = queried_linkable_specs.merge(extraneous_linkable_specs).dedupe() @@ -1521,8 +1527,6 @@ def _build_aggregated_measure_from_measure_source_node( else None ) measure_properties = self._build_measure_spec_properties([measure_spec]) - non_additive_dimension_spec = measure_properties.non_additive_dimension_spec - cumulative_metric_adjusted_time_constraint: Optional[TimeRangeConstraint] = None if cumulative and predicate_pushdown_state.time_range_constraint is not None: logger.debug( @@ -1551,7 +1555,7 @@ def _build_aggregated_measure_from_measure_source_node( required_linkable_specs, extraneous_linkable_specs = self.__get_required_and_extraneous_linkable_specs( queried_linkable_specs=queried_linkable_specs, filter_specs=metric_input_measure_spec.filter_spec_set.all_filter_specs, - non_additive_dimension_spec=non_additive_dimension_spec, + measure_spec_properties=measure_properties, ) before_aggregation_time_spine_join_description = ( @@ -1706,12 +1710,11 @@ def _build_aggregated_measure_from_measure_source_node( where_specs=metric_input_measure_spec.filter_spec_set.all_filter_specs, ) + non_additive_dimension_spec = measure_properties.non_additive_dimension_spec if non_additive_dimension_spec is not None: # Apply semi additive join on the node agg_time_dimension = measure_properties.agg_time_dimension - non_additive_dimension_grain = self._semantic_model_lookup.get_defined_time_granularity( - TimeDimensionReference(non_additive_dimension_spec.name) - ) + non_additive_dimension_grain = measure_properties.agg_time_dimension_grain queried_time_dimension_spec: Optional[ TimeDimensionSpec ] = self._find_non_additive_dimension_in_linkable_specs( diff --git a/metricflow/dataflow/builder/measure_spec_properties.py b/metricflow/dataflow/builder/measure_spec_properties.py index 48cf127bae..ec1c472eac 100644 --- a/metricflow/dataflow/builder/measure_spec_properties.py +++ b/metricflow/dataflow/builder/measure_spec_properties.py @@ -4,6 +4,7 @@ from typing import Optional, Sequence from dbt_semantic_interfaces.references import TimeDimensionReference +from dbt_semantic_interfaces.type_enums import TimeGranularity from metricflow_semantics.specs.measure_spec import MeasureSpec from metricflow_semantics.specs.non_additive_dimension_spec import NonAdditiveDimensionSpec @@ -15,4 +16,5 @@ class MeasureSpecProperties: measure_specs: Sequence[MeasureSpec] semantic_model_name: str agg_time_dimension: TimeDimensionReference + agg_time_dimension_grain: TimeGranularity non_additive_dimension_spec: Optional[NonAdditiveDimensionSpec] = None