Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Metricflow and dbt Core #265

Merged
merged 3 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.4.0
hooks:
- id: check-added-large-files
#- id: check-added-large-files
- id: check-ast
- id: check-json
- id: check-merge-conflict
Expand Down
80 changes: 72 additions & 8 deletions src/preset_cli/cli/superset/sync/dbt/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
A command to sync dbt models/metrics to Superset and charts/dashboards back as exposures.
"""

import logging
import os.path
import subprocess
import sys
import warnings
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple

import click
import yaml
Expand All @@ -17,6 +19,7 @@
JobSchema,
MetricSchema,
MFMetricWithSQLSchema,
MFSQLEngine,
ModelSchema,
)
from preset_cli.api.clients.superset import SupersetClient
Expand All @@ -31,6 +34,8 @@
)
from preset_cli.exceptions import DatabaseNotFoundError

_logger = logging.getLogger(__name__)


@click.command()
@click.argument("file", type=click.Path(exists=True, resolve_path=True))
Expand Down Expand Up @@ -180,6 +185,10 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-branches, too-many
with open(manifest, encoding="utf-8") as input_:
configs = yaml.load(input_, Loader=yaml.SafeLoader)

with open(profiles, encoding="utf-8") as input_:
config = yaml.safe_load(input_)
dialect = MFSQLEngine(config[project]["outputs"][target]["type"].upper())

model_schema = ModelSchema()
models = []
for config in configs["nodes"].values():
Expand All @@ -200,14 +209,18 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-branches, too-many
]
else:
og_metrics = []
sl_metrics = []
metric_schema = MetricSchema()
for config in configs["metrics"].values():
# conform to the same schema that dbt Cloud uses for metrics
config["dependsOn"] = config.pop("depends_on")["nodes"]
config["uniqueId"] = config.pop("unique_id")
og_metrics.append(metric_schema.load(config))
if "calculation_method" in config or "sql" in config:
# conform to the same schema that dbt Cloud uses for metrics
config["dependsOn"] = config.pop("depends_on")["nodes"]
config["uniqueId"] = config.pop("unique_id")
og_metrics.append(metric_schema.load(config))
elif sl_metric := get_sl_metric(config, model_map, dialect):
sl_metrics.append(sl_metric)

superset_metrics = get_superset_metrics_per_model(og_metrics)
superset_metrics = get_superset_metrics_per_model(og_metrics, sl_metrics)

try:
database = sync_database(
Expand Down Expand Up @@ -338,7 +351,58 @@ def get_job(
raise ValueError(f"Job {job_id} not available")


def process_sl_metrics(
def get_sl_metric(
metric: Dict[str, Any],
model_map: Dict[ModelKey, ModelSchema],
dialect: MFSQLEngine,
) -> Optional[MFMetricWithSQLSchema]:
"""
Compute a SL metric using the ``mf`` CLI.
"""
mf_metric_schema = MFMetricWithSQLSchema()

command = ["mf", "query", "--explain", "--metrics", metric["name"]]
try:
_logger.info(
"Using `mf` command to retrieve SQL syntax for metric %s",
metric["name"],
)
result = subprocess.run(command, capture_output=True, text=True, check=True)
betodealmeida marked this conversation as resolved.
Show resolved Hide resolved
except FileNotFoundError:
_logger.warning(
"`mf` command not found, if you're using Metricflow make sure you have it "
"installed in order to sync metrics",
)
return None
except subprocess.CalledProcessError:
_logger.warning(
"Could not generate SQL for metric %s (this happens for some metrics)",
metric["name"],
)
return None

output = result.stdout.strip()
start = output.find("SELECT")
sql = output[start:]

models = get_models_from_sql(sql, dialect, model_map)
if len(models) > 1:
return None
Comment on lines +388 to +390
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm working on this PR #266 and could potentially further improve it to also list at the end of the execution any failed metric (according to the limitations we currently have).

model = models[0]

return mf_metric_schema.load(
{
"name": metric["name"],
"type": metric["type"],
"description": metric["description"],
"sql": sql,
"dialect": dialect.value,
"model": model["unique_id"],
},
)


def fetch_sl_metrics(
dbt_client: DBTClient,
environment_id: int,
model_map: Dict[ModelKey, ModelSchema],
Expand Down Expand Up @@ -498,7 +562,7 @@ def dbt_cloud( # pylint: disable=too-many-arguments, too-many-locals
model_map = {ModelKey(model["schema"], model["name"]): model for model in models}

og_metrics = dbt_client.get_og_metrics(job["id"])
sl_metrics = process_sl_metrics(dbt_client, job["environment_id"], model_map)
sl_metrics = fetch_sl_metrics(dbt_client, job["environment_id"], model_map)
superset_metrics = get_superset_metrics_per_model(og_metrics, sl_metrics)

if exposures_only:
Expand Down
4 changes: 3 additions & 1 deletion src/preset_cli/cli/superset/sync/dbt/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ def get_metric_expression(unique_id: str, metrics: Dict[str, MetricSchema]) -> s
# dbt >= 1.3
type_ = metric["calculation_method"]
sql = metric["expression"]
else:
elif "sql" in metric:
# dbt < 1.3
type_ = metric["type"]
sql = metric["sql"]
else:
raise Exception(f"Unable to generate metric expression from: {metric}")

if metric.get("filters"):
sql = apply_filters(sql, metric["filters"])
Expand Down
Loading
Loading