From 605574e15af292051f33a21debb62fd138b0984f Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 18 Dec 2024 19:24:53 +0530 Subject: [PATCH] Add tests --- cosmos/dbt/parser/output.py | 12 +++++------ dev/dags/example_source_rendering.py | 1 + tests/dbt/parser/test_output.py | 22 +++++++++++++++++++ tests/operators/test_local.py | 32 ++++++++++++++++++++++++++++ 4 files changed, 61 insertions(+), 6 deletions(-) diff --git a/cosmos/dbt/parser/output.py b/cosmos/dbt/parser/output.py index 06def6b28..0bf86ccb8 100644 --- a/cosmos/dbt/parser/output.py +++ b/cosmos/dbt/parser/output.py @@ -54,17 +54,17 @@ def parse_number_of_warnings_dbt_runner(result: dbtRunnerResult) -> int: def extract_freshness_warn_msg(result: FullOutputSubprocessResult) -> Tuple[List[str], List[str]]: log_list = result.full_output - test_names = [] - test_results = [] + node_names = [] + node_results = [] for line in log_list: if DBT_FRESHNESS_WARN_MSG in line: - test_name = line.split(DBT_FRESHNESS_WARN_MSG)[1].split(" ")[1] - test_names.append(test_name) - test_results.append(line) + node_name = line.split(DBT_FRESHNESS_WARN_MSG)[1].split(" ")[1] + node_names.append(node_name) + node_results.append(line) - return test_names, test_results + return node_names, node_results def extract_log_issues(log_list: List[str]) -> Tuple[List[str], List[str]]: diff --git a/dev/dags/example_source_rendering.py b/dev/dags/example_source_rendering.py index 07f757dd6..716d634c7 100644 --- a/dev/dags/example_source_rendering.py +++ b/dev/dags/example_source_rendering.py @@ -40,4 +40,5 @@ catchup=False, dag_id="source_rendering_dag", default_args={"retries": 2}, + on_warning_callback=lambda context: print(context), ) diff --git a/tests/dbt/parser/test_output.py b/tests/dbt/parser/test_output.py index 4fe11669f..fd19f8bbc 100644 --- a/tests/dbt/parser/test_output.py +++ b/tests/dbt/parser/test_output.py @@ -6,10 +6,12 @@ from cosmos.dbt.parser.output import ( extract_dbt_runner_issues, + extract_freshness_warn_msg, extract_log_issues, parse_number_of_warnings_dbt_runner, parse_number_of_warnings_subprocess, ) +from cosmos.hooks.subprocess import FullOutputSubprocessResult @pytest.mark.parametrize( @@ -112,3 +114,23 @@ def test_extract_dbt_runner_issues_with_status_levels(): assert node_names == ["node1", "node2"] assert node_results == ["An error message", "A failure message"] + + +def test_extract_freshness_warn_msg(): + result = FullOutputSubprocessResult( + full_output=[ + "Info: some other log message", + "INFO - 11:50:42 1 of 1 WARN freshness of postgres_db.raw_orders ................................ [WARN in 0.01s]", + "INFO - 11:50:42", + "INFO - 11:50:42 Finished running 1 source in 0 hours 0 minutes and 0.04 seconds (0.04s).", + "INFO - 11:50:42 Done.", + ], + output="INFO - 11:50:42 Done.", + exit_code=0, + ) + node_names, node_results = extract_freshness_warn_msg(result) + + assert node_names == ["postgres_db.raw_orders"] + assert node_results == [ + "INFO - 11:50:42 1 of 1 WARN freshness of postgres_db.raw_orders ................................ [WARN in 0.01s]" + ] diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 464ca2e92..5ea3d423d 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1159,6 +1159,38 @@ def test_store_freshness_not_store_compiled_sql(mock_context, mock_session): assert instance.freshness == "" +@pytest.mark.parametrize( + "invocation_mode, expected_extract_function", + [ + (InvocationMode.SUBPROCESS, "extract_freshness_warn_msg"), + (InvocationMode.DBT_RUNNER, "extract_dbt_runner_issues"), + ], +) +def test_handle_warnings(invocation_mode, expected_extract_function, mock_context): + result = MagicMock() + + instance = DbtSourceLocalOperator( + task_id="test", + profile_config=None, + project_dir="my/dir", + on_warning_callback=lambda context: print(context), + invocation_mode=invocation_mode, + ) + + with patch(f"cosmos.operators.local.{expected_extract_function}") as mock_extract_issues, patch.object( + instance, "on_warning_callback" + ) as mock_on_warning_callback: + mock_extract_issues.return_value = (["test_name1", "test_name2"], ["test_name1", "test_name2"]) + + instance._handle_warnings(result, mock_context) + + mock_extract_issues.assert_called_once_with(result) + + mock_on_warning_callback.assert_called_once_with( + {**mock_context, "test_names": ["test_name1", "test_name2"], "test_results": ["test_name1", "test_name2"]} + ) + + def test_dbt_compile_local_operator_initialisation(): operator = DbtCompileLocalOperator( task_id="fake-task",