Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Join to Time Spine & Fill Nulls #830

Closed
wants to merge 12 commits into from
Prev Previous commit
Next Next commit
Dataflow plan to join aggregated measures to time spine
  • Loading branch information
courtneyholcomb committed Oct 31, 2023
commit 799534a2cc4def93109240263cbc937972002089
25 changes: 23 additions & 2 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,7 @@ def _build_aggregated_measures_from_measure_source_node(
if time_dimension_spec.element_name == self._metric_time_dimension_reference.element_name
]
metric_time_dimension_requested = len(metric_time_dimension_specs) > 0
time_dimension_requested = len(queried_linkable_specs.time_dimension_specs) > 0
measure_specs = tuple(x.measure_spec for x in metric_input_measure_specs)
measure_properties = self._build_measure_spec_properties(measure_specs)
non_additive_dimension_spec = measure_properties.non_additive_dimension_spec
Expand Down Expand Up @@ -817,13 +818,15 @@ def _build_aggregated_measures_from_measure_source_node(
# If querying an offset metric, join to time spine.
join_to_time_spine_node: Optional[JoinToTimeSpineNode] = None
if metric_spec.offset_window or metric_spec.offset_to_grain:
# TODO: update to accept any time dimensions
assert metric_time_dimension_specs, "Joining to time spine requires querying with metric time."
join_to_time_spine_node = JoinToTimeSpineNode(
parent_node=time_range_node or measure_recipe.source_node,
metric_time_dimension_specs=metric_time_dimension_specs,
time_dimension_specs=metric_time_dimension_specs,
time_range_constraint=time_range_constraint,
offset_window=metric_spec.offset_window,
offset_to_grain=metric_spec.offset_to_grain,
join_type=SqlJoinType.INNER,
)

# Only get the required measure and the local linkable instances so that aggregations work correctly.
Expand Down Expand Up @@ -911,7 +914,25 @@ def _build_aggregated_measures_from_measure_source_node(
(InstanceSpecSet(measure_specs=measure_specs), queried_linkable_specs.as_spec_set)
),
)
return AggregateMeasuresNode(
aggregate_measures_node = AggregateMeasuresNode(
parent_node=pre_aggregate_node,
metric_input_measure_specs=tuple(metric_input_measure_specs),
)

join_aggregated_measure_to_time_spine = False
for metric_input_measure in metric_input_measure_specs:
if metric_input_measure.join_to_timespine:
join_aggregated_measure_to_time_spine = True
break

# Only join to time spine if a time dimension was requested in the query.
# TODO: if multiple measures and only some join to time spine, should we aggregate separately?
if join_aggregated_measure_to_time_spine and time_dimension_requested:
return JoinToTimeSpineNode(
parent_node=aggregate_measures_node,
time_dimension_specs=list(queried_linkable_specs.time_dimension_specs),
time_range_constraint=time_range_constraint,
join_type=SqlJoinType.LEFT_OUTER,
)
else:
return aggregate_measures_node
25 changes: 18 additions & 7 deletions metricflow/dataflow/dataflow_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,8 @@ class JoinToTimeSpineNode(BaseOutput, ABC):
def __init__(
self,
parent_node: BaseOutput,
metric_time_dimension_specs: List[TimeDimensionSpec],
time_dimension_specs: List[TimeDimensionSpec],
join_type: SqlJoinType,
time_range_constraint: Optional[TimeRangeConstraint] = None,
offset_window: Optional[MetricTimeWindow] = None,
offset_to_grain: Optional[TimeGranularity] = None,
Expand All @@ -709,7 +710,7 @@ def __init__(

Args:
parent_node: Node that returns desired dataset to join to time spine.
metric_time_dimension_specs: Metric time dimensions requested in query. Used to determine granularities.
time_dimension_specs: Time dimensions requested in query. Used to determine granularities.
time_range_constraint: Time range to constrain the time spine to.
offset_window: Time window to offset the parent dataset by when joining to time spine.
offset_to_grain: Granularity period to offset the parent dataset to when joining to time spine.
Expand All @@ -720,10 +721,11 @@ def __init__(
offset_window and offset_to_grain
), "Can't set both offset_window and offset_to_grain when joining to time spine. Choose one or the other."
self._parent_node = parent_node
self._metric_time_dimension_specs = metric_time_dimension_specs
self._time_dimension_specs = time_dimension_specs
self._offset_window = offset_window
self._offset_to_grain = offset_to_grain
self._time_range_constraint = time_range_constraint
self._join_type = join_type

super().__init__(node_id=self.create_unique_id(), parent_nodes=[self._parent_node])

Expand All @@ -732,9 +734,9 @@ def id_prefix(cls) -> str: # noqa: D
return DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX

@property
def metric_time_dimension_specs(self) -> List[TimeDimensionSpec]: # noqa: D
def time_dimension_specs(self) -> List[TimeDimensionSpec]: # noqa: D
"""Time dimension specs to use when creating time spine table."""
return self._metric_time_dimension_specs
return self._time_dimension_specs

@property
def time_range_constraint(self) -> Optional[TimeRangeConstraint]: # noqa: D
Expand All @@ -751,6 +753,11 @@ def offset_to_grain(self) -> Optional[TimeGranularity]: # noqa: D
"""Time range constraint to apply when querying time spine table."""
return self._offset_to_grain

@property
def join_type(self) -> SqlJoinType: # noqa: D
"""Join type to use when joining to time spine."""
return self._join_type

def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D
return visitor.visit_join_to_time_spine_node(self)

Expand All @@ -761,9 +768,11 @@ def description(self) -> str: # noqa: D
@property
def displayed_properties(self) -> List[DisplayedProperty]: # noqa: D
return super().displayed_properties + [
DisplayedProperty("time_dimension_specs", self._time_dimension_specs),
DisplayedProperty("time_range_constraint", self._time_range_constraint),
DisplayedProperty("offset_window", self._offset_window),
DisplayedProperty("offset_to_grain", self._offset_to_grain),
DisplayedProperty("join_type", self._join_type),
]

@property
Expand All @@ -776,17 +785,19 @@ def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa:
and other_node.time_range_constraint == self.time_range_constraint
and other_node.offset_window == self.offset_window
and other_node.offset_to_grain == self.offset_to_grain
and other_node.metric_time_dimension_specs == self.metric_time_dimension_specs
and other_node.time_dimension_specs == self.time_dimension_specs
and other_node.join_type == self.join_type
)

def with_new_parents(self, new_parent_nodes: Sequence[BaseOutput]) -> JoinToTimeSpineNode: # noqa: D
assert len(new_parent_nodes) == 1
return JoinToTimeSpineNode(
parent_node=new_parent_nodes[0],
metric_time_dimension_specs=self.metric_time_dimension_specs,
time_dimension_specs=self.time_dimension_specs,
time_range_constraint=self.time_range_constraint,
offset_window=self.offset_window,
offset_to_grain=self.offset_to_grain,
join_type=self.join_type,
)


Expand Down
3 changes: 2 additions & 1 deletion metricflow/plan_conversion/sql_join_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ def make_join_to_time_spine_join_description(
metric_time_dimension_column_name: str,
parent_sql_select_node: SqlSelectStatementNode,
parent_alias: str,
join_type: SqlJoinType,
) -> SqlJoinDescription:
"""Build join expression used to join a metric to a time spine dataset."""
left_expr: SqlExpressionNode = SqlColumnReferenceExpression(
Expand All @@ -497,5 +498,5 @@ def make_join_to_time_spine_join_description(
col_ref=SqlColumnReference(table_alias=parent_alias, column_name=metric_time_dimension_column_name)
),
),
join_type=SqlJoinType.INNER,
join_type=join_type,
)
104 changes: 104 additions & 0 deletions metricflow/test/dataflow/builder/test_dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
MetricFlowQuerySpec,
MetricSpec,
OrderBySpec,
TimeDimensionSpec,
)
from metricflow.specs.where_filter_transform import WhereSpecFactory
from metricflow.test.dataflow_plan_to_svg import display_graph_if_requested
Expand Down Expand Up @@ -868,3 +869,106 @@ def test_derived_offset_cumulative_metric( # noqa: D
mf_test_session_state=mf_test_session_state,
dag_graph=dataflow_plan,
)


def test_join_to_time_spine_with_metric_time( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings_fill_0"),),
time_dimension_specs=(DataSet.metric_time_dimension_spec(TimeGranularity.DAY),),
)
)

assert_plan_snapshot_text_equal(
request=request,
mf_test_session_state=mf_test_session_state,
plan=dataflow_plan,
plan_snapshot_text=dataflow_plan_as_text(dataflow_plan),
)

display_graph_if_requested(
request=request,
mf_test_session_state=mf_test_session_state,
dag_graph=dataflow_plan,
)


def test_join_to_time_spine_derived_metric( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings_growth_2_weeks_fill_0"),),
time_dimension_specs=(DataSet.metric_time_dimension_spec(TimeGranularity.DAY),),
)
)

assert_plan_snapshot_text_equal(
request=request,
mf_test_session_state=mf_test_session_state,
plan=dataflow_plan,
plan_snapshot_text=dataflow_plan_as_text(dataflow_plan),
)

display_graph_if_requested(
request=request,
mf_test_session_state=mf_test_session_state,
dag_graph=dataflow_plan,
)


def test_join_to_time_spine_with_non_metric_time( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings_fill_0"),),
time_dimension_specs=(
TimeDimensionSpec(element_name="paid_at", entity_links=(EntityReference("booking"),)),
),
)
)

assert_plan_snapshot_text_equal(
request=request,
mf_test_session_state=mf_test_session_state,
plan=dataflow_plan,
plan_snapshot_text=dataflow_plan_as_text(dataflow_plan),
)

display_graph_if_requested(
request=request,
mf_test_session_state=mf_test_session_state,
dag_graph=dataflow_plan,
)


def test_dont_join_to_time_spine_if_no_time_dimension_requested( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
MetricFlowQuerySpec(metric_specs=(MetricSpec(element_name="bookings_fill_0"),))
)

assert_plan_snapshot_text_equal(
request=request,
mf_test_session_state=mf_test_session_state,
plan=dataflow_plan,
plan_snapshot_text=dataflow_plan_as_text(dataflow_plan),
)

display_graph_if_requested(
request=request,
mf_test_session_state=mf_test_session_state,
dag_graph=dataflow_plan,
)
14 changes: 11 additions & 3 deletions metricflow/test/plan_conversion/test_dataflow_to_sql_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
)
from metricflow.specs.where_filter_transform import WhereSpecFactory
from metricflow.sql.optimizer.optimization_levels import SqlQueryOptimizationLevel
from metricflow.sql.sql_plan import SqlJoinType
from metricflow.test.dataflow_plan_to_svg import display_graph_if_requested
from metricflow.test.fixtures.model_fixtures import ConsistentIdObjectRepository
from metricflow.test.fixtures.setup_fixtures import MetricFlowTestSessionState
Expand Down Expand Up @@ -559,10 +560,11 @@ def test_join_to_time_spine_node_without_offset( # noqa: D
compute_metrics_node = ComputeMetricsNode(parent_node=aggregated_measures_node, metric_specs=[metric_spec])
join_to_time_spine_node = JoinToTimeSpineNode(
parent_node=compute_metrics_node,
metric_time_dimension_specs=[MTD_SPEC_DAY],
time_dimension_specs=[MTD_SPEC_DAY],
time_range_constraint=TimeRangeConstraint(
start_time=as_datetime("2020-01-01"), end_time=as_datetime("2021-01-01")
),
join_type=SqlJoinType.INNER,
)
sink_node = WriteToResultDataframeNode(join_to_time_spine_node)
dataflow_plan = DataflowPlan("plan0", sink_output_nodes=[sink_node])
Expand Down Expand Up @@ -622,11 +624,12 @@ def test_join_to_time_spine_node_with_offset_window( # noqa: D
compute_metrics_node = ComputeMetricsNode(parent_node=aggregated_measures_node, metric_specs=[metric_spec])
join_to_time_spine_node = JoinToTimeSpineNode(
parent_node=compute_metrics_node,
metric_time_dimension_specs=[MTD_SPEC_DAY],
time_dimension_specs=[MTD_SPEC_DAY],
time_range_constraint=TimeRangeConstraint(
start_time=as_datetime("2020-01-01"), end_time=as_datetime("2021-01-01")
),
offset_window=PydanticMetricTimeWindow(count=10, granularity=TimeGranularity.DAY),
join_type=SqlJoinType.INNER,
)

sink_node = WriteToResultDataframeNode(join_to_time_spine_node)
Expand Down Expand Up @@ -687,12 +690,13 @@ def test_join_to_time_spine_node_with_offset_to_grain(
compute_metrics_node = ComputeMetricsNode(parent_node=aggregated_measures_node, metric_specs=[metric_spec])
join_to_time_spine_node = JoinToTimeSpineNode(
parent_node=compute_metrics_node,
metric_time_dimension_specs=[MTD_SPEC_DAY],
time_dimension_specs=[MTD_SPEC_DAY],
time_range_constraint=TimeRangeConstraint(
start_time=as_datetime("2020-01-01"), end_time=as_datetime("2021-01-01")
),
offset_window=None,
offset_to_grain=TimeGranularity.MONTH,
join_type=SqlJoinType.INNER,
)

sink_node = WriteToResultDataframeNode(join_to_time_spine_node)
Expand Down Expand Up @@ -1994,3 +1998,7 @@ def test_offset_window_with_date_part( # noqa: D
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)


def test_stuff() -> None: # noqa: D
assert 0, "write some tests here!!!"
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<DataflowPlan>
<WriteToResultDataframeNode>
<!-- description = Write to Dataframe -->
<!-- node_id = wrd_0 -->
<ComputeMetricsNode>
<!-- description = Compute Metrics via Expressions -->
<!-- node_id = cm_0 -->
<!-- metric_spec = -->
<!-- {'class': 'MetricSpec', -->
<!-- 'element_name': 'bookings_fill_0', -->
<!-- 'constraint': None, -->
<!-- 'alias': None, -->
<!-- 'offset_window': None, -->
<!-- 'offset_to_grain': None} -->
<AggregateMeasuresNode>
<!-- description = Aggregate Measures -->
<!-- node_id = am_0 -->
<FilterElementsNode>
<!-- description = -->
<!-- Pass Only Elements: -->
<!-- ['bookings'] -->
<!-- node_id = pfe_0 -->
<!-- include_spec = -->
<!-- {'class': 'MeasureSpec', -->
<!-- 'element_name': 'bookings', -->
<!-- 'non_additive_dimension_spec': None} -->
<!-- distinct = False -->
<MetricTimeDimensionTransformNode>
<!-- description = Metric Time Dimension 'ds' -->
<!-- node_id = sma_10001 -->
<!-- aggregation_time_dimension = ds -->
<ReadSqlSourceNode>
<!-- description = -->
<!-- Read From SemanticModelDataSet(SemanticModelReference(semantic_model_name='bookings_source')) -->
<!-- node_id = rss_10011 -->
<!-- data_set = -->
<!-- SemanticModelDataSet(SemanticModelReference(semantic_model_name='bookings_source')) -->
</ReadSqlSourceNode>
</MetricTimeDimensionTransformNode>
</FilterElementsNode>
</AggregateMeasuresNode>
</ComputeMetricsNode>
</WriteToResultDataframeNode>
</DataflowPlan>
Loading