From 65a7fc78c8d650e915ac3a174f173aa1f63b9789 Mon Sep 17 00:00:00 2001 From: Marco Yuen Date: Tue, 26 Sep 2023 12:54:38 -0400 Subject: [PATCH 1/3] add back on_warning_handle in execute --- cosmos/operators/local.py | 16 ++++++++++++++- tests/operators/test_local.py | 38 +++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 4888583bb..ce69f1435 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -42,6 +42,7 @@ ) from cosmos.dbt.parser.output import ( extract_log_issues, + parse_output, ) logger = get_logger(__name__) @@ -350,11 +351,12 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope job_facets=job_facets, ) - def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None: + def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> FullOutputSubprocessResult: dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags) dbt_cmd = dbt_cmd or [] result = self.run_command(cmd=dbt_cmd, env=env, context=context) logger.info(result.output) + return result def execute(self, context: Context) -> None: self.build_and_run_cmd(context=context) @@ -492,6 +494,18 @@ def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context) if self.on_warning_callback: self.on_warning_callback(warning_context) + def execute(self, context: Context) -> str: + result = self.build_and_run_cmd(context=context) + + if not self._should_run_tests(result): + return result.output + + warnings = parse_output(result, "WARN") + if warnings > 0: + self._handle_warnings(result, context) + + return result.output + class DbtRunOperationLocalOperator(DbtLocalBaseOperator): """ diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 94b0f8e27..1b875a51f 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -302,3 +302,41 @@ def test_operator_execute_without_flags(mock_build_and_run_cmd, operator_class): ) task.execute(context={}) mock_build_and_run_cmd.assert_called_once_with(context={}) + + +@patch("cosmos.operators.local.parse_output") +@patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd") +def test_test_operator_execute_with_on_warning_callback(mock_build_and_run_cmd, mock_parse_output): + # simulate when there is warning + mock_parse_output.return_value = 1 + + warning_handler = MagicMock() + + test_operator = DbtTestLocalOperator( + profile_config=profile_config, + project_dir="my/dir", + task_id="test", + on_warning_callback=warning_handler + ) + test_operator.execute(context={}) + mock_build_and_run_cmd.assert_called_once() + warning_handler.assert_called_once() + + +@patch("cosmos.operators.local.parse_output") +@patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd") +def test_test_operator_execute_without_on_warning_callback(mock_build_and_run_cmd, mock_parse_output): + # simulate when there is no warning + mock_parse_output.return_value = 0 + + warning_handler = MagicMock() + + test_operator = DbtTestLocalOperator( + profile_config=profile_config, + project_dir="my/dir", + task_id="test", + on_warning_callback=warning_handler + ) + test_operator.execute(context={}) + mock_build_and_run_cmd.assert_called_once() + warning_handler.assert_not_called() From 52e7f749f168d916165c8a949f9eabc4e07c761b Mon Sep 17 00:00:00 2001 From: Marco Yuen Date: Tue, 26 Sep 2023 12:58:24 -0400 Subject: [PATCH 2/3] fix type error --- cosmos/operators/local.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index ce69f1435..fdf1d7d39 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -494,18 +494,16 @@ def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context) if self.on_warning_callback: self.on_warning_callback(warning_context) - def execute(self, context: Context) -> str: + def execute(self, context: Context) -> None: result = self.build_and_run_cmd(context=context) if not self._should_run_tests(result): - return result.output + return warnings = parse_output(result, "WARN") if warnings > 0: self._handle_warnings(result, context) - return result.output - class DbtRunOperationLocalOperator(DbtLocalBaseOperator): """ From e47dbb76df181125ba2b70a2c2d7c906315af6b5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 26 Sep 2023 17:08:26 +0000 Subject: [PATCH 3/3] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/operators/test_local.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 1b875a51f..fdd77aec3 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -313,10 +313,7 @@ def test_test_operator_execute_with_on_warning_callback(mock_build_and_run_cmd, warning_handler = MagicMock() test_operator = DbtTestLocalOperator( - profile_config=profile_config, - project_dir="my/dir", - task_id="test", - on_warning_callback=warning_handler + profile_config=profile_config, project_dir="my/dir", task_id="test", on_warning_callback=warning_handler ) test_operator.execute(context={}) mock_build_and_run_cmd.assert_called_once() @@ -332,10 +329,7 @@ def test_test_operator_execute_without_on_warning_callback(mock_build_and_run_cm warning_handler = MagicMock() test_operator = DbtTestLocalOperator( - profile_config=profile_config, - project_dir="my/dir", - task_id="test", - on_warning_callback=warning_handler + profile_config=profile_config, project_dir="my/dir", task_id="test", on_warning_callback=warning_handler ) test_operator.execute(context={}) mock_build_and_run_cmd.assert_called_once()