Skip to content

Commit

Permalink
Merge branch 'master' into staging/br_anp_precos_combustiveis
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Sep 28, 2023
2 parents a44742f + f9adb9b commit 8d71c43
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 5 deletions.
3 changes: 3 additions & 0 deletions pipelines/utils/execute_dbt_model/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
table_id = Parameter("table_id")
mode = Parameter("mode", default="dev", required=False)
dbt_alias = Parameter("dbt_alias", default=False, required=False)
dbt_command = Parameter("dbt_command", default="run", required=False)

################# ####################
#
Expand All @@ -39,7 +40,9 @@
table_id=table_id,
dbt_alias=dbt_alias,
sync=True,
dbt_command=dbt_command,
)


run_dbt_model_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
run_dbt_model_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
36 changes: 31 additions & 5 deletions pipelines/utils/execute_dbt_model/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from pipelines.constants import constants
from pipelines.utils.execute_dbt_model.utils import get_dbt_client
from pipelines.utils.utils import log


@task(
Expand Down Expand Up @@ -42,15 +43,40 @@ def run_dbt_model(
dataset_id: str,
table_id: str,
dbt_alias: bool,
dbt_command: str,
sync: bool = True,
):
"""
Run a DBT model.
"""
if dbt_command not in ["run", "test", "run and test", "run/test"]:
raise ValueError(f"Invalid dbt_command: {dbt_command}")

if dbt_alias:
table_id = f"{dataset_id}__{table_id}"
dbt_client.cli(
f"run --models {dataset_id}.{table_id}",
sync=sync,
logs=True,
)

if "run" in dbt_command:
logs_dict = dbt_client.cli(
f"run --models {dataset_id}.{table_id}",
sync=sync,
logs=True,
)
for event in logs_dict["result"]["logs"]:
if event["levelname"] == "INFO":
log(event["message"])
if event["levelname"] == "DEBUG":
if "On model" in event["message"]:
log(event["message"])

if "test" in dbt_command:
logs_dict = dbt_client.cli(
f"test --models {dataset_id}.{table_id}",
sync=sync,
logs=True,
)
for event in logs_dict["result"]["logs"]:
if event["levelname"] == "INFO":
log(event["message"])
if event["levelname"] == "DEBUG":
if "On model" in event["message"]:
log(event["message"])

0 comments on commit 8d71c43

Please sign in to comment.