Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Querying with DATE PART #772

Merged
merged 14 commits into from
Sep 19, 2023
1 change: 1 addition & 0 deletions metricflow/engine/metricflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ def _create_execution_plan(self, mf_query_request: MetricFlowQueryRequest) -> Me
query_spec = self._query_parser.parse_and_validate_query(
metric_names=mf_query_request.metric_names,
group_by_names=mf_query_request.group_by_names,
group_by=mf_query_request.group_by,
courtneyholcomb marked this conversation as resolved.
Show resolved Hide resolved
limit=mf_query_request.limit,
time_constraint_start=mf_query_request.time_constraint_start,
time_constraint_end=mf_query_request.time_constraint_end,
Expand Down
11 changes: 9 additions & 2 deletions metricflow/plan_conversion/instance_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
SqlFunctionExpression,
)
from metricflow.sql.sql_plan import SqlSelectColumn
from metricflow.time.date_part import DatePart

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -262,6 +263,7 @@ class _DimensionValidityParams:

dimension_name: str
time_granularity: TimeGranularity
date_part: Optional[DatePart] = None


class CreateValidityWindowJoinDescription(InstanceSetTransform[Optional[ValidityWindowJoinDescription]]):
Expand Down Expand Up @@ -324,12 +326,16 @@ def transform(self, instance_set: InstanceSet) -> Optional[ValidityWindowJoinDes
start_specs = [
spec
for spec in specs
if spec.element_name == start_dim.dimension_name and spec.time_granularity == start_dim.time_granularity
if spec.element_name == start_dim.dimension_name
and spec.time_granularity == start_dim.time_granularity
and spec.date_part == start_dim.date_part
]
end_specs = [
spec
for spec in specs
if spec.element_name == end_dim.dimension_name and spec.time_granularity == end_dim.time_granularity
if spec.element_name == end_dim.dimension_name
and spec.time_granularity == end_dim.time_granularity
and spec.date_part == end_dim.date_part
]
linkless_start_specs = {spec.without_entity_links for spec in start_specs}
linkless_end_specs = {spec.without_entity_links for spec in end_specs}
Expand Down Expand Up @@ -401,6 +407,7 @@ def transform(self, instance_set: InstanceSet) -> InstanceSet: # noqa: D
+ time_dimension_instance.spec.entity_links
),
time_granularity=time_dimension_instance.spec.time_granularity,
date_part=time_dimension_instance.spec.date_part,
)
time_dimension_instances_with_additional_link.append(
TimeDimensionInstance(
Expand Down
96 changes: 82 additions & 14 deletions metricflow/query/query_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,12 @@ def _get_group_by_names(
return (
group_by_names
if group_by_names
else [f"{g.name}__{g.grain}" if g.grain else g.name for g in group_by]
else [
StructuredLinkableSpecName(
entity_link_names=(), element_name=g.name, time_granularity=g.grain, date_part=g.date_part
).qualified_name
for g in group_by
]
if group_by
else []
)
Expand Down Expand Up @@ -343,7 +348,6 @@ def _parse_and_validate_query(
time_granularity: Optional[TimeGranularity] = None,
) -> MetricFlowQuerySpec:
metric_names = self._get_metric_names(metric_names, metrics)
group_by_names = self._get_group_by_names(group_by_names, group_by)
where_filter = self._get_where_filter(where_constraint, where_constraint_str)
order = self._get_order(order, order_by)

Expand Down Expand Up @@ -393,7 +397,9 @@ def _parse_and_validate_query(
# If the time constraint is all time, just ignore and not render
time_constraint = None

requested_linkable_specs = self._parse_linkable_element_names(group_by_names, metric_references)
requested_linkable_specs = self._parse_linkable_elements(
qualified_linkable_names=group_by_names, linkable_elements=group_by, metric_references=metric_references
)
where_filter_spec: Optional[WhereFilterSpec] = None
if where_filter is not None:
try:
Expand Down Expand Up @@ -427,6 +433,7 @@ def _parse_and_validate_query(
self._validate_no_time_dimension_query(metric_references=metric_references)

self._time_granularity_solver.validate_time_granularity(metric_references, time_dimension_specs)
self._validate_date_part(metric_references, time_dimension_specs)

order_by_specs = self._parse_order_by(order or [], partial_time_dimension_spec_replacements)

Expand All @@ -436,8 +443,9 @@ def _parse_and_validate_query(
for metric_reference in metric_references:
metric = self._metric_lookup.get_metric(metric_reference)
if metric.filter is not None:
group_by_specs_for_one_metric = self._parse_linkable_element_names(
group_by_specs_for_one_metric = self._parse_linkable_elements(
qualified_linkable_names=group_by_names,
linkable_elements=group_by,
metric_references=(metric_reference,),
)

Expand Down Expand Up @@ -529,6 +537,34 @@ def _validate_order_by_specs(
):
raise InvalidQueryException(f"Order by item {order_by_spec} not in the query")

def _validate_date_part(
self, metric_references: Sequence[MetricReference], time_dimension_specs: Sequence[TimeDimensionSpec]
) -> None:
"""Validate that date parts can be used for metrics.

TODO: figure out expected behavior for date part with these types of metrics.
"""
date_part_requested = False
for time_dimension_spec in time_dimension_specs:
if time_dimension_spec.date_part:
date_part_requested = True
if time_dimension_spec.date_part.to_int() < time_dimension_spec.time_granularity.to_int():
raise RequestTimeGranularityException(
f"Date part {time_dimension_spec.date_part.name} is not compatible with time granularity "
f"{time_dimension_spec.time_granularity.name}."
courtneyholcomb marked this conversation as resolved.
Show resolved Hide resolved
)
if date_part_requested:
for metric_reference in metric_references:
metric = self._metric_lookup.get_metric(metric_reference)
if metric.type == MetricType.CUMULATIVE:
raise UnableToSatisfyQueryError("Cannot extract date part for cumulative metrics.")
courtneyholcomb marked this conversation as resolved.
Show resolved Hide resolved
elif metric.type == MetricType.DERIVED:
for input_metric in metric.type_params.metrics or []:
if input_metric.offset_to_grain:
raise UnableToSatisfyQueryError(
"Cannot extract date part for metrics with offset_to_grain."
)

def _adjust_time_range_constraint(
self,
metric_references: Sequence[MetricReference],
Expand Down Expand Up @@ -643,59 +679,90 @@ def _parse_metric_names(
metric_references.extend(list(input_metrics))
return tuple(metric_references)

def _parse_linkable_element_names(
def _parse_linkable_elements(
self,
qualified_linkable_names: Sequence[str],
metric_references: Sequence[MetricReference],
qualified_linkable_names: Optional[Sequence[str]] = None,
linkable_elements: Optional[Sequence[QueryParameter]] = None,
) -> QueryTimeLinkableSpecSet:
"""Convert the linkable spec names into the respective specification objects."""
qualified_linkable_names = [x.lower() for x in qualified_linkable_names]
assert not (qualified_linkable_names and linkable_elements)
courtneyholcomb marked this conversation as resolved.
Show resolved Hide resolved

structured_names: List[StructuredLinkableSpecName] = []
if qualified_linkable_names:
qualified_linkable_names = [x.lower() for x in qualified_linkable_names]
structured_names = [StructuredLinkableSpecName.from_name(name) for name in qualified_linkable_names]
courtneyholcomb marked this conversation as resolved.
Show resolved Hide resolved
elif linkable_elements:
for linkable_element in linkable_elements:
parsed_name = StructuredLinkableSpecName.from_name(linkable_element.name)
if parsed_name.time_granularity:
courtneyholcomb marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
"Time granularity must be passed in the `grain` attribute for `group_by` query param."
courtneyholcomb marked this conversation as resolved.
Show resolved Hide resolved
)
structured_name = StructuredLinkableSpecName(
entity_link_names=parsed_name.entity_link_names,
element_name=parsed_name.element_name,
time_granularity=linkable_element.grain,
date_part=linkable_element.date_part,
)
structured_names.append(structured_name)

dimension_specs = []
time_dimension_specs = []
partial_time_dimension_specs = []
entity_specs = []

for qualified_name in qualified_linkable_names:
structured_name = StructuredLinkableSpecName.from_name(qualified_name)
for structured_name in structured_names:
element_name = structured_name.element_name
entity_links = tuple(EntityReference(element_name=x) for x in structured_name.entity_link_names)
# Create the spec based on the type of element referenced.
if TimeDimensionReference(element_name=element_name) in self._known_time_dimension_element_references:
if structured_name.time_granularity:
if structured_name.time_granularity and not structured_name.date_part:
time_dimension_specs.append(
TimeDimensionSpec(
element_name=element_name,
entity_links=entity_links,
time_granularity=structured_name.time_granularity,
)
)
# If date part is passed, remove requested granularity (to be overridden with default).
else:
partial_time_dimension_specs.append(
PartialTimeDimensionSpec(
element_name=element_name,
entity_links=entity_links,
date_part=structured_name.date_part,
)
)
elif DimensionReference(element_name=element_name) in self._known_dimension_element_references:
dimension_specs.append(DimensionSpec(element_name=element_name, entity_links=entity_links))
elif EntityReference(element_name=element_name) in self._known_entity_element_references:
entity_specs.append(EntitySpec(element_name=element_name, entity_links=entity_links))
else:
valid_group_bys_for_metrics = self._metric_lookup.element_specs_for_metrics(list(metric_references))
valid_group_by_names_for_metrics = sorted(
x.qualified_name for x in self._metric_lookup.element_specs_for_metrics(list(metric_references))
list(
set(
x.qualified_name if qualified_linkable_names else x.element_name
tlento marked this conversation as resolved.
Show resolved Hide resolved
for x in valid_group_bys_for_metrics
)
)
)

# If requested by name, show qualified name. If requested as object, show element name.
display_name = structured_name.qualified_name if qualified_linkable_names else element_name
courtneyholcomb marked this conversation as resolved.
Show resolved Hide resolved
suggestions = {
f"Suggestions for '{qualified_name}'": pformat_big_objects(
f"Suggestions for '{display_name}'": pformat_big_objects(
MetricFlowQueryParser._top_fuzzy_matches(
item=qualified_name,
item=display_name,
candidate_items=valid_group_by_names_for_metrics,
)
)
}
raise UnableToSatisfyQueryError(
f"Unknown element name '{element_name}' in dimension name '{qualified_name}'",
f"Unknown element name '{element_name}' in dimension name '{display_name}'"
if qualified_linkable_names
else f"Unknown dimension {element_name}",
context=suggestions,
)

Expand Down Expand Up @@ -795,6 +862,7 @@ def _parse_order_by(
element_name=parsed_name.element_name,
entity_links=entity_links,
time_granularity=parsed_name.time_granularity,
date_part=parsed_name.date_part,
),
descending=descending,
)
Expand Down
12 changes: 12 additions & 0 deletions metricflow/specs/query_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from dbt_semantic_interfaces.type_enums import TimeGranularity

from metricflow.time.date_part import DatePart


class QueryInterfaceMetric(Protocol):
"""Metric in the query interface."""
Expand All @@ -27,6 +29,11 @@ def grain(self) -> Optional[TimeGranularity]:
"""The time granularity."""
raise NotImplementedError

@property
def date_part(self) -> Optional[DatePart]:
"""Date part to extract from the dimension."""
raise NotImplementedError


class QueryInterfaceDimension(Protocol):
"""Represents the interface for Dimension in the query interface."""
Expand All @@ -39,6 +46,10 @@ def alias(self, _alias: str) -> QueryInterfaceDimension:
"""Renaming the column."""
raise NotImplementedError

def date_part(self, _date_part: str) -> QueryInterfaceDimension:
"""Date part to extract from the dimension."""
raise NotImplementedError


class QueryInterfaceDimensionFactory(Protocol):
"""Creates a Dimension for the query interface.
Expand Down Expand Up @@ -67,6 +78,7 @@ def create(
self,
time_dimension_name: str,
time_granularity_name: str,
date_part_name: Optional[str] = None,
entity_path: Sequence[str] = (),
) -> QueryInterfaceTimeDimension:
"""Create a TimeDimension."""
Expand Down
4 changes: 4 additions & 0 deletions metricflow/specs/where_filter_dimension.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def grain(self, _grain: str) -> QueryInterfaceDimension:
"""The time granularity."""
raise NotImplementedError

def date_part(self, _date_part: str) -> QueryInterfaceDimension:
"""The date_part requested to extract."""
raise NotImplementedError

def alias(self, _alias: str) -> QueryInterfaceDimension:
"""Renaming the column."""
raise NotImplementedError
Expand Down
6 changes: 3 additions & 3 deletions metricflow/specs/where_filter_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
from typing_extensions import override

from metricflow.specs.column_assoc import ColumnAssociationResolver
from metricflow.specs.query_interface import QueryInterfaceDimension, QueryInterfaceEntityFactory
from metricflow.specs.query_interface import QueryInterfaceEntity, QueryInterfaceEntityFactory
from metricflow.specs.specs import EntitySpec


class WhereFilterEntity(ProtocolHint[QueryInterfaceDimension]):
class WhereFilterEntity(ProtocolHint[QueryInterfaceEntity]):
"""An entity that is passed in through the where filter parameter."""

@override
def _implements_protocol(self) -> QueryInterfaceDimension:
def _implements_protocol(self) -> QueryInterfaceEntity:
return self

def __init__(self, column_name: str): # noqa
Expand Down
14 changes: 12 additions & 2 deletions metricflow/specs/where_filter_time_dimension.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import List, Sequence
from typing import List, Optional, Sequence

from dbt_semantic_interfaces.call_parameter_sets import FilterCallParameterSets, TimeDimensionCallParameterSet
from dbt_semantic_interfaces.naming.dundered import DunderedNameFormatter
Expand Down Expand Up @@ -28,6 +28,10 @@ def grain(self, _grain: str) -> WhereFilterTimeDimension:
"""The time granularity."""
raise NotImplementedError

def date_part(self, _date_part: str) -> WhereFilterTimeDimension:
"""Requested date_part to extract."""
raise NotImplementedError

def alias(self, _alias: str) -> WhereFilterTimeDimension:
"""Renaming the column."""
raise NotImplementedError
Expand Down Expand Up @@ -60,7 +64,11 @@ def __init__( # noqa
self.time_dimension_specs: List[TimeDimensionSpec] = []

def create(
self, time_dimension_name: str, time_granularity_name: str, entity_path: Sequence[str] = ()
self,
time_dimension_name: str,
time_granularity_name: str,
date_part_name: Optional[str] = None,
entity_path: Sequence[str] = (),
) -> WhereFilterTimeDimension:
"""Create a WhereFilterTimeDimension."""
structured_name = DunderedNameFormatter.parse_name(time_dimension_name)
Expand All @@ -86,4 +94,6 @@ def _convert_to_time_dimension_spec(
element_name=parameter_set.time_dimension_reference.element_name,
entity_links=parameter_set.entity_path,
time_granularity=parameter_set.time_granularity,
# TODO: add date_part to TimeDimensionCallParameterSet in DSI
# date_part=parameter_set.date_part,
courtneyholcomb marked this conversation as resolved.
Show resolved Hide resolved
)
3 changes: 3 additions & 0 deletions metricflow/time/time_granularity_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from metricflow.specs.specs import (
TimeDimensionSpec,
)
from metricflow.time.date_part import DatePart
from metricflow.time.time_granularity import (
adjust_to_end_of_period,
adjust_to_start_of_period,
Expand All @@ -38,6 +39,7 @@ class PartialTimeDimensionSpec:

element_name: str
entity_links: Tuple[EntityReference, ...]
date_part: Optional[DatePart] = None


@dataclass(frozen=True)
Expand Down Expand Up @@ -123,6 +125,7 @@ def resolve_granularity_for_partial_time_dimension_specs(
element_name=partial_time_dimension_spec.element_name,
entity_links=partial_time_dimension_spec.entity_links,
time_granularity=minimum_time_granularity,
date_part=partial_time_dimension_spec.date_part,
)
else:
raise RequestTimeGranularityException(
Expand Down