Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Dec 16, 2024
1 parent ec58d8f commit a3deabd
Show file tree
Hide file tree
Showing 13 changed files with 826 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ def visit_time_dimension_spec(self, time_dimension_spec: TimeDimensionSpec) -> C
if time_dimension_spec.aggregation_state
else ""
)
+ (
f"{DUNDER}{time_dimension_spec.window_function.value.lower()}"
if time_dimension_spec.window_function
else ""
)
)

def visit_entity_spec(self, entity_spec: EntitySpec) -> ColumnAssociation: # noqa: D102
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,7 @@ def test_min_queryable_time_granularity_for_different_agg_time_grains( # noqa:
def test_custom_offset_window_for_metric(
simple_semantic_manifest_lookup: SemanticManifestLookup,
) -> None:
"""Test offset window with custom grain supplied.
TODO: As of now, the functionality of an offset window with a custom grain is not supported in MF.
This test is added to show that at least the parsing is successful using a custom grain offset window.
Once support for that is added in MF + relevant tests, this test can be removed.
"""
"""Test offset window with custom grain supplied."""
metric = simple_semantic_manifest_lookup.metric_lookup.get_metric(MetricReference("bookings_offset_martian_day"))

assert len(metric.input_metrics) == 1
Expand Down
39 changes: 27 additions & 12 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,13 @@ def _build_derived_metric_output_node(
time_spine_node=time_spine_node,
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs,
join_on_time_dimension_spec=self._sort_by_base_granularity(queried_agg_time_dimension_specs)[0],
offset_window=metric_spec.offset_window,
offset_window=(
metric_spec.offset_window
if metric_spec.offset_window
and metric_spec.offset_window.granularity
not in self._semantic_model_lookup.custom_granularity_names
else None
),
offset_to_grain=metric_spec.offset_to_grain,
join_type=SqlJoinType.INNER,
)
Expand Down Expand Up @@ -1662,7 +1668,13 @@ def _build_aggregated_measure_from_measure_source_node(
time_spine_node=time_spine_node,
requested_agg_time_dimension_specs=base_queried_agg_time_dimension_specs,
join_on_time_dimension_spec=join_on_time_dimension_spec,
offset_window=before_aggregation_time_spine_join_description.offset_window,
offset_window=(
before_aggregation_time_spine_join_description.offset_window
if before_aggregation_time_spine_join_description.offset_window
and before_aggregation_time_spine_join_description.offset_window.granularity
not in self._semantic_model_lookup.custom_granularity_names
else None
),
offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain,
join_type=before_aggregation_time_spine_join_description.join_type,
)
Expand Down Expand Up @@ -1879,7 +1891,7 @@ def _build_time_spine_node(
required_time_spine_specs = required_time_spine_spec_set.time_dimension_specs

should_dedupe = False
if offset_window and offset_window in self._semantic_model_lookup.custom_granularity_names:
if offset_window and offset_window.granularity in self._semantic_model_lookup.custom_granularity_names:
# Are sets the right choice here?
all_queried_grains: Set[ExpandedTimeGranularity] = set()
queried_custom_specs: Tuple[TimeDimensionSpec, ...] = ()
Expand All @@ -1899,7 +1911,7 @@ def _build_time_spine_node(
# TODO: make sure this is checking the correct granularity type once DSI is updated
if {spec.time_granularity for spec in queried_time_spine_specs} == {offset_window.granularity}:
# If querying with only the same grain as is used in the offset_window, can use a simpler plan.
# offset_node = OffsetCustomGranularityNode.create(
# offset_node = OffsetCustomGranularityWithMatchingGrainNode.create(
# parent_node=time_spine_read_node, offset_window=offset_window
# )
# time_spine_node: DataflowPlanNode = JoinToTimeSpineNode.create(
Expand All @@ -1912,15 +1924,18 @@ def _build_time_spine_node(
# )
pass
else:
time_spine_node: DataflowPlanNode = CustomGranularityBoundsNode.create(
bounds_node = CustomGranularityBoundsNode.create(
parent_node=time_spine_read_node,
offset_window=offset_window,
requested_time_spine_specs=required_time_spine_specs,
custom_granularity_name=custom_granularity_name,
)
# if queried_standard_specs:
# time_spine_node = ApplyStandardGranularityNode.create(
# parent_node=time_spine_node, time_dimension_specs=queried_standard_specs
# )
# TODO: assert in post-init that this can only take a CustomGranularityBoundsNode
time_spine_node = OffsetCustomGranularityNode.create(
parent_node=bounds_node, required_time_spine_specs=required_time_spine_specs
)
if queried_standard_specs:
time_spine_node = ApplyStandardGranularityNode.create(
parent_node=time_spine_node, time_dimension_specs=queried_standard_specs
)
# TODO: check if this join is needed for the same grain as is used in offset window. Later
for custom_spec in queried_custom_specs:
time_spine_node = JoinToCustomGranularityNode.create(
Expand All @@ -1939,7 +1954,7 @@ def _build_time_spine_node(
change_specs=tuple(
SpecToAlias(
input_spec=time_spine_data_set.instance_from_time_dimension_grain_and_date_part(
time_granularity=required_spec.time_granularity, date_part=required_spec.date_part
time_granularity_name=required_spec.time_granularity.name, date_part=required_spec.date_part
).spec,
output_spec=required_spec,
)
Expand Down
30 changes: 8 additions & 22 deletions metricflow/dataflow/nodes/custom_granularity_bounds.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@

from abc import ABC
from dataclasses import dataclass
from typing import Sequence, Tuple
from typing import Sequence

from dbt_semantic_interfaces.protocols.metric import MetricTimeWindow
from metricflow_semantics.dag.id_prefix import IdPrefix, StaticIdPrefix
from metricflow_semantics.dag.mf_dag import DisplayedProperty
from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec
from metricflow_semantics.visitor import VisitorOutputT

from metricflow.dataflow.dataflow_plan import DataflowPlanNode
Expand All @@ -19,24 +17,17 @@
class CustomGranularityBoundsNode(DataflowPlanNode, ABC):
"""Calculate the start and end of a custom granularity period and each row number within that period."""

offset_window: MetricTimeWindow
requested_time_spine_specs: Tuple[TimeDimensionSpec, ...]
custom_granularity_name: str

def __post_init__(self) -> None: # noqa: D105
super().__post_init__()
assert len(self.parent_nodes) == 1

@staticmethod
def create( # noqa: D102
parent_node: DataflowPlanNode,
offset_window: MetricTimeWindow,
requested_time_spine_specs: Tuple[TimeDimensionSpec, ...],
parent_node: DataflowPlanNode, custom_granularity_name: str
) -> CustomGranularityBoundsNode:
return CustomGranularityBoundsNode(
parent_nodes=(parent_node,),
offset_window=offset_window,
requested_time_spine_specs=requested_time_spine_specs,
)
return CustomGranularityBoundsNode(parent_nodes=(parent_node,), custom_granularity_name=custom_granularity_name)

@classmethod
def id_prefix(cls) -> IdPrefix: # noqa: D102
Expand All @@ -51,10 +42,8 @@ def description(self) -> str: # noqa: D102

@property
def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102
return (
tuple(super().displayed_properties)
+ (DisplayedProperty("offset_window", self.offset_window),)
+ (DisplayedProperty("requested_time_spine_specs", self.requested_time_spine_specs),)
return tuple(super().displayed_properties) + (
DisplayedProperty("custom_granularity_name", self.custom_granularity_name),
)

@property
Expand All @@ -64,16 +53,13 @@ def parent_node(self) -> DataflowPlanNode: # noqa: D102
def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D102
return (
isinstance(other_node, self.__class__)
and other_node.offset_window == self.offset_window
and self.requested_time_spine_specs == other_node.requested_time_spine_specs
and other_node.custom_granularity_name == self.custom_granularity_name
)

def with_new_parents( # noqa: D102
self, new_parent_nodes: Sequence[DataflowPlanNode]
) -> CustomGranularityBoundsNode:
assert len(new_parent_nodes) == 1
return CustomGranularityBoundsNode.create(
parent_node=new_parent_nodes[0],
offset_window=self.offset_window,
requested_time_spine_specs=self.requested_time_spine_specs,
parent_node=new_parent_nodes[0], custom_granularity_name=self.custom_granularity_name
)
7 changes: 3 additions & 4 deletions metricflow/dataset/sql_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from metricflow_semantics.specs.entity_spec import EntitySpec
from metricflow_semantics.specs.instance_spec import InstanceSpec
from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec
from metricflow_semantics.time.granularity import ExpandedTimeGranularity
from typing_extensions import override

from metricflow.dataset.dataset_classes import DataSet
Expand Down Expand Up @@ -167,18 +166,18 @@ def instance_for_spec(self, spec: InstanceSpec) -> MdoInstance:
)

def instance_from_time_dimension_grain_and_date_part(
self, time_granularity: ExpandedTimeGranularity, date_part: Optional[DatePart]
self, time_granularity_name: str, date_part: Optional[DatePart]
) -> TimeDimensionInstance:
"""Find instance in dataset that matches the given grain and date part."""
for time_dimension_instance in self.instance_set.time_dimension_instances:
if (
time_dimension_instance.spec.time_granularity == time_granularity
time_dimension_instance.spec.time_granularity.name == time_granularity_name
and time_dimension_instance.spec.date_part == date_part
):
return time_dimension_instance

raise RuntimeError(
f"Did not find a time dimension instance with grain {time_granularity} and date part {date_part}\n"
f"Did not find a time dimension instance with grain '{time_granularity_name}' and date part {date_part}\n"
f"Instances available: {self.instance_set.time_dimension_instances}"
)

Expand Down
Loading

0 comments on commit a3deabd

Please sign in to comment.