Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate synthetic manifests for performance tests #1495

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
import logging
from typing import Dict, List, Optional, Sequence, Set

from dbt_semantic_interfaces.protocols.dimension import Dimension
from dbt_semantic_interfaces.protocols.entity import Entity
from dbt_semantic_interfaces.protocols.measure import Measure
from dbt_semantic_interfaces.protocols.semantic_manifest import SemanticManifest
from dbt_semantic_interfaces.protocols.semantic_model import SemanticModel
from dbt_semantic_interfaces.references import (
Expand All @@ -16,22 +14,21 @@
SemanticModelReference,
TimeDimensionReference,
)
from dbt_semantic_interfaces.type_enums import AggregationType, DimensionType, TimeGranularity
from dbt_semantic_interfaces.type_enums import DimensionType

from metricflow_semantics.errors.error_classes import InvalidSemanticModelError
from metricflow_semantics.mf_logging.lazy_formattable import LazyFormat
from metricflow_semantics.model.semantics.dimension_lookup import DimensionLookup
from metricflow_semantics.model.semantics.element_group import ElementGrouper
from metricflow_semantics.model.semantics.measure_lookup import MeasureLookup
from metricflow_semantics.model.semantics.semantic_model_helper import SemanticModelHelper
from metricflow_semantics.model.spec_converters import MeasureConverter
from metricflow_semantics.naming.linkable_spec_name import StructuredLinkableSpecName
from metricflow_semantics.specs.dimension_spec import DimensionSpec
from metricflow_semantics.specs.entity_spec import EntitySpec
from metricflow_semantics.specs.instance_spec import LinkableInstanceSpec
from metricflow_semantics.specs.measure_spec import MeasureSpec
from metricflow_semantics.specs.non_additive_dimension_spec import NonAdditiveDimensionSpec
from metricflow_semantics.specs.time_dimension_spec import DEFAULT_TIME_GRANULARITY, TimeDimensionSpec
from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec
from metricflow_semantics.time.granularity import ExpandedTimeGranularity

logger = logging.getLogger(__name__)
Expand All @@ -48,8 +45,6 @@ def __init__(self, model: SemanticManifest, custom_granularities: Dict[str, Expa
"""
self._custom_granularities = custom_granularities
self._measure_index: Dict[MeasureReference, SemanticModel] = {}
self._measure_aggs: Dict[MeasureReference, AggregationType] = {}
self._measure_agg_time_dimension: Dict[MeasureReference, TimeDimensionReference] = {}
self._measure_non_additive_dimension_specs: Dict[MeasureReference, NonAdditiveDimensionSpec] = {}
self._dimension_index: Dict[DimensionReference, List[SemanticModel]] = {}
self._entity_index: Dict[EntityReference, List[SemanticModel]] = {}
Expand All @@ -66,40 +61,13 @@ def __init__(self, model: SemanticManifest, custom_granularities: Dict[str, Expa
for semantic_model in sorted_semantic_models:
self._add_semantic_model(semantic_model)

# Cache for defined time granularity.
self._time_dimension_to_defined_time_granularity: Dict[TimeDimensionReference, TimeGranularity] = {}

# Cache for agg. time dimension for measure.
self._measure_reference_to_agg_time_dimension_specs: Dict[MeasureReference, Sequence[TimeDimensionSpec]] = {}

self._measure_lookup = MeasureLookup(sorted_semantic_models, custom_granularities)
self._dimension_lookup = DimensionLookup(sorted_semantic_models)

def get_dimension_references(self) -> Sequence[DimensionReference]:
"""Retrieve all dimension references from the collection of semantic models."""
return tuple(self._dimension_index.keys())

def get_dimension(self, dimension_reference: DimensionReference) -> Dimension:
"""Retrieves a full dimension object by name."""
# If the reference passed is a TimeDimensionReference, convert to DimensionReference.
dimension_reference = DimensionReference(dimension_reference.element_name)

semantic_models = self._dimension_index.get(dimension_reference)
if not semantic_models:
raise ValueError(
f"Could not find dimension with name '{dimension_reference.element_name}' in configured semantic models"
)

return SemanticModelHelper.get_dimension_from_semantic_model(
# Dimension object should match across semantic models, so just use the first semantic model.
semantic_model=semantic_models[0],
dimension_reference=dimension_reference,
)

def get_time_dimension(self, time_dimension_reference: TimeDimensionReference) -> Dimension:
"""Retrieves a full dimension object by name."""
return self.get_dimension(dimension_reference=time_dimension_reference.dimension_reference)

@property
def measure_references(self) -> Sequence[MeasureReference]:
"""Return all measure references from the collection of semantic models."""
Expand All @@ -113,35 +81,10 @@ def non_additive_dimension_specs_by_measure(self) -> Dict[MeasureReference, NonA
"""
return self._measure_non_additive_dimension_specs

def get_measure(self, measure_reference: MeasureReference) -> Measure:
"""Retrieve the measure model object associated with the measure reference."""
if measure_reference not in self._measure_index:
raise ValueError(f"Could not find measure with name ({measure_reference}) in configured semantic models")

return SemanticModelHelper.get_measure_from_semantic_model(
semantic_model=self.get_semantic_model_for_measure(measure_reference), measure_reference=measure_reference
)

def get_entity_references(self) -> Sequence[EntityReference]:
"""Retrieve all entity references from the collection of semantic models."""
return list(self._entity_index.keys())

def get_semantic_model_for_measure(self, measure_reference: MeasureReference) -> SemanticModel: # noqa: D102
semantic_model = self._measure_index.get(measure_reference)
assert semantic_model, (
f"Semantic model not found for measure: {repr(measure_reference)}. "
f"This indicates either internal misconfiguration or that the measure does not exist."
)
return semantic_model

def get_agg_time_dimension_for_measure(self, measure_reference: MeasureReference) -> TimeDimensionReference:
"""Retrieves the aggregate time dimension that is associated with the measure reference.

This is the time dimension along which the measure will be aggregated when a metric built on this measure
is queried with metric_time.
"""
return self._measure_agg_time_dimension[measure_reference]

def get_entity_in_semantic_model(self, ref: SemanticModelElementReference) -> Optional[Entity]:
"""Retrieve the entity matching the element -> semantic model mapping, if any."""
semantic_model = self.get_by_reference(ref.semantic_model_reference)
Expand All @@ -165,13 +108,6 @@ def _add_semantic_model(self, semantic_model: SemanticModel) -> None:
if semantic_model.reference in self._semantic_model_reference_to_semantic_model:
errors.append(f"Semantic model {semantic_model.reference} already added.")

for measure in semantic_model.measures:
if measure.reference in self._measure_aggs and self._measure_aggs[measure.reference] != measure.agg:
errors.append(
f"Conflicting aggregation (agg) for measure {measure.reference}. Currently registered as "
f"{self._measure_aggs[measure.reference]} but got {measure.agg}."
)

if len(errors) > 0:
raise InvalidSemanticModelError(f"Error adding {semantic_model.reference}. Got errors: {errors}")

Expand All @@ -180,7 +116,6 @@ def _add_semantic_model(self, semantic_model: SemanticModel) -> None:
]()

for measure in semantic_model.measures:
self._measure_aggs[measure.reference] = measure.agg
self._measure_index[measure.reference] = semantic_model
agg_time_dimension_reference = semantic_model.checked_agg_time_dimension_for_measure(measure.reference)

Expand Down Expand Up @@ -209,9 +144,6 @@ def _add_semantic_model(self, semantic_model: SemanticModel) -> None:
),
value=MeasureConverter.convert_to_measure_spec(measure=measure),
)
self._measure_agg_time_dimension[measure.reference] = TimeDimensionReference(
element_name=agg_time_dimension.name
)

if measure.non_additive_dimension:
non_additive_dimension_spec = NonAdditiveDimensionSpec(
Expand Down Expand Up @@ -276,52 +208,6 @@ def get_element_spec_for_name(self, element_name: str) -> LinkableInstanceSpec:
else:
raise ValueError(f"Unable to find linkable element {element_name} in manifest")

def get_agg_time_dimension_specs_for_measure(
self, measure_reference: MeasureReference
) -> Sequence[TimeDimensionSpec]:
"""Get the agg time dimension specs that can be used in place of metric time for this measure."""
result = self._measure_reference_to_agg_time_dimension_specs.get(measure_reference)
if result is not None:
return result

result = self._get_agg_time_dimension_specs_for_measure(measure_reference)
self._measure_reference_to_agg_time_dimension_specs[measure_reference] = result
return result

def _get_agg_time_dimension_specs_for_measure(
self, measure_reference: MeasureReference
) -> Sequence[TimeDimensionSpec]:
agg_time_dimension = self.get_agg_time_dimension_for_measure(measure_reference)
# A measure's agg_time_dimension is required to be in the same semantic model as the measure,
# so we can assume the same semantic model for both measure and dimension.
semantic_model = self.get_semantic_model_for_measure(measure_reference)
entity_link = SemanticModelHelper.resolved_primary_entity(semantic_model)
return TimeDimensionSpec.generate_possible_specs_for_time_dimension(
time_dimension_reference=agg_time_dimension,
entity_links=(entity_link,),
custom_granularities=self._custom_granularities,
)

def get_defined_time_granularity(self, time_dimension_reference: TimeDimensionReference) -> TimeGranularity:
"""Time granularity from the time dimension's YAML definition. If not set, defaults to DAY."""
result = self._time_dimension_to_defined_time_granularity.get(time_dimension_reference)

if result is not None:
return result

result = self._get_defined_time_granularity(time_dimension_reference)
self._time_dimension_to_defined_time_granularity[time_dimension_reference] = result
return result

def _get_defined_time_granularity(self, time_dimension_reference: TimeDimensionReference) -> TimeGranularity:
time_dimension = self.get_dimension(time_dimension_reference)

defined_time_granularity = DEFAULT_TIME_GRANULARITY
if time_dimension.type_params and time_dimension.type_params.time_granularity:
defined_time_granularity = time_dimension.type_params.time_granularity

return defined_time_granularity

@property
def measure_lookup(self) -> MeasureLookup: # noqa: D102
return self._measure_lookup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,28 +77,6 @@ def test_get_names( # noqa: D103
)


def test_get_elements(semantic_model_lookup: SemanticModelLookup) -> None: # noqa: D103
for dimension_reference in semantic_model_lookup.get_dimension_references():
assert (
semantic_model_lookup.get_dimension(dimension_reference=dimension_reference).reference
== dimension_reference
)
for measure_reference in semantic_model_lookup.measure_references:
measure_reference = MeasureReference(element_name=measure_reference.element_name)
assert semantic_model_lookup.get_measure(measure_reference=measure_reference).reference == measure_reference


def test_get_semantic_model_for_measure(semantic_model_lookup: SemanticModelLookup) -> None: # noqa: D103
bookings_source = semantic_model_lookup.get_semantic_model_for_measure(MeasureReference(element_name="bookings"))
assert bookings_source.name == "bookings_source"

views_source = semantic_model_lookup.get_semantic_model_for_measure(MeasureReference(element_name="views"))
assert views_source.name == "views_source"

listings_source = semantic_model_lookup.get_semantic_model_for_measure(MeasureReference(element_name="listings"))
assert listings_source.name == "listings_latest"


def test_local_linked_elements_for_metric( # noqa: D103
request: FixtureRequest, mf_test_configuration: MetricFlowTestConfiguration, metric_lookup: MetricLookup
) -> None:
Expand Down Expand Up @@ -239,12 +217,3 @@ def test_get_valid_agg_time_dimensions_for_metric( # noqa: D103
assert metric_agg_time_dim.reference == measure_agg_time_dims[0]
else:
assert len(metric_agg_time_dims) == 0


def test_get_agg_time_dimension_specs_for_measure(semantic_model_lookup: SemanticModelLookup) -> None: # noqa: D103
for measure_name in ["bookings", "views", "listings"]:
measure_reference = MeasureReference(measure_name)
agg_time_dim_specs = semantic_model_lookup.get_agg_time_dimension_specs_for_measure(measure_reference)
agg_time_dim_reference = semantic_model_lookup.get_agg_time_dimension_for_measure(measure_reference)
for spec in agg_time_dim_specs:
assert spec.reference == agg_time_dim_reference
Empty file.
36 changes: 36 additions & 0 deletions tests_metricflow/performance/categorical_dimension_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import annotations

from functools import cached_property

from tests_metricflow.performance.synthetic_manifest_parameter_set import SyntheticManifestParameterSet


class CategoricalDimensionGenerator:
"""Helps generate the categorical dimensions in the semantic manifest.

The index for the dimension refers to the index when all unique dimensions in the semantic manifest are enumerated.
"""

def __init__(self, parameter_set: SyntheticManifestParameterSet) -> None: # noqa: D107
self._parameter_set = parameter_set

def get_dimension_name(self, dimension_index: int) -> str: # noqa: D102
"""Return the name of the dimension for the given index."""
return f"dimension_{dimension_index:03}"

@cached_property
def unique_dimension_count(self) -> int: # noqa: D102
return (
self._parameter_set.categorical_dimensions_per_semantic_model
* self._parameter_set.dimension_semantic_model_count
)

def get_next_wrapped_index(self, dimension_index: int) -> int:
"""Return the next valid dimension index, wrapping back to 0 if it reaches the last index."""
if dimension_index < 0:
raise ValueError(f"{dimension_index=} should be > 0")

if dimension_index >= self.unique_dimension_count:
raise ValueError(f"{dimension_index=} should be < {self.unique_dimension_count}")

return (dimension_index + 1) % self.unique_dimension_count
Loading
Loading