From dfbb7a73f495ea4e0b3b97bc8279d48bfe32e99b Mon Sep 17 00:00:00 2001 From: Courtney Holcomb Date: Thu, 21 Nov 2024 14:04:14 -0800 Subject: [PATCH] Bug fix: apply time constraints after time offsets Time constraints were being applied before time offsets in some scenarios. This is incorrect because the values will change during the offset. This fixes that. Customers likely never saw this bug because time constraints are only available to open source users, and this may never have been released to them. --- .../dataflow/builder/dataflow_plan_builder.py | 58 +++++++++---------- 1 file changed, 28 insertions(+), 30 deletions(-) diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 7cfb9a239..55bd1f34a 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -641,7 +641,8 @@ def _build_derived_metric_output_node( aggregated_to_elements=set(queried_linkable_specs.as_tuple), ) - # For ratio / derived metrics with time offset, apply offset & where constraint after metric computation. + # For ratio / derived metrics with time offset, apply offset join here. Constraints will be applied after the offset + # to avoid filtering out values that will be changed. if metric_spec.has_time_offset: queried_agg_time_dimension_specs = queried_linkable_specs.included_agg_time_dimension_specs_for_metric( metric_reference=metric_spec.reference, metric_lookup=self._metric_lookup @@ -649,28 +650,30 @@ def _build_derived_metric_output_node( output_node = JoinToTimeSpineNode.create( parent_node=output_node, requested_agg_time_dimension_specs=queried_agg_time_dimension_specs, - time_range_constraint=predicate_pushdown_state.time_range_constraint, offset_window=metric_spec.offset_window, offset_to_grain=metric_spec.offset_to_grain, join_type=SqlJoinType.INNER, ) - if len(metric_spec.filter_spec_set.all_filter_specs) > 0: - output_node = WhereConstraintNode.create( - parent_node=output_node, where_specs=metric_spec.filter_spec_set.all_filter_specs - ) + if len(metric_spec.filter_spec_set.all_filter_specs) > 0 or predicate_pushdown_state.time_range_constraint: + # FilterElementsNode will only be needed if there are where filter specs that were selected in the group by. specs_in_filters = set( linkable_spec for filter_spec in metric_spec.filter_spec_set.all_filter_specs for linkable_spec in filter_spec.linkable_specs ) + filter_to_specs = None if not specs_in_filters.issubset(queried_linkable_specs.as_tuple): - output_node = FilterElementsNode.create( - parent_node=output_node, - include_specs=InstanceSpecSet(metric_specs=(metric_spec,)).merge( - InstanceSpecSet.create_from_specs(queried_linkable_specs.as_tuple) - ), + filter_to_specs = InstanceSpecSet(metric_specs=(metric_spec,)).merge( + InstanceSpecSet.create_from_specs(queried_linkable_specs.as_tuple) ) + output_node = self._build_pre_aggregation_plan( + source_node=output_node, + where_filter_specs=metric_spec.filter_spec_set.all_filter_specs, + time_range_constraint=predicate_pushdown_state.time_range_constraint, + filter_to_specs=filter_to_specs, + ) + return output_node def _get_base_agg_time_dimensions( @@ -1629,7 +1632,6 @@ def _build_aggregated_measure_from_measure_source_node( unaggregated_measure_node = JoinToTimeSpineNode.create( parent_node=unaggregated_measure_node, requested_agg_time_dimension_specs=base_agg_time_dimension_specs, - time_range_constraint=predicate_pushdown_state.time_range_constraint, offset_window=before_aggregation_time_spine_join_description.offset_window, offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain, join_type=before_aggregation_time_spine_join_description.join_type, @@ -1641,17 +1643,12 @@ def _build_aggregated_measure_from_measure_source_node( # If this is the second layer of aggregation for a conversion metric, we have already joined the custom granularity. if spec not in measure_recipe.all_linkable_specs_required_for_source_nodes.as_tuple ] - # If time constraint was previously adjusted for cumulative window or grain, apply original time constraint - # here. Can skip if metric is being aggregated over all time. + # Apply original time constraint if it wasn't applied to the source node recipe. For cumulative metrics, the constraint + # may have been expanded and needs to be narrowed here. For offsets, the constraint was deferred to after the offset. # TODO - Pushdown: Encapsulate all of this window sliding bookkeeping in the pushdown params object - time_range_constraint_to_apply = ( - predicate_pushdown_state.time_range_constraint - if ( - cumulative_metric_adjusted_time_constraint is not None - and predicate_pushdown_state.time_range_constraint is not None - ) - else None - ) + time_range_constraint_to_apply = None + if cumulative_metric_adjusted_time_constraint or before_aggregation_time_spine_join_description: + time_range_constraint_to_apply = predicate_pushdown_state.time_range_constraint unaggregated_measure_node = self._build_pre_aggregation_plan( source_node=unaggregated_measure_node, join_targets=measure_recipe.join_targets, @@ -1737,11 +1734,11 @@ def _build_aggregated_measure_from_measure_source_node( def _build_pre_aggregation_plan( self, source_node: DataflowPlanNode, - join_targets: List[JoinDescription], - custom_granularity_specs: Sequence[TimeDimensionSpec], - where_filter_specs: Sequence[WhereFilterSpec], - time_range_constraint: Optional[TimeRangeConstraint], - filter_to_specs: InstanceSpecSet, + filter_to_specs: Optional[InstanceSpecSet] = None, + join_targets: List[JoinDescription] = [], + custom_granularity_specs: Sequence[TimeDimensionSpec] = (), + where_filter_specs: Sequence[WhereFilterSpec] = (), + time_range_constraint: Optional[TimeRangeConstraint] = None, measure_properties: Optional[MeasureSpecProperties] = None, queried_linkable_specs: Optional[LinkableSpecSet] = None, distinct: bool = False, @@ -1775,9 +1772,10 @@ def _build_pre_aggregation_plan( queried_linkable_specs=queried_linkable_specs, parent_node=output_node, ) - output_node = FilterElementsNode.create( - parent_node=output_node, include_specs=filter_to_specs, distinct=distinct - ) + if filter_to_specs: + output_node = FilterElementsNode.create( + parent_node=output_node, include_specs=filter_to_specs, distinct=distinct + ) return output_node