diff --git a/metricflow/engine/metricflow_engine.py b/metricflow/engine/metricflow_engine.py index afad49e9ae..666821cf07 100644 --- a/metricflow/engine/metricflow_engine.py +++ b/metricflow/engine/metricflow_engine.py @@ -376,7 +376,7 @@ def __init__( self._query_parser = MetricFlowQueryParser( column_association_resolver=self._column_association_resolver, model=self._semantic_manifest_lookup, - source_nodes=source_nodes, + read_nodes=read_nodes, node_output_resolver=node_output_resolver, ) diff --git a/metricflow/query/query_parser.py b/metricflow/query/query_parser.py index 4ae7b16822..1e1827b613 100644 --- a/metricflow/query/query_parser.py +++ b/metricflow/query/query_parser.py @@ -118,13 +118,15 @@ def __init__( # noqa: D self, column_association_resolver: ColumnAssociationResolver, model: SemanticManifestLookup, - source_nodes: Sequence[BaseOutput], + read_nodes: Sequence[BaseOutput], node_output_resolver: DataflowPlanNodeOutputDataSetResolver, ) -> None: self._column_association_resolver = column_association_resolver self._model = model self._metric_lookup = model.metric_lookup self._semantic_model_lookup = model.semantic_model_lookup + self._node_output_resolver = node_output_resolver + self._read_nodes = read_nodes # Set up containers for known element names self._known_entity_element_references = self._semantic_model_lookup.get_entity_references() @@ -404,6 +406,8 @@ def _parse_and_validate_query( self._time_granularity_solver.resolve_granularity_for_partial_time_dimension_specs( metric_references=metric_references, partial_time_dimension_specs=requested_linkable_specs.partial_time_dimension_specs, + read_nodes=self._read_nodes, + node_output_resolver=self._node_output_resolver, ) ) @@ -575,6 +579,8 @@ def _adjust_time_range_constraint( self._time_granularity_solver.resolve_granularity_for_partial_time_dimension_specs( metric_references=metric_references, partial_time_dimension_specs=(partial_metric_time_spec,), + read_nodes=self._read_nodes, + node_output_resolver=self._node_output_resolver, ) ) adjust_to_granularity = partial_time_dimension_spec_to_time_dimension_spec[ @@ -773,7 +779,10 @@ def _verify_resolved_granularity_for_date_part( ensure that the correct value was passed in. """ resolved_granularity = self._time_granularity_solver.find_minimum_granularity_for_partial_time_dimension_spec( - partial_time_dimension_spec=partial_time_dimension_spec, metric_references=metric_references + partial_time_dimension_spec=partial_time_dimension_spec, + metric_references=metric_references, + read_nodes=self._read_nodes, + node_output_resolver=self._node_output_resolver, ) if resolved_granularity != requested_dimension_structured_name.time_granularity: raise RequestTimeGranularityException( diff --git a/metricflow/test/fixtures/model_fixtures.py b/metricflow/test/fixtures/model_fixtures.py index 60e69bdb53..afcf7bf38e 100644 --- a/metricflow/test/fixtures/model_fixtures.py +++ b/metricflow/test/fixtures/model_fixtures.py @@ -61,11 +61,10 @@ def query_parser_from_yaml(yaml_contents: List[YamlConfigFile]) -> MetricFlowQue ).semantic_manifest ) SemanticManifestValidator[SemanticManifest]().checked_validations(semantic_manifest_lookup.semantic_manifest) - source_nodes = _data_set_to_source_nodes(semantic_manifest_lookup, create_data_sets(semantic_manifest_lookup)) return MetricFlowQueryParser( model=semantic_manifest_lookup, column_association_resolver=DunderColumnAssociationResolver(semantic_manifest_lookup), - source_nodes=source_nodes, + read_nodes=list(_data_set_to_read_nodes(create_data_sets(semantic_manifest_lookup)).values()), node_output_resolver=DataflowPlanNodeOutputDataSetResolver( column_association_resolver=DunderColumnAssociationResolver(semantic_manifest_lookup), semantic_manifest_lookup=semantic_manifest_lookup, @@ -241,3 +240,13 @@ def cyclic_join_semantic_manifest_lookup(template_mapping: Dict[str, str]) -> Se """Manifest that contains a potential cycle in the join graph (if not handled properly).""" build_result = load_semantic_manifest("cyclic_join_manifest", template_mapping) return SemanticManifestLookup(build_result.semantic_manifest) + + +@pytest.fixture(scope="session") +def node_output_resolver( # noqa:D + simple_semantic_manifest_lookup: SemanticManifestLookup, +) -> DataflowPlanNodeOutputDataSetResolver: + return DataflowPlanNodeOutputDataSetResolver( + column_association_resolver=DunderColumnAssociationResolver(simple_semantic_manifest_lookup), + semantic_manifest_lookup=simple_semantic_manifest_lookup, + ) diff --git a/metricflow/test/integration/test_cases/itest_dimensions.yaml b/metricflow/test/integration/test_cases/itest_dimensions.yaml index 33ea65c039..c6d5912629 100644 --- a/metricflow/test/integration/test_cases/itest_dimensions.yaml +++ b/metricflow/test/integration/test_cases/itest_dimensions.yaml @@ -145,7 +145,6 @@ integration_test: u.home_state_latest , l.is_lux --- -# TODO: test for dimension with non-day granularity integration_test: name: query_time_dimension_without_granularity description: Query just a time dimension, no granularity specified. Should assume default granularity for dimension. @@ -153,10 +152,22 @@ integration_test: group_bys: [ "verification__ds"] check_query: | SELECT - v.ds__day + v.ds as verification__ds__day FROM {{ source_schema }}.fct_id_verifications v GROUP BY - v.ds__day + v.ds +--- +integration_test: + name: query_non_default_time_dimension_without_granularity + description: Query just a time dimension, no granularity specified. Should assume default granularity for dimension. + model: EXTENDED_DATE_MODEL + group_bys: [ "monthly_ds"] + check_query: | + SELECT + ds AS monthly_ds__month + FROM {{ source_schema }}.fct_bookings_extended_monthly + GROUP BY + ds --- integration_test: name: query_dimension_only_with_constraint diff --git a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_distinct_values_plan_with_join__dfp_0.xml b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_distinct_values_plan_with_join__dfp_0.xml new file mode 100644 index 0000000000..7b51c3e7e6 --- /dev/null +++ b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_distinct_values_plan_with_join__dfp_0.xml @@ -0,0 +1,90 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/metricflow/test/time/test_time_granularity_solver.py b/metricflow/test/time/test_time_granularity_solver.py index 025778df10..0fcdf0ce28 100644 --- a/metricflow/test/time/test_time_granularity_solver.py +++ b/metricflow/test/time/test_time_granularity_solver.py @@ -6,9 +6,11 @@ from dbt_semantic_interfaces.references import MetricReference from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity +from metricflow.dataflow.builder.node_data_set import DataflowPlanNodeOutputDataSetResolver from metricflow.dataset.dataset import DataSet from metricflow.filters.time_constraint import TimeRangeConstraint from metricflow.model.semantic_manifest_lookup import SemanticManifestLookup +from metricflow.test.fixtures.model_fixtures import ConsistentIdObjectRepository from metricflow.test.time.metric_time_dimension import MTD_SPEC_DAY, MTD_SPEC_MONTH from metricflow.time.time_granularity_solver import ( PartialTimeDimensionSpec, @@ -89,19 +91,31 @@ def test_validate_day_granularity_for_day_and_month_metric( # noqa: D PARTIAL_PTD_SPEC = PartialTimeDimensionSpec(element_name=DataSet.metric_time_dimension_name(), entity_links=()) -def test_granularity_solution_for_day_metric(time_granularity_solver: TimeGranularitySolver) -> None: # noqa: D +def test_granularity_solution_for_day_metric( # noqa: D + time_granularity_solver: TimeGranularitySolver, + node_output_resolver: DataflowPlanNodeOutputDataSetResolver, + consistent_id_object_repository: ConsistentIdObjectRepository, +) -> None: assert time_granularity_solver.resolve_granularity_for_partial_time_dimension_specs( metric_references=[MetricReference(element_name="bookings")], partial_time_dimension_specs=[PARTIAL_PTD_SPEC], + node_output_resolver=node_output_resolver, + read_nodes=list(consistent_id_object_repository.simple_model_read_nodes.values()), ) == { PARTIAL_PTD_SPEC: MTD_SPEC_DAY, } -def test_granularity_solution_for_month_metric(time_granularity_solver: TimeGranularitySolver) -> None: # noqa: D +def test_granularity_solution_for_month_metric( # noqa: D + time_granularity_solver: TimeGranularitySolver, + node_output_resolver: DataflowPlanNodeOutputDataSetResolver, + consistent_id_object_repository: ConsistentIdObjectRepository, +) -> None: assert time_granularity_solver.resolve_granularity_for_partial_time_dimension_specs( metric_references=[MetricReference(element_name="bookings_monthly")], partial_time_dimension_specs=[PARTIAL_PTD_SPEC], + node_output_resolver=node_output_resolver, + read_nodes=list(consistent_id_object_repository.simple_model_read_nodes.values()), ) == { PARTIAL_PTD_SPEC: MTD_SPEC_MONTH, } @@ -109,10 +123,14 @@ def test_granularity_solution_for_month_metric(time_granularity_solver: TimeGran def test_granularity_solution_for_day_and_month_metrics( # noqa: D time_granularity_solver: TimeGranularitySolver, + node_output_resolver: DataflowPlanNodeOutputDataSetResolver, + consistent_id_object_repository: ConsistentIdObjectRepository, ) -> None: assert time_granularity_solver.resolve_granularity_for_partial_time_dimension_specs( metric_references=[MetricReference(element_name="bookings"), MetricReference(element_name="bookings_monthly")], partial_time_dimension_specs=[PARTIAL_PTD_SPEC], + node_output_resolver=node_output_resolver, + read_nodes=list(consistent_id_object_repository.simple_model_read_nodes.values()), ) == {PARTIAL_PTD_SPEC: MTD_SPEC_MONTH} diff --git a/metricflow/time/time_granularity_solver.py b/metricflow/time/time_granularity_solver.py index 004e476f35..1a3bbe62d4 100644 --- a/metricflow/time/time_granularity_solver.py +++ b/metricflow/time/time_granularity_solver.py @@ -14,8 +14,11 @@ ) from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity +from metricflow.dataflow.builder.node_data_set import DataflowPlanNodeOutputDataSetResolver +from metricflow.dataflow.dataflow_plan import BaseOutput from metricflow.filters.time_constraint import TimeRangeConstraint from metricflow.model.semantic_manifest_lookup import SemanticManifestLookup +from metricflow.naming.linkable_spec_name import StructuredLinkableSpecName from metricflow.specs.specs import ( TimeDimensionSpec, ) @@ -100,60 +103,100 @@ def resolve_granularity_for_partial_time_dimension_specs( self, metric_references: Sequence[MetricReference], partial_time_dimension_specs: Sequence[PartialTimeDimensionSpec], + read_nodes: Sequence[BaseOutput], + node_output_resolver: DataflowPlanNodeOutputDataSetResolver, ) -> Dict[PartialTimeDimensionSpec, TimeDimensionSpec]: """Figure out the lowest granularity possible for the partially specified time dimension specs. Returns a dictionary that maps how the partial time dimension spec should be turned into a time dimension spec. """ - if not partial_time_dimension_specs: - return {} - - if metric_references: - result: Dict[PartialTimeDimensionSpec, TimeDimensionSpec] = {} - for partial_time_dimension_spec in partial_time_dimension_specs: - minimum_time_granularity = self.find_minimum_granularity_for_partial_time_dimension_spec( - partial_time_dimension_spec=partial_time_dimension_spec, metric_references=metric_references - ) - result[partial_time_dimension_spec] = TimeDimensionSpec( - element_name=partial_time_dimension_spec.element_name, - entity_links=partial_time_dimension_spec.entity_links, - time_granularity=minimum_time_granularity, - date_part=partial_time_dimension_spec.date_part, - ) - return result - else: - raise NotImplementedError # find minimum granularity for time dimension + result: Dict[PartialTimeDimensionSpec, TimeDimensionSpec] = {} + + for partial_time_dimension_spec in partial_time_dimension_specs: + minimum_time_granularity = self.find_minimum_granularity_for_partial_time_dimension_spec( + partial_time_dimension_spec=partial_time_dimension_spec, + metric_references=metric_references, + read_nodes=read_nodes, + node_output_resolver=node_output_resolver, + ) + result[partial_time_dimension_spec] = TimeDimensionSpec( + element_name=partial_time_dimension_spec.element_name, + entity_links=partial_time_dimension_spec.entity_links, + time_granularity=minimum_time_granularity, + date_part=partial_time_dimension_spec.date_part, + ) + return result def find_minimum_granularity_for_partial_time_dimension_spec( - self, partial_time_dimension_spec: PartialTimeDimensionSpec, metric_references: Sequence[MetricReference] + self, + partial_time_dimension_spec: PartialTimeDimensionSpec, + metric_references: Sequence[MetricReference], + read_nodes: Sequence[BaseOutput], + node_output_resolver: DataflowPlanNodeOutputDataSetResolver, ) -> TimeGranularity: """Find minimum granularity allowed for time dimension when queried with given metrics.""" - valid_group_by_elements = self._semantic_manifest_lookup.metric_lookup.linkable_set_for_metrics( - metric_references=metric_references, - ) - minimum_time_granularity: Optional[TimeGranularity] = None - for path_key in valid_group_by_elements.path_key_to_linkable_dimensions: - if ( - path_key.element_name == partial_time_dimension_spec.element_name - and path_key.entity_links == partial_time_dimension_spec.entity_links - and path_key.time_granularity is not None - ): - minimum_time_granularity = ( - path_key.time_granularity - if minimum_time_granularity is None - else min(minimum_time_granularity, path_key.time_granularity) - ) - if not minimum_time_granularity: - raise RequestTimeGranularityException( - f"Unable to resolve the time dimension spec for {partial_time_dimension_spec}. " - f"Valid group by elements are:\n" - f"{pformat_big_objects([spec.qualified_name for spec in valid_group_by_elements.as_spec_set.as_tuple])}" + if metric_references: + valid_group_by_elements = self._semantic_manifest_lookup.metric_lookup.linkable_set_for_metrics( + metric_references=metric_references, ) + for path_key in valid_group_by_elements.path_key_to_linkable_dimensions: + if ( + path_key.element_name == partial_time_dimension_spec.element_name + and path_key.entity_links == partial_time_dimension_spec.entity_links + and path_key.time_granularity is not None + ): + minimum_time_granularity = ( + path_key.time_granularity + if minimum_time_granularity is None + else min(minimum_time_granularity, path_key.time_granularity) + ) + if not minimum_time_granularity: + raise RequestTimeGranularityException( + f"Unable to resolve the time dimension spec for {partial_time_dimension_spec}. " + f"Valid group by elements are:\n" + f"{pformat_big_objects([spec.qualified_name for spec in valid_group_by_elements.as_spec_set.as_tuple])}" + ) + else: + minimum_time_granularity = self.get_min_granularity_for_partial_time_dimension_without_metrics( + read_nodes=read_nodes, + node_output_resolver=node_output_resolver, + partial_time_dimension_spec=partial_time_dimension_spec, + ) + if not minimum_time_granularity: + raise RequestTimeGranularityException( + f"Unable to resolve the time dimension spec for {partial_time_dimension_spec}. " + ) return minimum_time_granularity + def get_min_granularity_for_partial_time_dimension_without_metrics( + self, + read_nodes: Sequence[BaseOutput], + node_output_resolver: DataflowPlanNodeOutputDataSetResolver, + partial_time_dimension_spec: PartialTimeDimensionSpec, + ) -> Optional[TimeGranularity]: + """Find the minimum.""" + granularity_free_qualified_name = StructuredLinkableSpecName( + entity_link_names=tuple( + [entity_link.element_name for entity_link in partial_time_dimension_spec.entity_links] + ), + element_name=partial_time_dimension_spec.element_name, + ).granularity_free_qualified_name + for read_node in read_nodes: + output_data_set = node_output_resolver.get_output_data_set(read_node) + for time_dimension_instance in output_data_set.instance_set.time_dimension_instances: + if time_dimension_instance.spec.date_part: + continue + time_dim_name_without_granularity = StructuredLinkableSpecName.from_name( + time_dimension_instance.spec.qualified_name + ).granularity_free_qualified_name + if time_dim_name_without_granularity == granularity_free_qualified_name: + return time_dimension_instance.spec.time_granularity + + return None + def adjust_time_range_to_granularity( self, time_range_constraint: TimeRangeConstraint, time_granularity: TimeGranularity ) -> TimeRangeConstraint: