Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Court/coalesce nulls for derived metrics #845

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Breaking Changes-20231102-182815.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Breaking Changes
body: Use FULL OUTER JOIN to combine input metrics for derived metrics. This is a change from using INNER JOIN and may result in changes in output.
time: 2023-11-02T18:28:15.181064-07:00
custom:
Author: courtneyholcomb
Issue: "842"
3 changes: 1 addition & 2 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,13 @@ def _build_metrics_output_node(
f"For {metric.type} metric: {metric_spec}, needed metrics are:\n"
f"{pformat_big_objects(metric_input_specs=metric_input_specs)}"
)

compute_metrics_node = ComputeMetricsNode(
parent_node=self._build_metrics_output_node(
metric_specs=metric_input_specs,
queried_linkable_specs=queried_linkable_specs,
where_constraint=where_constraint,
time_range_constraint=time_range_constraint,
combine_metrics_join_type=SqlJoinType.INNER,
combine_metrics_join_type=SqlJoinType.FULL_OUTER,
),
metric_specs=[metric_spec],
)
Expand Down
25 changes: 20 additions & 5 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,23 @@ def _make_select_columns_for_metrics(
else:
select_expression = column_reference_expression

input_measures = self._metric_lookup.measures_for_metric(
# Using alias instead of element name in reference, where did we lose the name?
metric_reference=metric_spec.as_reference,
column_association_resolver=self._column_association_resolver,
)
if input_measures:
# TODO: update type after other PR merges
input_measure = input_measures[0]
if input_measure.fill_nulls_with is not None:
select_expression = SqlAggregateFunctionExpression(
sql_function=SqlFunction.COALESCE,
sql_function_args=[
select_expression,
SqlStringExpression(str(input_measure.fill_nulls_with)),
],
)

select_columns.append(
SqlSelectColumn(
expr=select_expression,
Expand All @@ -906,12 +923,10 @@ def visit_combine_metrics_node(self, node: CombineMetricsNode) -> SqlDataSet:
"""Join computed metric datasets together to return a single dataset containing all metrics.

This node may exist in one of two situations: when metrics need to be combined in order to produce a single
dataset with all required inputs for a derived metric (in which case the join type is INNER), or when
metrics need to be combined in order to produce a single dataset of output for downstream consumption by
the end user, in which case we will use FULL OUTER JOIN.
dataset with all required inputs for a derived metric, or when metrics need to be combined in order to produce
a single dataset of output for downstream consumption by the end user.

In the case of a multi-data-source FULL OUTER JOIN the join key will be a coalesced set of all previously
seen dimension values. For example:
The join key will be a coalesced set of all previously seen dimension values. For example:
FROM (
...
) subq_9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ integration_test:
WHERE is_instant
GROUP BY ds
) a
JOIN (
FULL OUTER JOIN (
SELECT
CAST(NULLIF(MAX(booking_value), 0) AS {{ double_data_type_name }} ) AS max_booking_value
, ds
Expand Down Expand Up @@ -48,7 +48,7 @@ integration_test:
WHERE listings_latest.is_lux
GROUP BY fct_bookings.ds
) a
JOIN (
FULL OUTER JOIN (
SELECT
CAST(NULLIF(MAX(booking_value), 0) AS {{ double_data_type_name }} ) AS max_booking_value
, ds
Expand Down Expand Up @@ -79,7 +79,7 @@ integration_test:
WHERE listings_latest.is_lux
GROUP BY fct_bookings.ds
) a
JOIN (
FULL OUTER JOIN (
SELECT
CAST(NULLIF(SUM(booking_value), 0) AS {{ double_data_type_name }} ) AS booking_value
, ds
Expand Down Expand Up @@ -107,7 +107,7 @@ integration_test:
WHERE is_instant
GROUP BY ds
) a
JOIN (
FULL OUTER JOIN (
SELECT
CAST(NULLIF(SUM(booking_value), 0) AS {{ double_data_type_name }} ) AS booking_value
, ds
Expand Down Expand Up @@ -153,7 +153,7 @@ integration_test:
WHERE dul_west.home_state_latest IN ('CA', 'HI', 'WA')
GROUP BY fa_west_filtered.ds
) a
JOIN (
FULL OUTER JOIN (
SELECT
CAST(SUM(account_balance) AS {{ double_data_type_name }}) AS total_account_balance_first_day
, fa_east_filtered.ds
Expand Down
75 changes: 41 additions & 34 deletions metricflow/test/integration/test_cases/itest_metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ integration_test:
FROM {{source_schema}}.fct_bookings
GROUP BY ds
) b
JOIN (
FULL OUTER JOIN (
SELECT
SUM(1) AS views
, ds
Expand Down Expand Up @@ -284,7 +284,7 @@ integration_test:
GROUP BY
ds
) groupby_8cbdaa28
JOIN (
FULL OUTER JOIN (
SELECT
SUM(1) AS views
, ds
Expand Down Expand Up @@ -350,7 +350,7 @@ integration_test:
GROUP BY
ds
) groupby_8cbdaa28
JOIN (
FULL OUTER JOIN (
SELECT
SUM(1) AS listings
, created_at AS ds
Expand Down Expand Up @@ -519,7 +519,7 @@ integration_test:
GROUP BY
ds
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
SUM(1) AS lux_listings
, created_at AS metric_time__day
Expand Down Expand Up @@ -643,27 +643,34 @@ integration_test:
group_bys: [listing__is_lux_latest]
check_query: |
SELECT
bk.booking_value / NULLIF(vw.views, 0) AS booking_value_per_view
, bk.is_lux AS listing__is_lux_latest
booking_value / NULLIF(views, 0) AS booking_value_per_view
, listing__is_lux_latest
FROM (
SELECT
SUM(a.booking_value) AS booking_value
,b.is_lux
FROM {{ source_schema }}.fct_bookings a
LEFT OUTER JOIN {{ source_schema }}.dim_listings_latest b
ON a.listing_id = b.listing_id
GROUP BY 2
) bk
INNER JOIN (
SELECT
SUM(1) AS views
,d.is_lux
FROM {{ source_schema }}.fct_views c
LEFT OUTER JOIN {{ source_schema }}.dim_listings_latest d
ON c.listing_id = d.listing_id
GROUP BY 2
) vw
ON bk.is_lux = vw.is_lux OR (bk.is_lux IS NULL AND vw.is_lux IS NULL)
MAX(bk.booking_value) AS booking_value
, MAX(vw.views) AS views
, COALESCE(bk.is_lux, vw.is_lux) AS listing__is_lux_latest
FROM (
SELECT
SUM(a.booking_value) AS booking_value
,b.is_lux
FROM {{ source_schema }}.fct_bookings a
LEFT OUTER JOIN {{ source_schema }}.dim_listings_latest b
ON a.listing_id = b.listing_id
GROUP BY 2
) bk
FULL OUTER JOIN (
SELECT
SUM(1) AS views
,d.is_lux
FROM {{ source_schema }}.fct_views c
LEFT OUTER JOIN {{ source_schema }}.dim_listings_latest d
ON c.listing_id = d.listing_id
GROUP BY 2
) vw
ON bk.is_lux = vw.is_lux
GROUP BY 3
) x
---
integration_test:
name: derived_metric_with_offset_window
Expand All @@ -683,7 +690,7 @@ integration_test:
GROUP BY
metric_time__day
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
c.ds AS metric_time__day
, d.bookings_2_weeks_ago AS bookings_2_weeks_ago
Expand Down Expand Up @@ -718,7 +725,7 @@ integration_test:
GROUP BY
metric_time__day
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
c.ds AS metric_time__day
, d.bookings_at_start_of_month AS bookings_at_start_of_month
Expand Down Expand Up @@ -760,7 +767,7 @@ integration_test:
) f
ON {{ render_date_sub("g", "ds", 1, TimeGranularity.MONTH) }} = f.metric_time__day
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
c.ds AS metric_time__day
, d.bookings AS month_start_bookings
Expand Down Expand Up @@ -808,7 +815,7 @@ integration_test:
check_query: |
SELECT
booking_value - instant_booking_value AS booking_value_sub_instant
, a.metric_time__day
, COALESCE(a.metric_time__day, b.metric_time__day) AS metric_time__day
FROM (
SELECT
SUM(booking_value) AS instant_booking_value
Expand All @@ -818,7 +825,7 @@ integration_test:
GROUP BY
ds
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
SUM(booking_value) AS booking_value
, ds AS metric_time__day
Expand All @@ -843,7 +850,7 @@ integration_test:
FROM (
SELECT
booking_value - instant_booking_value AS booking_value_sub_instant
, a.metric_time__day AS metric_time__day
, COALESCE(a.metric_time__day, b.metric_time__day) AS metric_time__day
FROM (
SELECT
SUM(booking_value) AS instant_booking_value
Expand All @@ -853,7 +860,7 @@ integration_test:
GROUP BY
ds
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
SUM(booking_value) AS booking_value
, ds AS metric_time__day
Expand Down Expand Up @@ -901,7 +908,7 @@ integration_test:
FROM {{ source_schema }}.fct_bookings
GROUP BY metric_time__week
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
{{ render_date_trunc("c.ds", TimeGranularity.WEEK) }} AS metric_time__week
, SUM(d.bookings_at_start_of_month) AS bookings_at_start_of_month
Expand Down Expand Up @@ -944,7 +951,7 @@ integration_test:
ON {{ render_date_sub("g", "ds", 1, TimeGranularity.MONTH) }} = f.metric_time__day
GROUP BY metric_time__year
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
{{ render_date_trunc("c.ds", TimeGranularity.YEAR) }} AS metric_time__year
, SUM(d.bookings) AS month_start_bookings
Expand Down Expand Up @@ -1024,7 +1031,7 @@ integration_test:
FROM {{ source_schema }}.fct_bookings
GROUP BY metric_time__week, metric_time__month
) a
INNER JOIN (
FULL OUTER JOIN (
SELECT
{{ render_date_trunc("c.ds", TimeGranularity.WEEK) }} AS metric_time__week
, {{ render_date_trunc("c.ds", TimeGranularity.MONTH) }} AS metric_time__month
Expand Down Expand Up @@ -1290,7 +1297,7 @@ integration_test:
) subq_3
ON subq_5.ds = subq_3.metric_time__day
) subq_7
INNER JOIN (
FULL OUTER JOIN (
SELECT
subq_11.ds AS metric_time__day
, SUM(subq_9.bookings) AS bookings_2_weeks_ago
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
<CombineMetricsNode>
<!-- description = Combine Metrics -->
<!-- node_id = cbm_0 -->
<!-- join type = SqlJoinType.INNER -->
<!-- join type = SqlJoinType.FULL_OUTER -->
<!-- de-duplication method = post-join aggregation across all dimensions -->
<ComputeMetricsNode>
<!-- description = Compute Metrics via Expressions -->
<!-- node_id = cm_0 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
<CombineMetricsNode>
<!-- description = Combine Metrics -->
<!-- node_id = cbm_0 -->
<!-- join type = SqlJoinType.INNER -->
<!-- join type = SqlJoinType.FULL_OUTER -->
<!-- de-duplication method = post-join aggregation across all dimensions -->
<ComputeMetricsNode>
<!-- description = Compute Metrics via Expressions -->
<!-- node_id = cm_0 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
<CombineMetricsNode>
<!-- description = Combine Metrics -->
<!-- node_id = cbm_0 -->
<!-- join type = SqlJoinType.INNER -->
<!-- join type = SqlJoinType.FULL_OUTER -->
<!-- de-duplication method = post-join aggregation across all dimensions -->
<ComputeMetricsNode>
<!-- description = Compute Metrics via Expressions -->
<!-- node_id = cm_0 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
<CombineMetricsNode>
<!-- description = Combine Metrics -->
<!-- node_id = cbm_0 -->
<!-- join type = SqlJoinType.INNER -->
<!-- join type = SqlJoinType.FULL_OUTER -->
<!-- de-duplication method = post-join aggregation across all dimensions -->
<ComputeMetricsNode>
<!-- description = Compute Metrics via Expressions -->
<!-- node_id = cm_0 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
<CombineMetricsNode>
<!-- description = Combine Metrics -->
<!-- node_id = cbm_0 -->
<!-- join type = SqlJoinType.INNER -->
<!-- join type = SqlJoinType.FULL_OUTER -->
<!-- de-duplication method = post-join aggregation across all dimensions -->
<ComputeMetricsNode>
<!-- description = Compute Metrics via Expressions -->
<!-- node_id = cm_0 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
<CombineMetricsNode>
<!-- description = Combine Metrics -->
<!-- node_id = cbm_0 -->
<!-- join type = SqlJoinType.INNER -->
<!-- join type = SqlJoinType.FULL_OUTER -->
<!-- de-duplication method = post-join aggregation across all dimensions -->
<ComputeMetricsNode>
<!-- description = Compute Metrics via Expressions -->
<!-- node_id = cm_0 -->
Expand Down
Loading
Loading