Skip to content

Commit

Permalink
Fix DbtTestLocalOperator when using on_warning_callback (#558)
Browse files Browse the repository at this point in the history
Since 1.1.0, the `on_warning_callback` functionality no longer works. It
worked on 1.0.5
    
Closes: #549
Closes: #545 (the fix was necessary to make the `on_warning_callback`
test pass)

Co-authored-by: Edgaras Navickas <[email protected]>
Co-authored-by: Marco Yuen <[email protected]>
  • Loading branch information
3 people committed Sep 27, 2023
1 parent ef9c11f commit a8a28fa
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 19 deletions.
30 changes: 11 additions & 19 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@
FullOutputSubprocessHook,
FullOutputSubprocessResult,
)
from cosmos.dbt.parser.output import (
extract_log_issues,
)
from cosmos.dbt.parser.output import extract_log_issues, parse_output

logger = get_logger(__name__)

Expand Down Expand Up @@ -276,7 +274,7 @@ def calculate_openlineage_events_completes(
try:
events = openlineage_processor.parse()
self.openlineage_events_completes = events.completes
except (FileNotFoundError, NotImplementedError):
except (FileNotFoundError, NotImplementedError, ValueError):
logger.debug("Unable to parse OpenLineage events", stack_info=True)

def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]:
Expand Down Expand Up @@ -349,11 +347,12 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope
job_facets=job_facets,
)

def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None:
def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> FullOutputSubprocessResult:
dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags)
dbt_cmd = dbt_cmd or []
result = self.run_command(cmd=dbt_cmd, env=env, context=context)
logger.info(result.output)
return result

def execute(self, context: Context) -> None:
self.build_and_run_cmd(context=context)
Expand Down Expand Up @@ -460,20 +459,6 @@ def __init__(
self.base_cmd = ["test"]
self.on_warning_callback = on_warning_callback

def _should_run_tests(
self,
result: FullOutputSubprocessResult,
no_tests_message: str = "Nothing to do",
) -> bool:
"""
Check if any tests are defined to run in the DAG. If tests are defined
and on_warning_callback is set, then function returns True.
:param result: The output from the build and run command.
"""

return self.on_warning_callback is not None and no_tests_message not in result.output

def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context) -> None:
"""
Handles warnings by extracting log issues, creating additional context, and calling the
Expand All @@ -491,6 +476,13 @@ def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context)
if self.on_warning_callback:
self.on_warning_callback(warning_context)

def execute(self, context: Context) -> None:
result = self.build_and_run_cmd(context=context)
if self.on_warning_callback and "WARN" in result.output:
warnings = parse_output(result, "WARN")
if warnings > 0:
self._handle_warnings(result, context)


class DbtRunOperationLocalOperator(DbtLocalBaseOperator):
"""
Expand Down
68 changes: 68 additions & 0 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
import shutil
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -28,6 +31,9 @@


DBT_PROJ_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt/jaffle_shop"
MINI_DBT_PROJ_DIR = Path(__file__).parent.parent / "sample/mini"
MINI_DBT_PROJ_DIR_FAILING_SCHEMA = MINI_DBT_PROJ_DIR / "schema_failing_test.yml"
MINI_DBT_PROJ_PROFILE = MINI_DBT_PROJ_DIR / "profiles.yml"

profile_config = ProfileConfig(
profile_name="default",
Expand All @@ -44,6 +50,20 @@
),
)

mini_profile_config = ProfileConfig(profile_name="mini", target_name="dev", profiles_yml_filepath=MINI_DBT_PROJ_PROFILE)


@pytest.fixture
def failing_test_dbt_project(tmp_path):
tmp_dir = tempfile.TemporaryDirectory()
tmp_dir_path = Path(tmp_dir.name) / "mini"
shutil.copytree(MINI_DBT_PROJ_DIR, tmp_dir_path)
target_schema = tmp_dir_path / "models/schema.yml"
target_schema.exists() and os.remove(target_schema)
shutil.copy(MINI_DBT_PROJ_DIR_FAILING_SCHEMA, target_schema)
yield tmp_dir_path
tmp_dir.cleanup()


def test_dbt_base_operator_add_global_flags() -> None:
dbt_base_operator = DbtLocalBaseOperator(
Expand Down Expand Up @@ -167,13 +187,15 @@ def test_run_operator_dataset_inlets_and_outlets():
task_id="run",
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
)
test_operator = DbtTestLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="test",
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
)
run_operator
run_test_dag(dag)
Expand All @@ -183,6 +205,52 @@ def test_run_operator_dataset_inlets_and_outlets():
assert test_operator.outlets == []


@pytest.mark.integration
def test_run_test_operator_with_callback(failing_test_dbt_project):
on_warning_callback = MagicMock()

with DAG("test-id-2", start_date=datetime(2022, 1, 1)) as dag:
run_operator = DbtSeedLocalOperator(
profile_config=mini_profile_config,
project_dir=failing_test_dbt_project,
task_id="run",
append_env=True,
)
test_operator = DbtTestLocalOperator(
profile_config=mini_profile_config,
project_dir=failing_test_dbt_project,
task_id="test",
append_env=True,
on_warning_callback=on_warning_callback,
)
run_operator >> test_operator
run_test_dag(dag)
assert on_warning_callback.called


@pytest.mark.integration
def test_run_test_operator_without_callback():
on_warning_callback = MagicMock()

with DAG("test-id-3", start_date=datetime(2022, 1, 1)) as dag:
run_operator = DbtSeedLocalOperator(
profile_config=mini_profile_config,
project_dir=MINI_DBT_PROJ_DIR,
task_id="run",
append_env=True,
)
test_operator = DbtTestLocalOperator(
profile_config=mini_profile_config,
project_dir=MINI_DBT_PROJ_DIR,
task_id="test",
append_env=True,
on_warning_callback=on_warning_callback,
)
run_operator >> test_operator
run_test_dag(dag)
assert not on_warning_callback.called


@pytest.mark.integration
def test_run_operator_emits_events():
class MockRun:
Expand Down
20 changes: 20 additions & 0 deletions tests/sample/mini/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: 'mini'

config-version: 2
version: '0.1'

profile: 'mini'

model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]
analysis-paths: ["analysis"]
macro-paths: ["macros"]

target-path: "target"
clean-targets:
- "target"
- "dbt_modules"
- "logs"

require-dbt-version: [">=1.0.0", "<2.0.0"]
Empty file.
12 changes: 12 additions & 0 deletions tests/sample/mini/profiles.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
mini:
target: dev
outputs:
dev:
type: postgres
host: "{{ env_var('POSTGRES_HOST') }}"
user: "{{ env_var('POSTGRES_USER') }}"
password: "{{ env_var('POSTGRES_PASSWORD') }}"
port: "{{ env_var('POSTGRES_PORT') | int }}"
dbname: "{{ env_var('POSTGRES_DB') }}"
schema: "{{ env_var('POSTGRES_SCHEMA') }}"
threads: 4
18 changes: 18 additions & 0 deletions tests/sample/mini/schema_failing_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: 2

seeds:

- name: mini_orders
description: This table has basic information about orders, as well as some derived facts based on payments

columns:

- name: status
description: 'Order status'
tests:
- accepted_values:
# this will intentionally fail, since the seed has other values for this column
values: ['placed']
config:
severity: warn
warn_if: ">1"
Empty file.
10 changes: 10 additions & 0 deletions tests/sample/mini/seeds/mini_orders.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
id,user_id,order_date,status
1,1,2018-01-01,returned
2,3,2018-01-02,completed
3,22,2018-01-26,return_pending
4,9,2018-03-17,shipped
75,69,2018-03-18,completed
76,25,2018-03-20,completed
77,35,2018-03-21,shipped
78,90,2018-03-23,shipped
6,68,2018-03-26,placed
Empty file.

0 comments on commit a8a28fa

Please sign in to comment.