Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to FULL OUTER JOIN for derived & ratio metrics #842

Merged
merged 8 commits into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Breaking Changes-20231102-182815.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Breaking Changes
body: Use FULL OUTER JOIN to combine input metrics for derived metrics. This is a change from using INNER JOIN and may result in changes in output.
time: 2023-11-02T18:28:15.181064-07:00
custom:
Author: courtneyholcomb
Issue: "842"
9 changes: 1 addition & 8 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ def _build_metrics_output_node(
queried_linkable_specs: LinkableSpecSet,
where_constraint: Optional[WhereFilterSpec] = None,
time_range_constraint: Optional[TimeRangeConstraint] = None,
combine_metrics_join_type: SqlJoinType = SqlJoinType.FULL_OUTER,
) -> BaseOutput:
"""Builds a computed metrics output node.

Expand All @@ -222,7 +221,6 @@ def _build_metrics_output_node(
queried_linkable_specs: Dimensions/entities that were queried for.
where_constraint: Where constraint used to compute the metric.
time_range_constraint: Time range constraint used to compute the metric.
combine_metrics_join_type: The join used when combining the computed metrics.
"""
output_nodes: List[BaseOutput] = []
compute_metrics_node: Optional[ComputeMetricsNode] = None
Expand All @@ -241,14 +239,12 @@ def _build_metrics_output_node(
f"For {metric.type} metric: {metric_spec}, needed metrics are:\n"
f"{pformat_big_objects(metric_input_specs=metric_input_specs)}"
)

compute_metrics_node = ComputeMetricsNode(
parent_node=self._build_metrics_output_node(
metric_specs=metric_input_specs,
queried_linkable_specs=queried_linkable_specs,
where_constraint=where_constraint,
time_range_constraint=time_range_constraint,
combine_metrics_join_type=SqlJoinType.INNER,
),
metric_specs=[metric_spec],
)
Expand Down Expand Up @@ -295,10 +291,7 @@ def _build_metrics_output_node(
if len(output_nodes) == 1:
return output_nodes[0]

return CombineMetricsNode(
parent_nodes=output_nodes,
join_type=combine_metrics_join_type,
)
return CombineMetricsNode(parent_nodes=output_nodes)

def build_plan_for_distinct_values(self, query_spec: MetricFlowQuerySpec) -> DataflowPlan:
"""Generate a plan that would get the distinct values of a linkable instance.
Expand Down
25 changes: 2 additions & 23 deletions metricflow/dataflow/dataflow_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1225,9 +1225,7 @@ class CombineMetricsNode(ComputedMetricsOutput):
def __init__( # noqa: D
self,
parent_nodes: Sequence[Union[BaseOutput, ComputedMetricsOutput]],
join_type: SqlJoinType = SqlJoinType.FULL_OUTER,
) -> None:
self._join_type = join_type
super().__init__(node_id=self.create_unique_id(), parent_nodes=list(parent_nodes))

@classmethod
Expand All @@ -1241,31 +1239,12 @@ def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOut
def description(self) -> str: # noqa: D
return "Combine Metrics"

@property
def displayed_properties(self) -> List[DisplayedProperty]:
"""Prints details about the join types and how the node will behave."""
custom_properties = [DisplayedProperty("join type", self.join_type)]
if self.join_type is SqlJoinType.FULL_OUTER:
custom_properties.append(
DisplayedProperty("de-duplication method", "post-join aggregation across all dimensions")
)

return super().displayed_properties + custom_properties

@property
def join_type(self) -> SqlJoinType:
"""The type of join used for combining metrics."""
return self._join_type

def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D
return isinstance(other_node, self.__class__) and other_node.join_type == self.join_type
return isinstance(other_node, self.__class__)

def with_new_parents(self, new_parent_nodes: Sequence[BaseOutput]) -> CombineMetricsNode: # noqa: D
assert len(new_parent_nodes) == 1
return CombineMetricsNode(
parent_nodes=new_parent_nodes,
join_type=self.join_type,
)
return CombineMetricsNode(parent_nodes=new_parent_nodes)


class ConstrainTimeRangeNode(AggregatedMeasuresOutput, BaseOutput):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,7 @@ def visit_combine_metrics_node(self, node: CombineMetricsNode) -> OptimizeBranch
if len(combined_parent_branches) == 1:
return OptimizeBranchResult(base_output_node=combined_parent_branches[0])

return OptimizeBranchResult(
base_output_node=CombineMetricsNode(parent_nodes=combined_parent_branches, join_type=node.join_type)
)
return OptimizeBranchResult(base_output_node=CombineMetricsNode(parent_nodes=combined_parent_branches))

def visit_constrain_time_range_node(self, node: ConstrainTimeRangeNode) -> OptimizeBranchResult: # noqa: D
self._log_visit_node_type(node)
Expand Down
14 changes: 6 additions & 8 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -906,12 +906,10 @@ def visit_combine_metrics_node(self, node: CombineMetricsNode) -> SqlDataSet:
"""Join computed metric datasets together to return a single dataset containing all metrics.

This node may exist in one of two situations: when metrics need to be combined in order to produce a single
dataset with all required inputs for a derived metric (in which case the join type is INNER), or when
metrics need to be combined in order to produce a single dataset of output for downstream consumption by
the end user, in which case we will use FULL OUTER JOIN.
dataset with all required inputs for a derived metric, or when metrics need to be combined in order to produce
a single dataset of output for downstream consumption by the end user.

In the case of a multi-data-source FULL OUTER JOIN the join key will be a coalesced set of all previously
seen dimension values. For example:
The join key will be a coalesced set of all previously seen dimension values. For example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to do this here, but if there's logic in this function for rendering the INNER JOIN case maybe we can remove the join type parameter AND that logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fairly certain the INNER JOIN case is not used at all! I can remove the join_type param before merging!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...I take it back, it's more complicated than I thought 😅 so will deal with that in a follow up!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I was just on the wrong branch. Easy to add here after all! Removed the param and all it changes is the displayed properties in the dataflow plans 👍

FROM (
...
) subq_9
Expand Down Expand Up @@ -961,7 +959,7 @@ def visit_combine_metrics_node(self, node: CombineMetricsNode) -> SqlDataSet:
), "All parent nodes should have the same set of linkable instances since all values are coalesced."

linkable_spec_set = from_data_set.data_set.instance_set.spec_set.transform(SelectOnlyLinkableSpecs())
join_type = SqlJoinType.CROSS_JOIN if len(linkable_spec_set.all_specs) == 0 else node.join_type
join_type = SqlJoinType.CROSS_JOIN if len(linkable_spec_set.all_specs) == 0 else SqlJoinType.FULL_OUTER

joins_descriptions: List[SqlJoinDescription] = []
# TODO: refactor this loop into SqlQueryPlanJoinBuilder
Expand All @@ -986,7 +984,7 @@ def visit_combine_metrics_node(self, node: CombineMetricsNode) -> SqlDataSet:
output_instance_set = InstanceSet.merge([x.data_set.instance_set for x in parent_data_sets])
output_instance_set = output_instance_set.transform(ChangeAssociatedColumns(self._column_association_resolver))

metric_aggregation_type = AggregationType.MAX if node.join_type is SqlJoinType.FULL_OUTER else None
metric_aggregation_type = AggregationType.MAX
metric_select_column_set = SelectColumnSet(
metric_columns=self._make_select_columns_for_metrics(
table_alias_to_metric_specs, aggregation_type=metric_aggregation_type
Expand All @@ -1008,7 +1006,7 @@ def visit_combine_metrics_node(self, node: CombineMetricsNode) -> SqlDataSet:
from_source=from_data_set.data_set.sql_select_node,
from_source_alias=from_data_set.alias,
joins_descs=tuple(joins_descriptions),
group_bys=linkable_select_column_set.as_tuple() if node.join_type is SqlJoinType.FULL_OUTER else (),
group_bys=linkable_select_column_set.as_tuple(),
where=None,
order_bys=(),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ integration_test:
WHERE is_instant
GROUP BY ds
) a
JOIN (
FULL OUTER JOIN (
SELECT
CAST(NULLIF(MAX(booking_value), 0) AS {{ double_data_type_name }} ) AS max_booking_value
, ds
Expand Down Expand Up @@ -48,7 +48,7 @@ integration_test:
WHERE listings_latest.is_lux
GROUP BY fct_bookings.ds
) a
JOIN (
FULL OUTER JOIN (
SELECT
CAST(NULLIF(MAX(booking_value), 0) AS {{ double_data_type_name }} ) AS max_booking_value
, ds
Expand Down Expand Up @@ -79,7 +79,7 @@ integration_test:
WHERE listings_latest.is_lux
GROUP BY fct_bookings.ds
) a
JOIN (
FULL OUTER JOIN (
SELECT
CAST(NULLIF(SUM(booking_value), 0) AS {{ double_data_type_name }} ) AS booking_value
, ds
Expand Down Expand Up @@ -107,7 +107,7 @@ integration_test:
WHERE is_instant
GROUP BY ds
) a
JOIN (
FULL OUTER JOIN (
SELECT
CAST(NULLIF(SUM(booking_value), 0) AS {{ double_data_type_name }} ) AS booking_value
, ds
Expand Down Expand Up @@ -153,7 +153,7 @@ integration_test:
WHERE dul_west.home_state_latest IN ('CA', 'HI', 'WA')
GROUP BY fa_west_filtered.ds
) a
JOIN (
FULL OUTER JOIN (
SELECT
CAST(SUM(account_balance) AS {{ double_data_type_name }}) AS total_account_balance_first_day
, fa_east_filtered.ds
Expand Down
75 changes: 41 additions & 34 deletions metricflow/test/integration/test_cases/itest_metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ integration_test:
FROM {{source_schema}}.fct_bookings
GROUP BY ds
) b
JOIN (
FULL OUTER JOIN (
SELECT
SUM(1) AS views
, ds
Expand Down Expand Up @@ -284,7 +284,7 @@ integration_test:
GROUP BY
ds
) groupby_8cbdaa28
JOIN (
FULL OUTER JOIN (
SELECT
SUM(1) AS views
, ds
Expand Down Expand Up @@ -350,7 +350,7 @@ integration_test:
GROUP BY
ds
) groupby_8cbdaa28
JOIN (
FULL OUTER JOIN (
SELECT
SUM(1) AS listings
, created_at AS ds
Expand Down Expand Up @@ -519,7 +519,7 @@ integration_test:
GROUP BY
ds
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
SUM(1) AS lux_listings
, created_at AS metric_time__day
Expand Down Expand Up @@ -643,27 +643,34 @@ integration_test:
group_bys: [listing__is_lux_latest]
check_query: |
SELECT
bk.booking_value / NULLIF(vw.views, 0) AS booking_value_per_view
, bk.is_lux AS listing__is_lux_latest
booking_value / NULLIF(views, 0) AS booking_value_per_view
, listing__is_lux_latest
FROM (
SELECT
SUM(a.booking_value) AS booking_value
,b.is_lux
FROM {{ source_schema }}.fct_bookings a
LEFT OUTER JOIN {{ source_schema }}.dim_listings_latest b
ON a.listing_id = b.listing_id
GROUP BY 2
) bk
INNER JOIN (
SELECT
SUM(1) AS views
,d.is_lux
FROM {{ source_schema }}.fct_views c
LEFT OUTER JOIN {{ source_schema }}.dim_listings_latest d
ON c.listing_id = d.listing_id
GROUP BY 2
) vw
ON bk.is_lux = vw.is_lux OR (bk.is_lux IS NULL AND vw.is_lux IS NULL)
MAX(bk.booking_value) AS booking_value
, MAX(vw.views) AS views
, COALESCE(bk.is_lux, vw.is_lux) AS listing__is_lux_latest
FROM (
SELECT
SUM(a.booking_value) AS booking_value
,b.is_lux
FROM {{ source_schema }}.fct_bookings a
LEFT OUTER JOIN {{ source_schema }}.dim_listings_latest b
ON a.listing_id = b.listing_id
GROUP BY 2
) bk
FULL OUTER JOIN (
SELECT
SUM(1) AS views
,d.is_lux
FROM {{ source_schema }}.fct_views c
LEFT OUTER JOIN {{ source_schema }}.dim_listings_latest d
ON c.listing_id = d.listing_id
GROUP BY 2
) vw
ON bk.is_lux = vw.is_lux
GROUP BY 3
) x
---
integration_test:
name: derived_metric_with_offset_window
Expand All @@ -683,7 +690,7 @@ integration_test:
GROUP BY
metric_time__day
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
c.ds AS metric_time__day
, d.bookings_2_weeks_ago AS bookings_2_weeks_ago
Expand Down Expand Up @@ -718,7 +725,7 @@ integration_test:
GROUP BY
metric_time__day
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
c.ds AS metric_time__day
, d.bookings_at_start_of_month AS bookings_at_start_of_month
Expand Down Expand Up @@ -760,7 +767,7 @@ integration_test:
) f
ON {{ render_date_sub("g", "ds", 1, TimeGranularity.MONTH) }} = f.metric_time__day
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
c.ds AS metric_time__day
, d.bookings AS month_start_bookings
Expand Down Expand Up @@ -808,7 +815,7 @@ integration_test:
check_query: |
SELECT
booking_value - instant_booking_value AS booking_value_sub_instant
, a.metric_time__day
, COALESCE(a.metric_time__day, b.metric_time__day) AS metric_time__day
FROM (
SELECT
SUM(booking_value) AS instant_booking_value
Expand All @@ -818,7 +825,7 @@ integration_test:
GROUP BY
ds
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
SUM(booking_value) AS booking_value
, ds AS metric_time__day
Expand All @@ -843,7 +850,7 @@ integration_test:
FROM (
SELECT
booking_value - instant_booking_value AS booking_value_sub_instant
, a.metric_time__day AS metric_time__day
, COALESCE(a.metric_time__day, b.metric_time__day) AS metric_time__day
FROM (
SELECT
SUM(booking_value) AS instant_booking_value
Expand All @@ -853,7 +860,7 @@ integration_test:
GROUP BY
ds
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
SUM(booking_value) AS booking_value
, ds AS metric_time__day
Expand Down Expand Up @@ -901,7 +908,7 @@ integration_test:
FROM {{ source_schema }}.fct_bookings
GROUP BY metric_time__week
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
{{ render_date_trunc("c.ds", TimeGranularity.WEEK) }} AS metric_time__week
, SUM(d.bookings_at_start_of_month) AS bookings_at_start_of_month
Expand Down Expand Up @@ -944,7 +951,7 @@ integration_test:
ON {{ render_date_sub("g", "ds", 1, TimeGranularity.MONTH) }} = f.metric_time__day
GROUP BY metric_time__year
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
{{ render_date_trunc("c.ds", TimeGranularity.YEAR) }} AS metric_time__year
, SUM(d.bookings) AS month_start_bookings
Expand Down Expand Up @@ -1024,7 +1031,7 @@ integration_test:
FROM {{ source_schema }}.fct_bookings
GROUP BY metric_time__week, metric_time__month
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
{{ render_date_trunc("c.ds", TimeGranularity.WEEK) }} AS metric_time__week
, {{ render_date_trunc("c.ds", TimeGranularity.MONTH) }} AS metric_time__month
Expand Down Expand Up @@ -1290,7 +1297,7 @@ integration_test:
) subq_3
ON subq_5.ds = subq_3.metric_time__day
) subq_7
INNER JOIN (
FULL OUTER JOIN (
SELECT
subq_11.ds AS metric_time__day
, SUM(subq_9.bookings) AS bookings_2_weeks_ago
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
<CombineMetricsNode>
<!-- description = Combine Metrics -->
<!-- node_id = cbm_0 -->
<!-- join type = SqlJoinType.FULL_OUTER -->
<!-- de-duplication method = post-join aggregation across all dimensions -->
<ComputeMetricsNode>
<!-- description = Compute Metrics via Expressions -->
<!-- node_id = cm_0 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
<CombineMetricsNode>
<!-- description = Combine Metrics -->
<!-- node_id = cbm_0 -->
<!-- join type = SqlJoinType.INNER -->
<ComputeMetricsNode>
<!-- description = Compute Metrics via Expressions -->
<!-- node_id = cm_0 -->
Expand Down
Loading
Loading