Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Sep 21, 2023
1 parent 57498ec commit 13bd280
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 98 deletions.
35 changes: 16 additions & 19 deletions metricflow/dataflow/builder/source_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,23 @@ def create_from_data_sets(
source_nodes: List[BaseOutput] = []
for data_set in data_sets:
read_node = ReadSqlSourceNode(data_set)
if not with_measures:
source_nodes.append(read_node)
else:
agg_time_dim_to_measures_grouper = (
self._semantic_manifest_lookup.semantic_model_lookup.get_aggregation_time_dimensions_with_measures(
data_set.semantic_model_reference
)
agg_time_dim_to_measures_grouper = (
self._semantic_manifest_lookup.semantic_model_lookup.get_aggregation_time_dimensions_with_measures(
data_set.semantic_model_reference
)
)

# Dimension sources may not have any measures -> no aggregation time dimensions.
time_dimension_references = agg_time_dim_to_measures_grouper.keys
if len(time_dimension_references) == 0:
source_nodes.append(read_node)
else:
# Splits the measures by distinct aggregate time dimension.
for time_dimension_reference in time_dimension_references:
source_nodes.append(
MetricTimeDimensionTransformNode(
parent_node=read_node,
aggregation_time_dimension_reference=time_dimension_reference,
)
# Dimension sources may not have any measures -> no aggregation time dimensions.
time_dimension_references = agg_time_dim_to_measures_grouper.keys
if len(time_dimension_references) == 0:
source_nodes.append(read_node)
else:
# Splits the measures by distinct aggregate time dimension.
for time_dimension_reference in time_dimension_references:
source_nodes.append(
MetricTimeDimensionTransformNode(
parent_node=read_node,
aggregation_time_dimension_reference=time_dimension_reference,
)
)
return source_nodes
169 changes: 122 additions & 47 deletions metricflow/query/query_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from metricflow.naming.linkable_spec_name import StructuredLinkableSpecName
from metricflow.query.query_exceptions import InvalidQueryException
from metricflow.specs.column_assoc import ColumnAssociationResolver
from metricflow.specs.query_interface import QueryInterfaceMetric, QueryParameter
from metricflow.specs.specs import (
DimensionSpec,
EntitySpec,
Expand Down Expand Up @@ -169,16 +168,16 @@ def _top_fuzzy_matches(
def parse_and_validate_query(
self,
metric_names: Optional[Sequence[str]] = None,
metrics: Optional[Sequence[QueryInterfaceMetric]] = None,
metrics: Optional[Sequence[QueryParameterMetric]] = None,
group_by_names: Optional[Sequence[str]] = None,
group_by: Optional[Sequence[QueryParameter]] = None,
group_by: Optional[Sequence[QueryParameterDimension]] = None,
limit: Optional[int] = None,
time_constraint_start: Optional[datetime.datetime] = None,
time_constraint_end: Optional[datetime.datetime] = None,
where_constraint: Optional[WhereFilter] = None,
where_constraint_str: Optional[str] = None,
order: Optional[Sequence[str]] = None,
order_by: Optional[Sequence[QueryParameter]] = None,
order_by: Optional[Sequence[QueryParameterDimension]] = None,
time_granularity: Optional[TimeGranularity] = None,
) -> MetricFlowQuerySpec:
"""Parse the query into spec objects, validating them in the process.
Expand Down Expand Up @@ -289,22 +288,8 @@ def _construct_metric_specs_for_query(
)
return tuple(metric_specs)

def _get_group_by_names(
self, group_by_names: Optional[Sequence[str]], group_by: Optional[Sequence[QueryParameter]]
) -> Sequence[str]:
assert not (
group_by_names and group_by
), "Both group_by_names and group_by were set, but if a group by is specified you should only use one of these!"
return (
group_by_names
if group_by_names
else [f"{g.name}__{g.grain}" if g.grain else g.name for g in group_by]
if group_by
else []
)

def _get_metric_names(
self, metric_names: Optional[Sequence[str]], metrics: Optional[Sequence[QueryInterfaceMetric]]
self, metric_names: Optional[Sequence[str]], metrics: Optional[Sequence[QueryParameterMetric]]
) -> Sequence[str]:
assert_exactly_one_arg_set(metric_names=metric_names, metrics=metrics)
return metric_names if metric_names else [m.name for m in metrics] if metrics else []
Expand All @@ -321,7 +306,9 @@ def _get_where_filter(
PydanticWhereFilter(where_sql_template=where_constraint_str) if where_constraint_str else where_constraint
)

def _get_order(self, order: Optional[Sequence[str]], order_by: Optional[Sequence[QueryParameter]]) -> Sequence[str]:
def _get_order(
self, order: Optional[Sequence[str]], order_by: Optional[Sequence[QueryParameterDimension]]
) -> Sequence[str]:
assert not (
order and order_by
), "Both order_by_names and order_by were set, but if an order by is specified you should only use one of these!"
Expand All @@ -330,20 +317,19 @@ def _get_order(self, order: Optional[Sequence[str]], order_by: Optional[Sequence
def _parse_and_validate_query(
self,
metric_names: Optional[Sequence[str]] = None,
metrics: Optional[Sequence[QueryInterfaceMetric]] = None,
metrics: Optional[Sequence[QueryParameterMetric]] = None,
group_by_names: Optional[Sequence[str]] = None,
group_by: Optional[Sequence[QueryParameter]] = None,
group_by: Optional[Sequence[QueryParameterDimension]] = None,
limit: Optional[int] = None,
time_constraint_start: Optional[datetime.datetime] = None,
time_constraint_end: Optional[datetime.datetime] = None,
where_constraint: Optional[WhereFilter] = None,
where_constraint_str: Optional[str] = None,
order: Optional[Sequence[str]] = None,
order_by: Optional[Sequence[QueryParameter]] = None,
order_by: Optional[Sequence[QueryParameterDimension]] = None,
time_granularity: Optional[TimeGranularity] = None,
) -> MetricFlowQuerySpec:
metric_names = self._get_metric_names(metric_names, metrics)
group_by_names = self._get_group_by_names(group_by_names, group_by)
where_filter = self._get_where_filter(where_constraint, where_constraint_str)
order = self._get_order(order, order_by)

Expand Down Expand Up @@ -393,7 +379,9 @@ def _parse_and_validate_query(
# If the time constraint is all time, just ignore and not render
time_constraint = None

requested_linkable_specs = self._parse_linkable_element_names(group_by_names, metric_references)
requested_linkable_specs = self._parse_linkable_elements(
qualified_linkable_names=group_by_names, linkable_elements=group_by, metric_references=metric_references
)
where_filter_spec: Optional[WhereFilterSpec] = None
if where_filter is not None:
try:
Expand Down Expand Up @@ -427,6 +415,7 @@ def _parse_and_validate_query(
self._validate_no_time_dimension_query(metric_references=metric_references)

self._time_granularity_solver.validate_time_granularity(metric_references, time_dimension_specs)
self._validate_date_part(metric_references, time_dimension_specs)

order_by_specs = self._parse_order_by(order or [], partial_time_dimension_spec_replacements)

Expand All @@ -436,8 +425,9 @@ def _parse_and_validate_query(
for metric_reference in metric_references:
metric = self._metric_lookup.get_metric(metric_reference)
if metric.filter is not None:
group_by_specs_for_one_metric = self._parse_linkable_element_names(
group_by_specs_for_one_metric = self._parse_linkable_elements(
qualified_linkable_names=group_by_names,
linkable_elements=group_by,
metric_references=(metric_reference,),
)

Expand All @@ -461,12 +451,11 @@ def _parse_and_validate_query(
)

# Validate all of them together.
if metric_references:
self._validate_linkable_specs(
metric_references=metric_references,
all_linkable_specs=requested_linkable_specs_with_requested_filter_specs,
time_dimension_specs=time_dimension_specs,
)
self._validate_linkable_specs(
metric_references=metric_references,
all_linkable_specs=requested_linkable_specs_with_requested_filter_specs,
time_dimension_specs=time_dimension_specs,
)

self._validate_order_by_specs(
order_by_specs=order_by_specs,
Expand Down Expand Up @@ -530,6 +519,35 @@ def _validate_order_by_specs(
):
raise InvalidQueryException(f"Order by item {order_by_spec} not in the query")

def _validate_date_part(
self, metric_references: Sequence[MetricReference], time_dimension_specs: Sequence[TimeDimensionSpec]
) -> None:
"""Validate that date parts can be used for metrics.
TODO: figure out expected behavior for date part with these types of metrics.
"""
date_part_requested = False
for time_dimension_spec in time_dimension_specs:
if time_dimension_spec.date_part:
date_part_requested = True
if time_dimension_spec.date_part.to_int() < time_dimension_spec.time_granularity.to_int():
raise RequestTimeGranularityException(
f"Date part {time_dimension_spec.date_part.name} is not compatible with time granularity "
f"{time_dimension_spec.time_granularity.name}. Compatible granularities include: "
f"{[granularity.name for granularity in time_dimension_spec.date_part.compatible_granularities]}"
)
if date_part_requested:
for metric_reference in metric_references:
metric = self._metric_lookup.get_metric(metric_reference)
if metric.type == MetricType.CUMULATIVE:
raise UnableToSatisfyQueryError("Cannot extract date part for cumulative metrics.")
elif metric.type == MetricType.DERIVED:
for input_metric in metric.type_params.metrics or []:
if input_metric.offset_to_grain:
raise UnableToSatisfyQueryError(
"Cannot extract date part for metrics with offset_to_grain."
)

def _adjust_time_range_constraint(
self,
metric_references: Sequence[MetricReference],
Expand Down Expand Up @@ -644,26 +662,44 @@ def _parse_metric_names(
metric_references.extend(list(input_metrics))
return tuple(metric_references)

def _parse_linkable_element_names(
def _parse_linkable_elements(
self,
qualified_linkable_names: Sequence[str],
metric_references: Sequence[MetricReference],
qualified_linkable_names: Optional[Sequence[str]] = None,
linkable_elements: Optional[Sequence[QueryParameterDimension]] = None,
) -> QueryTimeLinkableSpecSet:
"""Convert the linkable spec names into the respective specification objects."""
qualified_linkable_names = [x.lower() for x in qualified_linkable_names]
# TODO: refactor to only support group_by object inputs (removing group_by_names param)
assert not (
qualified_linkable_names and linkable_elements
), "Both group_by_names and group_by were set, but if a group by is specified you should only use one of these!"

structured_names: List[StructuredLinkableSpecName] = []
if qualified_linkable_names:
qualified_linkable_names = [x.lower() for x in qualified_linkable_names]
structured_names = [StructuredLinkableSpecName.from_name(name) for name in qualified_linkable_names]
elif linkable_elements:
for linkable_element in linkable_elements:
parsed_name = StructuredLinkableSpecName.from_name(linkable_element.name)
structured_name = StructuredLinkableSpecName(
entity_link_names=parsed_name.entity_link_names,
element_name=parsed_name.element_name,
time_granularity=linkable_element.grain,
date_part=linkable_element.date_part,
)
structured_names.append(structured_name)

dimension_specs = []
time_dimension_specs = []
partial_time_dimension_specs = []
entity_specs = []

for qualified_name in qualified_linkable_names:
structured_name = StructuredLinkableSpecName.from_name(qualified_name)
for structured_name in structured_names:
element_name = structured_name.element_name
entity_links = tuple(EntityReference(element_name=x) for x in structured_name.entity_link_names)
# Create the spec based on the type of element referenced.
if TimeDimensionReference(element_name=element_name) in self._known_time_dimension_element_references:
if structured_name.time_granularity:
if structured_name.time_granularity and not structured_name.date_part:
time_dimension_specs.append(
TimeDimensionSpec(
element_name=element_name,
Expand All @@ -672,31 +708,47 @@ def _parse_linkable_element_names(
)
)
else:
partial_time_dimension_specs.append(
PartialTimeDimensionSpec(
element_name=element_name,
entity_links=entity_links,
)
partial_time_dimension_spec = PartialTimeDimensionSpec(
element_name=element_name, entity_links=entity_links, date_part=structured_name.date_part
)
# If both granularity & date_part are requested, verify requested & resolved granularities match.
if structured_name.time_granularity and structured_name.date_part:
self._verify_resolved_granularity_for_date_part(
requested_dimension_structured_name=structured_name,
partial_time_dimension_spec=partial_time_dimension_spec,
metric_references=metric_references,
)
partial_time_dimension_specs.append(partial_time_dimension_spec)

elif DimensionReference(element_name=element_name) in self._known_dimension_element_references:
dimension_specs.append(DimensionSpec(element_name=element_name, entity_links=entity_links))
elif EntityReference(element_name=element_name) in self._known_entity_element_references:
entity_specs.append(EntitySpec(element_name=element_name, entity_links=entity_links))
else:
valid_group_bys_for_metrics = self._metric_lookup.element_specs_for_metrics(list(metric_references))
valid_group_by_names_for_metrics = sorted(
x.qualified_name for x in self._metric_lookup.element_specs_for_metrics(list(metric_references))
list(
set(
x.qualified_name if qualified_linkable_names else x.element_name
for x in valid_group_bys_for_metrics
)
)
)

# If requested by name, show qualified name. If requested as object, show element name.
display_name = structured_name.qualified_name if qualified_linkable_names else element_name
suggestions = {
f"Suggestions for '{qualified_name}'": pformat_big_objects(
f"Suggestions for '{display_name}'": pformat_big_objects(
MetricFlowQueryParser._top_fuzzy_matches(
item=qualified_name,
item=display_name,
candidate_items=valid_group_by_names_for_metrics,
)
)
}
raise UnableToSatisfyQueryError(
f"Unknown element name '{element_name}' in dimension name '{qualified_name}'",
f"Unknown element name '{element_name}' in dimension name '{display_name}'"
if qualified_linkable_names
else f"Unknown dimension {element_name}",
context=suggestions,
)

Expand All @@ -707,6 +759,28 @@ def _parse_linkable_element_names(
entity_specs=tuple(entity_specs),
)

def _verify_resolved_granularity_for_date_part(
self,
requested_dimension_structured_name: StructuredLinkableSpecName,
partial_time_dimension_spec: PartialTimeDimensionSpec,
metric_references: Sequence[MetricReference],
) -> None:
"""Enforce that any granularity value associated with a date part query is the minimum.
By default, we will always ensure that a date_part query request uses the minimum granularity.
However, there are some interfaces where the user must pass in a granularity, so we need a check to
ensure that the correct value was passed in.
"""
resolved_granularity = self._time_granularity_solver.find_minimum_granularity_for_partial_time_dimension_spec(
partial_time_dimension_spec=partial_time_dimension_spec, metric_references=metric_references
)
if resolved_granularity != requested_dimension_structured_name.time_granularity:
raise RequestTimeGranularityException(
f"When applying a date part to dimension '{requested_dimension_structured_name.qualified_name}' with "
f"metrics {[metric.element_name for metric in metric_references]}, only {resolved_granularity.name} "
"granularity can be used."
)

def _get_invalid_linkable_specs(
self,
metric_references: Tuple[MetricReference, ...],
Expand Down Expand Up @@ -796,6 +870,7 @@ def _parse_order_by(
element_name=parsed_name.element_name,
entity_links=entity_links,
time_granularity=parsed_name.time_granularity,
date_part=parsed_name.date_part,
),
descending=descending,
)
Expand Down
Loading

0 comments on commit 13bd280

Please sign in to comment.