diff --git a/metricflow/dataflow/builder/source_node.py b/metricflow/dataflow/builder/source_node.py index a345273dd3..6c930c4fdb 100644 --- a/metricflow/dataflow/builder/source_node.py +++ b/metricflow/dataflow/builder/source_node.py @@ -28,26 +28,23 @@ def create_from_data_sets( source_nodes: List[BaseOutput] = [] for data_set in data_sets: read_node = ReadSqlSourceNode(data_set) - if not with_measures: - source_nodes.append(read_node) - else: - agg_time_dim_to_measures_grouper = ( - self._semantic_manifest_lookup.semantic_model_lookup.get_aggregation_time_dimensions_with_measures( - data_set.semantic_model_reference - ) + agg_time_dim_to_measures_grouper = ( + self._semantic_manifest_lookup.semantic_model_lookup.get_aggregation_time_dimensions_with_measures( + data_set.semantic_model_reference ) + ) - # Dimension sources may not have any measures -> no aggregation time dimensions. - time_dimension_references = agg_time_dim_to_measures_grouper.keys - if len(time_dimension_references) == 0: - source_nodes.append(read_node) - else: - # Splits the measures by distinct aggregate time dimension. - for time_dimension_reference in time_dimension_references: - source_nodes.append( - MetricTimeDimensionTransformNode( - parent_node=read_node, - aggregation_time_dimension_reference=time_dimension_reference, - ) + # Dimension sources may not have any measures -> no aggregation time dimensions. + time_dimension_references = agg_time_dim_to_measures_grouper.keys + if len(time_dimension_references) == 0: + source_nodes.append(read_node) + else: + # Splits the measures by distinct aggregate time dimension. + for time_dimension_reference in time_dimension_references: + source_nodes.append( + MetricTimeDimensionTransformNode( + parent_node=read_node, + aggregation_time_dimension_reference=time_dimension_reference, ) + ) return source_nodes diff --git a/metricflow/query/query_parser.py b/metricflow/query/query_parser.py index 23686aa457..ddf12f0727 100644 --- a/metricflow/query/query_parser.py +++ b/metricflow/query/query_parser.py @@ -30,7 +30,6 @@ from metricflow.naming.linkable_spec_name import StructuredLinkableSpecName from metricflow.query.query_exceptions import InvalidQueryException from metricflow.specs.column_assoc import ColumnAssociationResolver -from metricflow.specs.query_interface import QueryInterfaceMetric, QueryParameter from metricflow.specs.specs import ( DimensionSpec, EntitySpec, @@ -169,16 +168,16 @@ def _top_fuzzy_matches( def parse_and_validate_query( self, metric_names: Optional[Sequence[str]] = None, - metrics: Optional[Sequence[QueryInterfaceMetric]] = None, + metrics: Optional[Sequence[QueryParameterMetric]] = None, group_by_names: Optional[Sequence[str]] = None, - group_by: Optional[Sequence[QueryParameter]] = None, + group_by: Optional[Sequence[QueryParameterDimension]] = None, limit: Optional[int] = None, time_constraint_start: Optional[datetime.datetime] = None, time_constraint_end: Optional[datetime.datetime] = None, where_constraint: Optional[WhereFilter] = None, where_constraint_str: Optional[str] = None, order: Optional[Sequence[str]] = None, - order_by: Optional[Sequence[QueryParameter]] = None, + order_by: Optional[Sequence[QueryParameterDimension]] = None, time_granularity: Optional[TimeGranularity] = None, ) -> MetricFlowQuerySpec: """Parse the query into spec objects, validating them in the process. @@ -289,22 +288,8 @@ def _construct_metric_specs_for_query( ) return tuple(metric_specs) - def _get_group_by_names( - self, group_by_names: Optional[Sequence[str]], group_by: Optional[Sequence[QueryParameter]] - ) -> Sequence[str]: - assert not ( - group_by_names and group_by - ), "Both group_by_names and group_by were set, but if a group by is specified you should only use one of these!" - return ( - group_by_names - if group_by_names - else [f"{g.name}__{g.grain}" if g.grain else g.name for g in group_by] - if group_by - else [] - ) - def _get_metric_names( - self, metric_names: Optional[Sequence[str]], metrics: Optional[Sequence[QueryInterfaceMetric]] + self, metric_names: Optional[Sequence[str]], metrics: Optional[Sequence[QueryParameterMetric]] ) -> Sequence[str]: assert_exactly_one_arg_set(metric_names=metric_names, metrics=metrics) return metric_names if metric_names else [m.name for m in metrics] if metrics else [] @@ -321,7 +306,9 @@ def _get_where_filter( PydanticWhereFilter(where_sql_template=where_constraint_str) if where_constraint_str else where_constraint ) - def _get_order(self, order: Optional[Sequence[str]], order_by: Optional[Sequence[QueryParameter]]) -> Sequence[str]: + def _get_order( + self, order: Optional[Sequence[str]], order_by: Optional[Sequence[QueryParameterDimension]] + ) -> Sequence[str]: assert not ( order and order_by ), "Both order_by_names and order_by were set, but if an order by is specified you should only use one of these!" @@ -330,20 +317,19 @@ def _get_order(self, order: Optional[Sequence[str]], order_by: Optional[Sequence def _parse_and_validate_query( self, metric_names: Optional[Sequence[str]] = None, - metrics: Optional[Sequence[QueryInterfaceMetric]] = None, + metrics: Optional[Sequence[QueryParameterMetric]] = None, group_by_names: Optional[Sequence[str]] = None, - group_by: Optional[Sequence[QueryParameter]] = None, + group_by: Optional[Sequence[QueryParameterDimension]] = None, limit: Optional[int] = None, time_constraint_start: Optional[datetime.datetime] = None, time_constraint_end: Optional[datetime.datetime] = None, where_constraint: Optional[WhereFilter] = None, where_constraint_str: Optional[str] = None, order: Optional[Sequence[str]] = None, - order_by: Optional[Sequence[QueryParameter]] = None, + order_by: Optional[Sequence[QueryParameterDimension]] = None, time_granularity: Optional[TimeGranularity] = None, ) -> MetricFlowQuerySpec: metric_names = self._get_metric_names(metric_names, metrics) - group_by_names = self._get_group_by_names(group_by_names, group_by) where_filter = self._get_where_filter(where_constraint, where_constraint_str) order = self._get_order(order, order_by) @@ -393,7 +379,9 @@ def _parse_and_validate_query( # If the time constraint is all time, just ignore and not render time_constraint = None - requested_linkable_specs = self._parse_linkable_element_names(group_by_names, metric_references) + requested_linkable_specs = self._parse_linkable_elements( + qualified_linkable_names=group_by_names, linkable_elements=group_by, metric_references=metric_references + ) where_filter_spec: Optional[WhereFilterSpec] = None if where_filter is not None: try: @@ -427,6 +415,7 @@ def _parse_and_validate_query( self._validate_no_time_dimension_query(metric_references=metric_references) self._time_granularity_solver.validate_time_granularity(metric_references, time_dimension_specs) + self._validate_date_part(metric_references, time_dimension_specs) order_by_specs = self._parse_order_by(order or [], partial_time_dimension_spec_replacements) @@ -436,8 +425,9 @@ def _parse_and_validate_query( for metric_reference in metric_references: metric = self._metric_lookup.get_metric(metric_reference) if metric.filter is not None: - group_by_specs_for_one_metric = self._parse_linkable_element_names( + group_by_specs_for_one_metric = self._parse_linkable_elements( qualified_linkable_names=group_by_names, + linkable_elements=group_by, metric_references=(metric_reference,), ) @@ -461,12 +451,11 @@ def _parse_and_validate_query( ) # Validate all of them together. - if metric_references: - self._validate_linkable_specs( - metric_references=metric_references, - all_linkable_specs=requested_linkable_specs_with_requested_filter_specs, - time_dimension_specs=time_dimension_specs, - ) + self._validate_linkable_specs( + metric_references=metric_references, + all_linkable_specs=requested_linkable_specs_with_requested_filter_specs, + time_dimension_specs=time_dimension_specs, + ) self._validate_order_by_specs( order_by_specs=order_by_specs, @@ -530,6 +519,35 @@ def _validate_order_by_specs( ): raise InvalidQueryException(f"Order by item {order_by_spec} not in the query") + def _validate_date_part( + self, metric_references: Sequence[MetricReference], time_dimension_specs: Sequence[TimeDimensionSpec] + ) -> None: + """Validate that date parts can be used for metrics. + + TODO: figure out expected behavior for date part with these types of metrics. + """ + date_part_requested = False + for time_dimension_spec in time_dimension_specs: + if time_dimension_spec.date_part: + date_part_requested = True + if time_dimension_spec.date_part.to_int() < time_dimension_spec.time_granularity.to_int(): + raise RequestTimeGranularityException( + f"Date part {time_dimension_spec.date_part.name} is not compatible with time granularity " + f"{time_dimension_spec.time_granularity.name}. Compatible granularities include: " + f"{[granularity.name for granularity in time_dimension_spec.date_part.compatible_granularities]}" + ) + if date_part_requested: + for metric_reference in metric_references: + metric = self._metric_lookup.get_metric(metric_reference) + if metric.type == MetricType.CUMULATIVE: + raise UnableToSatisfyQueryError("Cannot extract date part for cumulative metrics.") + elif metric.type == MetricType.DERIVED: + for input_metric in metric.type_params.metrics or []: + if input_metric.offset_to_grain: + raise UnableToSatisfyQueryError( + "Cannot extract date part for metrics with offset_to_grain." + ) + def _adjust_time_range_constraint( self, metric_references: Sequence[MetricReference], @@ -644,26 +662,44 @@ def _parse_metric_names( metric_references.extend(list(input_metrics)) return tuple(metric_references) - def _parse_linkable_element_names( + def _parse_linkable_elements( self, - qualified_linkable_names: Sequence[str], metric_references: Sequence[MetricReference], + qualified_linkable_names: Optional[Sequence[str]] = None, + linkable_elements: Optional[Sequence[QueryParameterDimension]] = None, ) -> QueryTimeLinkableSpecSet: """Convert the linkable spec names into the respective specification objects.""" - qualified_linkable_names = [x.lower() for x in qualified_linkable_names] + # TODO: refactor to only support group_by object inputs (removing group_by_names param) + assert not ( + qualified_linkable_names and linkable_elements + ), "Both group_by_names and group_by were set, but if a group by is specified you should only use one of these!" + + structured_names: List[StructuredLinkableSpecName] = [] + if qualified_linkable_names: + qualified_linkable_names = [x.lower() for x in qualified_linkable_names] + structured_names = [StructuredLinkableSpecName.from_name(name) for name in qualified_linkable_names] + elif linkable_elements: + for linkable_element in linkable_elements: + parsed_name = StructuredLinkableSpecName.from_name(linkable_element.name) + structured_name = StructuredLinkableSpecName( + entity_link_names=parsed_name.entity_link_names, + element_name=parsed_name.element_name, + time_granularity=linkable_element.grain, + date_part=linkable_element.date_part, + ) + structured_names.append(structured_name) dimension_specs = [] time_dimension_specs = [] partial_time_dimension_specs = [] entity_specs = [] - for qualified_name in qualified_linkable_names: - structured_name = StructuredLinkableSpecName.from_name(qualified_name) + for structured_name in structured_names: element_name = structured_name.element_name entity_links = tuple(EntityReference(element_name=x) for x in structured_name.entity_link_names) # Create the spec based on the type of element referenced. if TimeDimensionReference(element_name=element_name) in self._known_time_dimension_element_references: - if structured_name.time_granularity: + if structured_name.time_granularity and not structured_name.date_part: time_dimension_specs.append( TimeDimensionSpec( element_name=element_name, @@ -672,31 +708,47 @@ def _parse_linkable_element_names( ) ) else: - partial_time_dimension_specs.append( - PartialTimeDimensionSpec( - element_name=element_name, - entity_links=entity_links, - ) + partial_time_dimension_spec = PartialTimeDimensionSpec( + element_name=element_name, entity_links=entity_links, date_part=structured_name.date_part ) + # If both granularity & date_part are requested, verify requested & resolved granularities match. + if structured_name.time_granularity and structured_name.date_part: + self._verify_resolved_granularity_for_date_part( + requested_dimension_structured_name=structured_name, + partial_time_dimension_spec=partial_time_dimension_spec, + metric_references=metric_references, + ) + partial_time_dimension_specs.append(partial_time_dimension_spec) + elif DimensionReference(element_name=element_name) in self._known_dimension_element_references: dimension_specs.append(DimensionSpec(element_name=element_name, entity_links=entity_links)) elif EntityReference(element_name=element_name) in self._known_entity_element_references: entity_specs.append(EntitySpec(element_name=element_name, entity_links=entity_links)) else: + valid_group_bys_for_metrics = self._metric_lookup.element_specs_for_metrics(list(metric_references)) valid_group_by_names_for_metrics = sorted( - x.qualified_name for x in self._metric_lookup.element_specs_for_metrics(list(metric_references)) + list( + set( + x.qualified_name if qualified_linkable_names else x.element_name + for x in valid_group_bys_for_metrics + ) + ) ) + # If requested by name, show qualified name. If requested as object, show element name. + display_name = structured_name.qualified_name if qualified_linkable_names else element_name suggestions = { - f"Suggestions for '{qualified_name}'": pformat_big_objects( + f"Suggestions for '{display_name}'": pformat_big_objects( MetricFlowQueryParser._top_fuzzy_matches( - item=qualified_name, + item=display_name, candidate_items=valid_group_by_names_for_metrics, ) ) } raise UnableToSatisfyQueryError( - f"Unknown element name '{element_name}' in dimension name '{qualified_name}'", + f"Unknown element name '{element_name}' in dimension name '{display_name}'" + if qualified_linkable_names + else f"Unknown dimension {element_name}", context=suggestions, ) @@ -707,6 +759,28 @@ def _parse_linkable_element_names( entity_specs=tuple(entity_specs), ) + def _verify_resolved_granularity_for_date_part( + self, + requested_dimension_structured_name: StructuredLinkableSpecName, + partial_time_dimension_spec: PartialTimeDimensionSpec, + metric_references: Sequence[MetricReference], + ) -> None: + """Enforce that any granularity value associated with a date part query is the minimum. + + By default, we will always ensure that a date_part query request uses the minimum granularity. + However, there are some interfaces where the user must pass in a granularity, so we need a check to + ensure that the correct value was passed in. + """ + resolved_granularity = self._time_granularity_solver.find_minimum_granularity_for_partial_time_dimension_spec( + partial_time_dimension_spec=partial_time_dimension_spec, metric_references=metric_references + ) + if resolved_granularity != requested_dimension_structured_name.time_granularity: + raise RequestTimeGranularityException( + f"When applying a date part to dimension '{requested_dimension_structured_name.qualified_name}' with " + f"metrics {[metric.element_name for metric in metric_references]}, only {resolved_granularity.name} " + "granularity can be used." + ) + def _get_invalid_linkable_specs( self, metric_references: Tuple[MetricReference, ...], @@ -796,6 +870,7 @@ def _parse_order_by( element_name=parsed_name.element_name, entity_links=entity_links, time_granularity=parsed_name.time_granularity, + date_part=parsed_name.date_part, ), descending=descending, ) diff --git a/metricflow/test/integration/test_cases/itest_dimensions.yaml b/metricflow/test/integration/test_cases/itest_dimensions.yaml index 6645a8974a..269682b5b1 100644 --- a/metricflow/test/integration/test_cases/itest_dimensions.yaml +++ b/metricflow/test/integration/test_cases/itest_dimensions.yaml @@ -123,14 +123,39 @@ integration_test: check_query: | SELECT u.home_state AS user__home_state - , v.ds AS ds__day + , u.ds AS ds__day + FROM {{ source_schema }}.dim_users u + GROUP BY + u.ds + , u.home_state +--- +integration_test: + name: query_dimensions_from_different_tables + description: Query multiple dimensions without metrics, requiring a join + model: SIMPLE_MODEL + group_bys: ["user__home_state", "verification__ds__day"] + check_query: | + SELECT + u.home_state AS user__home_state + , v.ds AS verification__ds__day FROM {{ source_schema }}.fct_id_verifications v LEFT OUTER JOIN {{ source_schema }}.dim_users u ON u.user_id = v.user_id - AND u.ds = v.ds + GROUP BY + u.home_state + , v.ds +--- +integration_test: + name: query_time_dimension_without_granularity + description: Query just a time dimension, no granularity specified + model: SIMPLE_MODEL + group_bys: [ "verification__ds"] + check_query: | + SELECT + v.ds + FROM {{ source_schema }}.fct_id_verifications v GROUP BY v.ds - , u.home_state --- integration_test: name: query_dimension_only_with_constraint diff --git a/metricflow/time/time_granularity_solver.py b/metricflow/time/time_granularity_solver.py index 8fbf95c601..fa13733f37 100644 --- a/metricflow/time/time_granularity_solver.py +++ b/metricflow/time/time_granularity_solver.py @@ -72,6 +72,9 @@ def validate_time_granularity( e.g. throw an error if "ds__week" is specified for a metric with a time granularity of MONTH. """ + if not metric_references: + return None + valid_group_by_elements = self._semantic_manifest_lookup.metric_lookup.linkable_set_for_metrics( metric_references=metric_references, ) @@ -100,36 +103,39 @@ def resolve_granularity_for_partial_time_dimension_specs( Returns a dictionary that maps how the partial time dimension spec should be turned into a time dimension spec. """ - valid_group_by_elements = self._semantic_manifest_lookup.metric_lookup.linkable_set_for_metrics( - metric_references=metric_references, - ) - result: Dict[PartialTimeDimensionSpec, TimeDimensionSpec] = {} - for partial_time_dimension_spec in partial_time_dimension_specs: - minimum_time_granularity: Optional[TimeGranularity] = None - for path_key in valid_group_by_elements.path_key_to_linkable_dimensions: - if ( - path_key.element_name == partial_time_dimension_spec.element_name - and path_key.entity_links == partial_time_dimension_spec.entity_links - and path_key.time_granularity is not None - ): - minimum_time_granularity = ( - path_key.time_granularity - if minimum_time_granularity is None - else min(minimum_time_granularity, path_key.time_granularity) + if metric_references: + valid_group_by_elements = self._semantic_manifest_lookup.metric_lookup.linkable_set_for_metrics( + metric_references=metric_references, + ) + result: Dict[PartialTimeDimensionSpec, TimeDimensionSpec] = {} + for partial_time_dimension_spec in partial_time_dimension_specs: + minimum_time_granularity: Optional[TimeGranularity] = None + for path_key in valid_group_by_elements.path_key_to_linkable_dimensions: + if ( + path_key.element_name == partial_time_dimension_spec.element_name + and path_key.entity_links == partial_time_dimension_spec.entity_links + and path_key.time_granularity is not None + ): + minimum_time_granularity = ( + path_key.time_granularity + if minimum_time_granularity is None + else min(minimum_time_granularity, path_key.time_granularity) + ) + + if minimum_time_granularity is not None: + result[partial_time_dimension_spec] = TimeDimensionSpec( + element_name=partial_time_dimension_spec.element_name, + entity_links=partial_time_dimension_spec.entity_links, + time_granularity=minimum_time_granularity, ) - - if minimum_time_granularity is not None: - result[partial_time_dimension_spec] = TimeDimensionSpec( - element_name=partial_time_dimension_spec.element_name, - entity_links=partial_time_dimension_spec.entity_links, - time_granularity=minimum_time_granularity, - ) - else: - raise RequestTimeGranularityException( - f"Unable to resolve the time dimension spec for {partial_time_dimension_spec}. " - f"Valid group by elements are:\n" - f"{pformat_big_objects([spec.qualified_name for spec in valid_group_by_elements.as_spec_set.as_tuple])}" - ) + else: + raise RequestTimeGranularityException( + f"Unable to resolve the time dimension spec for {partial_time_dimension_spec}. " + f"Valid group by elements are:\n" + f"{pformat_big_objects([spec.qualified_name for spec in valid_group_by_elements.as_spec_set.as_tuple])}" + ) + else: + raise NotImplementedError # find minimum granularity for time dimension return result def adjust_time_range_to_granularity(