Skip to content

Commit

Permalink
Add metric alias step in dataflow plan builder
Browse files Browse the repository at this point in the history
This commit makes the `DataflowPlanBuilder` add an extra
`AliasSpecsNode` at the end of the plan in case any of the input metric
specs has an alias.
  • Loading branch information
serramatutu committed Dec 13, 2024
1 parent 2b819ee commit cb8a4cc
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 2 deletions.
18 changes: 16 additions & 2 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ def _build_query_output_node(
len(metric_spec.filter_spec_set.all_filter_specs) > 0
or metric_spec.offset_to_grain is not None
or metric_spec.offset_window is not None
or metric_spec.alias is not None
):
raise ValueError(
f"The metric specs in the query spec should not contain any metric modifiers. Got: {metric_spec}"
Expand Down Expand Up @@ -213,6 +212,7 @@ def _build_plan(

sink_node = DataflowPlanBuilder.build_sink_node(
parent_node=metrics_output_node,
metric_specs=query_spec.metric_specs,
order_by_specs=query_spec.order_by_specs,
output_sql_table=output_sql_table,
limit=query_spec.limit,
Expand Down Expand Up @@ -866,7 +866,10 @@ def _build_plan_for_distinct_values(
output_node = MinMaxNode.create(parent_node=output_node)

sink_node = self.build_sink_node(
parent_node=output_node, order_by_specs=query_spec.order_by_specs, limit=query_spec.limit
parent_node=output_node,
metric_specs=query_spec.metric_specs,
order_by_specs=query_spec.order_by_specs,
limit=query_spec.limit,
)

plan = DataflowPlan(sink_nodes=[sink_node])
Expand All @@ -875,6 +878,7 @@ def _build_plan_for_distinct_values(
@staticmethod
def build_sink_node(
parent_node: DataflowPlanNode,
metric_specs: Sequence[MetricSpec],
order_by_specs: Sequence[OrderBySpec],
output_sql_table: Optional[SqlTable] = None,
limit: Optional[int] = None,
Expand All @@ -893,6 +897,16 @@ def build_sink_node(
parent_node=pre_result_node or parent_node, include_specs=output_selection_specs
)

alias_specs = tuple(
SpecToAlias(MetricSpec(metric.element_name), MetricSpec(metric.alias))
for metric in metric_specs
if metric.alias is not None
)
if len(alias_specs) > 0:
pre_result_node = AliasSpecsNode.create(
parent_node=pre_result_node or parent_node, change_specs=alias_specs
)

write_result_node: DataflowPlanNode
if not output_sql_table:
write_result_node = WriteToResultDataTableNode.create(parent_node=pre_result_node or parent_node)
Expand Down
32 changes: 32 additions & 0 deletions tests_metricflow/dataflow/builder/test_dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1403,3 +1403,35 @@ def test_derived_cumulative_metric_with_non_default_grain(
mf_test_configuration=mf_test_configuration,
dag_graph=dataflow_plan,
)


def test_metric_with_alias_plan(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
) -> None:
"""Tests a simple plan getting a metric and a local dimension."""
dataflow_plan = dataflow_plan_builder.build_plan(
MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings", alias="bookings_alias"),),
dimension_specs=(
DimensionSpec(
element_name="is_instant",
entity_links=(EntityReference("booking"),),
),
),
)
)

assert_plan_snapshot_text_equal(
request=request,
mf_test_configuration=mf_test_configuration,
plan=dataflow_plan,
plan_snapshot_text=dataflow_plan.structure_text(),
)

display_graph_if_requested(
request=request,
mf_test_configuration=mf_test_configuration,
dag_graph=dataflow_plan,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
test_name: test_metric_with_alias_plan
test_filename: test_dataflow_plan_builder.py
docstring:
Tests a simple plan getting a metric and a local dimension.
---
<DataflowPlan>
<WriteToResultDataTableNode>
<!-- description = 'Write to DataTable' -->
<!-- node_id = NodeId(id_str='wrd_0') -->
<AliasSpecsNode>
<!-- description = 'Change Column Aliases' -->
<!-- node_id = NodeId(id_str='as_0') -->
<!-- change_specs = -->
<!-- ( -->
<!-- SpecToAlias( -->
<!-- input_spec=MetricSpec(element_name='bookings', filter_spec_set=WhereFilterSpecSet()), -->
<!-- output_spec=MetricSpec(element_name='bookings_alias', filter_spec_set=WhereFilterSpecSet()), -->
<!-- ), -->
<!-- ) -->
<ComputeMetricsNode>
<!-- description = 'Compute Metrics via Expressions' -->
<!-- node_id = NodeId(id_str='cm_0') -->
<!-- metric_spec = MetricSpec(element_name='bookings', filter_spec_set=WhereFilterSpecSet()) -->
<AggregateMeasuresNode>
<!-- description = 'Aggregate Measures' -->
<!-- node_id = NodeId(id_str='am_0') -->
<FilterElementsNode>
<!-- description = "Pass Only Elements: ['bookings', 'booking__is_instant']" -->
<!-- node_id = NodeId(id_str='pfe_0') -->
<!-- include_spec = MeasureSpec(element_name='bookings') -->
<!-- include_spec = -->
<!-- DimensionSpec( -->
<!-- element_name='is_instant', -->
<!-- entity_links=(EntityReference(element_name='booking'),), -->
<!-- ) -->
<!-- distinct = False -->
<MetricTimeDimensionTransformNode>
<!-- description = "Metric Time Dimension 'ds'" -->
<!-- node_id = NodeId(id_str='sma_28009') -->
<!-- aggregation_time_dimension = 'ds' -->
<ReadSqlSourceNode>
<!-- description = "Read From SemanticModelDataSet('bookings_source')" -->
<!-- node_id = NodeId(id_str='rss_28020') -->
<!-- data_set = SemanticModelDataSet('bookings_source') -->
</ReadSqlSourceNode>
</MetricTimeDimensionTransformNode>
</FilterElementsNode>
</AggregateMeasuresNode>
</ComputeMetricsNode>
</AliasSpecsNode>
</WriteToResultDataTableNode>
</DataflowPlan>

0 comments on commit cb8a4cc

Please sign in to comment.