Skip to content

Commit

Permalink
Bug fix: apply time constraints after time offsets
Browse files Browse the repository at this point in the history
Time constraints were being applied before time offsets in some scenarios. This is incorrect because the values will change during the offset. This fixes that. Customers likely never saw this bug because time constraints are only available to open source users, and this may never have been released to them.
  • Loading branch information
courtneyholcomb committed Nov 21, 2024
1 parent 0781ab3 commit dfbb7a7
Showing 1 changed file with 28 additions and 30 deletions.
58 changes: 28 additions & 30 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,36 +641,39 @@ def _build_derived_metric_output_node(
aggregated_to_elements=set(queried_linkable_specs.as_tuple),
)

# For ratio / derived metrics with time offset, apply offset & where constraint after metric computation.
# For ratio / derived metrics with time offset, apply offset join here. Constraints will be applied after the offset
# to avoid filtering out values that will be changed.
if metric_spec.has_time_offset:
queried_agg_time_dimension_specs = queried_linkable_specs.included_agg_time_dimension_specs_for_metric(
metric_reference=metric_spec.reference, metric_lookup=self._metric_lookup
)
output_node = JoinToTimeSpineNode.create(
parent_node=output_node,
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
time_range_constraint=predicate_pushdown_state.time_range_constraint,
offset_window=metric_spec.offset_window,
offset_to_grain=metric_spec.offset_to_grain,
join_type=SqlJoinType.INNER,
)

if len(metric_spec.filter_spec_set.all_filter_specs) > 0:
output_node = WhereConstraintNode.create(
parent_node=output_node, where_specs=metric_spec.filter_spec_set.all_filter_specs
)
if len(metric_spec.filter_spec_set.all_filter_specs) > 0 or predicate_pushdown_state.time_range_constraint:
# FilterElementsNode will only be needed if there are where filter specs that were selected in the group by.
specs_in_filters = set(
linkable_spec
for filter_spec in metric_spec.filter_spec_set.all_filter_specs
for linkable_spec in filter_spec.linkable_specs
)
filter_to_specs = None
if not specs_in_filters.issubset(queried_linkable_specs.as_tuple):
output_node = FilterElementsNode.create(
parent_node=output_node,
include_specs=InstanceSpecSet(metric_specs=(metric_spec,)).merge(
InstanceSpecSet.create_from_specs(queried_linkable_specs.as_tuple)
),
filter_to_specs = InstanceSpecSet(metric_specs=(metric_spec,)).merge(
InstanceSpecSet.create_from_specs(queried_linkable_specs.as_tuple)
)
output_node = self._build_pre_aggregation_plan(
source_node=output_node,
where_filter_specs=metric_spec.filter_spec_set.all_filter_specs,
time_range_constraint=predicate_pushdown_state.time_range_constraint,
filter_to_specs=filter_to_specs,
)

return output_node

def _get_base_agg_time_dimensions(
Expand Down Expand Up @@ -1629,7 +1632,6 @@ def _build_aggregated_measure_from_measure_source_node(
unaggregated_measure_node = JoinToTimeSpineNode.create(
parent_node=unaggregated_measure_node,
requested_agg_time_dimension_specs=base_agg_time_dimension_specs,
time_range_constraint=predicate_pushdown_state.time_range_constraint,
offset_window=before_aggregation_time_spine_join_description.offset_window,
offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain,
join_type=before_aggregation_time_spine_join_description.join_type,
Expand All @@ -1641,17 +1643,12 @@ def _build_aggregated_measure_from_measure_source_node(
# If this is the second layer of aggregation for a conversion metric, we have already joined the custom granularity.
if spec not in measure_recipe.all_linkable_specs_required_for_source_nodes.as_tuple
]
# If time constraint was previously adjusted for cumulative window or grain, apply original time constraint
# here. Can skip if metric is being aggregated over all time.
# Apply original time constraint if it wasn't applied to the source node recipe. For cumulative metrics, the constraint
# may have been expanded and needs to be narrowed here. For offsets, the constraint was deferred to after the offset.
# TODO - Pushdown: Encapsulate all of this window sliding bookkeeping in the pushdown params object
time_range_constraint_to_apply = (
predicate_pushdown_state.time_range_constraint
if (
cumulative_metric_adjusted_time_constraint is not None
and predicate_pushdown_state.time_range_constraint is not None
)
else None
)
time_range_constraint_to_apply = None
if cumulative_metric_adjusted_time_constraint or before_aggregation_time_spine_join_description:
time_range_constraint_to_apply = predicate_pushdown_state.time_range_constraint
unaggregated_measure_node = self._build_pre_aggregation_plan(
source_node=unaggregated_measure_node,
join_targets=measure_recipe.join_targets,
Expand Down Expand Up @@ -1737,11 +1734,11 @@ def _build_aggregated_measure_from_measure_source_node(
def _build_pre_aggregation_plan(
self,
source_node: DataflowPlanNode,
join_targets: List[JoinDescription],
custom_granularity_specs: Sequence[TimeDimensionSpec],
where_filter_specs: Sequence[WhereFilterSpec],
time_range_constraint: Optional[TimeRangeConstraint],
filter_to_specs: InstanceSpecSet,
filter_to_specs: Optional[InstanceSpecSet] = None,
join_targets: List[JoinDescription] = [],
custom_granularity_specs: Sequence[TimeDimensionSpec] = (),
where_filter_specs: Sequence[WhereFilterSpec] = (),
time_range_constraint: Optional[TimeRangeConstraint] = None,
measure_properties: Optional[MeasureSpecProperties] = None,
queried_linkable_specs: Optional[LinkableSpecSet] = None,
distinct: bool = False,
Expand Down Expand Up @@ -1775,9 +1772,10 @@ def _build_pre_aggregation_plan(
queried_linkable_specs=queried_linkable_specs,
parent_node=output_node,
)
output_node = FilterElementsNode.create(
parent_node=output_node, include_specs=filter_to_specs, distinct=distinct
)
if filter_to_specs:
output_node = FilterElementsNode.create(
parent_node=output_node, include_specs=filter_to_specs, distinct=distinct
)

return output_node

Expand Down

0 comments on commit dfbb7a7

Please sign in to comment.