From 76c15f2b6585b1c3ce5d65e178a2d298e1b87a00 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 19 Dec 2024 15:51:28 +0530 Subject: [PATCH] Add docs --- cosmos/airflow/graph.py | 3 ++- cosmos/core/airflow.py | 1 - dev/dags/example_source_rendering.py | 3 +++ docs/configuration/source-nodes-rendering.rst | 13 +++++++++++++ 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 249a2d0ee..09d9a51cd 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -177,7 +177,8 @@ def create_task_metadata( :param dbt_dag_task_group_identifier: Identifier to refer to the DbtDAG or DbtTaskGroup in the DAG. :param use_task_group: It determines whether to use the name as a prefix for the task id or not. If it is False, then use the name as a prefix for the task id, otherwise do not. - :param on_warning_callback: + :param on_warning_callback: A callback function called on warnings with additional Context variables “test_names” + and “test_results” of type List. This is param available for dbt test and dbt source freshness command. :returns: The metadata necessary to instantiate the source dbt node as an Airflow task. """ dbt_resource_to_class = create_dbt_resource_to_class(test_behavior) diff --git a/cosmos/core/airflow.py b/cosmos/core/airflow.py index f26568ee7..be8fd5b7c 100644 --- a/cosmos/core/airflow.py +++ b/cosmos/core/airflow.py @@ -35,7 +35,6 @@ def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None) for k, v in task.airflow_task_config.items(): task_kwargs[k] = v - # Set the on_warning_callback of source node in task_kwargs on_warning_callback = task.extra_context.get("on_warning_callback") if on_warning_callback: task_kwargs["on_warning_callback"] = on_warning_callback diff --git a/dev/dags/example_source_rendering.py b/dev/dags/example_source_rendering.py index 716d634c7..2be1cda62 100644 --- a/dev/dags/example_source_rendering.py +++ b/dev/dags/example_source_rendering.py @@ -23,6 +23,8 @@ ), ) +# [START cosmos_source_node_example] + source_rendering_dag = DbtDag( # dbt/cosmos-specific parameters project_config=ProjectConfig( @@ -42,3 +44,4 @@ default_args={"retries": 2}, on_warning_callback=lambda context: print(context), ) +# [END cosmos_source_node_example] diff --git a/docs/configuration/source-nodes-rendering.rst b/docs/configuration/source-nodes-rendering.rst index ae1417361..2593c5b72 100644 --- a/docs/configuration/source-nodes-rendering.rst +++ b/docs/configuration/source-nodes-rendering.rst @@ -34,3 +34,16 @@ Example: source_rendering_behavior=SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS, ) ) + + +on_warning_callback Callback +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The ``on_warning_callback`` is a callback parameter available on the ``DbtSourceLocalOperator``. This callback is triggered when a warning occurs during the execution of the ``dbt source freshness`` command. The callback accepts the task context, which includes additional parameters: test_names and test_results + +Example: + +.. literalinclude:: ../../dev/dags/example_source_rendering.py/ + :language: python + :start-after: [START cosmos_source_node_example] + :end-before: [END cosmos_source_node_example]