diff --git a/metricflow-semantics/metricflow_semantics/specs/dunder_column_association_resolver.py b/metricflow-semantics/metricflow_semantics/specs/dunder_column_association_resolver.py index 3473618c7..5e6948d76 100644 --- a/metricflow-semantics/metricflow_semantics/specs/dunder_column_association_resolver.py +++ b/metricflow-semantics/metricflow_semantics/specs/dunder_column_association_resolver.py @@ -58,6 +58,11 @@ def visit_time_dimension_spec(self, time_dimension_spec: TimeDimensionSpec) -> C if time_dimension_spec.aggregation_state else "" ) + + ( + f"{DUNDER}{time_dimension_spec.window_function.value.lower()}" + if time_dimension_spec.window_function + else "" + ) ) def visit_entity_spec(self, entity_spec: EntitySpec) -> ColumnAssociation: # noqa: D102 diff --git a/metricflow-semantics/tests_metricflow_semantics/model/semantics/test_metric_lookup.py b/metricflow-semantics/tests_metricflow_semantics/model/semantics/test_metric_lookup.py index d9942eeb4..b69c82d62 100644 --- a/metricflow-semantics/tests_metricflow_semantics/model/semantics/test_metric_lookup.py +++ b/metricflow-semantics/tests_metricflow_semantics/model/semantics/test_metric_lookup.py @@ -27,12 +27,7 @@ def test_min_queryable_time_granularity_for_different_agg_time_grains( # noqa: def test_custom_offset_window_for_metric( simple_semantic_manifest_lookup: SemanticManifestLookup, ) -> None: - """Test offset window with custom grain supplied. - - TODO: As of now, the functionality of an offset window with a custom grain is not supported in MF. - This test is added to show that at least the parsing is successful using a custom grain offset window. - Once support for that is added in MF + relevant tests, this test can be removed. - """ + """Test offset window with custom grain supplied.""" metric = simple_semantic_manifest_lookup.metric_lookup.get_metric(MetricReference("bookings_offset_martian_day")) assert len(metric.input_metrics) == 1 diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index f88382fae..c006a0f5a 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -668,7 +668,13 @@ def _build_derived_metric_output_node( time_spine_node=time_spine_node, requested_agg_time_dimension_specs=queried_agg_time_dimension_specs, join_on_time_dimension_spec=self._sort_by_base_granularity(queried_agg_time_dimension_specs)[0], - offset_window=metric_spec.offset_window, + offset_window=( + metric_spec.offset_window + if metric_spec.offset_window + and metric_spec.offset_window.granularity + not in self._semantic_model_lookup.custom_granularity_names + else None + ), offset_to_grain=metric_spec.offset_to_grain, join_type=SqlJoinType.INNER, ) @@ -1662,7 +1668,13 @@ def _build_aggregated_measure_from_measure_source_node( time_spine_node=time_spine_node, requested_agg_time_dimension_specs=base_queried_agg_time_dimension_specs, join_on_time_dimension_spec=join_on_time_dimension_spec, - offset_window=before_aggregation_time_spine_join_description.offset_window, + offset_window=( + before_aggregation_time_spine_join_description.offset_window + if before_aggregation_time_spine_join_description.offset_window + and before_aggregation_time_spine_join_description.offset_window.granularity + not in self._semantic_model_lookup.custom_granularity_names + else None + ), offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain, join_type=before_aggregation_time_spine_join_description.join_type, ) @@ -1879,7 +1891,7 @@ def _build_time_spine_node( required_time_spine_specs = required_time_spine_spec_set.time_dimension_specs should_dedupe = False - if offset_window and offset_window in self._semantic_model_lookup.custom_granularity_names: + if offset_window and offset_window.granularity in self._semantic_model_lookup.custom_granularity_names: # Are sets the right choice here? all_queried_grains: Set[ExpandedTimeGranularity] = set() queried_custom_specs: Tuple[TimeDimensionSpec, ...] = () @@ -1899,7 +1911,7 @@ def _build_time_spine_node( # TODO: make sure this is checking the correct granularity type once DSI is updated if {spec.time_granularity for spec in queried_time_spine_specs} == {offset_window.granularity}: # If querying with only the same grain as is used in the offset_window, can use a simpler plan. - # offset_node = OffsetCustomGranularityNode.create( + # offset_node = OffsetCustomGranularityWithMatchingGrainNode.create( # parent_node=time_spine_read_node, offset_window=offset_window # ) # time_spine_node: DataflowPlanNode = JoinToTimeSpineNode.create( @@ -1912,15 +1924,18 @@ def _build_time_spine_node( # ) pass else: - time_spine_node: DataflowPlanNode = CustomGranularityBoundsNode.create( + bounds_node = CustomGranularityBoundsNode.create( parent_node=time_spine_read_node, - offset_window=offset_window, - requested_time_spine_specs=required_time_spine_specs, + custom_granularity_name=custom_granularity_name, ) - # if queried_standard_specs: - # time_spine_node = ApplyStandardGranularityNode.create( - # parent_node=time_spine_node, time_dimension_specs=queried_standard_specs - # ) + # TODO: assert in post-init that this can only take a CustomGranularityBoundsNode + time_spine_node = OffsetCustomGranularityNode.create( + parent_node=bounds_node, required_time_spine_specs=required_time_spine_specs + ) + if queried_standard_specs: + time_spine_node = ApplyStandardGranularityNode.create( + parent_node=time_spine_node, time_dimension_specs=queried_standard_specs + ) # TODO: check if this join is needed for the same grain as is used in offset window. Later for custom_spec in queried_custom_specs: time_spine_node = JoinToCustomGranularityNode.create( @@ -1939,7 +1954,7 @@ def _build_time_spine_node( change_specs=tuple( SpecToAlias( input_spec=time_spine_data_set.instance_from_time_dimension_grain_and_date_part( - time_granularity=required_spec.time_granularity, date_part=required_spec.date_part + time_granularity_name=required_spec.time_granularity.name, date_part=required_spec.date_part ).spec, output_spec=required_spec, ) diff --git a/metricflow/dataflow/nodes/custom_granularity_bounds.py b/metricflow/dataflow/nodes/custom_granularity_bounds.py index 4dd19c71a..21801724a 100644 --- a/metricflow/dataflow/nodes/custom_granularity_bounds.py +++ b/metricflow/dataflow/nodes/custom_granularity_bounds.py @@ -2,12 +2,10 @@ from abc import ABC from dataclasses import dataclass -from typing import Sequence, Tuple +from typing import Sequence -from dbt_semantic_interfaces.protocols.metric import MetricTimeWindow from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix from metricflow_semantics.dag.mf_dag import DisplayedProperty -from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec from metricflow_semantics.visitor import VisitorOutputT from metricflow.dataflow.dataflow_plan import DataflowPlanNode @@ -19,8 +17,7 @@ class CustomGranularityBoundsNode(DataflowPlanNode, ABC): """Calculate the start and end of a custom granularity period and each row number within that period.""" - offset_window: MetricTimeWindow - requested_time_spine_specs: Tuple[TimeDimensionSpec, ...] + custom_granularity_name: str def __post_init__(self) -> None: # noqa: D105 super().__post_init__() @@ -28,15 +25,9 @@ def __post_init__(self) -> None: # noqa: D105 @staticmethod def create( # noqa: D102 - parent_node: DataflowPlanNode, - offset_window: MetricTimeWindow, - requested_time_spine_specs: Tuple[TimeDimensionSpec, ...], + parent_node: DataflowPlanNode, custom_granularity_name: str ) -> CustomGranularityBoundsNode: - return CustomGranularityBoundsNode( - parent_nodes=(parent_node,), - offset_window=offset_window, - requested_time_spine_specs=requested_time_spine_specs, - ) + return CustomGranularityBoundsNode(parent_nodes=(parent_node,), custom_granularity_name=custom_granularity_name) @classmethod def id_prefix(cls) -> IdPrefix: # noqa: D102 @@ -51,10 +42,8 @@ def description(self) -> str: # noqa: D102 @property def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102 - return ( - tuple(super().displayed_properties) - + (DisplayedProperty("offset_window", self.offset_window),) - + (DisplayedProperty("requested_time_spine_specs", self.requested_time_spine_specs),) + return tuple(super().displayed_properties) + ( + DisplayedProperty("custom_granularity_name", self.custom_granularity_name), ) @property @@ -64,8 +53,7 @@ def parent_node(self) -> DataflowPlanNode: # noqa: D102 def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D102 return ( isinstance(other_node, self.__class__) - and other_node.offset_window == self.offset_window - and self.requested_time_spine_specs == other_node.requested_time_spine_specs + and other_node.custom_granularity_name == self.custom_granularity_name ) def with_new_parents( # noqa: D102 @@ -73,7 +61,5 @@ def with_new_parents( # noqa: D102 ) -> CustomGranularityBoundsNode: assert len(new_parent_nodes) == 1 return CustomGranularityBoundsNode.create( - parent_node=new_parent_nodes[0], - offset_window=self.offset_window, - requested_time_spine_specs=self.requested_time_spine_specs, + parent_node=new_parent_nodes[0], custom_granularity_name=self.custom_granularity_name ) diff --git a/metricflow/dataset/sql_dataset.py b/metricflow/dataset/sql_dataset.py index c5707f012..7e6677edb 100644 --- a/metricflow/dataset/sql_dataset.py +++ b/metricflow/dataset/sql_dataset.py @@ -13,7 +13,6 @@ from metricflow_semantics.specs.entity_spec import EntitySpec from metricflow_semantics.specs.instance_spec import InstanceSpec from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec -from metricflow_semantics.time.granularity import ExpandedTimeGranularity from typing_extensions import override from metricflow.dataset.dataset_classes import DataSet @@ -167,18 +166,18 @@ def instance_for_spec(self, spec: InstanceSpec) -> MdoInstance: ) def instance_from_time_dimension_grain_and_date_part( - self, time_granularity: ExpandedTimeGranularity, date_part: Optional[DatePart] + self, time_granularity_name: str, date_part: Optional[DatePart] ) -> TimeDimensionInstance: """Find instance in dataset that matches the given grain and date part.""" for time_dimension_instance in self.instance_set.time_dimension_instances: if ( - time_dimension_instance.spec.time_granularity == time_granularity + time_dimension_instance.spec.time_granularity.name == time_granularity_name and time_dimension_instance.spec.date_part == date_part ): return time_dimension_instance raise RuntimeError( - f"Did not find a time dimension instance with grain {time_granularity} and date part {date_part}\n" + f"Did not find a time dimension instance with grain '{time_granularity_name}' and date part {date_part}\n" f"Instances available: {self.instance_set.time_dimension_instances}" ) diff --git a/metricflow/plan_conversion/dataflow_to_sql.py b/metricflow/plan_conversion/dataflow_to_sql.py index e0931bd07..8821cb6b3 100644 --- a/metricflow/plan_conversion/dataflow_to_sql.py +++ b/metricflow/plan_conversion/dataflow_to_sql.py @@ -6,7 +6,6 @@ from typing import Callable, Dict, FrozenSet, List, Optional, Sequence, Set, Tuple, TypeVar from dbt_semantic_interfaces.enum_extension import assert_values_exhausted -from dbt_semantic_interfaces.naming.keywords import DUNDER from dbt_semantic_interfaces.protocols.metric import MetricInputMeasure, MetricType from dbt_semantic_interfaces.references import MetricModelReference, SemanticModelElementReference from dbt_semantic_interfaces.type_enums.aggregation_type import AggregationType @@ -39,10 +38,8 @@ from metricflow_semantics.specs.spec_set import InstanceSpecSet from metricflow_semantics.specs.where_filter.where_filter_spec import WhereFilterSpec from metricflow_semantics.sql.sql_exprs import ( - SqlAddTimeExpression, SqlAggregateFunctionExpression, SqlBetweenExpression, - SqlCaseExpression, SqlColumnReference, SqlColumnReferenceExpression, SqlComparison, @@ -1963,190 +1960,210 @@ def visit_custom_granularity_bounds_node(self, node: CustomGranularityBoundsNode parent_instance_set = from_data_set.instance_set parent_data_set_alias = self._next_unique_table_alias() - # Get the column names needed to query the custom grain from the time spine where it's defined. - offset_grain_name = node.offset_window.granularity - time_spine = self._get_time_spine_for_custom_granularity(offset_grain_name) - window_column_name = self._get_custom_granularity_column_name(offset_grain_name) - window_column_expr = SqlColumnReferenceExpression.from_table_and_column_names( - table_alias=parent_data_set_alias, column_name=window_column_name + custom_granularity_name = node.custom_granularity_name + time_spine = self._get_time_spine_for_custom_granularity(custom_granularity_name) + custom_grain_instance_from_parent = from_data_set.instance_from_time_dimension_grain_and_date_part( + time_granularity_name=custom_granularity_name, date_part=None + ) + base_grain_instance_from_parent = from_data_set.instance_from_time_dimension_grain_and_date_part( + time_granularity_name=time_spine.base_granularity.value, date_part=None + ) + custom_column_expr = SqlColumnReferenceExpression.from_table_and_column_names( + table_alias=parent_data_set_alias, + column_name=custom_grain_instance_from_parent.associated_column.column_name, ) base_column_expr = SqlColumnReferenceExpression.from_table_and_column_names( - table_alias=parent_data_set_alias, column_name=time_spine.base_column + table_alias=parent_data_set_alias, column_name=base_grain_instance_from_parent.associated_column.column_name ) - # Build subquery to get start and end of custom grain period, as well as row number within the period. - parent_window_instance = from_data_set.instance_from_time_dimension_grain_and_date_part( - time_granularity=ExpandedTimeGranularity( - name=offset_grain_name, base_granularity=time_spine.base_granularity - ), - date_part=None, - ) - window_func_to_args: Dict[SqlWindowFunction, Tuple[SqlExpressionNode, ...]] = { - SqlWindowFunction.FIRST_VALUE: (base_column_expr,), - SqlWindowFunction.LAST_VALUE: (base_column_expr,), - SqlWindowFunction.ROW_NUMBER: (), - } - bounds_columns = tuple( - SqlSelectColumn( + new_instances = tuple() + new_select_columns = tuple() + + # Build columns that get start and end of the custom grain period. + # Ex: "FIRST_VALUE(ds) OVER (PARTITION BY martian_day ORDER BY ds) AS ds__fiscal_quarter__first_value" + for window_func in (SqlWindowFunction.FIRST_VALUE, SqlWindowFunction.LAST_VALUE): + new_instance = custom_grain_instance_from_parent.with_new_spec( + new_spec=custom_grain_instance_from_parent.spec.with_window_function(window_func), + column_association_resolver=self._column_association_resolver, + ) + select_column = SqlSelectColumn( expr=SqlWindowFunctionExpression.create( sql_function=window_func, - sql_function_args=func_args, - partition_by_args=(window_column_expr,), + sql_function_args=(base_column_expr,), + partition_by_args=(custom_column_expr,), order_by_args=(SqlWindowOrderByArgument(base_column_expr),), ), - column_alias=self._column_association_resolver.resolve_spec( - parent_window_instance.spec.with_window_function(window_func) - ).column_name, + column_alias=new_instance.associated_column.column_name, ) - for window_func, func_args in window_func_to_args.items() - ) - bounds_cte_alias = self._next_unique_table_alias() - bounds_cte = SqlCteNode.create( - SqlSelectStatementNode.create( - description=node.description, # TODO - select_columns=from_data_set.checked_sql_select_node.select_columns + bounds_columns, - from_source=from_data_set.checked_sql_select_node, - from_source_alias=parent_data_set_alias, - ), - cte_alias=bounds_cte_alias, - ) - - # Build a subquery to get a unique row for each custom grain along with its start date & end date. - unique_bounds_columns = tuple( - SqlSelectColumn.from_table_and_column_names(table_alias=bounds_cte_alias, column_name=alias) - for alias in [offset_grain_name] + [column.column_alias for column in bounds_columns[:-1]] - ) - unique_bounds_subquery_alias = self._next_unique_table_alias() - unique_bounds_subquery = SqlSelectStatementNode.create( - description=node.description, # TODO - select_columns=unique_bounds_columns, - from_source=bounds_cte, # need? can I make this optional if CTEs are present? - from_source_alias=bounds_cte_alias, - cte_sources=(bounds_cte,), - group_bys=unique_bounds_columns, - ) + new_instances += (new_instance,) + new_select_columns += (select_column,) - # Build a subquery to offset the start and end dates by the requested offset_window. - custom_grain_column = SqlSelectColumn.from_table_and_column_names( - column_name=offset_grain_name, table_alias=unique_bounds_subquery_alias + # Build a column that tracks the row number for the base grain column within the custom grain period. + # This will be offset by 1 to represent the number of base grain periods since the start of the custom grain period. + # Ex: "ROW_NUMBER() OVER (PARTITION BY martian_day ORDER BY ds) AS ds__day__row_number" + new_instance = base_grain_instance_from_parent.with_new_spec( + new_spec=base_grain_instance_from_parent.spec.with_window_function(window_func), + column_association_resolver=self._column_association_resolver, ) - offset_bounds_columns = tuple( - SqlSelectColumn( - expr=SqlWindowFunctionExpression.create( - sql_function=SqlWindowFunction.LAG, - sql_function_args=( - SqlColumnReferenceExpression.from_table_and_column_names( - column_name=column.column_alias, table_alias=unique_bounds_subquery_alias - ), - SqlStringExpression.create(str(node.offset_window.count)), - ), - order_by_args=(SqlWindowOrderByArgument(custom_grain_column.expr),), - ), - column_alias=f"{column.column_alias}{DUNDER}offset", # TODO: finalize this alias - ) - for column in unique_bounds_columns - ) - offset_bounds_subquery_alias = self._next_unique_table_alias() - offset_bounds_subquery = SqlSelectStatementNode.create( - description=node.description, # TODO - select_columns=(custom_grain_column,) + offset_bounds_columns, - from_source=unique_bounds_subquery, - from_source_alias=unique_bounds_subquery_alias, - ) - - # Use the row number calculated above to offset the time spine's base column by the requested window. - # If the offset date is not within the offset custom grain period, default to the last value in that period. - custom_grain_column_2 = SqlSelectColumn.from_table_and_column_names( - column_name=offset_grain_name, table_alias=unique_bounds_subquery_alias - ) # TODO: better variable name - # TODO: Get time spine specs in this node. If any have the base grain, use any of those specs. - # Else, default to metric time as below. - base_grain_spec = DataSet.metric_time_dimension_spec( - ExpandedTimeGranularity.from_time_granularity(time_spine.base_granularity) - ) - base_grain_spec_column_name = self._column_association_resolver.resolve_spec(base_grain_spec).column_name - offset_start, offset_end = [ - SqlColumnReferenceExpression.from_table_and_column_names( - column_name=offset_bound_column.column_alias, table_alias=offset_bounds_subquery_alias - ) - for offset_bound_column in offset_bounds_columns - ] - add_row_number_expr = SqlAddTimeExpression.create( - arg=offset_start, - count_expr=SqlColumnReferenceExpression.from_table_and_column_names( - table_alias=bounds_cte_alias, column_name=bounds_columns[-1].column_alias - ), - granularity=time_spine.base_granularity, + window_func_expr = SqlWindowFunctionExpression.create( + sql_function=SqlWindowFunction.ROW_NUMBER, + partition_by_args=(custom_column_expr,), + order_by_args=(SqlWindowOrderByArgument(base_column_expr),), ) - below_end_date_expr = SqlComparisonExpression.create( - left_expr=add_row_number_expr, comparison=SqlComparison.LESS_THAN_OR_EQUALS, right_expr=add_row_number_expr - ) - offset_base_column = SqlSelectColumn( - expr=SqlCaseExpression.create( - when_to_then_exprs={below_end_date_expr: add_row_number_expr}, - else_expr=offset_end, - ), - column_alias=base_grain_spec_column_name, - ) - join_desc = SqlJoinDescription( - right_source=offset_bounds_subquery, - right_source_alias=offset_bounds_subquery_alias, - join_type=SqlJoinType.INNER, - on_condition=SqlComparisonExpression.create( - left_expr=custom_grain_column_2.expr, - comparison=SqlComparison.EQUALS, - right_expr=SqlColumnReferenceExpression.from_table_and_column_names( - table_alias=bounds_cte_alias, column_name=offset_grain_name - ), - ), - ) - offset_base_column_subquery_alias = self._next_unique_table_alias() - output_select_node = SqlSelectStatementNode.create( - description=node.description, # TODO - select_columns=(custom_grain_column_2, offset_base_column), - from_source=bounds_cte, # need? - from_source_alias=bounds_cte_alias, - join_descs=(join_desc,), - cte_sources=(bounds_cte,), + new_select_column = SqlSelectColumn( + expr=window_func_expr, + column_alias=new_instance.associated_column.column_name, ) + new_instances += (new_instance,) + new_select_columns += (new_select_column,) - # Apply any standard grains that were requested. - # TODO: add a conditional here if there are standard grains requested besides the base grain - base_grain_column = SqlSelectColumn.from_table_and_column_names( - table_alias=offset_base_column_subquery_alias, column_name=base_grain_spec_column_name - ) - standard_grain_columns = tuple( - SqlSelectColumn( - expr=SqlDateTruncExpression.create( - time_granularity=time_spine_spec.time_granularity.base_granularity, arg=base_grain_column.expr - ), - column_alias=self._column_association_resolver.resolve_spec(time_spine_spec).column_name, - ) - for time_spine_spec in node.requested_time_spine_specs - if not time_spine_spec.time_granularity.is_custom_granularity - ) - if standard_grain_columns: - output_select_node = SqlSelectStatementNode.create( - description=node.description, # TODO - select_columns=(base_grain_column,) + standard_grain_columns, - from_source=output_select_node, - from_source_alias=offset_base_column_subquery_alias, - ) - - # Build output instance set. - time_spine_instance_set = InstanceSet( - time_dimension_instances=tuple( - parent_window_instance.with_new_spec( - new_spec=spec, column_association_resolver=self._column_association_resolver - ) - for spec in node.requested_time_spine_specs - ) - ) return SqlDataSet( - instance_set=InstanceSet.merge([time_spine_instance_set, parent_instance_set]), - sql_select_node=output_select_node, + instance_set=InstanceSet.merge([parent_instance_set, InstanceSet(time_dimension_instances=new_instances)]), + sql_select_node=SqlSelectStatementNode.create( + description=node.description, + select_columns=from_data_set.checked_sql_select_node.select_columns + new_select_columns, + from_source=from_data_set.checked_sql_select_node, + from_source_alias=parent_data_set_alias, + ), ) + # def __visit_next_node(): + # # Build a subquery to get a unique row for each custom grain along with its start date & end date. + # unique_bounds_columns = tuple( + # SqlSelectColumn.from_table_and_column_names(table_alias=bounds_cte_alias, column_name=alias) + # for alias in [column.column_alias for column in bounds_columns[:-1]] + # ) + # unique_bounds_subquery_alias = self._next_unique_table_alias() + # unique_bounds_subquery = SqlSelectStatementNode.create( + # description=node.description, # TODO + # select_columns=unique_bounds_columns, + # from_source=bounds_cte, # need? can I make this optional if CTEs are present? + # from_source_alias=bounds_cte_alias, + # # cte_sources=(bounds_cte,), + # group_bys=unique_bounds_columns, + # ) + + # # Build a subquery to offset the start and end dates by the requested offset_window. + # custom_grain_column = SqlSelectColumn.from_table_and_column_names( + # column_name=window_column_alias, table_alias=unique_bounds_subquery_alias + # ) + # offset_bounds_columns = tuple( + # SqlSelectColumn( + # expr=SqlWindowFunctionExpression.create( + # sql_function=SqlWindowFunction.LAG, + # sql_function_args=( + # SqlColumnReferenceExpression.from_table_and_column_names( + # column_name=column.column_alias, table_alias=unique_bounds_subquery_alias + # ), + # SqlStringExpression.create(str(node.offset_window.count)), + # ), + # order_by_args=(SqlWindowOrderByArgument(custom_grain_column.expr),), + # ), + # column_alias=f"{column.column_alias}{DUNDER}offset", + # ) + # for column in unique_bounds_columns + # ) + # offset_bounds_subquery_alias = self._next_unique_table_alias() + # offset_bounds_subquery = SqlSelectStatementNode.create( + # description=node.description, # TODO + # select_columns=(custom_grain_column,) + offset_bounds_columns, + # from_source=unique_bounds_subquery, + # from_source_alias=unique_bounds_subquery_alias, + # ) + + # # Use the row number calculated above to offset the time spine's base column by the requested window. + # # If the offset date is not within the offset custom grain period, default to the last value in that period. + # custom_grain_column_2 = SqlSelectColumn.from_table_and_column_names( + # column_name=window_column_alias, table_alias=unique_bounds_subquery_alias + # ) # TODO: better variable name + # # TODO: Get time spine specs in this node. If any have the base grain, use any of those specs. + # # Else, default to metric time as below. + # base_grain_spec = DataSet.metric_time_dimension_spec( + # ExpandedTimeGranularity.from_time_granularity(time_spine.base_granularity) + # ) + # base_grain_spec_column_name = self._column_association_resolver.resolve_spec(base_grain_spec).column_name + # offset_start, offset_end = [ + # SqlColumnReferenceExpression.from_table_and_column_names( + # column_name=offset_bound_column.column_alias, table_alias=offset_bounds_subquery_alias + # ) + # for offset_bound_column in offset_bounds_columns + # ] + # add_row_number_expr = SqlAddTimeExpression.create( + # arg=offset_start, + # count_expr=SqlColumnReferenceExpression.from_table_and_column_names( + # table_alias=bounds_cte_alias, column_name=bounds_columns[-1].column_alias + # ), + # granularity=time_spine.base_granularity, + # ) + # below_end_date_expr = SqlComparisonExpression.create( + # left_expr=add_row_number_expr, comparison=SqlComparison.LESS_THAN_OR_EQUALS, right_expr=add_row_number_expr + # ) + # offset_base_column = SqlSelectColumn( + # expr=SqlCaseExpression.create( + # when_to_then_exprs={below_end_date_expr: add_row_number_expr}, + # else_expr=offset_end, + # ), + # column_alias=base_grain_spec_column_name, + # ) + # join_desc = SqlJoinDescription( + # right_source=offset_bounds_subquery, + # right_source_alias=offset_bounds_subquery_alias, + # join_type=SqlJoinType.INNER, + # on_condition=SqlComparisonExpression.create( + # left_expr=custom_grain_column_2.expr, + # comparison=SqlComparison.EQUALS, + # right_expr=SqlColumnReferenceExpression.from_table_and_column_names( + # table_alias=bounds_cte_alias, column_name=window_column_alias + # ), + # ), + # ) + # offset_base_column_subquery_alias = self._next_unique_table_alias() + # output_select_node = SqlSelectStatementNode.create( + # description=node.description, # TODO + # select_columns=(custom_grain_column_2, offset_base_column), + # from_source=bounds_cte, # need? + # from_source_alias=bounds_cte_alias, + # join_descs=(join_desc,), + # # cte_sources=(bounds_cte,), + # ) + + # # Apply any standard grains that were requested. + # # TODO: add a conditional here if there are standard grains requested besides the base grain + # base_grain_column = SqlSelectColumn.from_table_and_column_names( + # table_alias=offset_base_column_subquery_alias, column_name=base_grain_spec_column_name + # ) + # standard_grain_columns = tuple( + # SqlSelectColumn( + # expr=SqlDateTruncExpression.create( + # time_granularity=time_spine_spec.time_granularity.base_granularity, arg=base_grain_column.expr + # ), + # column_alias=self._column_association_resolver.resolve_spec(time_spine_spec).column_name, + # ) + # for time_spine_spec in node.requested_time_spine_specs + # if not time_spine_spec.time_granularity.is_custom_granularity + # ) + # if standard_grain_columns: + # output_select_node = SqlSelectStatementNode.create( + # description=node.description, # TODO + # select_columns=(base_grain_column,) + standard_grain_columns, + # from_source=output_select_node, + # from_source_alias=offset_base_column_subquery_alias, + # ) + + # # Build output instance set. + # time_spine_instance_set = InstanceSet( + # time_dimension_instances=tuple( + # parent_window_instance.with_new_spec( + # new_spec=spec, column_association_resolver=self._column_association_resolver + # ) + # for spec in node.requested_time_spine_specs + # ) + # ) + # return SqlDataSet( + # instance_set=InstanceSet.merge([time_spine_instance_set, parent_instance_set]), + # sql_select_node=output_select_node, + # ) + class DataflowNodeToSqlCteVisitor(DataflowNodeToSqlSubqueryVisitor): """Similar to `DataflowNodeToSqlSubqueryVisitor`, except that this converts specific nodes to CTEs. diff --git a/metricflow/sql/optimizer/rewriting_sub_query_reducer.py b/metricflow/sql/optimizer/rewriting_sub_query_reducer.py index 6d54f6ff4..63766ca7a 100644 --- a/metricflow/sql/optimizer/rewriting_sub_query_reducer.py +++ b/metricflow/sql/optimizer/rewriting_sub_query_reducer.py @@ -586,7 +586,7 @@ def _rewrite_node_with_join(self, node: SqlSelectStatementNode) -> SqlSelectStat @override def visit_cte_node(self, node: SqlCteNode) -> SqlQueryPlanNode: - raise NotImplementedError + return node def visit_select_statement_node(self, node: SqlSelectStatementNode) -> SqlQueryPlanNode: # noqa: D102 node_with_reduced_parents = self._reduce_parents(node) diff --git a/metricflow/sql/optimizer/tag_required_column_aliases.py b/metricflow/sql/optimizer/tag_required_column_aliases.py index b75f9d2d1..656fd14ac 100644 --- a/metricflow/sql/optimizer/tag_required_column_aliases.py +++ b/metricflow/sql/optimizer/tag_required_column_aliases.py @@ -131,6 +131,7 @@ def _tag_potential_cte_node(self, table_name: str, column_aliases: Set[str]) -> def visit_select_statement_node(self, node: SqlSelectStatementNode) -> None: """Based on required column aliases for this SELECT, figure out required column aliases in parents.""" initial_required_column_aliases_in_this_node = self._current_required_column_alias_mapping.get_aliases(node) + print(self._current_required_column_alias_mapping._node_to_tagged_aliases[node]) # If this SELECT statement uses DISTINCT, all columns are required as removing them would change the meaning of # the query. @@ -161,10 +162,10 @@ def visit_select_statement_node(self, node: SqlSelectStatementNode) -> None: if select_column.column_alias in updated_required_column_aliases_in_this_node ) - if len(required_select_columns_in_this_node) == 0: - raise RuntimeError( - "No columns are required in this node - this indicates a bug in this visitor or in the inputs." - ) + # if len(required_select_columns_in_this_node) == 0: + # raise RuntimeError( + # "No columns are required in this node - this indicates a bug in this visitor or in the inputs." + # ) # It's possible for `required_select_columns_in_this_node` to be empty because we traverse through the ancestors # of a CTE node whenever a CTE node is updated. See `test_multi_child_pruning`. diff --git a/tests_metricflow/integration/test_cases/itest_granularity.yaml b/tests_metricflow/integration/test_cases/itest_granularity.yaml index b83cbeb03..e32a6cf99 100644 --- a/tests_metricflow/integration/test_cases/itest_granularity.yaml +++ b/tests_metricflow/integration/test_cases/itest_granularity.yaml @@ -961,3 +961,12 @@ integration_test: GROUP BY subq_2.martian_day ) subq_5 ON subq_6.metric_time__martian_day = subq_5.metric_time__martian_day +--- +integration_test: + name: custom_offset_window + description: Test querying a metric with a custom offset window + model: SIMPLE_MODEL + metrics: ["bookings_offset_one_martian_day"] + group_bys: ["metric_time__day"] # TODO: add other standard grains + custom grain + check_query: | + SELECT 1 diff --git a/tests_metricflow/integration/test_configured_cases.py b/tests_metricflow/integration/test_configured_cases.py index 027834d05..791c0b1f9 100644 --- a/tests_metricflow/integration/test_configured_cases.py +++ b/tests_metricflow/integration/test_configured_cases.py @@ -313,6 +313,7 @@ def test_case( ) actual = query_result.result_df + assert 0, query_result.sql expected = sql_client.query( jinja2.Template( diff --git a/tests_metricflow/snapshots/test_custom_granularity.py/SqlQueryPlan/DuckDB/test_custom_offset_window__plan0.sql b/tests_metricflow/snapshots/test_custom_granularity.py/SqlQueryPlan/DuckDB/test_custom_offset_window__plan0.sql new file mode 100644 index 000000000..d5637e260 --- /dev/null +++ b/tests_metricflow/snapshots/test_custom_granularity.py/SqlQueryPlan/DuckDB/test_custom_offset_window__plan0.sql @@ -0,0 +1,469 @@ +test_name: test_custom_offset_window +test_filename: test_custom_granularity.py +sql_engine: DuckDB +--- +-- Compute Metrics via Expressions +SELECT + subq_12.metric_time__day + , bookings AS bookings_offset_one_martian_day +FROM ( + -- Compute Metrics via Expressions + SELECT + subq_11.metric_time__day + , subq_11.bookings + FROM ( + -- Aggregate Measures + SELECT + subq_10.metric_time__day + , SUM(subq_10.bookings) AS bookings + FROM ( + -- Pass Only Elements: ['bookings', 'metric_time__day'] + SELECT + subq_9.metric_time__day + , subq_9.bookings + FROM ( + -- Join to Time Spine Dataset + SELECT + subq_1.ds__day AS ds__day + , subq_1.ds__week AS ds__week + , subq_1.ds__month AS ds__month + , subq_1.ds__quarter AS ds__quarter + , subq_1.ds__year AS ds__year + , subq_1.ds__extract_year AS ds__extract_year + , subq_1.ds__extract_quarter AS ds__extract_quarter + , subq_1.ds__extract_month AS ds__extract_month + , subq_1.ds__extract_day AS ds__extract_day + , subq_1.ds__extract_dow AS ds__extract_dow + , subq_1.ds__extract_doy AS ds__extract_doy + , subq_1.ds_partitioned__day AS ds_partitioned__day + , subq_1.ds_partitioned__week AS ds_partitioned__week + , subq_1.ds_partitioned__month AS ds_partitioned__month + , subq_1.ds_partitioned__quarter AS ds_partitioned__quarter + , subq_1.ds_partitioned__year AS ds_partitioned__year + , subq_1.ds_partitioned__extract_year AS ds_partitioned__extract_year + , subq_1.ds_partitioned__extract_quarter AS ds_partitioned__extract_quarter + , subq_1.ds_partitioned__extract_month AS ds_partitioned__extract_month + , subq_1.ds_partitioned__extract_day AS ds_partitioned__extract_day + , subq_1.ds_partitioned__extract_dow AS ds_partitioned__extract_dow + , subq_1.ds_partitioned__extract_doy AS ds_partitioned__extract_doy + , subq_1.paid_at__day AS paid_at__day + , subq_1.paid_at__week AS paid_at__week + , subq_1.paid_at__month AS paid_at__month + , subq_1.paid_at__quarter AS paid_at__quarter + , subq_1.paid_at__year AS paid_at__year + , subq_1.paid_at__extract_year AS paid_at__extract_year + , subq_1.paid_at__extract_quarter AS paid_at__extract_quarter + , subq_1.paid_at__extract_month AS paid_at__extract_month + , subq_1.paid_at__extract_day AS paid_at__extract_day + , subq_1.paid_at__extract_dow AS paid_at__extract_dow + , subq_1.paid_at__extract_doy AS paid_at__extract_doy + , subq_1.booking__ds__day AS booking__ds__day + , subq_1.booking__ds__week AS booking__ds__week + , subq_1.booking__ds__month AS booking__ds__month + , subq_1.booking__ds__quarter AS booking__ds__quarter + , subq_1.booking__ds__year AS booking__ds__year + , subq_1.booking__ds__extract_year AS booking__ds__extract_year + , subq_1.booking__ds__extract_quarter AS booking__ds__extract_quarter + , subq_1.booking__ds__extract_month AS booking__ds__extract_month + , subq_1.booking__ds__extract_day AS booking__ds__extract_day + , subq_1.booking__ds__extract_dow AS booking__ds__extract_dow + , subq_1.booking__ds__extract_doy AS booking__ds__extract_doy + , subq_1.booking__ds_partitioned__day AS booking__ds_partitioned__day + , subq_1.booking__ds_partitioned__week AS booking__ds_partitioned__week + , subq_1.booking__ds_partitioned__month AS booking__ds_partitioned__month + , subq_1.booking__ds_partitioned__quarter AS booking__ds_partitioned__quarter + , subq_1.booking__ds_partitioned__year AS booking__ds_partitioned__year + , subq_1.booking__ds_partitioned__extract_year AS booking__ds_partitioned__extract_year + , subq_1.booking__ds_partitioned__extract_quarter AS booking__ds_partitioned__extract_quarter + , subq_1.booking__ds_partitioned__extract_month AS booking__ds_partitioned__extract_month + , subq_1.booking__ds_partitioned__extract_day AS booking__ds_partitioned__extract_day + , subq_1.booking__ds_partitioned__extract_dow AS booking__ds_partitioned__extract_dow + , subq_1.booking__ds_partitioned__extract_doy AS booking__ds_partitioned__extract_doy + , subq_1.booking__paid_at__day AS booking__paid_at__day + , subq_1.booking__paid_at__week AS booking__paid_at__week + , subq_1.booking__paid_at__month AS booking__paid_at__month + , subq_1.booking__paid_at__quarter AS booking__paid_at__quarter + , subq_1.booking__paid_at__year AS booking__paid_at__year + , subq_1.booking__paid_at__extract_year AS booking__paid_at__extract_year + , subq_1.booking__paid_at__extract_quarter AS booking__paid_at__extract_quarter + , subq_1.booking__paid_at__extract_month AS booking__paid_at__extract_month + , subq_1.booking__paid_at__extract_day AS booking__paid_at__extract_day + , subq_1.booking__paid_at__extract_dow AS booking__paid_at__extract_dow + , subq_1.booking__paid_at__extract_doy AS booking__paid_at__extract_doy + , subq_1.metric_time__week AS metric_time__week + , subq_1.metric_time__month AS metric_time__month + , subq_1.metric_time__quarter AS metric_time__quarter + , subq_1.metric_time__year AS metric_time__year + , subq_1.metric_time__extract_year AS metric_time__extract_year + , subq_1.metric_time__extract_quarter AS metric_time__extract_quarter + , subq_1.metric_time__extract_month AS metric_time__extract_month + , subq_1.metric_time__extract_day AS metric_time__extract_day + , subq_1.metric_time__extract_dow AS metric_time__extract_dow + , subq_1.metric_time__extract_doy AS metric_time__extract_doy + , subq_8.metric_time__day AS metric_time__day + , subq_1.listing AS listing + , subq_1.guest AS guest + , subq_1.host AS host + , subq_1.booking__listing AS booking__listing + , subq_1.booking__guest AS booking__guest + , subq_1.booking__host AS booking__host + , subq_1.is_instant AS is_instant + , subq_1.booking__is_instant AS booking__is_instant + , subq_1.bookings AS bookings + , subq_1.instant_bookings AS instant_bookings + , subq_1.booking_value AS booking_value + , subq_1.max_booking_value AS max_booking_value + , subq_1.min_booking_value AS min_booking_value + , subq_1.bookers AS bookers + , subq_1.average_booking_value AS average_booking_value + , subq_1.referred_bookings AS referred_bookings + , subq_1.median_booking_value AS median_booking_value + , subq_1.booking_value_p99 AS booking_value_p99 + , subq_1.discrete_booking_value_p99 AS discrete_booking_value_p99 + , subq_1.approximate_continuous_booking_value_p99 AS approximate_continuous_booking_value_p99 + , subq_1.approximate_discrete_booking_value_p99 AS approximate_discrete_booking_value_p99 + FROM ( + -- Pass Only Elements: ['metric_time__day', 'metric_time__day'] + SELECT + subq_7.metric_time__day + FROM ( + -- Calculate Custom Granularity Bounds + SELECT + subq_6.metric_time__day + , DATE_TRUNC('day', subq_6.metric_time__day) AS metric_time__day + FROM ( + -- Calculate Custom Granularity Bounds + SELECT + subq_4.metric_time__martian_day AS metric_time__martian_day + , CASE + WHEN subq_5.ds__martian_day__first_value__offset + INTERVAL subq_3.ds__martian_day__row_number day <= subq_5.ds__martian_day__first_value__offset + INTERVAL subq_3.ds__martian_day__row_number day THEN subq_5.ds__martian_day__first_value__offset + INTERVAL subq_3.ds__martian_day__row_number day + ELSE subq_5.ds__martian_day__last_value__offset + END AS metric_time__day + FROM ( + -- Calculate Custom Granularity Bounds + SELECT + time_spine_src_28006.ds AS ds__day + , DATE_TRUNC('week', time_spine_src_28006.ds) AS ds__week + , DATE_TRUNC('month', time_spine_src_28006.ds) AS ds__month + , DATE_TRUNC('quarter', time_spine_src_28006.ds) AS ds__quarter + , DATE_TRUNC('year', time_spine_src_28006.ds) AS ds__year + , EXTRACT(year FROM time_spine_src_28006.ds) AS ds__extract_year + , EXTRACT(quarter FROM time_spine_src_28006.ds) AS ds__extract_quarter + , EXTRACT(month FROM time_spine_src_28006.ds) AS ds__extract_month + , EXTRACT(day FROM time_spine_src_28006.ds) AS ds__extract_day + , EXTRACT(isodow FROM time_spine_src_28006.ds) AS ds__extract_dow + , EXTRACT(doy FROM time_spine_src_28006.ds) AS ds__extract_doy + , time_spine_src_28006.martian_day AS ds__martian_day + , subq_2.ds__martian_day AS metric_time__martian_day + , FIRST_VALUE(subq_2.ds) OVER ( + PARTITION BY subq_2.ds__martian_day + ORDER BY subq_2.ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__first_value + , LAST_VALUE(subq_2.ds) OVER ( + PARTITION BY subq_2.ds__martian_day + ORDER BY subq_2.ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__last_value + , ROW_NUMBER() OVER ( + PARTITION BY subq_2.ds__martian_day + ORDER BY subq_2.ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__row_number + FROM ( + -- Read From Time Spine 'mf_time_spine' + SELECT + time_spine_src_28006.ds AS ds__day + , DATE_TRUNC('week', time_spine_src_28006.ds) AS ds__week + , DATE_TRUNC('month', time_spine_src_28006.ds) AS ds__month + , DATE_TRUNC('quarter', time_spine_src_28006.ds) AS ds__quarter + , DATE_TRUNC('year', time_spine_src_28006.ds) AS ds__year + , EXTRACT(year FROM time_spine_src_28006.ds) AS ds__extract_year + , EXTRACT(quarter FROM time_spine_src_28006.ds) AS ds__extract_quarter + , EXTRACT(month FROM time_spine_src_28006.ds) AS ds__extract_month + , EXTRACT(day FROM time_spine_src_28006.ds) AS ds__extract_day + , EXTRACT(isodow FROM time_spine_src_28006.ds) AS ds__extract_dow + , EXTRACT(doy FROM time_spine_src_28006.ds) AS ds__extract_doy + , time_spine_src_28006.martian_day AS ds__martian_day + FROM ***************************.mf_time_spine time_spine_src_28006 + ) subq_2 + ) subq_3 + INNER JOIN ( + -- Calculate Custom Granularity Bounds + SELECT + subq_4.metric_time__martian_day + , LAG(subq_4.ds__martian_day__first_value, 1) OVER ( + ORDER BY subq_4.metric_time__martian_day + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__first_value__offset + , LAG(subq_4.ds__martian_day__last_value, 1) OVER ( + ORDER BY subq_4.metric_time__martian_day + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__last_value__offset + FROM ( + -- Calculate Custom Granularity Bounds + SELECT + subq_3.ds__martian_day__first_value + , subq_3.ds__martian_day__last_value + FROM ( + -- Calculate Custom Granularity Bounds + SELECT + time_spine_src_28006.ds AS ds__day + , DATE_TRUNC('week', time_spine_src_28006.ds) AS ds__week + , DATE_TRUNC('month', time_spine_src_28006.ds) AS ds__month + , DATE_TRUNC('quarter', time_spine_src_28006.ds) AS ds__quarter + , DATE_TRUNC('year', time_spine_src_28006.ds) AS ds__year + , EXTRACT(year FROM time_spine_src_28006.ds) AS ds__extract_year + , EXTRACT(quarter FROM time_spine_src_28006.ds) AS ds__extract_quarter + , EXTRACT(month FROM time_spine_src_28006.ds) AS ds__extract_month + , EXTRACT(day FROM time_spine_src_28006.ds) AS ds__extract_day + , EXTRACT(isodow FROM time_spine_src_28006.ds) AS ds__extract_dow + , EXTRACT(doy FROM time_spine_src_28006.ds) AS ds__extract_doy + , time_spine_src_28006.martian_day AS ds__martian_day + , subq_2.ds__martian_day AS metric_time__martian_day + , FIRST_VALUE(subq_2.ds) OVER ( + PARTITION BY subq_2.ds__martian_day + ORDER BY subq_2.ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__first_value + , LAST_VALUE(subq_2.ds) OVER ( + PARTITION BY subq_2.ds__martian_day + ORDER BY subq_2.ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__last_value + , ROW_NUMBER() OVER ( + PARTITION BY subq_2.ds__martian_day + ORDER BY subq_2.ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__row_number + FROM ( + -- Read From Time Spine 'mf_time_spine' + SELECT + time_spine_src_28006.ds AS ds__day + , DATE_TRUNC('week', time_spine_src_28006.ds) AS ds__week + , DATE_TRUNC('month', time_spine_src_28006.ds) AS ds__month + , DATE_TRUNC('quarter', time_spine_src_28006.ds) AS ds__quarter + , DATE_TRUNC('year', time_spine_src_28006.ds) AS ds__year + , EXTRACT(year FROM time_spine_src_28006.ds) AS ds__extract_year + , EXTRACT(quarter FROM time_spine_src_28006.ds) AS ds__extract_quarter + , EXTRACT(month FROM time_spine_src_28006.ds) AS ds__extract_month + , EXTRACT(day FROM time_spine_src_28006.ds) AS ds__extract_day + , EXTRACT(isodow FROM time_spine_src_28006.ds) AS ds__extract_dow + , EXTRACT(doy FROM time_spine_src_28006.ds) AS ds__extract_doy + , time_spine_src_28006.martian_day AS ds__martian_day + FROM ***************************.mf_time_spine time_spine_src_28006 + ) subq_2 + ) subq_3 + GROUP BY + subq_3.ds__martian_day__first_value + , subq_3.ds__martian_day__last_value + ) subq_4 + ) subq_5 + ON + subq_4.metric_time__martian_day = subq_3.metric_time__martian_day + ) subq_6 + ) subq_7 + ) subq_8 + INNER JOIN ( + -- Metric Time Dimension 'ds' + SELECT + subq_0.ds__day + , subq_0.ds__week + , subq_0.ds__month + , subq_0.ds__quarter + , subq_0.ds__year + , subq_0.ds__extract_year + , subq_0.ds__extract_quarter + , subq_0.ds__extract_month + , subq_0.ds__extract_day + , subq_0.ds__extract_dow + , subq_0.ds__extract_doy + , subq_0.ds_partitioned__day + , subq_0.ds_partitioned__week + , subq_0.ds_partitioned__month + , subq_0.ds_partitioned__quarter + , subq_0.ds_partitioned__year + , subq_0.ds_partitioned__extract_year + , subq_0.ds_partitioned__extract_quarter + , subq_0.ds_partitioned__extract_month + , subq_0.ds_partitioned__extract_day + , subq_0.ds_partitioned__extract_dow + , subq_0.ds_partitioned__extract_doy + , subq_0.paid_at__day + , subq_0.paid_at__week + , subq_0.paid_at__month + , subq_0.paid_at__quarter + , subq_0.paid_at__year + , subq_0.paid_at__extract_year + , subq_0.paid_at__extract_quarter + , subq_0.paid_at__extract_month + , subq_0.paid_at__extract_day + , subq_0.paid_at__extract_dow + , subq_0.paid_at__extract_doy + , subq_0.booking__ds__day + , subq_0.booking__ds__week + , subq_0.booking__ds__month + , subq_0.booking__ds__quarter + , subq_0.booking__ds__year + , subq_0.booking__ds__extract_year + , subq_0.booking__ds__extract_quarter + , subq_0.booking__ds__extract_month + , subq_0.booking__ds__extract_day + , subq_0.booking__ds__extract_dow + , subq_0.booking__ds__extract_doy + , subq_0.booking__ds_partitioned__day + , subq_0.booking__ds_partitioned__week + , subq_0.booking__ds_partitioned__month + , subq_0.booking__ds_partitioned__quarter + , subq_0.booking__ds_partitioned__year + , subq_0.booking__ds_partitioned__extract_year + , subq_0.booking__ds_partitioned__extract_quarter + , subq_0.booking__ds_partitioned__extract_month + , subq_0.booking__ds_partitioned__extract_day + , subq_0.booking__ds_partitioned__extract_dow + , subq_0.booking__ds_partitioned__extract_doy + , subq_0.booking__paid_at__day + , subq_0.booking__paid_at__week + , subq_0.booking__paid_at__month + , subq_0.booking__paid_at__quarter + , subq_0.booking__paid_at__year + , subq_0.booking__paid_at__extract_year + , subq_0.booking__paid_at__extract_quarter + , subq_0.booking__paid_at__extract_month + , subq_0.booking__paid_at__extract_day + , subq_0.booking__paid_at__extract_dow + , subq_0.booking__paid_at__extract_doy + , subq_0.ds__day AS metric_time__day + , subq_0.ds__week AS metric_time__week + , subq_0.ds__month AS metric_time__month + , subq_0.ds__quarter AS metric_time__quarter + , subq_0.ds__year AS metric_time__year + , subq_0.ds__extract_year AS metric_time__extract_year + , subq_0.ds__extract_quarter AS metric_time__extract_quarter + , subq_0.ds__extract_month AS metric_time__extract_month + , subq_0.ds__extract_day AS metric_time__extract_day + , subq_0.ds__extract_dow AS metric_time__extract_dow + , subq_0.ds__extract_doy AS metric_time__extract_doy + , subq_0.listing + , subq_0.guest + , subq_0.host + , subq_0.booking__listing + , subq_0.booking__guest + , subq_0.booking__host + , subq_0.is_instant + , subq_0.booking__is_instant + , subq_0.bookings + , subq_0.instant_bookings + , subq_0.booking_value + , subq_0.max_booking_value + , subq_0.min_booking_value + , subq_0.bookers + , subq_0.average_booking_value + , subq_0.referred_bookings + , subq_0.median_booking_value + , subq_0.booking_value_p99 + , subq_0.discrete_booking_value_p99 + , subq_0.approximate_continuous_booking_value_p99 + , subq_0.approximate_discrete_booking_value_p99 + FROM ( + -- Read Elements From Semantic Model 'bookings_source' + SELECT + 1 AS bookings + , CASE WHEN is_instant THEN 1 ELSE 0 END AS instant_bookings + , bookings_source_src_28000.booking_value + , bookings_source_src_28000.booking_value AS max_booking_value + , bookings_source_src_28000.booking_value AS min_booking_value + , bookings_source_src_28000.guest_id AS bookers + , bookings_source_src_28000.booking_value AS average_booking_value + , bookings_source_src_28000.booking_value AS booking_payments + , CASE WHEN referrer_id IS NOT NULL THEN 1 ELSE 0 END AS referred_bookings + , bookings_source_src_28000.booking_value AS median_booking_value + , bookings_source_src_28000.booking_value AS booking_value_p99 + , bookings_source_src_28000.booking_value AS discrete_booking_value_p99 + , bookings_source_src_28000.booking_value AS approximate_continuous_booking_value_p99 + , bookings_source_src_28000.booking_value AS approximate_discrete_booking_value_p99 + , bookings_source_src_28000.is_instant + , DATE_TRUNC('day', bookings_source_src_28000.ds) AS ds__day + , DATE_TRUNC('week', bookings_source_src_28000.ds) AS ds__week + , DATE_TRUNC('month', bookings_source_src_28000.ds) AS ds__month + , DATE_TRUNC('quarter', bookings_source_src_28000.ds) AS ds__quarter + , DATE_TRUNC('year', bookings_source_src_28000.ds) AS ds__year + , EXTRACT(year FROM bookings_source_src_28000.ds) AS ds__extract_year + , EXTRACT(quarter FROM bookings_source_src_28000.ds) AS ds__extract_quarter + , EXTRACT(month FROM bookings_source_src_28000.ds) AS ds__extract_month + , EXTRACT(day FROM bookings_source_src_28000.ds) AS ds__extract_day + , EXTRACT(isodow FROM bookings_source_src_28000.ds) AS ds__extract_dow + , EXTRACT(doy FROM bookings_source_src_28000.ds) AS ds__extract_doy + , DATE_TRUNC('day', bookings_source_src_28000.ds_partitioned) AS ds_partitioned__day + , DATE_TRUNC('week', bookings_source_src_28000.ds_partitioned) AS ds_partitioned__week + , DATE_TRUNC('month', bookings_source_src_28000.ds_partitioned) AS ds_partitioned__month + , DATE_TRUNC('quarter', bookings_source_src_28000.ds_partitioned) AS ds_partitioned__quarter + , DATE_TRUNC('year', bookings_source_src_28000.ds_partitioned) AS ds_partitioned__year + , EXTRACT(year FROM bookings_source_src_28000.ds_partitioned) AS ds_partitioned__extract_year + , EXTRACT(quarter FROM bookings_source_src_28000.ds_partitioned) AS ds_partitioned__extract_quarter + , EXTRACT(month FROM bookings_source_src_28000.ds_partitioned) AS ds_partitioned__extract_month + , EXTRACT(day FROM bookings_source_src_28000.ds_partitioned) AS ds_partitioned__extract_day + , EXTRACT(isodow FROM bookings_source_src_28000.ds_partitioned) AS ds_partitioned__extract_dow + , EXTRACT(doy FROM bookings_source_src_28000.ds_partitioned) AS ds_partitioned__extract_doy + , DATE_TRUNC('day', bookings_source_src_28000.paid_at) AS paid_at__day + , DATE_TRUNC('week', bookings_source_src_28000.paid_at) AS paid_at__week + , DATE_TRUNC('month', bookings_source_src_28000.paid_at) AS paid_at__month + , DATE_TRUNC('quarter', bookings_source_src_28000.paid_at) AS paid_at__quarter + , DATE_TRUNC('year', bookings_source_src_28000.paid_at) AS paid_at__year + , EXTRACT(year FROM bookings_source_src_28000.paid_at) AS paid_at__extract_year + , EXTRACT(quarter FROM bookings_source_src_28000.paid_at) AS paid_at__extract_quarter + , EXTRACT(month FROM bookings_source_src_28000.paid_at) AS paid_at__extract_month + , EXTRACT(day FROM bookings_source_src_28000.paid_at) AS paid_at__extract_day + , EXTRACT(isodow FROM bookings_source_src_28000.paid_at) AS paid_at__extract_dow + , EXTRACT(doy FROM bookings_source_src_28000.paid_at) AS paid_at__extract_doy + , bookings_source_src_28000.is_instant AS booking__is_instant + , DATE_TRUNC('day', bookings_source_src_28000.ds) AS booking__ds__day + , DATE_TRUNC('week', bookings_source_src_28000.ds) AS booking__ds__week + , DATE_TRUNC('month', bookings_source_src_28000.ds) AS booking__ds__month + , DATE_TRUNC('quarter', bookings_source_src_28000.ds) AS booking__ds__quarter + , DATE_TRUNC('year', bookings_source_src_28000.ds) AS booking__ds__year + , EXTRACT(year FROM bookings_source_src_28000.ds) AS booking__ds__extract_year + , EXTRACT(quarter FROM bookings_source_src_28000.ds) AS booking__ds__extract_quarter + , EXTRACT(month FROM bookings_source_src_28000.ds) AS booking__ds__extract_month + , EXTRACT(day FROM bookings_source_src_28000.ds) AS booking__ds__extract_day + , EXTRACT(isodow FROM bookings_source_src_28000.ds) AS booking__ds__extract_dow + , EXTRACT(doy FROM bookings_source_src_28000.ds) AS booking__ds__extract_doy + , DATE_TRUNC('day', bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__day + , DATE_TRUNC('week', bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__week + , DATE_TRUNC('month', bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__month + , DATE_TRUNC('quarter', bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__quarter + , DATE_TRUNC('year', bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__year + , EXTRACT(year FROM bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__extract_year + , EXTRACT(quarter FROM bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__extract_quarter + , EXTRACT(month FROM bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__extract_month + , EXTRACT(day FROM bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__extract_day + , EXTRACT(isodow FROM bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__extract_dow + , EXTRACT(doy FROM bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__extract_doy + , DATE_TRUNC('day', bookings_source_src_28000.paid_at) AS booking__paid_at__day + , DATE_TRUNC('week', bookings_source_src_28000.paid_at) AS booking__paid_at__week + , DATE_TRUNC('month', bookings_source_src_28000.paid_at) AS booking__paid_at__month + , DATE_TRUNC('quarter', bookings_source_src_28000.paid_at) AS booking__paid_at__quarter + , DATE_TRUNC('year', bookings_source_src_28000.paid_at) AS booking__paid_at__year + , EXTRACT(year FROM bookings_source_src_28000.paid_at) AS booking__paid_at__extract_year + , EXTRACT(quarter FROM bookings_source_src_28000.paid_at) AS booking__paid_at__extract_quarter + , EXTRACT(month FROM bookings_source_src_28000.paid_at) AS booking__paid_at__extract_month + , EXTRACT(day FROM bookings_source_src_28000.paid_at) AS booking__paid_at__extract_day + , EXTRACT(isodow FROM bookings_source_src_28000.paid_at) AS booking__paid_at__extract_dow + , EXTRACT(doy FROM bookings_source_src_28000.paid_at) AS booking__paid_at__extract_doy + , bookings_source_src_28000.listing_id AS listing + , bookings_source_src_28000.guest_id AS guest + , bookings_source_src_28000.host_id AS host + , bookings_source_src_28000.listing_id AS booking__listing + , bookings_source_src_28000.guest_id AS booking__guest + , bookings_source_src_28000.host_id AS booking__host + FROM ***************************.fct_bookings bookings_source_src_28000 + ) subq_0 + ) subq_1 + ON + subq_8.metric_time__day = subq_1.metric_time__day + ) subq_9 + ) subq_10 + GROUP BY + subq_10.metric_time__day + ) subq_11 +) subq_12 diff --git a/tests_metricflow/snapshots/test_custom_granularity.py/SqlQueryPlan/DuckDB/test_custom_offset_window__plan0_optimized.sql b/tests_metricflow/snapshots/test_custom_granularity.py/SqlQueryPlan/DuckDB/test_custom_offset_window__plan0_optimized.sql new file mode 100644 index 000000000..93b2e2909 --- /dev/null +++ b/tests_metricflow/snapshots/test_custom_granularity.py/SqlQueryPlan/DuckDB/test_custom_offset_window__plan0_optimized.sql @@ -0,0 +1,98 @@ +test_name: test_custom_offset_window +test_filename: test_custom_granularity.py +sql_engine: DuckDB +--- +-- Compute Metrics via Expressions +SELECT + metric_time__day + , bookings AS bookings_offset_one_martian_day +FROM ( + -- Join to Time Spine Dataset + -- Pass Only Elements: ['bookings', 'metric_time__day'] + -- Aggregate Measures + -- Compute Metrics via Expressions + SELECT + subq_21.metric_time__day AS metric_time__day + , SUM(subq_14.bookings) AS bookings + FROM ( + -- Calculate Custom Granularity Bounds + -- Calculate Custom Granularity Bounds + -- Pass Only Elements: ['metric_time__day', 'metric_time__day'] + SELECT + DATE_TRUNC('day', CASE + WHEN subq_18.ds__martian_day__first_value__offset + INTERVAL subq_16.ds__martian_day__row_number day <= subq_18.ds__martian_day__first_value__offset + INTERVAL subq_16.ds__martian_day__row_number day THEN subq_18.ds__martian_day__first_value__offset + INTERVAL subq_16.ds__martian_day__row_number day + ELSE subq_18.ds__martian_day__last_value__offset + END) AS metric_time__day + FROM ***************************.mf_time_spine time_spine_src_28006 + INNER JOIN ( + -- Calculate Custom Granularity Bounds + SELECT + metric_time__martian_day + , LAG(ds__martian_day__first_value, 1) OVER ( + ORDER BY metric_time__martian_day + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__first_value__offset + , LAG(ds__martian_day__last_value, 1) OVER ( + ORDER BY metric_time__martian_day + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__last_value__offset + FROM ( + -- Calculate Custom Granularity Bounds + SELECT + ds__martian_day__first_value + , ds__martian_day__last_value + FROM ( + -- Read From Time Spine 'mf_time_spine' + -- Calculate Custom Granularity Bounds + SELECT + ds AS ds__day + , DATE_TRUNC('week', ds) AS ds__week + , DATE_TRUNC('month', ds) AS ds__month + , DATE_TRUNC('quarter', ds) AS ds__quarter + , DATE_TRUNC('year', ds) AS ds__year + , EXTRACT(year FROM ds) AS ds__extract_year + , EXTRACT(quarter FROM ds) AS ds__extract_quarter + , EXTRACT(month FROM ds) AS ds__extract_month + , EXTRACT(day FROM ds) AS ds__extract_day + , EXTRACT(isodow FROM ds) AS ds__extract_dow + , EXTRACT(doy FROM ds) AS ds__extract_doy + , martian_day AS ds__martian_day + , martian_day AS metric_time__martian_day + , FIRST_VALUE(ds) OVER ( + PARTITION BY martian_day + ORDER BY ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__first_value + , LAST_VALUE(ds) OVER ( + PARTITION BY martian_day + ORDER BY ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__last_value + , ROW_NUMBER() OVER ( + PARTITION BY martian_day + ORDER BY ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__row_number + FROM ***************************.mf_time_spine time_spine_src_28006 + ) subq_16 + GROUP BY + ds__martian_day__first_value + , ds__martian_day__last_value + ) subq_17 + ) subq_18 + ON + subq_17.metric_time__martian_day = time_spine_src_28006.martian_day + ) subq_21 + INNER JOIN ( + -- Read Elements From Semantic Model 'bookings_source' + -- Metric Time Dimension 'ds' + SELECT + DATE_TRUNC('day', ds) AS metric_time__day + , 1 AS bookings + FROM ***************************.fct_bookings bookings_source_src_28000 + ) subq_14 + ON + subq_21.metric_time__day = subq_14.metric_time__day + GROUP BY + subq_21.metric_time__day +) subq_25 diff --git a/x.sql b/x.sql index 85127566b..6965ba6da 100644 --- a/x.sql +++ b/x.sql @@ -4,14 +4,14 @@ -- This also works for custom grain, since we can just join it to the final subquery like usual. -- Also works if there are multiple grains in the group by +-- TODO: get CTE working somehow. Might need a separate DFP node. with cte as ( --- CustomGranularityBoundsNode SELECT date_day, - fiscal_quarter, - row_number() over (partition by fiscal_quarter order by date_day) - 1 as days_from_start_of_fiscal_quarter - , first_value(date_day) over (partition by fiscal_quarter order by date_day) as fiscal_quarter_start - , last_value(date_day) over (partition by fiscal_quarter order by date_day) as fiscal_quarter_end + fiscal_quarter + , first_value(date_day) over (partition by fiscal_quarter order by date_day) as ds__fiscal_quarter__first_value + , last_value(date_day) over (partition by fiscal_quarter order by date_day) as ds__fiscal_quarter__last_value + , row_number() over (partition by fiscal_quarter order by date_day) as ds__day__row_number FROM ANALYTICS_DEV.DBT_JSTEIN.ALL_DAYS ) @@ -23,7 +23,7 @@ FROM ANALYTICS_DEV.DBT_JSTEIN.STG_SALESFORCE__ORDER_ITEMS INNER JOIN ( -- ApplyStandardGranularityNode SELECT - ts_offset.date_day, + ts_offset.date_day AS metric_time__day, -- This alias is only needed if it was requested in the query DATE_TRUNC(week, ts_offset.date_day) AS metric_time__week, fiscal_year AS metric_time__fiscal_year FROM ( @@ -31,8 +31,8 @@ INNER JOIN ( select fiscal_quarter , case - when dateadd(day, days_from_start_of_fiscal_quarter, fiscal_quarter_start__offset_by_1) <= fiscal_quarter_end__offset_by_1 - then dateadd(day, days_from_start_of_fiscal_quarter, fiscal_quarter_start__offset_by_1) + when dateadd(day, ds__day__row_number - 1, fiscal_quarter_start__offset_by_1) <= fiscal_quarter_end__offset_by_1 + then dateadd(day, ds__day__row_number - 1, fiscal_quarter_start__offset_by_1) else fiscal_quarter_end__offset_by_1 end as date_day from cte -- CustomGranularityBoundsNode @@ -40,14 +40,14 @@ INNER JOIN ( -- OffsetCustomGranularityBoundsNode select fiscal_quarter, - lag(fiscal_quarter_start, 1) over (order by fiscal_quarter) as fiscal_quarter_start__offset_by_1, - lag(fiscal_quarter_end, 1) over (order by fiscal_quarter) as fiscal_quarter_end__offset_by_1 + lag(ds__fiscal_quarter__first_value, 1) over (order by fiscal_quarter) as fiscal_quarter_start__offset_by_1, + lag(ds__fiscal_quarter__last_value, 1) over (order by fiscal_quarter) as fiscal_quarter_end__offset_by_1 from ( -- FilterEelementsNode select fiscal_quarter, - fiscal_quarter_start, - fiscal_quarter_end + ds__fiscal_quarter__first_value, + ds__fiscal_quarter__last_value from cte -- CustomGranularityBoundsNode GROUP BY 1, 2, 3 ) ts_distinct