diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 7cfb9a239..55bd1f34a 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -641,7 +641,8 @@ def _build_derived_metric_output_node( aggregated_to_elements=set(queried_linkable_specs.as_tuple), ) - # For ratio / derived metrics with time offset, apply offset & where constraint after metric computation. + # For ratio / derived metrics with time offset, apply offset join here. Constraints will be applied after the offset + # to avoid filtering out values that will be changed. if metric_spec.has_time_offset: queried_agg_time_dimension_specs = queried_linkable_specs.included_agg_time_dimension_specs_for_metric( metric_reference=metric_spec.reference, metric_lookup=self._metric_lookup @@ -649,28 +650,30 @@ def _build_derived_metric_output_node( output_node = JoinToTimeSpineNode.create( parent_node=output_node, requested_agg_time_dimension_specs=queried_agg_time_dimension_specs, - time_range_constraint=predicate_pushdown_state.time_range_constraint, offset_window=metric_spec.offset_window, offset_to_grain=metric_spec.offset_to_grain, join_type=SqlJoinType.INNER, ) - if len(metric_spec.filter_spec_set.all_filter_specs) > 0: - output_node = WhereConstraintNode.create( - parent_node=output_node, where_specs=metric_spec.filter_spec_set.all_filter_specs - ) + if len(metric_spec.filter_spec_set.all_filter_specs) > 0 or predicate_pushdown_state.time_range_constraint: + # FilterElementsNode will only be needed if there are where filter specs that were selected in the group by. specs_in_filters = set( linkable_spec for filter_spec in metric_spec.filter_spec_set.all_filter_specs for linkable_spec in filter_spec.linkable_specs ) + filter_to_specs = None if not specs_in_filters.issubset(queried_linkable_specs.as_tuple): - output_node = FilterElementsNode.create( - parent_node=output_node, - include_specs=InstanceSpecSet(metric_specs=(metric_spec,)).merge( - InstanceSpecSet.create_from_specs(queried_linkable_specs.as_tuple) - ), + filter_to_specs = InstanceSpecSet(metric_specs=(metric_spec,)).merge( + InstanceSpecSet.create_from_specs(queried_linkable_specs.as_tuple) ) + output_node = self._build_pre_aggregation_plan( + source_node=output_node, + where_filter_specs=metric_spec.filter_spec_set.all_filter_specs, + time_range_constraint=predicate_pushdown_state.time_range_constraint, + filter_to_specs=filter_to_specs, + ) + return output_node def _get_base_agg_time_dimensions( @@ -1629,7 +1632,6 @@ def _build_aggregated_measure_from_measure_source_node( unaggregated_measure_node = JoinToTimeSpineNode.create( parent_node=unaggregated_measure_node, requested_agg_time_dimension_specs=base_agg_time_dimension_specs, - time_range_constraint=predicate_pushdown_state.time_range_constraint, offset_window=before_aggregation_time_spine_join_description.offset_window, offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain, join_type=before_aggregation_time_spine_join_description.join_type, @@ -1641,17 +1643,12 @@ def _build_aggregated_measure_from_measure_source_node( # If this is the second layer of aggregation for a conversion metric, we have already joined the custom granularity. if spec not in measure_recipe.all_linkable_specs_required_for_source_nodes.as_tuple ] - # If time constraint was previously adjusted for cumulative window or grain, apply original time constraint - # here. Can skip if metric is being aggregated over all time. + # Apply original time constraint if it wasn't applied to the source node recipe. For cumulative metrics, the constraint + # may have been expanded and needs to be narrowed here. For offsets, the constraint was deferred to after the offset. # TODO - Pushdown: Encapsulate all of this window sliding bookkeeping in the pushdown params object - time_range_constraint_to_apply = ( - predicate_pushdown_state.time_range_constraint - if ( - cumulative_metric_adjusted_time_constraint is not None - and predicate_pushdown_state.time_range_constraint is not None - ) - else None - ) + time_range_constraint_to_apply = None + if cumulative_metric_adjusted_time_constraint or before_aggregation_time_spine_join_description: + time_range_constraint_to_apply = predicate_pushdown_state.time_range_constraint unaggregated_measure_node = self._build_pre_aggregation_plan( source_node=unaggregated_measure_node, join_targets=measure_recipe.join_targets, @@ -1737,11 +1734,11 @@ def _build_aggregated_measure_from_measure_source_node( def _build_pre_aggregation_plan( self, source_node: DataflowPlanNode, - join_targets: List[JoinDescription], - custom_granularity_specs: Sequence[TimeDimensionSpec], - where_filter_specs: Sequence[WhereFilterSpec], - time_range_constraint: Optional[TimeRangeConstraint], - filter_to_specs: InstanceSpecSet, + filter_to_specs: Optional[InstanceSpecSet] = None, + join_targets: List[JoinDescription] = [], + custom_granularity_specs: Sequence[TimeDimensionSpec] = (), + where_filter_specs: Sequence[WhereFilterSpec] = (), + time_range_constraint: Optional[TimeRangeConstraint] = None, measure_properties: Optional[MeasureSpecProperties] = None, queried_linkable_specs: Optional[LinkableSpecSet] = None, distinct: bool = False, @@ -1775,9 +1772,10 @@ def _build_pre_aggregation_plan( queried_linkable_specs=queried_linkable_specs, parent_node=output_node, ) - output_node = FilterElementsNode.create( - parent_node=output_node, include_specs=filter_to_specs, distinct=distinct - ) + if filter_to_specs: + output_node = FilterElementsNode.create( + parent_node=output_node, include_specs=filter_to_specs, distinct=distinct + ) return output_node