Skip to content

Commit

Permalink
Include Time Spine Nodes in Dataflow Plan (#1548)
Browse files Browse the repository at this point in the history
Previously, time spine nodes were built on the fly in the dataflow to
SQL logic. This came with several limitations, including:
- We can't apply where constraints and time constraints using the
standard dataflow plan logic. This limitation was the main driver for
this PR stack.
- We can't track time spines as source nodes. We might want to add these
to the dbt DAG at some point.

This PR adds logic to build time spine nodes in the dataflow plan,
instead.
  • Loading branch information
courtneyholcomb authored Dec 11, 2024
1 parent 50c1c5a commit 7782067
Show file tree
Hide file tree
Showing 804 changed files with 59,216 additions and 31,423 deletions.
1 change: 1 addition & 0 deletions metricflow-semantics/metricflow_semantics/dag/id_prefix.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class StaticIdPrefix(IdPrefix, Enum, metaclass=EnumMetaClassHelper):
DATAFLOW_NODE_ADD_UUID_COLUMN_PREFIX = "auid"
DATAFLOW_NODE_JOIN_CONVERSION_EVENTS_PREFIX = "jce"
DATAFLOW_NODE_WINDOW_REAGGREGATION_ID_PREFIX = "wr"
DATAFLOW_NODE_ALIAS_SPECS_ID_PREFIX = "as"

SQL_EXPR_COLUMN_REFERENCE_ID_PREFIX = "cr"
SQL_EXPR_COMPARISON_ID_PREFIX = "cmp"
Expand Down
74 changes: 74 additions & 0 deletions metricflow-semantics/metricflow_semantics/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ def accept(self, visitor: InstanceVisitor[VisitorOutputT]) -> VisitorOutputT:
"""See Visitable."""
raise NotImplementedError()

def with_new_spec(self, new_spec: SpecT, column_association_resolver: ColumnAssociationResolver) -> MdoInstance:
"""Returns a new instance with the spec replaced."""
raise NotImplementedError()


class LinkableInstance(MdoInstance, Generic[SpecT]):
"""An MdoInstance whose spec is linkable (i.e., it can have entity links)."""
Expand Down Expand Up @@ -105,6 +109,17 @@ class MeasureInstance(MdoInstance[MeasureSpec], SemanticModelElementInstance):
def accept(self, visitor: InstanceVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
return visitor.visit_measure_instance(self)

def with_new_spec(
self, new_spec: MeasureSpec, column_association_resolver: ColumnAssociationResolver
) -> MeasureInstance:
"""Returns a new instance with the spec replaced."""
return MeasureInstance(
associated_columns=(column_association_resolver.resolve_spec(new_spec),),
defined_from=self.defined_from,
spec=new_spec,
aggregation_state=self.aggregation_state,
)


@dataclass(frozen=True)
class DimensionInstance(LinkableInstance[DimensionSpec], SemanticModelElementInstance): # noqa: D101
Expand All @@ -125,6 +140,16 @@ def with_entity_prefix(
spec=transformed_spec,
)

def with_new_spec(
self, new_spec: DimensionSpec, column_association_resolver: ColumnAssociationResolver
) -> DimensionInstance:
"""Returns a new instance with the spec replaced."""
return DimensionInstance(
associated_columns=(column_association_resolver.resolve_spec(new_spec),),
defined_from=self.defined_from,
spec=new_spec,
)


@dataclass(frozen=True)
class TimeDimensionInstance(LinkableInstance[TimeDimensionSpec], SemanticModelElementInstance): # noqa: D101
Expand All @@ -151,6 +176,16 @@ def with_new_defined_from(self, defined_from: Sequence[SemanticModelElementRefer
associated_columns=self.associated_columns, defined_from=tuple(defined_from), spec=self.spec
)

def with_new_spec(
self, new_spec: TimeDimensionSpec, column_association_resolver: ColumnAssociationResolver
) -> TimeDimensionInstance:
"""Returns a new instance with the spec replaced."""
return TimeDimensionInstance(
associated_columns=(column_association_resolver.resolve_spec(new_spec),),
defined_from=self.defined_from,
spec=new_spec,
)


@dataclass(frozen=True)
class EntityInstance(LinkableInstance[EntitySpec], SemanticModelElementInstance): # noqa: D101
Expand All @@ -171,6 +206,16 @@ def with_entity_prefix(
spec=transformed_spec,
)

def with_new_spec(
self, new_spec: EntitySpec, column_association_resolver: ColumnAssociationResolver
) -> EntityInstance:
"""Returns a new instance with the spec replaced."""
return EntityInstance(
associated_columns=(column_association_resolver.resolve_spec(new_spec),),
defined_from=self.defined_from,
spec=new_spec,
)


@dataclass(frozen=True)
class GroupByMetricInstance(LinkableInstance[GroupByMetricSpec], SerializableDataclass): # noqa: D101
Expand All @@ -192,6 +237,16 @@ def with_entity_prefix(
spec=transformed_spec,
)

def with_new_spec(
self, new_spec: GroupByMetricSpec, column_association_resolver: ColumnAssociationResolver
) -> GroupByMetricInstance:
"""Returns a new instance with the spec replaced."""
return GroupByMetricInstance(
associated_columns=(column_association_resolver.resolve_spec(new_spec),),
defined_from=self.defined_from,
spec=new_spec,
)


@dataclass(frozen=True)
class MetricInstance(MdoInstance[MetricSpec], SerializableDataclass): # noqa: D101
Expand All @@ -202,6 +257,16 @@ class MetricInstance(MdoInstance[MetricSpec], SerializableDataclass): # noqa: D
def accept(self, visitor: InstanceVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
return visitor.visit_metric_instance(self)

def with_new_spec(
self, new_spec: MetricSpec, column_association_resolver: ColumnAssociationResolver
) -> MetricInstance:
"""Returns a new instance with the spec replaced."""
return MetricInstance(
associated_columns=(column_association_resolver.resolve_spec(new_spec),),
defined_from=self.defined_from,
spec=new_spec,
)


@dataclass(frozen=True)
class MetadataInstance(MdoInstance[MetadataSpec], SerializableDataclass): # noqa: D101
Expand All @@ -211,6 +276,15 @@ class MetadataInstance(MdoInstance[MetadataSpec], SerializableDataclass): # noq
def accept(self, visitor: InstanceVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
return visitor.visit_metadata_instance(self)

def with_new_spec(
self, new_spec: MetadataSpec, column_association_resolver: ColumnAssociationResolver
) -> MetadataInstance:
"""Returns a new instance with the spec replaced."""
return MetadataInstance(
associated_columns=(column_association_resolver.resolve_spec(new_spec),),
spec=new_spec,
)


# Output type of transform function
TransformOutputT = TypeVar("TransformOutputT")
Expand Down
Loading

0 comments on commit 7782067

Please sign in to comment.