Skip to content

Commit

Permalink
Pass MeasureSpecProperties in DataflowPlanBuilder.
Browse files Browse the repository at this point in the history
  • Loading branch information
plypaul committed Oct 30, 2024
1 parent bdbd23b commit 247b290
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 17 deletions.
37 changes: 20 additions & 17 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -986,9 +986,11 @@ def _build_measure_spec_properties(self, measure_specs: Sequence[MeasureSpec]) -
)
semantic_model_name = semantic_model_names.pop()

agg_time_dimension = self._semantic_model_lookup.measure_lookup.get_properties(
measure_specs[0].reference
).agg_time_dimension_reference
measure_reference = measure_specs[0].reference
measure_properties = self._semantic_model_lookup.measure_lookup.get_properties(measure_reference)
agg_time_dimension = measure_properties.agg_time_dimension_reference
agg_time_dimension_grain = measure_properties.agg_time_granularity

non_additive_dimension_spec = measure_specs[0].non_additive_dimension_spec
for measure_spec in measure_specs:
if non_additive_dimension_spec != measure_spec.non_additive_dimension_spec:
Expand All @@ -1002,6 +1004,7 @@ def _build_measure_spec_properties(self, measure_specs: Sequence[MeasureSpec]) -
measure_specs=tuple(measure_specs),
semantic_model_name=semantic_model_name,
agg_time_dimension=agg_time_dimension,
agg_time_dimension_grain=agg_time_dimension_grain,
non_additive_dimension_spec=non_additive_dimension_spec,
)

Expand Down Expand Up @@ -1470,7 +1473,7 @@ def __get_required_and_extraneous_linkable_specs(
self,
queried_linkable_specs: LinkableSpecSet,
filter_specs: Sequence[WhereFilterSpec],
non_additive_dimension_spec: Optional[NonAdditiveDimensionSpec] = None,
measure_spec_properties: Optional[MeasureSpecProperties] = None,
) -> Tuple[LinkableSpecSet, LinkableSpecSet]:
"""Get the required and extraneous linkable specs for this query.
Expand All @@ -1480,15 +1483,18 @@ def __get_required_and_extraneous_linkable_specs(
linkable_spec_sets_to_merge: List[LinkableSpecSet] = []
for filter_spec in filter_specs:
linkable_spec_sets_to_merge.append(LinkableSpecSet.create_from_specs(filter_spec.linkable_specs))
if non_additive_dimension_spec:
non_additive_dimension_grain = self._semantic_model_lookup.get_defined_time_granularity(
TimeDimensionReference(non_additive_dimension_spec.name)

if measure_spec_properties is not None:
non_additive_dimension_spec = (
measure_spec_properties.non_additive_dimension_spec if measure_spec_properties else None
)
linkable_spec_sets_to_merge.append(
LinkableSpecSet.create_from_specs(
non_additive_dimension_spec.linkable_specs(non_additive_dimension_grain)
if non_additive_dimension_spec is not None:
agg_time_dimension_grain = measure_spec_properties.agg_time_dimension_grain
linkable_spec_sets_to_merge.append(
LinkableSpecSet.create_from_specs(
non_additive_dimension_spec.linkable_specs(agg_time_dimension_grain)
)
)
)

extraneous_linkable_specs = LinkableSpecSet.merge_iterable(linkable_spec_sets_to_merge).dedupe()
required_linkable_specs = queried_linkable_specs.merge(extraneous_linkable_specs).dedupe()
Expand Down Expand Up @@ -1521,8 +1527,6 @@ def _build_aggregated_measure_from_measure_source_node(
else None
)
measure_properties = self._build_measure_spec_properties([measure_spec])
non_additive_dimension_spec = measure_properties.non_additive_dimension_spec

cumulative_metric_adjusted_time_constraint: Optional[TimeRangeConstraint] = None
if cumulative and predicate_pushdown_state.time_range_constraint is not None:
logger.debug(
Expand Down Expand Up @@ -1551,7 +1555,7 @@ def _build_aggregated_measure_from_measure_source_node(
required_linkable_specs, extraneous_linkable_specs = self.__get_required_and_extraneous_linkable_specs(
queried_linkable_specs=queried_linkable_specs,
filter_specs=metric_input_measure_spec.filter_spec_set.all_filter_specs,
non_additive_dimension_spec=non_additive_dimension_spec,
measure_spec_properties=measure_properties,
)

before_aggregation_time_spine_join_description = (
Expand Down Expand Up @@ -1706,12 +1710,11 @@ def _build_aggregated_measure_from_measure_source_node(
where_specs=metric_input_measure_spec.filter_spec_set.all_filter_specs,
)

non_additive_dimension_spec = measure_properties.non_additive_dimension_spec
if non_additive_dimension_spec is not None:
# Apply semi additive join on the node
agg_time_dimension = measure_properties.agg_time_dimension
non_additive_dimension_grain = self._semantic_model_lookup.get_defined_time_granularity(
TimeDimensionReference(non_additive_dimension_spec.name)
)
non_additive_dimension_grain = measure_properties.agg_time_dimension_grain
queried_time_dimension_spec: Optional[
TimeDimensionSpec
] = self._find_non_additive_dimension_in_linkable_specs(
Expand Down
2 changes: 2 additions & 0 deletions metricflow/dataflow/builder/measure_spec_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Optional, Sequence

from dbt_semantic_interfaces.references import TimeDimensionReference
from dbt_semantic_interfaces.type_enums import TimeGranularity
from metricflow_semantics.specs.measure_spec import MeasureSpec
from metricflow_semantics.specs.non_additive_dimension_spec import NonAdditiveDimensionSpec

Expand All @@ -15,4 +16,5 @@ class MeasureSpecProperties:
measure_specs: Sequence[MeasureSpec]
semantic_model_name: str
agg_time_dimension: TimeDimensionReference
agg_time_dimension_grain: TimeGranularity
non_additive_dimension_spec: Optional[NonAdditiveDimensionSpec] = None

0 comments on commit 247b290

Please sign in to comment.