From 0dffc2c6fb3ba4c92e90cfca253e2337f2d0b822 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 18 Dec 2024 00:17:16 +0530 Subject: [PATCH 1/5] Add warning callback on source freshness --- cosmos/airflow/graph.py | 9 ++++++++- cosmos/core/airflow.py | 6 ++++++ cosmos/dbt/parser/output.py | 20 +++++++++++++++++++ cosmos/operators/local.py | 40 ++++++++++++++++++++++++++++++++++++- 4 files changed, 73 insertions(+), 2 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 4848c45c5..869c2087c 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -165,6 +165,7 @@ def create_task_metadata( use_task_group: bool = False, source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE, test_behavior: TestBehavior = TestBehavior.AFTER_ALL, + on_warning_callback: Callable[..., Any] | None = None, ) -> TaskMetadata | None: """ Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node. @@ -176,6 +177,7 @@ def create_task_metadata( :param dbt_dag_task_group_identifier: Identifier to refer to the DbtDAG or DbtTaskGroup in the DAG. :param use_task_group: It determines whether to use the name as a prefix for the task id or not. If it is False, then use the name as a prefix for the task id, otherwise do not. + :param on_warning_callback: :returns: The metadata necessary to instantiate the source dbt node as an Airflow task. """ dbt_resource_to_class = create_dbt_resource_to_class(test_behavior) @@ -183,10 +185,11 @@ def create_task_metadata( args = {**args, **{"models": node.resource_name}} if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class: - extra_context = { + extra_context: dict[str, Any] = { "dbt_node_config": node.context_dict, "dbt_dag_task_group_identifier": dbt_dag_task_group_identifier, } + if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: task_id = f"{node.name}_{node.resource_type.value}_build" elif node.resource_type == DbtResourceType.MODEL: @@ -195,6 +198,9 @@ def create_task_metadata( else: task_id = f"{node.name}_run" elif node.resource_type == DbtResourceType.SOURCE: + # if on_warning_callback is not None: + extra_context["on_warning_callback"] = on_warning_callback + if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS and node.has_freshness is False @@ -262,6 +268,7 @@ def generate_task_or_group( use_task_group=use_task_group, source_rendering_behavior=source_rendering_behavior, test_behavior=test_behavior, + on_warning_callback=on_warning_callback, ) # In most cases, we'll map one DBT node to one Airflow task diff --git a/cosmos/core/airflow.py b/cosmos/core/airflow.py index e25404aed..057056d36 100644 --- a/cosmos/core/airflow.py +++ b/cosmos/core/airflow.py @@ -35,6 +35,12 @@ def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None) for k, v in task.airflow_task_config.items(): task_kwargs[k] = v + # Set the on_warning_callback of source node in task_kwargs + on_warning_callback = task.extra_context.get("on_warning_callback") + if on_warning_callback is not None: + task_kwargs["on_warning_callback"] = on_warning_callback + del task.extra_context["on_warning_callback"] + airflow_task = Operator( task_id=task.id, dag=dag, diff --git a/cosmos/dbt/parser/output.py b/cosmos/dbt/parser/output.py index 3ff377941..4232ab380 100644 --- a/cosmos/dbt/parser/output.py +++ b/cosmos/dbt/parser/output.py @@ -11,6 +11,7 @@ DBT_NO_TESTS_MSG = "Nothing to do" DBT_WARN_MSG = "WARN" +DBT_FRESHNESS_WARN_MSG = "WARN freshness of" def parse_number_of_warnings_subprocess(result: FullOutputSubprocessResult) -> int: @@ -39,6 +40,10 @@ def parse_number_of_warnings_subprocess(result: FullOutputSubprocessResult) -> i return num +def parse_number_of_freshness_warnings_subprocess(result: FullOutputSubprocessResult) -> int: + return sum(1 for line in result.full_output if DBT_FRESHNESS_WARN_MSG in line) + + def parse_number_of_warnings_dbt_runner(result: dbtRunnerResult) -> int: """Parses a dbt runner result and returns the number of warnings found. This only works for dbtRunnerResult from invoking dbt build, compile, run, seed, snapshot, test, or run-operation. @@ -50,6 +55,21 @@ def parse_number_of_warnings_dbt_runner(result: dbtRunnerResult) -> int: return num +def extract_freshness_warn_issue(log_list: List[str]) -> Tuple[List[str], List[str]]: + + test_names = [] + test_results = [] + + for line in log_list: + + if DBT_FRESHNESS_WARN_MSG in line: + test_name = line.split(DBT_FRESHNESS_WARN_MSG)[1].split(" ")[1] + test_names.append(test_name) + test_results.append(line) + + return test_names, test_results + + def extract_log_issues(log_list: List[str]) -> Tuple[List[str], List[str]]: """ Extracts warning messages from the log list and returns them as a formatted string. diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 2a56c33e3..0fa8da23a 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -57,7 +57,9 @@ ) from cosmos.dbt.parser.output import ( extract_dbt_runner_issues, + extract_freshness_warn_issue, extract_log_issues, + parse_number_of_freshness_warnings_subprocess, parse_number_of_warnings_dbt_runner, parse_number_of_warnings_subprocess, ) @@ -706,8 +708,44 @@ class DbtSourceLocalOperator(DbtSourceMixin, DbtLocalBaseOperator): Executes a dbt source freshness command. """ - def __init__(self, *args: Any, **kwargs: Any) -> None: + def __init__(self, *args: Any, on_warning_callback: Callable[..., Any] | None = None, **kwargs: Any) -> None: super().__init__(*args, **kwargs) + self.on_warning_callback = on_warning_callback + self.extract_issues: Callable[..., tuple[list[str], list[str]]] + self.parse_number_of_warnings: Callable[..., int] + + def _set_test_result_parsing_methods(self) -> None: + """Sets the extract_issues and parse_number_of_warnings methods based on the invocation mode.""" + if self.invocation_mode == InvocationMode.SUBPROCESS: + self.extract_issues = extract_freshness_warn_issue + self.parse_number_of_warnings = parse_number_of_freshness_warnings_subprocess + # TODO: FIXME + # elif self.invocation_mode == InvocationMode.DBT_RUNNER: + # self.extract_issues = extract_dbt_runner_issues + # self.parse_number_of_warnings = parse_number_of_warnings_dbt_runner + + def _handle_warnings(self, result: FullOutputSubprocessResult | dbtRunnerResult, context: Context) -> None: + """ + Handles warnings by extracting log issues, creating additional context, and calling the + on_warning_callback with the updated context. + + :param result: The result object from the build and run command. + :param context: The original airflow context in which the build and run command was executed. + """ + test_names, test_results = self.extract_issues(result.full_output) + + warning_context = dict(context) + warning_context["test_names"] = test_names + warning_context["test_results"] = test_results + + self.on_warning_callback and self.on_warning_callback(warning_context) + + def execute(self, context: Context) -> None: + result = self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) + self._set_test_result_parsing_methods() + number_of_warnings = self.parse_number_of_warnings(result) # type: ignore + if self.on_warning_callback and number_of_warnings > 0: + self._handle_warnings(result, context) class DbtRunLocalOperator(DbtRunMixin, DbtLocalBaseOperator): From c951912e5f6bff411703bc214a7bdf344dfe3b77 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 18 Dec 2024 17:23:43 +0530 Subject: [PATCH 2/5] cleanup --- cosmos/airflow/graph.py | 1 - cosmos/core/airflow.py | 4 ++-- cosmos/dbt/parser/output.py | 7 ++----- cosmos/operators/local.py | 25 ++++++++----------------- 4 files changed, 12 insertions(+), 25 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 869c2087c..249a2d0ee 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -198,7 +198,6 @@ def create_task_metadata( else: task_id = f"{node.name}_run" elif node.resource_type == DbtResourceType.SOURCE: - # if on_warning_callback is not None: extra_context["on_warning_callback"] = on_warning_callback if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( diff --git a/cosmos/core/airflow.py b/cosmos/core/airflow.py index 057056d36..f26568ee7 100644 --- a/cosmos/core/airflow.py +++ b/cosmos/core/airflow.py @@ -37,9 +37,9 @@ def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None) # Set the on_warning_callback of source node in task_kwargs on_warning_callback = task.extra_context.get("on_warning_callback") - if on_warning_callback is not None: + if on_warning_callback: task_kwargs["on_warning_callback"] = on_warning_callback - del task.extra_context["on_warning_callback"] + task.extra_context.pop("on_warning_callback", None) airflow_task = Operator( task_id=task.id, diff --git a/cosmos/dbt/parser/output.py b/cosmos/dbt/parser/output.py index 4232ab380..06def6b28 100644 --- a/cosmos/dbt/parser/output.py +++ b/cosmos/dbt/parser/output.py @@ -40,10 +40,6 @@ def parse_number_of_warnings_subprocess(result: FullOutputSubprocessResult) -> i return num -def parse_number_of_freshness_warnings_subprocess(result: FullOutputSubprocessResult) -> int: - return sum(1 for line in result.full_output if DBT_FRESHNESS_WARN_MSG in line) - - def parse_number_of_warnings_dbt_runner(result: dbtRunnerResult) -> int: """Parses a dbt runner result and returns the number of warnings found. This only works for dbtRunnerResult from invoking dbt build, compile, run, seed, snapshot, test, or run-operation. @@ -55,7 +51,8 @@ def parse_number_of_warnings_dbt_runner(result: dbtRunnerResult) -> int: return num -def extract_freshness_warn_issue(log_list: List[str]) -> Tuple[List[str], List[str]]: +def extract_freshness_warn_msg(result: FullOutputSubprocessResult) -> Tuple[List[str], List[str]]: + log_list = result.full_output test_names = [] test_results = [] diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 0fa8da23a..dee00114c 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -57,9 +57,8 @@ ) from cosmos.dbt.parser.output import ( extract_dbt_runner_issues, - extract_freshness_warn_issue, + extract_freshness_warn_msg, extract_log_issues, - parse_number_of_freshness_warnings_subprocess, parse_number_of_warnings_dbt_runner, parse_number_of_warnings_subprocess, ) @@ -712,17 +711,6 @@ def __init__(self, *args: Any, on_warning_callback: Callable[..., Any] | None = super().__init__(*args, **kwargs) self.on_warning_callback = on_warning_callback self.extract_issues: Callable[..., tuple[list[str], list[str]]] - self.parse_number_of_warnings: Callable[..., int] - - def _set_test_result_parsing_methods(self) -> None: - """Sets the extract_issues and parse_number_of_warnings methods based on the invocation mode.""" - if self.invocation_mode == InvocationMode.SUBPROCESS: - self.extract_issues = extract_freshness_warn_issue - self.parse_number_of_warnings = parse_number_of_freshness_warnings_subprocess - # TODO: FIXME - # elif self.invocation_mode == InvocationMode.DBT_RUNNER: - # self.extract_issues = extract_dbt_runner_issues - # self.parse_number_of_warnings = parse_number_of_warnings_dbt_runner def _handle_warnings(self, result: FullOutputSubprocessResult | dbtRunnerResult, context: Context) -> None: """ @@ -732,7 +720,12 @@ def _handle_warnings(self, result: FullOutputSubprocessResult | dbtRunnerResult, :param result: The result object from the build and run command. :param context: The original airflow context in which the build and run command was executed. """ - test_names, test_results = self.extract_issues(result.full_output) + if self.invocation_mode == InvocationMode.SUBPROCESS: + self.extract_issues = extract_freshness_warn_msg + elif self.invocation_mode == InvocationMode.DBT_RUNNER: + self.extract_issues = extract_dbt_runner_issues + + test_names, test_results = self.extract_issues(result) warning_context = dict(context) warning_context["test_names"] = test_names @@ -742,9 +735,7 @@ def _handle_warnings(self, result: FullOutputSubprocessResult | dbtRunnerResult, def execute(self, context: Context) -> None: result = self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) - self._set_test_result_parsing_methods() - number_of_warnings = self.parse_number_of_warnings(result) # type: ignore - if self.on_warning_callback and number_of_warnings > 0: + if self.on_warning_callback: self._handle_warnings(result, context) From 7f02837a2613e97a92157deb97a20a50b09bb83c Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 18 Dec 2024 19:24:53 +0530 Subject: [PATCH 3/5] Add tests --- cosmos/dbt/parser/output.py | 12 +++++------ dev/dags/example_source_rendering.py | 1 + tests/dbt/parser/test_output.py | 22 +++++++++++++++++++ tests/operators/test_local.py | 32 ++++++++++++++++++++++++++++ 4 files changed, 61 insertions(+), 6 deletions(-) diff --git a/cosmos/dbt/parser/output.py b/cosmos/dbt/parser/output.py index 06def6b28..0bf86ccb8 100644 --- a/cosmos/dbt/parser/output.py +++ b/cosmos/dbt/parser/output.py @@ -54,17 +54,17 @@ def parse_number_of_warnings_dbt_runner(result: dbtRunnerResult) -> int: def extract_freshness_warn_msg(result: FullOutputSubprocessResult) -> Tuple[List[str], List[str]]: log_list = result.full_output - test_names = [] - test_results = [] + node_names = [] + node_results = [] for line in log_list: if DBT_FRESHNESS_WARN_MSG in line: - test_name = line.split(DBT_FRESHNESS_WARN_MSG)[1].split(" ")[1] - test_names.append(test_name) - test_results.append(line) + node_name = line.split(DBT_FRESHNESS_WARN_MSG)[1].split(" ")[1] + node_names.append(node_name) + node_results.append(line) - return test_names, test_results + return node_names, node_results def extract_log_issues(log_list: List[str]) -> Tuple[List[str], List[str]]: diff --git a/dev/dags/example_source_rendering.py b/dev/dags/example_source_rendering.py index 07f757dd6..716d634c7 100644 --- a/dev/dags/example_source_rendering.py +++ b/dev/dags/example_source_rendering.py @@ -40,4 +40,5 @@ catchup=False, dag_id="source_rendering_dag", default_args={"retries": 2}, + on_warning_callback=lambda context: print(context), ) diff --git a/tests/dbt/parser/test_output.py b/tests/dbt/parser/test_output.py index 4fe11669f..fd19f8bbc 100644 --- a/tests/dbt/parser/test_output.py +++ b/tests/dbt/parser/test_output.py @@ -6,10 +6,12 @@ from cosmos.dbt.parser.output import ( extract_dbt_runner_issues, + extract_freshness_warn_msg, extract_log_issues, parse_number_of_warnings_dbt_runner, parse_number_of_warnings_subprocess, ) +from cosmos.hooks.subprocess import FullOutputSubprocessResult @pytest.mark.parametrize( @@ -112,3 +114,23 @@ def test_extract_dbt_runner_issues_with_status_levels(): assert node_names == ["node1", "node2"] assert node_results == ["An error message", "A failure message"] + + +def test_extract_freshness_warn_msg(): + result = FullOutputSubprocessResult( + full_output=[ + "Info: some other log message", + "INFO - 11:50:42 1 of 1 WARN freshness of postgres_db.raw_orders ................................ [WARN in 0.01s]", + "INFO - 11:50:42", + "INFO - 11:50:42 Finished running 1 source in 0 hours 0 minutes and 0.04 seconds (0.04s).", + "INFO - 11:50:42 Done.", + ], + output="INFO - 11:50:42 Done.", + exit_code=0, + ) + node_names, node_results = extract_freshness_warn_msg(result) + + assert node_names == ["postgres_db.raw_orders"] + assert node_results == [ + "INFO - 11:50:42 1 of 1 WARN freshness of postgres_db.raw_orders ................................ [WARN in 0.01s]" + ] diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 464ca2e92..5ea3d423d 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1159,6 +1159,38 @@ def test_store_freshness_not_store_compiled_sql(mock_context, mock_session): assert instance.freshness == "" +@pytest.mark.parametrize( + "invocation_mode, expected_extract_function", + [ + (InvocationMode.SUBPROCESS, "extract_freshness_warn_msg"), + (InvocationMode.DBT_RUNNER, "extract_dbt_runner_issues"), + ], +) +def test_handle_warnings(invocation_mode, expected_extract_function, mock_context): + result = MagicMock() + + instance = DbtSourceLocalOperator( + task_id="test", + profile_config=None, + project_dir="my/dir", + on_warning_callback=lambda context: print(context), + invocation_mode=invocation_mode, + ) + + with patch(f"cosmos.operators.local.{expected_extract_function}") as mock_extract_issues, patch.object( + instance, "on_warning_callback" + ) as mock_on_warning_callback: + mock_extract_issues.return_value = (["test_name1", "test_name2"], ["test_name1", "test_name2"]) + + instance._handle_warnings(result, mock_context) + + mock_extract_issues.assert_called_once_with(result) + + mock_on_warning_callback.assert_called_once_with( + {**mock_context, "test_names": ["test_name1", "test_name2"], "test_results": ["test_name1", "test_name2"]} + ) + + def test_dbt_compile_local_operator_initialisation(): operator = DbtCompileLocalOperator( task_id="fake-task", From 76c15f2b6585b1c3ce5d65e178a2d298e1b87a00 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 19 Dec 2024 15:51:28 +0530 Subject: [PATCH 4/5] Add docs --- cosmos/airflow/graph.py | 3 ++- cosmos/core/airflow.py | 1 - dev/dags/example_source_rendering.py | 3 +++ docs/configuration/source-nodes-rendering.rst | 13 +++++++++++++ 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 249a2d0ee..09d9a51cd 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -177,7 +177,8 @@ def create_task_metadata( :param dbt_dag_task_group_identifier: Identifier to refer to the DbtDAG or DbtTaskGroup in the DAG. :param use_task_group: It determines whether to use the name as a prefix for the task id or not. If it is False, then use the name as a prefix for the task id, otherwise do not. - :param on_warning_callback: + :param on_warning_callback: A callback function called on warnings with additional Context variables “test_names” + and “test_results” of type List. This is param available for dbt test and dbt source freshness command. :returns: The metadata necessary to instantiate the source dbt node as an Airflow task. """ dbt_resource_to_class = create_dbt_resource_to_class(test_behavior) diff --git a/cosmos/core/airflow.py b/cosmos/core/airflow.py index f26568ee7..be8fd5b7c 100644 --- a/cosmos/core/airflow.py +++ b/cosmos/core/airflow.py @@ -35,7 +35,6 @@ def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None) for k, v in task.airflow_task_config.items(): task_kwargs[k] = v - # Set the on_warning_callback of source node in task_kwargs on_warning_callback = task.extra_context.get("on_warning_callback") if on_warning_callback: task_kwargs["on_warning_callback"] = on_warning_callback diff --git a/dev/dags/example_source_rendering.py b/dev/dags/example_source_rendering.py index 716d634c7..2be1cda62 100644 --- a/dev/dags/example_source_rendering.py +++ b/dev/dags/example_source_rendering.py @@ -23,6 +23,8 @@ ), ) +# [START cosmos_source_node_example] + source_rendering_dag = DbtDag( # dbt/cosmos-specific parameters project_config=ProjectConfig( @@ -42,3 +44,4 @@ default_args={"retries": 2}, on_warning_callback=lambda context: print(context), ) +# [END cosmos_source_node_example] diff --git a/docs/configuration/source-nodes-rendering.rst b/docs/configuration/source-nodes-rendering.rst index ae1417361..2593c5b72 100644 --- a/docs/configuration/source-nodes-rendering.rst +++ b/docs/configuration/source-nodes-rendering.rst @@ -34,3 +34,16 @@ Example: source_rendering_behavior=SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS, ) ) + + +on_warning_callback Callback +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The ``on_warning_callback`` is a callback parameter available on the ``DbtSourceLocalOperator``. This callback is triggered when a warning occurs during the execution of the ``dbt source freshness`` command. The callback accepts the task context, which includes additional parameters: test_names and test_results + +Example: + +.. literalinclude:: ../../dev/dags/example_source_rendering.py/ + :language: python + :start-after: [START cosmos_source_node_example] + :end-before: [END cosmos_source_node_example] From 8d7ad6637103772753f3e2a61096f2a5d48f0a9a Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 19 Dec 2024 17:31:15 +0530 Subject: [PATCH 5/5] Apply review suggestions --- cosmos/airflow/graph.py | 2 +- cosmos/core/airflow.py | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 09d9a51cd..c5eeb5d88 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -199,7 +199,7 @@ def create_task_metadata( else: task_id = f"{node.name}_run" elif node.resource_type == DbtResourceType.SOURCE: - extra_context["on_warning_callback"] = on_warning_callback + args["on_warning_callback"] = on_warning_callback if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS diff --git a/cosmos/core/airflow.py b/cosmos/core/airflow.py index be8fd5b7c..e25404aed 100644 --- a/cosmos/core/airflow.py +++ b/cosmos/core/airflow.py @@ -35,11 +35,6 @@ def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None) for k, v in task.airflow_task_config.items(): task_kwargs[k] = v - on_warning_callback = task.extra_context.get("on_warning_callback") - if on_warning_callback: - task_kwargs["on_warning_callback"] = on_warning_callback - task.extra_context.pop("on_warning_callback", None) - airflow_task = Operator( task_id=task.id, dag=dag,