diff --git a/metricflow-semantics/metricflow_semantics/dag/id_prefix.py b/metricflow-semantics/metricflow_semantics/dag/id_prefix.py index 8c2a6d1b4..73fcba6df 100644 --- a/metricflow-semantics/metricflow_semantics/dag/id_prefix.py +++ b/metricflow-semantics/metricflow_semantics/dag/id_prefix.py @@ -56,6 +56,8 @@ class StaticIdPrefix(IdPrefix, Enum, metaclass=EnumMetaClassHelper): DATAFLOW_NODE_JOIN_CONVERSION_EVENTS_PREFIX = "jce" DATAFLOW_NODE_WINDOW_REAGGREGATION_ID_PREFIX = "wr" DATAFLOW_NODE_ALIAS_SPECS_ID_PREFIX = "as" + DATAFLOW_NODE_CUSTOM_GRANULARITY_BOUNDS_ID_PREFIX = "cgb" + DATAFLOW_NODE_OFFSET_BY_CUSTOMG_GRANULARITY_ID_PREFIX = "obcg" SQL_EXPR_COLUMN_REFERENCE_ID_PREFIX = "cr" SQL_EXPR_COMPARISON_ID_PREFIX = "cmp" @@ -75,6 +77,7 @@ class StaticIdPrefix(IdPrefix, Enum, metaclass=EnumMetaClassHelper): SQL_EXPR_BETWEEN_PREFIX = "betw" SQL_EXPR_WINDOW_FUNCTION_ID_PREFIX = "wfnc" SQL_EXPR_GENERATE_UUID_PREFIX = "uuid" + SQL_EXPR_CASE_PREFIX = "case" SQL_PLAN_SELECT_STATEMENT_ID_PREFIX = "ss" SQL_PLAN_TABLE_FROM_CLAUSE_ID_PREFIX = "tfc" diff --git a/metricflow-semantics/metricflow_semantics/instances.py b/metricflow-semantics/metricflow_semantics/instances.py index 6cd85fcbd..45d9560e5 100644 --- a/metricflow-semantics/metricflow_semantics/instances.py +++ b/metricflow-semantics/metricflow_semantics/instances.py @@ -164,11 +164,7 @@ def with_entity_prefix( ) -> TimeDimensionInstance: """Returns a new instance with the entity prefix added to the entity links.""" transformed_spec = self.spec.with_entity_prefix(entity_prefix) - return TimeDimensionInstance( - associated_columns=(column_association_resolver.resolve_spec(transformed_spec),), - defined_from=self.defined_from, - spec=transformed_spec, - ) + return self.with_new_spec(transformed_spec, column_association_resolver) def with_new_defined_from(self, defined_from: Sequence[SemanticModelElementReference]) -> TimeDimensionInstance: """Returns a new instance with the defined_from field replaced.""" diff --git a/metricflow-semantics/metricflow_semantics/specs/dunder_column_association_resolver.py b/metricflow-semantics/metricflow_semantics/specs/dunder_column_association_resolver.py index 3473618c7..5e6948d76 100644 --- a/metricflow-semantics/metricflow_semantics/specs/dunder_column_association_resolver.py +++ b/metricflow-semantics/metricflow_semantics/specs/dunder_column_association_resolver.py @@ -58,6 +58,11 @@ def visit_time_dimension_spec(self, time_dimension_spec: TimeDimensionSpec) -> C if time_dimension_spec.aggregation_state else "" ) + + ( + f"{DUNDER}{time_dimension_spec.window_function.value.lower()}" + if time_dimension_spec.window_function + else "" + ) ) def visit_entity_spec(self, entity_spec: EntitySpec) -> ColumnAssociation: # noqa: D102 diff --git a/metricflow-semantics/metricflow_semantics/specs/time_dimension_spec.py b/metricflow-semantics/metricflow_semantics/specs/time_dimension_spec.py index fd47c80a6..dec834adc 100644 --- a/metricflow-semantics/metricflow_semantics/specs/time_dimension_spec.py +++ b/metricflow-semantics/metricflow_semantics/specs/time_dimension_spec.py @@ -15,6 +15,7 @@ from metricflow_semantics.naming.linkable_spec_name import StructuredLinkableSpecName from metricflow_semantics.specs.dimension_spec import DimensionSpec from metricflow_semantics.specs.instance_spec import InstanceSpecVisitor +from metricflow_semantics.sql.sql_exprs import SqlWindowFunction from metricflow_semantics.time.granularity import ExpandedTimeGranularity from metricflow_semantics.visitor import VisitorOutputT @@ -91,6 +92,8 @@ class TimeDimensionSpec(DimensionSpec): # noqa: D101 # Used for semi-additive joins. Some more thought is needed, but this may be useful in InstanceSpec. aggregation_state: Optional[AggregationState] = None + window_function: Optional[SqlWindowFunction] = None + @property def without_first_entity_link(self) -> TimeDimensionSpec: # noqa: D102 assert len(self.entity_links) > 0, f"Spec does not have any entity links: {self}" @@ -99,6 +102,8 @@ def without_first_entity_link(self) -> TimeDimensionSpec: # noqa: D102 entity_links=self.entity_links[1:], time_granularity=self.time_granularity, date_part=self.date_part, + aggregation_state=self.aggregation_state, + window_function=self.window_function, ) @property @@ -108,6 +113,8 @@ def without_entity_links(self) -> TimeDimensionSpec: # noqa: D102 time_granularity=self.time_granularity, date_part=self.date_part, entity_links=(), + aggregation_state=self.aggregation_state, + window_function=self.window_function, ) @property @@ -153,6 +160,7 @@ def with_grain(self, time_granularity: ExpandedTimeGranularity) -> TimeDimension time_granularity=time_granularity, date_part=self.date_part, aggregation_state=self.aggregation_state, + window_function=self.window_function, ) def with_base_grain(self) -> TimeDimensionSpec: # noqa: D102 @@ -162,6 +170,7 @@ def with_base_grain(self) -> TimeDimensionSpec: # noqa: D102 time_granularity=ExpandedTimeGranularity.from_time_granularity(self.time_granularity.base_granularity), date_part=self.date_part, aggregation_state=self.aggregation_state, + window_function=self.window_function, ) def with_grain_and_date_part( # noqa: D102 @@ -173,6 +182,7 @@ def with_grain_and_date_part( # noqa: D102 time_granularity=time_granularity, date_part=date_part, aggregation_state=self.aggregation_state, + window_function=self.window_function, ) def with_aggregation_state(self, aggregation_state: AggregationState) -> TimeDimensionSpec: # noqa: D102 @@ -182,6 +192,17 @@ def with_aggregation_state(self, aggregation_state: AggregationState) -> TimeDim time_granularity=self.time_granularity, date_part=self.date_part, aggregation_state=aggregation_state, + window_function=self.window_function, + ) + + def with_window_function(self, window_function: SqlWindowFunction) -> TimeDimensionSpec: # noqa: D102 + return TimeDimensionSpec( + element_name=self.element_name, + entity_links=self.entity_links, + time_granularity=self.time_granularity, + date_part=self.date_part, + aggregation_state=self.aggregation_state, + window_function=window_function, ) def comparison_key(self, exclude_fields: Sequence[TimeDimensionSpecField] = ()) -> TimeDimensionSpecComparisonKey: @@ -243,6 +264,7 @@ def with_entity_prefix(self, entity_prefix: EntityReference) -> TimeDimensionSpe time_granularity=self.time_granularity, date_part=self.date_part, aggregation_state=self.aggregation_state, + window_function=self.window_function, ) @staticmethod diff --git a/metricflow-semantics/metricflow_semantics/sql/sql_exprs.py b/metricflow-semantics/metricflow_semantics/sql/sql_exprs.py index 15b7268c5..391892d84 100644 --- a/metricflow-semantics/metricflow_semantics/sql/sql_exprs.py +++ b/metricflow-semantics/metricflow_semantics/sql/sql_exprs.py @@ -15,11 +15,12 @@ from dbt_semantic_interfaces.type_enums.date_part import DatePart from dbt_semantic_interfaces.type_enums.period_agg import PeriodAggregation from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity +from typing_extensions import override + from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix from metricflow_semantics.dag.mf_dag import DagNode, DisplayedProperty from metricflow_semantics.sql.sql_bind_parameters import SqlBindParameterSet from metricflow_semantics.visitor import Visitable, VisitorOutputT -from typing_extensions import override @dataclass(frozen=True, eq=False) @@ -235,6 +236,10 @@ def visit_window_function_expr(self, node: SqlWindowFunctionExpression) -> Visit def visit_generate_uuid_expr(self, node: SqlGenerateUuidExpression) -> VisitorOutputT: # noqa: D102 pass + @abstractmethod + def visit_case_expr(self, node: SqlCaseExpression) -> VisitorOutputT: # noqa: D102 + pass + @dataclass(frozen=True, eq=False) class SqlStringExpression(SqlExpressionNode): @@ -948,11 +953,18 @@ class SqlWindowFunction(Enum): FIRST_VALUE = "FIRST_VALUE" LAST_VALUE = "LAST_VALUE" AVERAGE = "AVG" + ROW_NUMBER = "ROW_NUMBER" + LAG = "LAG" @property def requires_ordering(self) -> bool: """Asserts whether or not ordering the window function will have an impact on the resulting value.""" - if self is SqlWindowFunction.FIRST_VALUE or self is SqlWindowFunction.LAST_VALUE: + if ( + self is SqlWindowFunction.FIRST_VALUE + or self is SqlWindowFunction.LAST_VALUE + or self is SqlWindowFunction.ROW_NUMBER + or self is SqlWindowFunction.LAG + ): return True elif self is SqlWindowFunction.AVERAGE: return False @@ -1715,3 +1727,67 @@ def lineage(self) -> SqlExpressionTreeLineage: # noqa: D102 def matches(self, other: SqlExpressionNode) -> bool: # noqa: D102 return False + + +@dataclass(frozen=True, eq=False) +class SqlCaseExpression(SqlExpressionNode): + """Renders a CASE WHEN expression.""" + + when_to_then_exprs: Dict[SqlExpressionNode, SqlExpressionNode] + else_expr: Optional[SqlExpressionNode] + + @staticmethod + def create( # noqa: D102 + when_to_then_exprs: Dict[SqlExpressionNode, SqlExpressionNode], else_expr: Optional[SqlExpressionNode] = None + ) -> SqlCaseExpression: + parent_nodes: Tuple[SqlExpressionNode, ...] = () + for when, then in when_to_then_exprs.items(): + parent_nodes += (when,) + parent_nodes += (then,) + + if else_expr: + parent_nodes += (else_expr,) + + return SqlCaseExpression(parent_nodes=parent_nodes, when_to_then_exprs=when_to_then_exprs, else_expr=else_expr) + + @classmethod + def id_prefix(cls) -> IdPrefix: # noqa: D102 + return StaticIdPrefix.SQL_EXPR_CASE_PREFIX + + def accept(self, visitor: SqlExpressionNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102 + return visitor.visit_case_expr(self) + + @property + def description(self) -> str: # noqa: D102 + return "Case expression" + + @property + def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102 + return super().displayed_properties + + @property + def requires_parenthesis(self) -> bool: # noqa: D102 + return False + + @property + def bind_parameter_set(self) -> SqlBindParameterSet: # noqa: D102 + return SqlBindParameterSet() + + def __repr__(self) -> str: # noqa: D105 + return f"{self.__class__.__name__}(node_id={self.node_id})" + + def rewrite( # noqa: D102 + self, + column_replacements: Optional[SqlColumnReplacements] = None, + should_render_table_alias: Optional[bool] = None, + ) -> SqlExpressionNode: + return self + + @property + def lineage(self) -> SqlExpressionTreeLineage: # noqa: D102 + return SqlExpressionTreeLineage(other_exprs=(self,)) + + def matches(self, other: SqlExpressionNode) -> bool: # noqa: D102 + if not isinstance(other, SqlCaseExpression): + return False + return self.when_to_then_exprs == other.when_to_then_exprs and self.else_expr == other.else_expr diff --git a/metricflow-semantics/metricflow_semantics/test_helpers/semantic_manifest_yamls/simple_manifest/metrics.yaml b/metricflow-semantics/metricflow_semantics/test_helpers/semantic_manifest_yamls/simple_manifest/metrics.yaml index 7e34bebef..4cdad77e5 100644 --- a/metricflow-semantics/metricflow_semantics/test_helpers/semantic_manifest_yamls/simple_manifest/metrics.yaml +++ b/metricflow-semantics/metricflow_semantics/test_helpers/semantic_manifest_yamls/simple_manifest/metrics.yaml @@ -860,3 +860,24 @@ metric: - name: instant_bookings alias: shared_alias --- +metric: + name: bookings_offset_one_martian_day + description: bookings offset by one martian_day + type: derived + type_params: + expr: bookings + metrics: + - name: bookings + offset_window: 1 martian_day +--- +metric: + name: bookings_martian_day_over_martian_day + description: bookings growth martian day over martian day + type: derived + type_params: + expr: bookings - bookings_offset / NULLIF(bookings_offset, 0) + metrics: + - name: bookings + offset_window: 1 martian_day + alias: bookings_offset + - name: bookings diff --git a/metricflow-semantics/tests_metricflow_semantics/collection_helpers/test_pretty_print.py b/metricflow-semantics/tests_metricflow_semantics/collection_helpers/test_pretty_print.py index c09422caa..86a4c446c 100644 --- a/metricflow-semantics/tests_metricflow_semantics/collection_helpers/test_pretty_print.py +++ b/metricflow-semantics/tests_metricflow_semantics/collection_helpers/test_pretty_print.py @@ -47,6 +47,7 @@ def test_classes() -> None: # noqa: D103 time_granularity=ExpandedTimeGranularity(name='day', base_granularity=DAY), date_part=None, aggregation_state=None, + window_function=None, ) """ ).rstrip() diff --git a/metricflow-semantics/tests_metricflow_semantics/model/semantics/test_metric_lookup.py b/metricflow-semantics/tests_metricflow_semantics/model/semantics/test_metric_lookup.py index d9942eeb4..b69c82d62 100644 --- a/metricflow-semantics/tests_metricflow_semantics/model/semantics/test_metric_lookup.py +++ b/metricflow-semantics/tests_metricflow_semantics/model/semantics/test_metric_lookup.py @@ -27,12 +27,7 @@ def test_min_queryable_time_granularity_for_different_agg_time_grains( # noqa: def test_custom_offset_window_for_metric( simple_semantic_manifest_lookup: SemanticManifestLookup, ) -> None: - """Test offset window with custom grain supplied. - - TODO: As of now, the functionality of an offset window with a custom grain is not supported in MF. - This test is added to show that at least the parsing is successful using a custom grain offset window. - Once support for that is added in MF + relevant tests, this test can be removed. - """ + """Test offset window with custom grain supplied.""" metric = simple_semantic_manifest_lookup.metric_lookup.get_metric(MetricReference("bookings_offset_martian_day")) assert len(metric.input_metrics) == 1 diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 348ba5b4e..783123d4f 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -84,6 +84,7 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode +from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -92,6 +93,7 @@ from metricflow.dataflow.nodes.join_to_time_spine import JoinToTimeSpineNode from metricflow.dataflow.nodes.metric_time_transform import MetricTimeDimensionTransformNode from metricflow.dataflow.nodes.min_max import MinMaxNode +from metricflow.dataflow.nodes.offset_by_custom_granularity import OffsetByCustomGranularityNode from metricflow.dataflow.nodes.order_by_limit import OrderByLimitNode from metricflow.dataflow.nodes.read_sql_source import ReadSqlSourceNode from metricflow.dataflow.nodes.semi_additive_join import SemiAdditiveJoinNode @@ -658,13 +660,22 @@ def _build_derived_metric_output_node( ) if metric_spec.has_time_offset and queried_agg_time_dimension_specs: # TODO: move this to a helper method - time_spine_node = self._build_time_spine_node(queried_agg_time_dimension_specs) + time_spine_node = self._build_time_spine_node( + queried_time_spine_specs=queried_agg_time_dimension_specs, + offset_window=metric_spec.offset_window, + ) output_node = JoinToTimeSpineNode.create( parent_node=output_node, time_spine_node=time_spine_node, requested_agg_time_dimension_specs=queried_agg_time_dimension_specs, join_on_time_dimension_spec=self._sort_by_base_granularity(queried_agg_time_dimension_specs)[0], - offset_window=metric_spec.offset_window, + offset_window=( + metric_spec.offset_window + if metric_spec.offset_window + and metric_spec.offset_window.granularity + not in self._semantic_model_lookup.custom_granularity_names + else None + ), offset_to_grain=metric_spec.offset_to_grain, join_type=SqlJoinType.INNER, ) @@ -1649,13 +1660,22 @@ def _build_aggregated_measure_from_measure_source_node( measure_properties=measure_properties, required_time_spine_specs=base_queried_agg_time_dimension_specs ) required_time_spine_specs = (join_on_time_dimension_spec,) + base_queried_agg_time_dimension_specs - time_spine_node = self._build_time_spine_node(required_time_spine_specs) + time_spine_node = self._build_time_spine_node( + queried_time_spine_specs=required_time_spine_specs, + offset_window=before_aggregation_time_spine_join_description.offset_window, + ) unaggregated_measure_node = JoinToTimeSpineNode.create( parent_node=unaggregated_measure_node, time_spine_node=time_spine_node, requested_agg_time_dimension_specs=base_queried_agg_time_dimension_specs, join_on_time_dimension_spec=join_on_time_dimension_spec, - offset_window=before_aggregation_time_spine_join_description.offset_window, + offset_window=( + before_aggregation_time_spine_join_description.offset_window + if before_aggregation_time_spine_join_description.offset_window + and before_aggregation_time_spine_join_description.offset_window.granularity + not in self._semantic_model_lookup.custom_granularity_names + else None + ), offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain, join_type=before_aggregation_time_spine_join_description.join_type, ) @@ -1862,6 +1882,7 @@ def _build_time_spine_node( queried_time_spine_specs: Sequence[TimeDimensionSpec], where_filter_specs: Sequence[WhereFilterSpec] = (), time_range_constraint: Optional[TimeRangeConstraint] = None, + offset_window: Optional[MetricTimeWindow] = None, ) -> DataflowPlanNode: """Return the time spine node needed to satisfy the specs.""" required_time_spine_spec_set = self.__get_required_linkable_specs( @@ -1870,28 +1891,81 @@ def _build_time_spine_node( ) required_time_spine_specs = required_time_spine_spec_set.time_dimension_specs - # TODO: support multiple time spines here. Build node on the one with the smallest base grain. - # Then, pass custom_granularity_specs into _build_pre_aggregation_plan if they aren't satisfied by smallest time spine. - time_spine_source = self._choose_time_spine_source(required_time_spine_specs) - read_node = self._choose_time_spine_read_node(time_spine_source) - time_spine_data_set = self._node_data_set_resolver.get_output_data_set(read_node) - - # Change the column aliases to match the specs that were requested in the query. - time_spine_node = AliasSpecsNode.create( - parent_node=read_node, - change_specs=tuple( - SpecToAlias( - input_spec=time_spine_data_set.instance_from_time_dimension_grain_and_date_part(required_spec).spec, - output_spec=required_spec, + should_dedupe = False + if offset_window and offset_window.granularity in self._semantic_model_lookup._custom_granularities: + # Are sets the right choice here? + all_queried_grains: Set[ExpandedTimeGranularity] = set() + queried_custom_specs: Tuple[TimeDimensionSpec, ...] = () + queried_standard_specs: Tuple[TimeDimensionSpec, ...] = () + for spec in queried_time_spine_specs: + all_queried_grains.add(spec.time_granularity) + if spec.time_granularity.is_custom_granularity: + queried_custom_specs += (spec,) + else: + queried_standard_specs += (spec,) + + custom_grain = self._semantic_model_lookup._custom_granularities[offset_window.granularity] + time_spine_source = self._choose_time_spine_source((DataSet.metric_time_dimension_spec(custom_grain),)) + time_spine_read_node = self._choose_time_spine_read_node(time_spine_source) + # TODO: make sure this is checking the correct granularity type once DSI is updated + if {spec.time_granularity for spec in queried_time_spine_specs} == {custom_grain}: + # If querying with only the same grain as is used in the offset_window, can use a simpler plan. + # offset_node = OffsetCustomGranularityNode.create( + # parent_node=time_spine_read_node, offset_window=offset_window + # ) + # time_spine_node: DataflowPlanNode = JoinToTimeSpineNode.create( + # parent_node=offset_node, + # # TODO: need to make sure we apply both agg time and metric time + # requested_agg_time_dimension_specs=queried_time_spine_specs, + # time_spine_node=time_spine_read_node, + # join_type=SqlJoinType.INNER, + # join_on_time_dimension_spec=custom_grain_metric_time_spec, + # ) + pass + else: + bounds_node = CustomGranularityBoundsNode.create( + parent_node=time_spine_read_node, + custom_granularity_name=custom_grain.name, ) - for required_spec in required_time_spine_specs - ), - ) + time_spine_node: DataflowPlanNode = OffsetByCustomGranularityNode.create( + parent_node=bounds_node, offset_window=offset_window + ) + # if queried_standard_specs: + # # TODO: This is also when we can change the alias names to match the requested specs + # time_spine_node = ApplyStandardGranularityNode.create( + # parent_node=time_spine_node, time_dimension_specs=queried_standard_specs + # ) + for custom_spec in queried_custom_specs: + time_spine_node = JoinToCustomGranularityNode.create( + parent_node=time_spine_node, time_dimension_spec=custom_spec + ) + else: + # TODO: support multiple time spines here. Build node on the one with the smallest base grain. + # Then, pass custom_granularity_specs into _build_pre_aggregation_plan if they aren't satisfied by smallest time spine. + time_spine_source = self._choose_time_spine_source(required_time_spine_specs) + read_node = self._choose_time_spine_read_node(time_spine_source) + time_spine_data_set = self._node_data_set_resolver.get_output_data_set(read_node) + + # Change the column aliases to match the specs that were requested in the query. + time_spine_node = AliasSpecsNode.create( + parent_node=read_node, + change_specs=tuple( + SpecToAlias( + input_spec=time_spine_data_set.instance_from_time_dimension_grain_and_date_part( + time_granularity_name=required_spec.time_granularity.name, date_part=required_spec.date_part + ).spec, + output_spec=required_spec, + ) + for required_spec in required_time_spine_specs + ), + ) - # If the base grain of the time spine isn't selected, it will have duplicate rows that need deduping. - should_dedupe = ExpandedTimeGranularity.from_time_granularity(time_spine_source.base_granularity) not in { - spec.time_granularity for spec in queried_time_spine_specs - } + # If the base grain of the time spine isn't selected, it will have duplicate rows that need deduping. + should_dedupe = ExpandedTimeGranularity.from_time_granularity(time_spine_source.base_granularity) not in { + spec.time_granularity for spec in queried_time_spine_specs + } + + # -- JoinToCustomGranularityNode -- if needed to support another custom grain not covered by initial time spine return self._build_pre_aggregation_plan( source_node=time_spine_node, diff --git a/metricflow/dataflow/dataflow_plan_visitor.py b/metricflow/dataflow/dataflow_plan_visitor.py index 412170a53..3fcd9ec8d 100644 --- a/metricflow/dataflow/dataflow_plan_visitor.py +++ b/metricflow/dataflow/dataflow_plan_visitor.py @@ -15,6 +15,7 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode + from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -126,6 +127,10 @@ def visit_join_to_custom_granularity_node(self, node: JoinToCustomGranularityNod def visit_alias_specs_node(self, node: AliasSpecsNode) -> VisitorOutputT: # noqa: D102 raise NotImplementedError + @abstractmethod + def visit_custom_granularity_bounds_node(self, node: CustomGranularityBoundsNode) -> VisitorOutputT: # noqa: D102 + raise NotImplementedError + class DataflowPlanNodeVisitorWithDefaultHandler(DataflowPlanNodeVisitor[VisitorOutputT], Generic[VisitorOutputT]): """Similar to `DataflowPlanNodeVisitor`, but with an abstract default handler that gets called for each node. @@ -222,3 +227,7 @@ def visit_join_to_custom_granularity_node(self, node: JoinToCustomGranularityNod @override def visit_alias_specs_node(self, node: AliasSpecsNode) -> VisitorOutputT: # noqa: D102 return self._default_handler(node) + + @override + def visit_custom_granularity_bounds_node(self, node: CustomGranularityBoundsNode) -> VisitorOutputT: # noqa: D102 + return self._default_handler(node) diff --git a/metricflow/dataflow/nodes/custom_granularity_bounds.py b/metricflow/dataflow/nodes/custom_granularity_bounds.py new file mode 100644 index 000000000..5dbde2a88 --- /dev/null +++ b/metricflow/dataflow/nodes/custom_granularity_bounds.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from abc import ABC +from dataclasses import dataclass +from typing import Sequence + +from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix +from metricflow_semantics.dag.mf_dag import DisplayedProperty +from metricflow_semantics.visitor import VisitorOutputT + +from metricflow.dataflow.dataflow_plan import DataflowPlanNode +from metricflow.dataflow.dataflow_plan_visitor import DataflowPlanNodeVisitor + + +@dataclass(frozen=True, eq=False) +class CustomGranularityBoundsNode(DataflowPlanNode, ABC): + """Calculate the start and end of a custom granularity period and each row number within that period.""" + + custom_granularity_name: str + + def __post_init__(self) -> None: # noqa: D105 + super().__post_init__() + assert len(self.parent_nodes) == 1 + + @staticmethod + def create( # noqa: D102 + parent_node: DataflowPlanNode, custom_granularity_name: str + ) -> CustomGranularityBoundsNode: + return CustomGranularityBoundsNode(parent_nodes=(parent_node,), custom_granularity_name=custom_granularity_name) + + @classmethod + def id_prefix(cls) -> IdPrefix: # noqa: D102 + return StaticIdPrefix.DATAFLOW_NODE_CUSTOM_GRANULARITY_BOUNDS_ID_PREFIX + + def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102 + return visitor.visit_custom_granularity_bounds_node(self) + + @property + def description(self) -> str: # noqa: D102 + return """Calculate Custom Granularity Bounds""" + + @property + def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102 + return tuple(super().displayed_properties) + ( + DisplayedProperty("custom_granularity_name", self.custom_granularity_name), + ) + + @property + def parent_node(self) -> DataflowPlanNode: # noqa: D102 + return self.parent_nodes[0] + + def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D102 + return ( + isinstance(other_node, self.__class__) + and other_node.custom_granularity_name == self.custom_granularity_name + ) + + def with_new_parents( # noqa: D102 + self, new_parent_nodes: Sequence[DataflowPlanNode] + ) -> CustomGranularityBoundsNode: + assert len(new_parent_nodes) == 1 + return CustomGranularityBoundsNode.create( + parent_node=new_parent_nodes[0], custom_granularity_name=self.custom_granularity_name + ) diff --git a/metricflow/dataflow/nodes/offset_by_custom_granularity.py b/metricflow/dataflow/nodes/offset_by_custom_granularity.py new file mode 100644 index 000000000..b36018dbf --- /dev/null +++ b/metricflow/dataflow/nodes/offset_by_custom_granularity.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +from abc import ABC +from dataclasses import dataclass +from typing import Sequence + +from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix +from metricflow_semantics.dag.mf_dag import DisplayedProperty +from metricflow_semantics.visitor import VisitorOutputT + +from metricflow.dataflow.dataflow_plan import DataflowPlanNode +from metricflow.dataflow.dataflow_plan_visitor import DataflowPlanNodeVisitor +from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode +from dbt_semantic_interfaces.protocols.metric import MetricTimeWindow + +stuff + + +@dataclass(frozen=True, eq=False) +class OffsetByCustomGranularityNode(DataflowPlanNode, ABC): + """For a given custom grain, offset its base grain by the requested number of custom grain periods. + + Only accepts CustomGranularityBoundsNode as parent node. + """ + + offset_window: MetricTimeWindow + + def __post_init__(self) -> None: # noqa: D105 + super().__post_init__() + assert len(self.parent_nodes) == 1 + if not isinstance(self.parent_node, CustomGranularityBoundsNode): + raise RuntimeError( + "OffsetByCustomGranularityNode only accepts CustomGranularityBoundsNode as a parent node." + ) + + @staticmethod + def create( # noqa: D102 + parent_node: CustomGranularityBoundsNode, offset_window: MetricTimeWindow + ) -> OffsetByCustomGranularityNode: + return OffsetByCustomGranularityNode(parent_nodes=(parent_node,), offset_window=offset_window) + + @classmethod + def id_prefix(cls) -> IdPrefix: # noqa: D102 + return StaticIdPrefix.DATAFLOW_NODE_OFFSET_BY_CUSTOMG_GRANULARITY_ID_PREFIX + + def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102 + return visitor.visit_offset_by_custom_granularity_node(self) + + @property + def description(self) -> str: # noqa: D102 + return """Offset Base Granularity By Custom Granularity Period(s)""" + + @property + def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102 + return tuple(super().displayed_properties) + (DisplayedProperty("offset_window", self.offset_window),) + + @property + def parent_node(self) -> DataflowPlanNode: # noqa: D102 + return self.parent_nodes[0] + + def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D102 + return isinstance(other_node, self.__class__) and other_node.offset_window == self.offset_window + + def with_new_parents( # noqa: D102 + self, new_parent_nodes: Sequence[DataflowPlanNode] + ) -> OffsetByCustomGranularityNode: + assert len(new_parent_nodes) == 1 + return OffsetByCustomGranularityNode.create(parent_node=new_parent_nodes[0], offset_window=self.offset_window) diff --git a/metricflow/dataflow/optimizer/predicate_pushdown_optimizer.py b/metricflow/dataflow/optimizer/predicate_pushdown_optimizer.py index 223964af4..0c21ff612 100644 --- a/metricflow/dataflow/optimizer/predicate_pushdown_optimizer.py +++ b/metricflow/dataflow/optimizer/predicate_pushdown_optimizer.py @@ -23,6 +23,7 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode +from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -472,6 +473,11 @@ def visit_join_to_custom_granularity_node( # noqa: D102 def visit_alias_specs_node(self, node: AliasSpecsNode) -> OptimizeBranchResult: # noqa: D102 raise NotImplementedError + def visit_custom_granularity_bounds_node( # noqa: D102 + self, node: CustomGranularityBoundsNode + ) -> OptimizeBranchResult: + raise NotImplementedError + def visit_join_on_entities_node(self, node: JoinOnEntitiesNode) -> OptimizeBranchResult: """Handles pushdown state propagation for the standard join node type. diff --git a/metricflow/dataflow/optimizer/source_scan/cm_branch_combiner.py b/metricflow/dataflow/optimizer/source_scan/cm_branch_combiner.py index 233629e7a..e2db90d08 100644 --- a/metricflow/dataflow/optimizer/source_scan/cm_branch_combiner.py +++ b/metricflow/dataflow/optimizer/source_scan/cm_branch_combiner.py @@ -17,6 +17,7 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode +from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -472,3 +473,9 @@ def visit_min_max_node(self, node: MinMaxNode) -> ComputeMetricsBranchCombinerRe def visit_alias_specs_node(self, node: AliasSpecsNode) -> ComputeMetricsBranchCombinerResult: # noqa: D102 self._log_visit_node_type(node) return self._default_handler(node) + + def visit_custom_granularity_bounds_node( # noqa: D102 + self, node: CustomGranularityBoundsNode + ) -> ComputeMetricsBranchCombinerResult: + self._log_visit_node_type(node) + return self._default_handler(node) diff --git a/metricflow/dataflow/optimizer/source_scan/source_scan_optimizer.py b/metricflow/dataflow/optimizer/source_scan/source_scan_optimizer.py index c84035335..b2fa4b5f7 100644 --- a/metricflow/dataflow/optimizer/source_scan/source_scan_optimizer.py +++ b/metricflow/dataflow/optimizer/source_scan/source_scan_optimizer.py @@ -19,6 +19,7 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode +from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -356,3 +357,9 @@ def visit_min_max_node(self, node: MinMaxNode) -> OptimizeBranchResult: # noqa: def visit_alias_specs_node(self, node: AliasSpecsNode) -> OptimizeBranchResult: # noqa: D102 self._log_visit_node_type(node) return self._default_base_output_handler(node) + + def visit_custom_granularity_bounds_node( # noqa: D102 + self, node: CustomGranularityBoundsNode + ) -> OptimizeBranchResult: + self._log_visit_node_type(node) + return self._default_base_output_handler(node) diff --git a/metricflow/dataset/sql_dataset.py b/metricflow/dataset/sql_dataset.py index afa559387..7e6677edb 100644 --- a/metricflow/dataset/sql_dataset.py +++ b/metricflow/dataset/sql_dataset.py @@ -4,6 +4,7 @@ from typing import List, Optional, Sequence, Tuple from dbt_semantic_interfaces.references import SemanticModelReference +from dbt_semantic_interfaces.type_enums import DatePart from metricflow_semantics.assert_one_arg import assert_exactly_one_arg_set from metricflow_semantics.instances import EntityInstance, InstanceSet, MdoInstance, TimeDimensionInstance from metricflow_semantics.mf_logging.lazy_formattable import LazyFormat @@ -165,18 +166,18 @@ def instance_for_spec(self, spec: InstanceSpec) -> MdoInstance: ) def instance_from_time_dimension_grain_and_date_part( - self, time_dimension_spec: TimeDimensionSpec + self, time_granularity_name: str, date_part: Optional[DatePart] ) -> TimeDimensionInstance: - """Find instance in dataset that matches the grain and date part of the given time dimension spec.""" + """Find instance in dataset that matches the given grain and date part.""" for time_dimension_instance in self.instance_set.time_dimension_instances: if ( - time_dimension_instance.spec.time_granularity == time_dimension_spec.time_granularity - and time_dimension_instance.spec.date_part == time_dimension_spec.date_part + time_dimension_instance.spec.time_granularity.name == time_granularity_name + and time_dimension_instance.spec.date_part == date_part ): return time_dimension_instance raise RuntimeError( - f"Did not find a time dimension instance with matching grain and date part for spec: {time_dimension_spec}\n" + f"Did not find a time dimension instance with grain '{time_granularity_name}' and date part {date_part}\n" f"Instances available: {self.instance_set.time_dimension_instances}" ) diff --git a/metricflow/execution/dataflow_to_execution.py b/metricflow/execution/dataflow_to_execution.py index b5369f735..c192234ba 100644 --- a/metricflow/execution/dataflow_to_execution.py +++ b/metricflow/execution/dataflow_to_execution.py @@ -16,6 +16,7 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode +from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -205,3 +206,7 @@ def visit_join_to_custom_granularity_node(self, node: JoinToCustomGranularityNod @override def visit_alias_specs_node(self, node: AliasSpecsNode) -> ConvertToExecutionPlanResult: raise NotImplementedError + + @override + def visit_custom_granularity_bounds_node(self, node: CustomGranularityBoundsNode) -> ConvertToExecutionPlanResult: + raise NotImplementedError diff --git a/metricflow/plan_conversion/dataflow_to_sql.py b/metricflow/plan_conversion/dataflow_to_sql.py index 7e40df875..ca711f880 100644 --- a/metricflow/plan_conversion/dataflow_to_sql.py +++ b/metricflow/plan_conversion/dataflow_to_sql.py @@ -6,6 +6,7 @@ from typing import Callable, Dict, FrozenSet, List, Optional, Sequence, Set, Tuple, TypeVar from dbt_semantic_interfaces.enum_extension import assert_values_exhausted +from dbt_semantic_interfaces.naming.keywords import DUNDER from dbt_semantic_interfaces.protocols.metric import MetricInputMeasure, MetricType from dbt_semantic_interfaces.references import MetricModelReference, SemanticModelElementReference from dbt_semantic_interfaces.type_enums.aggregation_type import AggregationType @@ -38,8 +39,10 @@ from metricflow_semantics.specs.spec_set import InstanceSpecSet from metricflow_semantics.specs.where_filter.where_filter_spec import WhereFilterSpec from metricflow_semantics.sql.sql_exprs import ( + SqlAddTimeExpression, SqlAggregateFunctionExpression, SqlBetweenExpression, + SqlCaseExpression, SqlColumnReference, SqlColumnReferenceExpression, SqlComparison, @@ -77,6 +80,7 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode +from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -85,6 +89,7 @@ from metricflow.dataflow.nodes.join_to_time_spine import JoinToTimeSpineNode from metricflow.dataflow.nodes.metric_time_transform import MetricTimeDimensionTransformNode from metricflow.dataflow.nodes.min_max import MinMaxNode +from metricflow.dataflow.nodes.offset_by_custom_granularity import OffsetByCustomGranularityNode from metricflow.dataflow.nodes.order_by_limit import OrderByLimitNode from metricflow.dataflow.nodes.read_sql_source import ReadSqlSourceNode from metricflow.dataflow.nodes.semi_additive_join import SemiAdditiveJoinNode @@ -1827,7 +1832,7 @@ def visit_join_conversion_events_node(self, node: JoinConversionEventsNode) -> S def visit_window_reaggregation_node(self, node: WindowReaggregationNode) -> SqlDataSet: # noqa: D102 from_data_set = node.parent_node.accept(self) - parent_instance_set = from_data_set.instance_set # remove order by col + parent_instance_set = from_data_set.instance_set parent_data_set_alias = self._next_unique_table_alias() metric_instance = None @@ -1954,6 +1959,263 @@ def strip_time_from_dt(ts: dt.datetime) -> dt.datetime: ), ) + def visit_custom_granularity_bounds_node(self, node: CustomGranularityBoundsNode) -> SqlDataSet: # noqa: D102 + parent_data_set = node.parent_node.accept(self) + parent_instance_set = parent_data_set.instance_set + parent_data_set_alias = self._next_unique_table_alias() + + custom_granularity_name = node.custom_granularity_name + time_spine = self._get_time_spine_for_custom_granularity(custom_granularity_name) + custom_grain_instance_from_parent = parent_data_set.instance_from_time_dimension_grain_and_date_part( + time_granularity_name=custom_granularity_name, date_part=None + ) + base_grain_instance_from_parent = parent_data_set.instance_from_time_dimension_grain_and_date_part( + time_granularity_name=time_spine.base_granularity.value, date_part=None + ) + custom_column_expr = SqlColumnReferenceExpression.from_table_and_column_names( + table_alias=parent_data_set_alias, + column_name=custom_grain_instance_from_parent.associated_column.column_name, + ) + base_column_expr = SqlColumnReferenceExpression.from_table_and_column_names( + table_alias=parent_data_set_alias, column_name=base_grain_instance_from_parent.associated_column.column_name + ) + + new_instances: Tuple[TimeDimensionInstance, ...] = tuple() + new_select_columns: Tuple[SqlSelectColumn, ...] = tuple() + + # Build columns that get start and end of the custom grain period. + # Ex: "FIRST_VALUE(ds) OVER (PARTITION BY martian_day ORDER BY ds) AS ds__fiscal_quarter__first_value" + for window_func in (SqlWindowFunction.FIRST_VALUE, SqlWindowFunction.LAST_VALUE): + new_instance = custom_grain_instance_from_parent.with_new_spec( + new_spec=custom_grain_instance_from_parent.spec.with_window_function(window_func), + column_association_resolver=self._column_association_resolver, + ) + select_column = SqlSelectColumn( + expr=SqlWindowFunctionExpression.create( + sql_function=window_func, + sql_function_args=(base_column_expr,), + partition_by_args=(custom_column_expr,), + order_by_args=(SqlWindowOrderByArgument(base_column_expr),), + ), + column_alias=new_instance.associated_column.column_name, + ) + new_instances += (new_instance,) + new_select_columns += (select_column,) + + # Build a column that tracks the row number for the base grain column within the custom grain period. + # This will be offset by 1 to represent the number of base grain periods since the start of the custom grain period. + # Ex: "ROW_NUMBER() OVER (PARTITION BY martian_day ORDER BY ds) AS ds__day__row_number" + new_instance = base_grain_instance_from_parent.with_new_spec( + new_spec=base_grain_instance_from_parent.spec.with_window_function(window_func), + column_association_resolver=self._column_association_resolver, + ) + window_func_expr = SqlWindowFunctionExpression.create( + sql_function=SqlWindowFunction.ROW_NUMBER, + partition_by_args=(custom_column_expr,), + order_by_args=(SqlWindowOrderByArgument(base_column_expr),), + ) + new_select_column = SqlSelectColumn( + expr=window_func_expr, + column_alias=new_instance.associated_column.column_name, + ) + new_instances += (new_instance,) + new_select_columns += (new_select_column,) + + return SqlDataSet( + instance_set=InstanceSet.merge([InstanceSet(time_dimension_instances=new_instances), parent_instance_set]), + sql_select_node=SqlSelectStatementNode.create( + description=node.description, + select_columns=parent_data_set.checked_sql_select_node.select_columns + new_select_columns, + from_source=parent_data_set.checked_sql_select_node, + from_source_alias=parent_data_set_alias, + ), + ) + + def visit_offset_by_custom_granularity_node(self, node: OffsetByCustomGranularityNode) -> SqlDataSet: + """For a given custom grain, offset its base grain by the requested number of custom grain periods. + + This node requires a CustomGranularityBoundsNode as a parent, which will have calculated rows for the first and last + values of each custom grain period, as well as the row number for the base grain within each custom grain period. This + will be used as a subquery multiple times in this node, so optimizers should turn it into a CTE. + + Example: if the custom grain is `fiscal_quarter` with a base grain of DAY and we're offsetting by 1 period, the + output SQL should look something like this: + + SELECT + fiscal_quarter, + CASE + WHEN DATEADD(day, ds__day__row_number - 1, ds__fiscal_quarter__first_value__offset) <= ds__fiscal_quarter__last_value__offset + THEN DATEADD(day, ds__day__row_number - 1, ds__fiscal_quarter__first_value__offset) + ELSE ds__fiscal_quarter__last_value__offset + END AS date_day__offset + FROM custom_granularity_bounds_node + INNER JOIN ( + SELECT + fiscal_quarter, + LAG(ds__fiscal_quarter__first_value, 1) OVER (ORDER BY fiscal_quarter) AS ds__fiscal_quarter__first_value__offset, + LAG(ds__fiscal_quarter__last_value, 1) OVER (ORDER BY fiscal_quarter) AS ds__fiscal_quarter__last_value__offset + FROM ( + SELECT + fiscal_quarter, + ds__fiscal_quarter__first_value, + ds__fiscal_quarter__last_value + FROM custom_granularity_bounds_node + GROUP BY + fiscal_quarter, + ds__fiscal_quarter__first_value, + ds__fiscal_quarter__last_value + ) subq_1 + ) subq_2 ON subq_2.fiscal_quarter = custom_granularity_bounds_node.fiscal_quarter + """ + parent_data_set = node.parent_node.accept(self) + parent_instance_set = parent_data_set.instance_set + parent_data_set_alias = self._next_unique_table_alias() + offset_window = node.offset_window + custom_grain_name = offset_window.granularity + base_grain = ExpandedTimeGranularity.from_time_granularity( + self._get_time_spine_for_custom_granularity(custom_grain_name).base_granularity + ) + + # Find the required instances in the parent data set. + first_value_instance: Optional[TimeDimensionInstance] = None + last_value_instance: Optional[TimeDimensionInstance] = None + row_number_instance: Optional[TimeDimensionInstance] = None + custom_grain_instance: Optional[TimeDimensionInstance] = None + base_grain_instance: Optional[TimeDimensionInstance] = None + for instance in parent_instance_set.time_dimension_instances: + if instance.spec.window_function is SqlWindowFunction.FIRST_VALUE: + first_value_instance = instance + elif instance.spec.window_function is SqlWindowFunction.LAST_VALUE: + last_value_instance = instance + elif instance.spec.window_function is SqlWindowFunction.ROW_NUMBER: + row_number_instance = instance + elif instance.spec.time_granularity.name == custom_grain_name: + custom_grain_instance = instance + elif instance.spec.time_granularity == base_grain: + base_grain_instance = instance + if ( + custom_grain_instance + and base_grain_instance + and first_value_instance + and last_value_instance + and row_number_instance + ): + break + assert ( + custom_grain_instance + and base_grain_instance + and first_value_instance + and last_value_instance + and row_number_instance + ), ( + "Did not find all required time dimension instances in parent data set for OffsetByCustomGranularityNode. " + f"This indicates internal misconfiguration. Got custom grain instance: {custom_grain_instance}; base grain " + f"instance: {base_grain_instance}; first value instance: {first_value_instance}; last value instance: " + f"{last_value_instance}; row number instance: {row_number_instance}\n" + f"Available instances:{parent_instance_set.time_dimension_instances}." + ) + + # First, build a subquery that gets unique rows for the custom grain, first value, and last value columns. + unique_columns = tuple( + SqlSelectColumn.from_table_and_column_names( + table_alias=parent_data_set_alias, column_name=instance.associated_column.column_name + ) + for instance in (custom_grain_instance, first_value_instance, last_value_instance) + ) + unique_rows_subquery = SqlSelectStatementNode.create( + description="Get Distinct Custom Grain Bounds Values", + select_columns=unique_columns, + from_source=parent_data_set.checked_sql_select_node, + from_source_alias=parent_data_set_alias, + group_bys=unique_columns, + ) + unique_rows_subquery_alias = self._next_unique_table_alias() + + # Next, build a subquery that offsets the first and last value columns. + custom_grain_column_name = custom_grain_instance.associated_column.column_name + custom_grain_column = SqlSelectColumn.from_table_and_column_names( + column_name=custom_grain_column_name, table_alias=unique_rows_subquery_alias + ) + first_value_offset_column, last_value_offset_column = tuple( + SqlSelectColumn( + expr=SqlWindowFunctionExpression.create( + sql_function=SqlWindowFunction.LAG, + sql_function_args=( + SqlColumnReferenceExpression.from_table_and_column_names( + column_name=instance.associated_column.column_name, table_alias=unique_rows_subquery_alias + ), + SqlStringExpression.create(str(node.offset_window.count)), + ), + order_by_args=(SqlWindowOrderByArgument(custom_grain_column.expr),), + ), + column_alias=f"{instance.associated_column.column_name}{DUNDER}offset", + ) + for instance in (first_value_instance, last_value_instance) + ) + offset_bounds_subquery_alias = self._next_unique_table_alias() + offset_bounds_subquery = SqlSelectStatementNode.create( + description="Offset Custom Granularity Bounds", + select_columns=(custom_grain_column, first_value_offset_column, last_value_offset_column), + from_source=unique_rows_subquery, + from_source_alias=unique_rows_subquery_alias, + ) + offset_bounds_subquery_alias = self._next_unique_table_alias() + + # Offset the base column by the requested window. If the offset date is not within the offset custom grain period, + # default to the last value in that period. + new_custom_grain_column = SqlSelectColumn.from_table_and_column_names( + column_name=custom_grain_column_name, table_alias=parent_data_set_alias + ) + first_value_offset_expr, last_value_offset_expr = [ + SqlColumnReferenceExpression.from_table_and_column_names( + column_name=offset_column.column_alias, table_alias=offset_bounds_subquery_alias + ) + for offset_column in (first_value_offset_column, last_value_offset_column) + ] + add_row_number_to_first_value_expr = SqlAddTimeExpression.create( + arg=first_value_offset_expr, + count_expr=SqlColumnReferenceExpression.from_table_and_column_names( + table_alias=parent_data_set_alias, column_name=row_number_instance.associated_column.column_name + ), + granularity=base_grain.base_granularity, + ) + is_below_last_value_expr = SqlComparisonExpression.create( + left_expr=add_row_number_to_first_value_expr, + comparison=SqlComparison.LESS_THAN_OR_EQUALS, + right_expr=add_row_number_to_first_value_expr, + ) + offset_base_column = SqlSelectColumn( + expr=SqlCaseExpression.create( + when_to_then_exprs={is_below_last_value_expr: add_row_number_to_first_value_expr}, + else_expr=last_value_offset_expr, + ), + column_alias=base_grain_instance.associated_column.column_name, + ) + join_desc = SqlJoinDescription( + right_source=offset_bounds_subquery, + right_source_alias=offset_bounds_subquery_alias, + join_type=SqlJoinType.INNER, + on_condition=SqlComparisonExpression.create( + left_expr=SqlColumnReferenceExpression.from_table_and_column_names( + table_alias=parent_data_set_alias, column_name=custom_grain_column_name + ), + comparison=SqlComparison.EQUALS, + right_expr=SqlColumnReferenceExpression.from_table_and_column_names( + table_alias=offset_bounds_subquery_alias, column_name=custom_grain_column_name + ), + ), + ) + return SqlDataSet( + instance_set=InstanceSet(time_dimension_instances=(custom_grain_instance, base_grain_instance)), + sql_select_node=SqlSelectStatementNode.create( + description=node.description, + select_columns=(new_custom_grain_column, offset_base_column), + from_source=parent_data_set.checked_sql_select_node, + from_source_alias=parent_data_set_alias, + join_descs=(join_desc,), + ), + ) + class DataflowNodeToSqlCteVisitor(DataflowNodeToSqlSubqueryVisitor): """Similar to `DataflowNodeToSqlSubqueryVisitor`, except that this converts specific nodes to CTEs. @@ -2149,5 +2411,11 @@ def visit_join_to_custom_granularity_node(self, node: JoinToCustomGranularityNod def visit_alias_specs_node(self, node: AliasSpecsNode) -> SqlDataSet: # noqa: D102 return self._default_handler(node=node, node_to_select_subquery_function=super().visit_alias_specs_node) + @override + def visit_custom_granularity_bounds_node(self, node: CustomGranularityBoundsNode) -> SqlDataSet: # noqa: D102 + return self._default_handler( + node=node, node_to_select_subquery_function=super().visit_custom_granularity_bounds_node + ) + DataflowNodeT = TypeVar("DataflowNodeT", bound=DataflowPlanNode) diff --git a/metricflow/plan_conversion/sql_join_builder.py b/metricflow/plan_conversion/sql_join_builder.py index f80cdf228..682599ab6 100644 --- a/metricflow/plan_conversion/sql_join_builder.py +++ b/metricflow/plan_conversion/sql_join_builder.py @@ -535,7 +535,7 @@ def make_join_to_time_spine_join_description( left_expr: SqlExpressionNode = SqlColumnReferenceExpression.create( col_ref=SqlColumnReference(table_alias=time_spine_alias, column_name=agg_time_dimension_column_name) ) - if node.offset_window: + if node.offset_window: # and not node.offset_window.granularity.is_custom_granularity: left_expr = SqlSubtractTimeIntervalExpression.create( arg=left_expr, count=node.offset_window.count, diff --git a/metricflow/sql/optimizer/rewriting_sub_query_reducer.py b/metricflow/sql/optimizer/rewriting_sub_query_reducer.py index 6d54f6ff4..63766ca7a 100644 --- a/metricflow/sql/optimizer/rewriting_sub_query_reducer.py +++ b/metricflow/sql/optimizer/rewriting_sub_query_reducer.py @@ -586,7 +586,7 @@ def _rewrite_node_with_join(self, node: SqlSelectStatementNode) -> SqlSelectStat @override def visit_cte_node(self, node: SqlCteNode) -> SqlQueryPlanNode: - raise NotImplementedError + return node def visit_select_statement_node(self, node: SqlSelectStatementNode) -> SqlQueryPlanNode: # noqa: D102 node_with_reduced_parents = self._reduce_parents(node) diff --git a/metricflow/sql/optimizer/tag_required_column_aliases.py b/metricflow/sql/optimizer/tag_required_column_aliases.py index b75f9d2d1..656fd14ac 100644 --- a/metricflow/sql/optimizer/tag_required_column_aliases.py +++ b/metricflow/sql/optimizer/tag_required_column_aliases.py @@ -131,6 +131,7 @@ def _tag_potential_cte_node(self, table_name: str, column_aliases: Set[str]) -> def visit_select_statement_node(self, node: SqlSelectStatementNode) -> None: """Based on required column aliases for this SELECT, figure out required column aliases in parents.""" initial_required_column_aliases_in_this_node = self._current_required_column_alias_mapping.get_aliases(node) + print(self._current_required_column_alias_mapping._node_to_tagged_aliases[node]) # If this SELECT statement uses DISTINCT, all columns are required as removing them would change the meaning of # the query. @@ -161,10 +162,10 @@ def visit_select_statement_node(self, node: SqlSelectStatementNode) -> None: if select_column.column_alias in updated_required_column_aliases_in_this_node ) - if len(required_select_columns_in_this_node) == 0: - raise RuntimeError( - "No columns are required in this node - this indicates a bug in this visitor or in the inputs." - ) + # if len(required_select_columns_in_this_node) == 0: + # raise RuntimeError( + # "No columns are required in this node - this indicates a bug in this visitor or in the inputs." + # ) # It's possible for `required_select_columns_in_this_node` to be empty because we traverse through the ancestors # of a CTE node whenever a CTE node is updated. See `test_multi_child_pruning`. diff --git a/metricflow/sql/render/expr_renderer.py b/metricflow/sql/render/expr_renderer.py index f7cac9efb..b0461a1aa 100644 --- a/metricflow/sql/render/expr_renderer.py +++ b/metricflow/sql/render/expr_renderer.py @@ -16,6 +16,7 @@ SqlAddTimeExpression, SqlAggregateFunctionExpression, SqlBetweenExpression, + SqlCaseExpression, SqlCastToTimestampExpression, SqlColumnAliasReferenceExpression, SqlColumnReferenceExpression, @@ -438,3 +439,18 @@ def visit_generate_uuid_expr(self, node: SqlGenerateUuidExpression) -> SqlExpres sql="UUID()", bind_parameter_set=SqlBindParameterSet(), ) + + def visit_case_expr(self, node: SqlCaseExpression) -> SqlExpressionRenderResult: # noqa: D102 + sql = "CASE\n" + for when, then in node.when_to_then_exprs.items(): + sql += indent( + f"WHEN {self.render_sql_expr(when).sql} THEN {self.render_sql_expr(then).sql}\n", + indent_prefix=SqlRenderingConstants.INDENT, + ) + if node.else_expr: + sql += indent( + f"ELSE {self.render_sql_expr(node.else_expr).sql}\n", + indent_prefix=SqlRenderingConstants.INDENT, + ) + sql += "END" + return SqlExpressionRenderResult(sql=sql, bind_parameter_set=SqlBindParameterSet()) diff --git a/metricflow/sql/sql_plan.py b/metricflow/sql/sql_plan.py index a01eb7a2f..6a75f3015 100644 --- a/metricflow/sql/sql_plan.py +++ b/metricflow/sql/sql_plan.py @@ -9,7 +9,7 @@ from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix from metricflow_semantics.dag.mf_dag import DagId, DagNode, DisplayedProperty, MetricFlowDag -from metricflow_semantics.sql.sql_exprs import SqlExpressionNode +from metricflow_semantics.sql.sql_exprs import SqlColumnReferenceExpression, SqlExpressionNode from metricflow_semantics.sql.sql_join_type import SqlJoinType from metricflow_semantics.sql.sql_table import SqlTable from metricflow_semantics.visitor import VisitorOutputT @@ -102,6 +102,16 @@ class SqlSelectColumn: # Always require a column alias for simplicity. column_alias: str + @staticmethod + def from_table_and_column_names(table_alias: str, column_name: str) -> SqlSelectColumn: + """Create a column that selects a column from a table by name.""" + return SqlSelectColumn( + expr=SqlColumnReferenceExpression.from_table_and_column_names( + column_name=column_name, table_alias=table_alias + ), + column_alias=column_name, + ) + @dataclass(frozen=True) class SqlJoinDescription: diff --git a/tests_metricflow/dataflow/optimizer/source_scan/test_source_scan_optimizer.py b/tests_metricflow/dataflow/optimizer/source_scan/test_source_scan_optimizer.py index 05770806a..a396ae104 100644 --- a/tests_metricflow/dataflow/optimizer/source_scan/test_source_scan_optimizer.py +++ b/tests_metricflow/dataflow/optimizer/source_scan/test_source_scan_optimizer.py @@ -24,6 +24,7 @@ from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode +from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode @@ -114,6 +115,9 @@ def visit_join_to_custom_granularity_node(self, node: JoinToCustomGranularityNod def visit_alias_specs_node(self, node: AliasSpecsNode) -> int: # noqa: D102 return self._sum_parents(node) + def visit_custom_granularity_bounds_node(self, node: CustomGranularityBoundsNode) -> int: # noqa: D102 + return self._sum_parents(node) + def count_source_nodes(self, dataflow_plan: DataflowPlan) -> int: # noqa: D102 return dataflow_plan.sink_node.accept(self) diff --git a/tests_metricflow/integration/test_cases/itest_granularity.yaml b/tests_metricflow/integration/test_cases/itest_granularity.yaml index b83cbeb03..e32a6cf99 100644 --- a/tests_metricflow/integration/test_cases/itest_granularity.yaml +++ b/tests_metricflow/integration/test_cases/itest_granularity.yaml @@ -961,3 +961,12 @@ integration_test: GROUP BY subq_2.martian_day ) subq_5 ON subq_6.metric_time__martian_day = subq_5.metric_time__martian_day +--- +integration_test: + name: custom_offset_window + description: Test querying a metric with a custom offset window + model: SIMPLE_MODEL + metrics: ["bookings_offset_one_martian_day"] + group_bys: ["metric_time__day"] # TODO: add other standard grains + custom grain + check_query: | + SELECT 1 diff --git a/tests_metricflow/integration/test_configured_cases.py b/tests_metricflow/integration/test_configured_cases.py index 027834d05..791c0b1f9 100644 --- a/tests_metricflow/integration/test_configured_cases.py +++ b/tests_metricflow/integration/test_configured_cases.py @@ -313,6 +313,7 @@ def test_case( ) actual = query_result.result_df + assert 0, query_result.sql expected = sql_client.query( jinja2.Template( diff --git a/tests_metricflow/query_rendering/test_custom_granularity.py b/tests_metricflow/query_rendering/test_custom_granularity.py index 4043c7b97..a87cbc620 100644 --- a/tests_metricflow/query_rendering/test_custom_granularity.py +++ b/tests_metricflow/query_rendering/test_custom_granularity.py @@ -610,3 +610,27 @@ def test_join_to_timespine_metric_with_custom_granularity_filter_not_in_group_by dataflow_plan_builder=dataflow_plan_builder, query_spec=query_spec, ) + + +@pytest.mark.sql_engine_snapshot +def test_custom_offset_window( # noqa: D103 + request: FixtureRequest, + mf_test_configuration: MetricFlowTestConfiguration, + dataflow_plan_builder: DataflowPlanBuilder, + dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter, + sql_client: SqlClient, + query_parser: MetricFlowQueryParser, +) -> None: + query_spec = query_parser.parse_and_validate_query( + metric_names=("bookings_offset_one_martian_day",), + group_by_names=("metric_time__day",), + ).query_spec + + render_and_check( + request=request, + mf_test_configuration=mf_test_configuration, + dataflow_to_sql_converter=dataflow_to_sql_converter, + sql_client=sql_client, + dataflow_plan_builder=dataflow_plan_builder, + query_spec=query_spec, + ) diff --git a/tests_metricflow/snapshots/test_custom_granularity.py/SqlQueryPlan/DuckDB/test_custom_offset_window__plan0.sql b/tests_metricflow/snapshots/test_custom_granularity.py/SqlQueryPlan/DuckDB/test_custom_offset_window__plan0.sql new file mode 100644 index 000000000..d5637e260 --- /dev/null +++ b/tests_metricflow/snapshots/test_custom_granularity.py/SqlQueryPlan/DuckDB/test_custom_offset_window__plan0.sql @@ -0,0 +1,469 @@ +test_name: test_custom_offset_window +test_filename: test_custom_granularity.py +sql_engine: DuckDB +--- +-- Compute Metrics via Expressions +SELECT + subq_12.metric_time__day + , bookings AS bookings_offset_one_martian_day +FROM ( + -- Compute Metrics via Expressions + SELECT + subq_11.metric_time__day + , subq_11.bookings + FROM ( + -- Aggregate Measures + SELECT + subq_10.metric_time__day + , SUM(subq_10.bookings) AS bookings + FROM ( + -- Pass Only Elements: ['bookings', 'metric_time__day'] + SELECT + subq_9.metric_time__day + , subq_9.bookings + FROM ( + -- Join to Time Spine Dataset + SELECT + subq_1.ds__day AS ds__day + , subq_1.ds__week AS ds__week + , subq_1.ds__month AS ds__month + , subq_1.ds__quarter AS ds__quarter + , subq_1.ds__year AS ds__year + , subq_1.ds__extract_year AS ds__extract_year + , subq_1.ds__extract_quarter AS ds__extract_quarter + , subq_1.ds__extract_month AS ds__extract_month + , subq_1.ds__extract_day AS ds__extract_day + , subq_1.ds__extract_dow AS ds__extract_dow + , subq_1.ds__extract_doy AS ds__extract_doy + , subq_1.ds_partitioned__day AS ds_partitioned__day + , subq_1.ds_partitioned__week AS ds_partitioned__week + , subq_1.ds_partitioned__month AS ds_partitioned__month + , subq_1.ds_partitioned__quarter AS ds_partitioned__quarter + , subq_1.ds_partitioned__year AS ds_partitioned__year + , subq_1.ds_partitioned__extract_year AS ds_partitioned__extract_year + , subq_1.ds_partitioned__extract_quarter AS ds_partitioned__extract_quarter + , subq_1.ds_partitioned__extract_month AS ds_partitioned__extract_month + , subq_1.ds_partitioned__extract_day AS ds_partitioned__extract_day + , subq_1.ds_partitioned__extract_dow AS ds_partitioned__extract_dow + , subq_1.ds_partitioned__extract_doy AS ds_partitioned__extract_doy + , subq_1.paid_at__day AS paid_at__day + , subq_1.paid_at__week AS paid_at__week + , subq_1.paid_at__month AS paid_at__month + , subq_1.paid_at__quarter AS paid_at__quarter + , subq_1.paid_at__year AS paid_at__year + , subq_1.paid_at__extract_year AS paid_at__extract_year + , subq_1.paid_at__extract_quarter AS paid_at__extract_quarter + , subq_1.paid_at__extract_month AS paid_at__extract_month + , subq_1.paid_at__extract_day AS paid_at__extract_day + , subq_1.paid_at__extract_dow AS paid_at__extract_dow + , subq_1.paid_at__extract_doy AS paid_at__extract_doy + , subq_1.booking__ds__day AS booking__ds__day + , subq_1.booking__ds__week AS booking__ds__week + , subq_1.booking__ds__month AS booking__ds__month + , subq_1.booking__ds__quarter AS booking__ds__quarter + , subq_1.booking__ds__year AS booking__ds__year + , subq_1.booking__ds__extract_year AS booking__ds__extract_year + , subq_1.booking__ds__extract_quarter AS booking__ds__extract_quarter + , subq_1.booking__ds__extract_month AS booking__ds__extract_month + , subq_1.booking__ds__extract_day AS booking__ds__extract_day + , subq_1.booking__ds__extract_dow AS booking__ds__extract_dow + , subq_1.booking__ds__extract_doy AS booking__ds__extract_doy + , subq_1.booking__ds_partitioned__day AS booking__ds_partitioned__day + , subq_1.booking__ds_partitioned__week AS booking__ds_partitioned__week + , subq_1.booking__ds_partitioned__month AS booking__ds_partitioned__month + , subq_1.booking__ds_partitioned__quarter AS booking__ds_partitioned__quarter + , subq_1.booking__ds_partitioned__year AS booking__ds_partitioned__year + , subq_1.booking__ds_partitioned__extract_year AS booking__ds_partitioned__extract_year + , subq_1.booking__ds_partitioned__extract_quarter AS booking__ds_partitioned__extract_quarter + , subq_1.booking__ds_partitioned__extract_month AS booking__ds_partitioned__extract_month + , subq_1.booking__ds_partitioned__extract_day AS booking__ds_partitioned__extract_day + , subq_1.booking__ds_partitioned__extract_dow AS booking__ds_partitioned__extract_dow + , subq_1.booking__ds_partitioned__extract_doy AS booking__ds_partitioned__extract_doy + , subq_1.booking__paid_at__day AS booking__paid_at__day + , subq_1.booking__paid_at__week AS booking__paid_at__week + , subq_1.booking__paid_at__month AS booking__paid_at__month + , subq_1.booking__paid_at__quarter AS booking__paid_at__quarter + , subq_1.booking__paid_at__year AS booking__paid_at__year + , subq_1.booking__paid_at__extract_year AS booking__paid_at__extract_year + , subq_1.booking__paid_at__extract_quarter AS booking__paid_at__extract_quarter + , subq_1.booking__paid_at__extract_month AS booking__paid_at__extract_month + , subq_1.booking__paid_at__extract_day AS booking__paid_at__extract_day + , subq_1.booking__paid_at__extract_dow AS booking__paid_at__extract_dow + , subq_1.booking__paid_at__extract_doy AS booking__paid_at__extract_doy + , subq_1.metric_time__week AS metric_time__week + , subq_1.metric_time__month AS metric_time__month + , subq_1.metric_time__quarter AS metric_time__quarter + , subq_1.metric_time__year AS metric_time__year + , subq_1.metric_time__extract_year AS metric_time__extract_year + , subq_1.metric_time__extract_quarter AS metric_time__extract_quarter + , subq_1.metric_time__extract_month AS metric_time__extract_month + , subq_1.metric_time__extract_day AS metric_time__extract_day + , subq_1.metric_time__extract_dow AS metric_time__extract_dow + , subq_1.metric_time__extract_doy AS metric_time__extract_doy + , subq_8.metric_time__day AS metric_time__day + , subq_1.listing AS listing + , subq_1.guest AS guest + , subq_1.host AS host + , subq_1.booking__listing AS booking__listing + , subq_1.booking__guest AS booking__guest + , subq_1.booking__host AS booking__host + , subq_1.is_instant AS is_instant + , subq_1.booking__is_instant AS booking__is_instant + , subq_1.bookings AS bookings + , subq_1.instant_bookings AS instant_bookings + , subq_1.booking_value AS booking_value + , subq_1.max_booking_value AS max_booking_value + , subq_1.min_booking_value AS min_booking_value + , subq_1.bookers AS bookers + , subq_1.average_booking_value AS average_booking_value + , subq_1.referred_bookings AS referred_bookings + , subq_1.median_booking_value AS median_booking_value + , subq_1.booking_value_p99 AS booking_value_p99 + , subq_1.discrete_booking_value_p99 AS discrete_booking_value_p99 + , subq_1.approximate_continuous_booking_value_p99 AS approximate_continuous_booking_value_p99 + , subq_1.approximate_discrete_booking_value_p99 AS approximate_discrete_booking_value_p99 + FROM ( + -- Pass Only Elements: ['metric_time__day', 'metric_time__day'] + SELECT + subq_7.metric_time__day + FROM ( + -- Calculate Custom Granularity Bounds + SELECT + subq_6.metric_time__day + , DATE_TRUNC('day', subq_6.metric_time__day) AS metric_time__day + FROM ( + -- Calculate Custom Granularity Bounds + SELECT + subq_4.metric_time__martian_day AS metric_time__martian_day + , CASE + WHEN subq_5.ds__martian_day__first_value__offset + INTERVAL subq_3.ds__martian_day__row_number day <= subq_5.ds__martian_day__first_value__offset + INTERVAL subq_3.ds__martian_day__row_number day THEN subq_5.ds__martian_day__first_value__offset + INTERVAL subq_3.ds__martian_day__row_number day + ELSE subq_5.ds__martian_day__last_value__offset + END AS metric_time__day + FROM ( + -- Calculate Custom Granularity Bounds + SELECT + time_spine_src_28006.ds AS ds__day + , DATE_TRUNC('week', time_spine_src_28006.ds) AS ds__week + , DATE_TRUNC('month', time_spine_src_28006.ds) AS ds__month + , DATE_TRUNC('quarter', time_spine_src_28006.ds) AS ds__quarter + , DATE_TRUNC('year', time_spine_src_28006.ds) AS ds__year + , EXTRACT(year FROM time_spine_src_28006.ds) AS ds__extract_year + , EXTRACT(quarter FROM time_spine_src_28006.ds) AS ds__extract_quarter + , EXTRACT(month FROM time_spine_src_28006.ds) AS ds__extract_month + , EXTRACT(day FROM time_spine_src_28006.ds) AS ds__extract_day + , EXTRACT(isodow FROM time_spine_src_28006.ds) AS ds__extract_dow + , EXTRACT(doy FROM time_spine_src_28006.ds) AS ds__extract_doy + , time_spine_src_28006.martian_day AS ds__martian_day + , subq_2.ds__martian_day AS metric_time__martian_day + , FIRST_VALUE(subq_2.ds) OVER ( + PARTITION BY subq_2.ds__martian_day + ORDER BY subq_2.ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__first_value + , LAST_VALUE(subq_2.ds) OVER ( + PARTITION BY subq_2.ds__martian_day + ORDER BY subq_2.ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__last_value + , ROW_NUMBER() OVER ( + PARTITION BY subq_2.ds__martian_day + ORDER BY subq_2.ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__row_number + FROM ( + -- Read From Time Spine 'mf_time_spine' + SELECT + time_spine_src_28006.ds AS ds__day + , DATE_TRUNC('week', time_spine_src_28006.ds) AS ds__week + , DATE_TRUNC('month', time_spine_src_28006.ds) AS ds__month + , DATE_TRUNC('quarter', time_spine_src_28006.ds) AS ds__quarter + , DATE_TRUNC('year', time_spine_src_28006.ds) AS ds__year + , EXTRACT(year FROM time_spine_src_28006.ds) AS ds__extract_year + , EXTRACT(quarter FROM time_spine_src_28006.ds) AS ds__extract_quarter + , EXTRACT(month FROM time_spine_src_28006.ds) AS ds__extract_month + , EXTRACT(day FROM time_spine_src_28006.ds) AS ds__extract_day + , EXTRACT(isodow FROM time_spine_src_28006.ds) AS ds__extract_dow + , EXTRACT(doy FROM time_spine_src_28006.ds) AS ds__extract_doy + , time_spine_src_28006.martian_day AS ds__martian_day + FROM ***************************.mf_time_spine time_spine_src_28006 + ) subq_2 + ) subq_3 + INNER JOIN ( + -- Calculate Custom Granularity Bounds + SELECT + subq_4.metric_time__martian_day + , LAG(subq_4.ds__martian_day__first_value, 1) OVER ( + ORDER BY subq_4.metric_time__martian_day + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__first_value__offset + , LAG(subq_4.ds__martian_day__last_value, 1) OVER ( + ORDER BY subq_4.metric_time__martian_day + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__last_value__offset + FROM ( + -- Calculate Custom Granularity Bounds + SELECT + subq_3.ds__martian_day__first_value + , subq_3.ds__martian_day__last_value + FROM ( + -- Calculate Custom Granularity Bounds + SELECT + time_spine_src_28006.ds AS ds__day + , DATE_TRUNC('week', time_spine_src_28006.ds) AS ds__week + , DATE_TRUNC('month', time_spine_src_28006.ds) AS ds__month + , DATE_TRUNC('quarter', time_spine_src_28006.ds) AS ds__quarter + , DATE_TRUNC('year', time_spine_src_28006.ds) AS ds__year + , EXTRACT(year FROM time_spine_src_28006.ds) AS ds__extract_year + , EXTRACT(quarter FROM time_spine_src_28006.ds) AS ds__extract_quarter + , EXTRACT(month FROM time_spine_src_28006.ds) AS ds__extract_month + , EXTRACT(day FROM time_spine_src_28006.ds) AS ds__extract_day + , EXTRACT(isodow FROM time_spine_src_28006.ds) AS ds__extract_dow + , EXTRACT(doy FROM time_spine_src_28006.ds) AS ds__extract_doy + , time_spine_src_28006.martian_day AS ds__martian_day + , subq_2.ds__martian_day AS metric_time__martian_day + , FIRST_VALUE(subq_2.ds) OVER ( + PARTITION BY subq_2.ds__martian_day + ORDER BY subq_2.ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__first_value + , LAST_VALUE(subq_2.ds) OVER ( + PARTITION BY subq_2.ds__martian_day + ORDER BY subq_2.ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__last_value + , ROW_NUMBER() OVER ( + PARTITION BY subq_2.ds__martian_day + ORDER BY subq_2.ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__row_number + FROM ( + -- Read From Time Spine 'mf_time_spine' + SELECT + time_spine_src_28006.ds AS ds__day + , DATE_TRUNC('week', time_spine_src_28006.ds) AS ds__week + , DATE_TRUNC('month', time_spine_src_28006.ds) AS ds__month + , DATE_TRUNC('quarter', time_spine_src_28006.ds) AS ds__quarter + , DATE_TRUNC('year', time_spine_src_28006.ds) AS ds__year + , EXTRACT(year FROM time_spine_src_28006.ds) AS ds__extract_year + , EXTRACT(quarter FROM time_spine_src_28006.ds) AS ds__extract_quarter + , EXTRACT(month FROM time_spine_src_28006.ds) AS ds__extract_month + , EXTRACT(day FROM time_spine_src_28006.ds) AS ds__extract_day + , EXTRACT(isodow FROM time_spine_src_28006.ds) AS ds__extract_dow + , EXTRACT(doy FROM time_spine_src_28006.ds) AS ds__extract_doy + , time_spine_src_28006.martian_day AS ds__martian_day + FROM ***************************.mf_time_spine time_spine_src_28006 + ) subq_2 + ) subq_3 + GROUP BY + subq_3.ds__martian_day__first_value + , subq_3.ds__martian_day__last_value + ) subq_4 + ) subq_5 + ON + subq_4.metric_time__martian_day = subq_3.metric_time__martian_day + ) subq_6 + ) subq_7 + ) subq_8 + INNER JOIN ( + -- Metric Time Dimension 'ds' + SELECT + subq_0.ds__day + , subq_0.ds__week + , subq_0.ds__month + , subq_0.ds__quarter + , subq_0.ds__year + , subq_0.ds__extract_year + , subq_0.ds__extract_quarter + , subq_0.ds__extract_month + , subq_0.ds__extract_day + , subq_0.ds__extract_dow + , subq_0.ds__extract_doy + , subq_0.ds_partitioned__day + , subq_0.ds_partitioned__week + , subq_0.ds_partitioned__month + , subq_0.ds_partitioned__quarter + , subq_0.ds_partitioned__year + , subq_0.ds_partitioned__extract_year + , subq_0.ds_partitioned__extract_quarter + , subq_0.ds_partitioned__extract_month + , subq_0.ds_partitioned__extract_day + , subq_0.ds_partitioned__extract_dow + , subq_0.ds_partitioned__extract_doy + , subq_0.paid_at__day + , subq_0.paid_at__week + , subq_0.paid_at__month + , subq_0.paid_at__quarter + , subq_0.paid_at__year + , subq_0.paid_at__extract_year + , subq_0.paid_at__extract_quarter + , subq_0.paid_at__extract_month + , subq_0.paid_at__extract_day + , subq_0.paid_at__extract_dow + , subq_0.paid_at__extract_doy + , subq_0.booking__ds__day + , subq_0.booking__ds__week + , subq_0.booking__ds__month + , subq_0.booking__ds__quarter + , subq_0.booking__ds__year + , subq_0.booking__ds__extract_year + , subq_0.booking__ds__extract_quarter + , subq_0.booking__ds__extract_month + , subq_0.booking__ds__extract_day + , subq_0.booking__ds__extract_dow + , subq_0.booking__ds__extract_doy + , subq_0.booking__ds_partitioned__day + , subq_0.booking__ds_partitioned__week + , subq_0.booking__ds_partitioned__month + , subq_0.booking__ds_partitioned__quarter + , subq_0.booking__ds_partitioned__year + , subq_0.booking__ds_partitioned__extract_year + , subq_0.booking__ds_partitioned__extract_quarter + , subq_0.booking__ds_partitioned__extract_month + , subq_0.booking__ds_partitioned__extract_day + , subq_0.booking__ds_partitioned__extract_dow + , subq_0.booking__ds_partitioned__extract_doy + , subq_0.booking__paid_at__day + , subq_0.booking__paid_at__week + , subq_0.booking__paid_at__month + , subq_0.booking__paid_at__quarter + , subq_0.booking__paid_at__year + , subq_0.booking__paid_at__extract_year + , subq_0.booking__paid_at__extract_quarter + , subq_0.booking__paid_at__extract_month + , subq_0.booking__paid_at__extract_day + , subq_0.booking__paid_at__extract_dow + , subq_0.booking__paid_at__extract_doy + , subq_0.ds__day AS metric_time__day + , subq_0.ds__week AS metric_time__week + , subq_0.ds__month AS metric_time__month + , subq_0.ds__quarter AS metric_time__quarter + , subq_0.ds__year AS metric_time__year + , subq_0.ds__extract_year AS metric_time__extract_year + , subq_0.ds__extract_quarter AS metric_time__extract_quarter + , subq_0.ds__extract_month AS metric_time__extract_month + , subq_0.ds__extract_day AS metric_time__extract_day + , subq_0.ds__extract_dow AS metric_time__extract_dow + , subq_0.ds__extract_doy AS metric_time__extract_doy + , subq_0.listing + , subq_0.guest + , subq_0.host + , subq_0.booking__listing + , subq_0.booking__guest + , subq_0.booking__host + , subq_0.is_instant + , subq_0.booking__is_instant + , subq_0.bookings + , subq_0.instant_bookings + , subq_0.booking_value + , subq_0.max_booking_value + , subq_0.min_booking_value + , subq_0.bookers + , subq_0.average_booking_value + , subq_0.referred_bookings + , subq_0.median_booking_value + , subq_0.booking_value_p99 + , subq_0.discrete_booking_value_p99 + , subq_0.approximate_continuous_booking_value_p99 + , subq_0.approximate_discrete_booking_value_p99 + FROM ( + -- Read Elements From Semantic Model 'bookings_source' + SELECT + 1 AS bookings + , CASE WHEN is_instant THEN 1 ELSE 0 END AS instant_bookings + , bookings_source_src_28000.booking_value + , bookings_source_src_28000.booking_value AS max_booking_value + , bookings_source_src_28000.booking_value AS min_booking_value + , bookings_source_src_28000.guest_id AS bookers + , bookings_source_src_28000.booking_value AS average_booking_value + , bookings_source_src_28000.booking_value AS booking_payments + , CASE WHEN referrer_id IS NOT NULL THEN 1 ELSE 0 END AS referred_bookings + , bookings_source_src_28000.booking_value AS median_booking_value + , bookings_source_src_28000.booking_value AS booking_value_p99 + , bookings_source_src_28000.booking_value AS discrete_booking_value_p99 + , bookings_source_src_28000.booking_value AS approximate_continuous_booking_value_p99 + , bookings_source_src_28000.booking_value AS approximate_discrete_booking_value_p99 + , bookings_source_src_28000.is_instant + , DATE_TRUNC('day', bookings_source_src_28000.ds) AS ds__day + , DATE_TRUNC('week', bookings_source_src_28000.ds) AS ds__week + , DATE_TRUNC('month', bookings_source_src_28000.ds) AS ds__month + , DATE_TRUNC('quarter', bookings_source_src_28000.ds) AS ds__quarter + , DATE_TRUNC('year', bookings_source_src_28000.ds) AS ds__year + , EXTRACT(year FROM bookings_source_src_28000.ds) AS ds__extract_year + , EXTRACT(quarter FROM bookings_source_src_28000.ds) AS ds__extract_quarter + , EXTRACT(month FROM bookings_source_src_28000.ds) AS ds__extract_month + , EXTRACT(day FROM bookings_source_src_28000.ds) AS ds__extract_day + , EXTRACT(isodow FROM bookings_source_src_28000.ds) AS ds__extract_dow + , EXTRACT(doy FROM bookings_source_src_28000.ds) AS ds__extract_doy + , DATE_TRUNC('day', bookings_source_src_28000.ds_partitioned) AS ds_partitioned__day + , DATE_TRUNC('week', bookings_source_src_28000.ds_partitioned) AS ds_partitioned__week + , DATE_TRUNC('month', bookings_source_src_28000.ds_partitioned) AS ds_partitioned__month + , DATE_TRUNC('quarter', bookings_source_src_28000.ds_partitioned) AS ds_partitioned__quarter + , DATE_TRUNC('year', bookings_source_src_28000.ds_partitioned) AS ds_partitioned__year + , EXTRACT(year FROM bookings_source_src_28000.ds_partitioned) AS ds_partitioned__extract_year + , EXTRACT(quarter FROM bookings_source_src_28000.ds_partitioned) AS ds_partitioned__extract_quarter + , EXTRACT(month FROM bookings_source_src_28000.ds_partitioned) AS ds_partitioned__extract_month + , EXTRACT(day FROM bookings_source_src_28000.ds_partitioned) AS ds_partitioned__extract_day + , EXTRACT(isodow FROM bookings_source_src_28000.ds_partitioned) AS ds_partitioned__extract_dow + , EXTRACT(doy FROM bookings_source_src_28000.ds_partitioned) AS ds_partitioned__extract_doy + , DATE_TRUNC('day', bookings_source_src_28000.paid_at) AS paid_at__day + , DATE_TRUNC('week', bookings_source_src_28000.paid_at) AS paid_at__week + , DATE_TRUNC('month', bookings_source_src_28000.paid_at) AS paid_at__month + , DATE_TRUNC('quarter', bookings_source_src_28000.paid_at) AS paid_at__quarter + , DATE_TRUNC('year', bookings_source_src_28000.paid_at) AS paid_at__year + , EXTRACT(year FROM bookings_source_src_28000.paid_at) AS paid_at__extract_year + , EXTRACT(quarter FROM bookings_source_src_28000.paid_at) AS paid_at__extract_quarter + , EXTRACT(month FROM bookings_source_src_28000.paid_at) AS paid_at__extract_month + , EXTRACT(day FROM bookings_source_src_28000.paid_at) AS paid_at__extract_day + , EXTRACT(isodow FROM bookings_source_src_28000.paid_at) AS paid_at__extract_dow + , EXTRACT(doy FROM bookings_source_src_28000.paid_at) AS paid_at__extract_doy + , bookings_source_src_28000.is_instant AS booking__is_instant + , DATE_TRUNC('day', bookings_source_src_28000.ds) AS booking__ds__day + , DATE_TRUNC('week', bookings_source_src_28000.ds) AS booking__ds__week + , DATE_TRUNC('month', bookings_source_src_28000.ds) AS booking__ds__month + , DATE_TRUNC('quarter', bookings_source_src_28000.ds) AS booking__ds__quarter + , DATE_TRUNC('year', bookings_source_src_28000.ds) AS booking__ds__year + , EXTRACT(year FROM bookings_source_src_28000.ds) AS booking__ds__extract_year + , EXTRACT(quarter FROM bookings_source_src_28000.ds) AS booking__ds__extract_quarter + , EXTRACT(month FROM bookings_source_src_28000.ds) AS booking__ds__extract_month + , EXTRACT(day FROM bookings_source_src_28000.ds) AS booking__ds__extract_day + , EXTRACT(isodow FROM bookings_source_src_28000.ds) AS booking__ds__extract_dow + , EXTRACT(doy FROM bookings_source_src_28000.ds) AS booking__ds__extract_doy + , DATE_TRUNC('day', bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__day + , DATE_TRUNC('week', bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__week + , DATE_TRUNC('month', bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__month + , DATE_TRUNC('quarter', bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__quarter + , DATE_TRUNC('year', bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__year + , EXTRACT(year FROM bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__extract_year + , EXTRACT(quarter FROM bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__extract_quarter + , EXTRACT(month FROM bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__extract_month + , EXTRACT(day FROM bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__extract_day + , EXTRACT(isodow FROM bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__extract_dow + , EXTRACT(doy FROM bookings_source_src_28000.ds_partitioned) AS booking__ds_partitioned__extract_doy + , DATE_TRUNC('day', bookings_source_src_28000.paid_at) AS booking__paid_at__day + , DATE_TRUNC('week', bookings_source_src_28000.paid_at) AS booking__paid_at__week + , DATE_TRUNC('month', bookings_source_src_28000.paid_at) AS booking__paid_at__month + , DATE_TRUNC('quarter', bookings_source_src_28000.paid_at) AS booking__paid_at__quarter + , DATE_TRUNC('year', bookings_source_src_28000.paid_at) AS booking__paid_at__year + , EXTRACT(year FROM bookings_source_src_28000.paid_at) AS booking__paid_at__extract_year + , EXTRACT(quarter FROM bookings_source_src_28000.paid_at) AS booking__paid_at__extract_quarter + , EXTRACT(month FROM bookings_source_src_28000.paid_at) AS booking__paid_at__extract_month + , EXTRACT(day FROM bookings_source_src_28000.paid_at) AS booking__paid_at__extract_day + , EXTRACT(isodow FROM bookings_source_src_28000.paid_at) AS booking__paid_at__extract_dow + , EXTRACT(doy FROM bookings_source_src_28000.paid_at) AS booking__paid_at__extract_doy + , bookings_source_src_28000.listing_id AS listing + , bookings_source_src_28000.guest_id AS guest + , bookings_source_src_28000.host_id AS host + , bookings_source_src_28000.listing_id AS booking__listing + , bookings_source_src_28000.guest_id AS booking__guest + , bookings_source_src_28000.host_id AS booking__host + FROM ***************************.fct_bookings bookings_source_src_28000 + ) subq_0 + ) subq_1 + ON + subq_8.metric_time__day = subq_1.metric_time__day + ) subq_9 + ) subq_10 + GROUP BY + subq_10.metric_time__day + ) subq_11 +) subq_12 diff --git a/tests_metricflow/snapshots/test_custom_granularity.py/SqlQueryPlan/DuckDB/test_custom_offset_window__plan0_optimized.sql b/tests_metricflow/snapshots/test_custom_granularity.py/SqlQueryPlan/DuckDB/test_custom_offset_window__plan0_optimized.sql new file mode 100644 index 000000000..93b2e2909 --- /dev/null +++ b/tests_metricflow/snapshots/test_custom_granularity.py/SqlQueryPlan/DuckDB/test_custom_offset_window__plan0_optimized.sql @@ -0,0 +1,98 @@ +test_name: test_custom_offset_window +test_filename: test_custom_granularity.py +sql_engine: DuckDB +--- +-- Compute Metrics via Expressions +SELECT + metric_time__day + , bookings AS bookings_offset_one_martian_day +FROM ( + -- Join to Time Spine Dataset + -- Pass Only Elements: ['bookings', 'metric_time__day'] + -- Aggregate Measures + -- Compute Metrics via Expressions + SELECT + subq_21.metric_time__day AS metric_time__day + , SUM(subq_14.bookings) AS bookings + FROM ( + -- Calculate Custom Granularity Bounds + -- Calculate Custom Granularity Bounds + -- Pass Only Elements: ['metric_time__day', 'metric_time__day'] + SELECT + DATE_TRUNC('day', CASE + WHEN subq_18.ds__martian_day__first_value__offset + INTERVAL subq_16.ds__martian_day__row_number day <= subq_18.ds__martian_day__first_value__offset + INTERVAL subq_16.ds__martian_day__row_number day THEN subq_18.ds__martian_day__first_value__offset + INTERVAL subq_16.ds__martian_day__row_number day + ELSE subq_18.ds__martian_day__last_value__offset + END) AS metric_time__day + FROM ***************************.mf_time_spine time_spine_src_28006 + INNER JOIN ( + -- Calculate Custom Granularity Bounds + SELECT + metric_time__martian_day + , LAG(ds__martian_day__first_value, 1) OVER ( + ORDER BY metric_time__martian_day + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__first_value__offset + , LAG(ds__martian_day__last_value, 1) OVER ( + ORDER BY metric_time__martian_day + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__last_value__offset + FROM ( + -- Calculate Custom Granularity Bounds + SELECT + ds__martian_day__first_value + , ds__martian_day__last_value + FROM ( + -- Read From Time Spine 'mf_time_spine' + -- Calculate Custom Granularity Bounds + SELECT + ds AS ds__day + , DATE_TRUNC('week', ds) AS ds__week + , DATE_TRUNC('month', ds) AS ds__month + , DATE_TRUNC('quarter', ds) AS ds__quarter + , DATE_TRUNC('year', ds) AS ds__year + , EXTRACT(year FROM ds) AS ds__extract_year + , EXTRACT(quarter FROM ds) AS ds__extract_quarter + , EXTRACT(month FROM ds) AS ds__extract_month + , EXTRACT(day FROM ds) AS ds__extract_day + , EXTRACT(isodow FROM ds) AS ds__extract_dow + , EXTRACT(doy FROM ds) AS ds__extract_doy + , martian_day AS ds__martian_day + , martian_day AS metric_time__martian_day + , FIRST_VALUE(ds) OVER ( + PARTITION BY martian_day + ORDER BY ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__first_value + , LAST_VALUE(ds) OVER ( + PARTITION BY martian_day + ORDER BY ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__last_value + , ROW_NUMBER() OVER ( + PARTITION BY martian_day + ORDER BY ds + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS ds__martian_day__row_number + FROM ***************************.mf_time_spine time_spine_src_28006 + ) subq_16 + GROUP BY + ds__martian_day__first_value + , ds__martian_day__last_value + ) subq_17 + ) subq_18 + ON + subq_17.metric_time__martian_day = time_spine_src_28006.martian_day + ) subq_21 + INNER JOIN ( + -- Read Elements From Semantic Model 'bookings_source' + -- Metric Time Dimension 'ds' + SELECT + DATE_TRUNC('day', ds) AS metric_time__day + , 1 AS bookings + FROM ***************************.fct_bookings bookings_source_src_28000 + ) subq_14 + ON + subq_21.metric_time__day = subq_14.metric_time__day + GROUP BY + subq_21.metric_time__day +) subq_25 diff --git a/x.sql b/x.sql new file mode 100644 index 000000000..918316414 --- /dev/null +++ b/x.sql @@ -0,0 +1,95 @@ +-- Grouping by a grain that is NOT the same as the custom grain used in the offset window +-------------------------------------------------- +-- Use the base grain of the custom grain's time spine in all initial subqueries, apply DATE_TRUNC in final query +-- This also works for custom grain, since we can just join it to the final subquery like usual. +-- Also works if there are multiple grains in the group by + +-- TODO: get CTE working somehow. Might need a separate DFP node. +with cte as ( + SELECT + date_day, + fiscal_quarter + , first_value(date_day) over (partition by fiscal_quarter order by date_day) as ds__fiscal_quarter__first_value + , last_value(date_day) over (partition by fiscal_quarter order by date_day) as ds__fiscal_quarter__last_value + , row_number() over (partition by fiscal_quarter order by date_day) as ds__day__row_number + FROM ANALYTICS_DEV.DBT_JSTEIN.ALL_DAYS +) + +SELECT + metric_time__week, + metric_time__fiscal_year, + SUM(total_price) AS revenue_last_fiscal_quarter +FROM ANALYTICS_DEV.DBT_JSTEIN.STG_SALESFORCE__ORDER_ITEMS +INNER JOIN ( + -- ApplyStandardGranularityNode + -- Also AliasSpecsNode here?? + SELECT + offset_by_custom_grain.date_day AS metric_time__day, -- This alias is only needed if it was requested in the query + DATE_TRUNC(week, offset_by_custom_grain.date_day) AS metric_time__week, + fiscal_year AS metric_time__fiscal_year + FROM ( + -- OffsetByCustomGranularityNode + select + fiscal_quarter + , case + when dateadd(day, ds__day__row_number - 1, fiscal_quarter_start__offset_by_1) <= fiscal_quarter_end__offset_by_1 + then dateadd(day, ds__day__row_number - 1, fiscal_quarter_start__offset_by_1) + else fiscal_quarter_end__offset_by_1 + end as date_day + from cte -- CustomGranularityBoundsNode + inner join ( + -- OffsetCustomGranularityBoundsNode + select + fiscal_quarter, + lag(ds__fiscal_quarter__first_value, 1) over (order by fiscal_quarter) as fiscal_quarter_start__offset_by_1, + lag(ds__fiscal_quarter__last_value, 1) over (order by fiscal_quarter) as fiscal_quarter_end__offset_by_1 + from ( + -- FilterEelementsNode + select + fiscal_quarter, + ds__fiscal_quarter__first_value, + ds__fiscal_quarter__last_value + from cte -- CustomGranularityBoundsNode + GROUP BY 1, 2, 3 + ) ts_distinct + ) ts_with_offset_intervals USING (fiscal_quarter) + ) as offset_by_custom_grain + -- JoinToCustomGranularityNode + LEFT JOIN ANALYTICS_DEV.DBT_JSTEIN.ALL_DAYS custom ON custom.date_day = offset_by_custom_grain.date_day +) ts_offset_dates ON ts_offset_dates.date_day = DATE_TRUNC(day, created_at)::date -- always join on base time spine column +GROUP BY 1, 2 +ORDER BY 1, 2; + + + + + + +-- Grouping by the just same custom grain as what's used in the offset window (and only that grain) +-------------------------------------------------- +-- Could follow the same SQL as above, but this would be a more optimized version (they appear to give the same results) +-- This is likely to be most common for period over period, so it might be good to optimize it + + +SELECT -- existing nodes! + metric_time__fiscal_quarter, + SUM(total_price) AS revenue +FROM ANALYTICS_DEV.DBT_JSTEIN.STG_SALESFORCE__ORDER_ITEMS +LEFT JOIN ( -- JoinToTimeSpineNode, no offset, join on custom grain spec + SELECT + -- JoinToTimeSpineNode + -- TransformTimeDimensionsNode?? + date_day, + fiscal_quarter_offset AS metric_time__fiscal_quarter + FROM ANALYTICS_DEV.DBT_JSTEIN.ALL_DAYS + INNER JOIN ( + -- OffsetCustomGranularityNode + SELECT + fiscal_quarter + , lag(fiscal_quarter, 1) OVER (ORDER BY fiscal_quarter) as fiscal_quarter_offset + FROM ANALYTICS_DEV.DBT_JSTEIN.ALL_DAYS + GROUP BY 1 + ) ts_offset_dates USING (fiscal_quarter) +) ts ON date_day = DATE_TRUNC(day, created_at)::date +GROUP BY 1 +ORDER BY 1;