diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7acebacc4..85a22f5f2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -43,14 +43,14 @@ repos: - id: remove-tabs exclude: ^docs/make.bat$|^docs/Makefile$|^dev/dags/dbt/jaffle_shop/seeds/raw_orders.csv$ - repo: https://github.com/asottile/pyupgrade - rev: v3.10.1 + rev: v3.13.0 hooks: - id: pyupgrade args: - --py37-plus - --keep-runtime-typing - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.0.288 + rev: v0.0.291 hooks: - id: ruff args: diff --git a/cosmos/__init__.py b/cosmos/__init__.py index c0f00e3a5..fc58a7b99 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -9,15 +9,15 @@ from cosmos.airflow.dag import DbtDag from cosmos.airflow.task_group import DbtTaskGroup -from cosmos.constants import LoadMode, TestBehavior, ExecutionMode -from cosmos.operators.lazy_load import MissingPackage from cosmos.config import ( ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig, ) - +from cosmos.constants import LoadMode, TestBehavior, ExecutionMode +from cosmos.log import get_logger +from cosmos.operators.lazy_load import MissingPackage from cosmos.operators.local import ( DbtDepsLocalOperator, DbtLSLocalOperator, @@ -28,6 +28,8 @@ DbtTestLocalOperator, ) +logger = get_logger() + try: from cosmos.operators.docker import ( DbtLSDockerOperator, @@ -57,7 +59,8 @@ DbtSnapshotKubernetesOperator, DbtTestKubernetesOperator, ) -except ImportError: +except ImportError as error: + logger.exception(error) DbtLSKubernetesOperator = MissingPackage( "cosmos.operators.kubernetes.DbtLSKubernetesOperator", "kubernetes", diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index d03af5f1f..caf992b78 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -83,7 +83,7 @@ def create_test_task_metadata( def create_task_metadata( - node: DbtNode, execution_mode: ExecutionMode, args: dict[str, Any], use_name_as_task_id_prefix: bool = True + node: DbtNode, execution_mode: ExecutionMode, args: dict[str, Any], use_task_group: bool = False ) -> TaskMetadata | None: """ Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node. @@ -106,9 +106,9 @@ def create_task_metadata( if hasattr(node.resource_type, "value") and node.resource_type in dbt_resource_to_class: if node.resource_type == DbtResourceType.MODEL: - if use_name_as_task_id_prefix: - task_id = f"{node.name}_run" - else: + task_id = f"{node.name}_run" + + if use_task_group is True: task_id = "run" else: task_id = f"{node.name}_{node.resource_type.value}" @@ -167,14 +167,17 @@ def build_airflow_graph( # The exception are the test nodes, since it would be too slow to run test tasks individually. # If test_behaviour=="after_each", each model task will be bundled with a test task, using TaskGroup for node_id, node in nodes.items(): + use_task_group = ( + node.resource_type == DbtResourceType.MODEL + and test_behavior == TestBehavior.AFTER_EACH + and node.has_test is True + ) task_meta = create_task_metadata( - node=node, - execution_mode=execution_mode, - args=task_args, - use_name_as_task_id_prefix=test_behavior != TestBehavior.AFTER_EACH, + node=node, execution_mode=execution_mode, args=task_args, use_task_group=use_task_group ) + if task_meta and node.resource_type != DbtResourceType.TEST: - if node.resource_type == DbtResourceType.MODEL and test_behavior == TestBehavior.AFTER_EACH: + if use_task_group is True: with TaskGroup(dag=dag, group_id=node.name, parent_group=task_group) as model_task_group: task = create_airflow_task(task_meta, dag, task_group=model_task_group) test_meta = create_test_task_metadata( diff --git a/cosmos/config.py b/cosmos/config.py index c249c27cc..c61a5a712 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -81,7 +81,7 @@ def validate_project(self) -> None: } for name, path in mandatory_paths.items(): if path is None or not Path(path).exists(): - raise CosmosValueError(f"Could not find {name} at {project_yml_path}") + raise CosmosValueError(f"Could not find {name} at {path}") def is_manifest_available(self) -> bool: """ diff --git a/cosmos/converter.py b/cosmos/converter.py index 05b523a1d..c97d9274e 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -154,7 +154,6 @@ def __init__( "profile_config": profile_config, "emit_datasets": emit_datasets, } - if dbt_executable_path: task_args["dbt_executable_path"] = dbt_executable_path diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 07a55ad79..1ad6c1737 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -50,6 +50,7 @@ class DbtNode: file_path: Path tags: list[str] = field(default_factory=lambda: []) config: dict[str, Any] = field(default_factory=lambda: {}) + has_test: bool = False class DbtGraph: @@ -193,9 +194,10 @@ def load_via_dbt_ls(self) -> None: env=env, ) stdout, stderr = process.communicate() + returncode = process.returncode logger.debug("dbt deps output: %s", stdout) - if stderr or "Error" in stdout: + if returncode or "Error" in stdout: details = stderr or stdout raise CosmosLoadDbtException(f"Unable to run dbt deps command due to the error:\n{details}") @@ -222,6 +224,7 @@ def load_via_dbt_ls(self) -> None: ) stdout, stderr = process.communicate() + returncode = process.returncode logger.debug("dbt output: %s", stdout) log_filepath = log_dir / DBT_LOG_FILENAME @@ -231,14 +234,14 @@ def load_via_dbt_ls(self) -> None: for line in logfile: logger.debug(line.strip()) - if stderr or "Error" in stdout: - if 'Run "dbt deps" to install package dependencies' in stdout: - raise CosmosLoadDbtException( - "Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True." - ) - else: - details = stderr or stdout - raise CosmosLoadDbtException(f"Unable to run dbt ls command due to the error:\n{details}") + if 'Run "dbt deps" to install package dependencies' in stdout: + raise CosmosLoadDbtException( + "Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True." + ) + + if returncode or "Error" in stdout: + details = stderr or stdout + raise CosmosLoadDbtException(f"Unable to run dbt ls command due to the error:\n{details}") nodes = {} for line in stdout.split("\n"): @@ -262,6 +265,8 @@ def load_via_dbt_ls(self) -> None: self.nodes = nodes self.filtered_nodes = nodes + self.update_node_dependency() + logger.info("Total nodes: %i", len(self.nodes)) logger.info("Total filtered nodes: %i", len(self.nodes)) @@ -306,6 +311,8 @@ def load_via_custom_parser(self) -> None: project_dir=self.project.dir, nodes=nodes, select=self.select, exclude=self.exclude ) + self.update_node_dependency() + logger.info("Total nodes: %i", len(self.nodes)) logger.info("Total filtered nodes: %i", len(self.nodes)) @@ -335,11 +342,28 @@ def load_from_dbt_manifest(self) -> None: tags=node_dict["tags"], config=node_dict["config"], ) + nodes[node.unique_id] = node self.nodes = nodes self.filtered_nodes = select_nodes( project_dir=self.project.dir, nodes=nodes, select=self.select, exclude=self.exclude ) + + self.update_node_dependency() + logger.info("Total nodes: %i", len(self.nodes)) logger.info("Total filtered nodes: %i", len(self.nodes)) + + def update_node_dependency(self) -> None: + """ + This will update the property `has_text` if node has `dbt` test + + Updates in-place: + * self.filtered_nodes + """ + for _, node in self.filtered_nodes.items(): + if node.resource_type == DbtResourceType.TEST: + for node_id in node.depends_on: + if node_id in self.filtered_nodes: + self.filtered_nodes[node_id].has_test = True diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index b61dbd5fb..d43a2d241 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -33,6 +33,7 @@ class DbtBaseOperator(BaseOperator): :param cache_selected_only: :param no_version_check: dbt optional argument - If set, skip ensuring dbt's version matches the one specified in the dbt_project.yml file ('require-dbt-version') + :param emit_datasets: Enable emitting inlets and outlets during task execution :param fail_fast: dbt optional argument to make dbt exit immediately if a single resource fails to build. :param quiet: dbt optional argument to show only error logs in stdout :param warn_error: dbt optional argument to convert dbt warnings into errors @@ -87,6 +88,7 @@ def __init__( selector: str | None = None, vars: dict[str, str] | None = None, models: str | None = None, + emit_datasets: bool = True, cache_selected_only: bool = False, no_version_check: bool = False, fail_fast: bool = False, @@ -112,6 +114,7 @@ def __init__( self.selector = selector self.vars = vars self.models = models + self.emit_datasets = emit_datasets self.cache_selected_only = cache_selected_only self.no_version_check = no_version_check self.fail_fast = fail_fast diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index 38ca47452..996bbc9dd 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -13,17 +13,22 @@ logger = get_logger(__name__) -# kubernetes is an optional dependency, so we need to check if it's installed try: + # apache-airflow-providers-cncf-kubernetes >= 7.4.0 from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters import ( convert_env_vars, ) from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator except ImportError: - raise ImportError( - "Could not import KubernetesPodOperator. Ensure you've installed the Kubernetes provider " - "separately or with with `pip install astronomer-cosmos[...,kubernetes]`." - ) + try: + # apache-airflow-providers-cncf-kubernetes < 7.4.0 + from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator + except ImportError as error: + logger.exception(error) + raise ImportError( + "Could not import KubernetesPodOperator. Ensure you've installed the Kubernetes provider " + "separately or with with `pip install astronomer-cosmos[...,kubernetes]`." + ) class DbtKubernetesBaseOperator(KubernetesPodOperator, DbtBaseOperator): # type: ignore @@ -70,6 +75,9 @@ def build_kube_args(self, context: Context, cmd_flags: list[str] | None = None) if self.profile_config.target_name: dbt_cmd.extend(["--target", self.profile_config.target_name]) + if self.project_dir: + dbt_cmd.extend(["--project-dir", str(self.project_dir)]) + # set env vars self.build_env_args(env_vars) self.arguments = dbt_cmd diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 4888583bb..52428753e 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -40,9 +40,7 @@ FullOutputSubprocessHook, FullOutputSubprocessResult, ) -from cosmos.dbt.parser.output import ( - extract_log_issues, -) +from cosmos.dbt.parser.output import extract_log_issues, parse_output logger = get_logger(__name__) @@ -82,6 +80,7 @@ class DbtLocalBaseOperator(DbtBaseOperator): :param profile_name: A name to use for the dbt profile. If not provided, and no profile target is found in your project's dbt_project.yml, "cosmos_profile" is used. :param install_deps: If true, install dependencies before running the command + :param install_deps: If true, the operator will set inlets and outlets :param callback: A callback function called on after a dbt run with a path to the dbt project directory. :param target_name: A name to use for the dbt target. If not provided, and no target is found in your project's dbt_project.yml, "cosmos_target" is used. @@ -99,7 +98,6 @@ def __init__( install_deps: bool = False, callback: Callable[[str], None] | None = None, should_store_compiled_sql: bool = True, - emit_datasets: bool = True, **kwargs: Any, ) -> None: self.profile_config = profile_config @@ -107,7 +105,6 @@ def __init__( self.callback = callback self.compiled_sql = "" self.should_store_compiled_sql = should_store_compiled_sql - self.emit_datasets = emit_datasets self.openlineage_events_completes: list[RunEvent] = [] super().__init__(**kwargs) @@ -277,7 +274,7 @@ def calculate_openlineage_events_completes( try: events = openlineage_processor.parse() self.openlineage_events_completes = events.completes - except (FileNotFoundError, NotImplementedError): + except (FileNotFoundError, NotImplementedError, ValueError): logger.debug("Unable to parse OpenLineage events", stack_info=True) def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]: @@ -350,11 +347,12 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope job_facets=job_facets, ) - def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None: + def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> FullOutputSubprocessResult: dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags) dbt_cmd = dbt_cmd or [] result = self.run_command(cmd=dbt_cmd, env=env, context=context) logger.info(result.output) + return result def execute(self, context: Context) -> None: self.build_and_run_cmd(context=context) @@ -461,20 +459,6 @@ def __init__( self.base_cmd = ["test"] self.on_warning_callback = on_warning_callback - def _should_run_tests( - self, - result: FullOutputSubprocessResult, - no_tests_message: str = "Nothing to do", - ) -> bool: - """ - Check if any tests are defined to run in the DAG. If tests are defined - and on_warning_callback is set, then function returns True. - - :param result: The output from the build and run command. - """ - - return self.on_warning_callback is not None and no_tests_message not in result.output - def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context) -> None: """ Handles warnings by extracting log issues, creating additional context, and calling the @@ -492,6 +476,13 @@ def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context) if self.on_warning_callback: self.on_warning_callback(warning_context) + def execute(self, context: Context) -> None: + result = self.build_and_run_cmd(context=context) + if self.on_warning_callback and "WARN" in result.output: + warnings = parse_output(result, "WARN") + if warnings > 0: + self._handle_warnings(result, context) + class DbtRunOperationLocalOperator(DbtLocalBaseOperator): """ diff --git a/docs/_static/jaffle_shop_k8s_dag_run.png b/docs/_static/jaffle_shop_k8s_dag_run.png index f2ee1a5e2..e41db8b81 100644 Binary files a/docs/_static/jaffle_shop_k8s_dag_run.png and b/docs/_static/jaffle_shop_k8s_dag_run.png differ diff --git a/docs/getting_started/dbt-airflow-concepts.rst b/docs/getting_started/dbt-airflow-concepts.rst index 8dfe00582..291cec19c 100644 --- a/docs/getting_started/dbt-airflow-concepts.rst +++ b/docs/getting_started/dbt-airflow-concepts.rst @@ -10,28 +10,18 @@ differences, they also share similar concepts. This page aims to list some of these concepts and help those who may be new to Airflow or dbt and are considering to use Cosmos. +.. table:: + :align: left + :widths: auto -+----------------+--------------+---------------------------------------------------------------------------------+-----------------------------------------------------------------------------+--------------------------------------------------------------------------------------+ -| Airflow naming | dbt naming | Description | Differences | References | -+================+==============+=================================================================================+=============================================================================+======================================================================================+ -| DAG | Workflow | Pipeline (Direct Acyclic Graph) that contains a group of steps | Airflow expects upstream tasks to have passed to run downstream tasks. | https://airflow.apache.org/docs/apache-airflow/2.7.1/core-concepts/dags.html | -| | | | dbt can run a subset of tasks assuming upstream tasks were run. | https://docs.getdbt.com/docs/introduction | -+----------------+--------------+---------------------------------------------------------------------------------+-----------------------------------------------------------------------------+--------------------------------------------------------------------------------------+ -| Task | Node | Step within a pipeline (DAG or workflow) | In dbt, these are usually transformations that run on a remote database. | https://docs.getdbt.com/reference/node-selection/syntax | -| | | | In Airflow, steps can be anything, running locally in Airflow or remotely. | https://airflow.apache.org/docs/apache-airflow/2.7.1/core-concepts/tasks.html | -+----------------+--------------+---------------------------------------------------------------------------------+-----------------------------------------------------------------------------+--------------------------------------------------------------------------------------+ -| Language | Language | Programming or declarative language used to define pipelines and steps. | In dbt, users write SQL, YML and Python to define the steps of a pipeline. | https://docs.getdbt.com/docs/introduction#dbt-optimizes-your-workflow | -| | | | Airflow expects steps and pipelines are written in Python. | https://airflow.apache.org/docs/apache-airflow/stable/public-airflow-interface.html | -+----------------+--------------+---------------------------------------------------------------------------------+-----------------------------------------------------------------------------+--------------------------------------------------------------------------------------+ -| Variables | Variables | Key-value configuration that can be used in steps and avoids hard-coded values | | https://docs.getdbt.com/docs/build/project-variables | -| | | | | https://airflow.apache.org/docs/apache-airflow/2.7.1/core-concepts/variables.html | -+----------------+--------------+---------------------------------------------------------------------------------+-----------------------------------------------------------------------------+--------------------------------------------------------------------------------------+ -| Templating | Macros | Jinja templating used to access variables, configuration and reference steps | dbt encourages using jinja templating for control structures (if and for). | https://docs.getdbt.com/docs/build/jinja-macros | -| | | | Native in Airflow/Python, used to define variables, macros and filters. | https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html | -+----------------+--------------+---------------------------------------------------------------------------------+-----------------------------------------------------------------------------+--------------------------------------------------------------------------------------+ -| Connection | Profile | Configuration to connect to databases or other services | | https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html | -| | | | | https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles | -+----------------+--------------+---------------------------------------------------------------------------------+-----------------------------------------------------------------------------+--------------------------------------------------------------------------------------+ -| Providers | Adapter | Additional Python libraries that support specific databases or services | | https://airflow.apache.org/docs/apache-airflow-providers/ | -| | | | | https://docs.getdbt.com/guides/dbt-ecosystem/adapter-development/1-what-are-adapters | -+----------------+--------------+---------------------------------------------------------------------------------+-----------------------------------------------------------------------------+--------------------------------------------------------------------------------------+ + =================================================================================================== ==================================================================================================== ==================================================================================== ====================================================================================================================================================== + Airflow naming dbt naming Description Differences + =================================================================================================== ==================================================================================================== ==================================================================================== ====================================================================================================================================================== + `DAG `_ `Workflow `_ Pipeline (Direct Acyclic Graph) that contains a group of steps Airflow expects upstream tasks to have passed to run downstream tasks. dbt can run a subset of tasks assuming upstream tasks were run. + `Task `_ `Node `_ Step within a pipeline (DAG or workflow) In dbt, these are usually transformations that run on a remote database. In Airflow, steps can be anything, running locally in Airflow or remotely. + `Language `_ `Language `_ Programming or declarative language used to define pipelines and steps. In dbt, users write SQL, YML and Python to define the steps of a pipeline. Airflow expects steps and pipelines are written in Python. + `Variables `_ `Variables `_ Key-value configuration that can be used in steps and avoids hard-coded values + `Templating `_ `Macros `_ Jinja templating used to access variables, configuration and reference steps dbt encourages using jinja templating for control structures (if and for). Native in Airflow/Python, used to define variables, macros and filters. + `Connection `_ `Profile `_ Configuration to connect to databases or other services + `Providers `_ `Adapter `_ Additional Python libraries that support specific databases or services + =================================================================================================== ==================================================================================================== ==================================================================================== ====================================================================================================================================================== diff --git a/docs/getting_started/kubernetes.rst b/docs/getting_started/kubernetes.rst index 9a500a047..6d2368997 100644 --- a/docs/getting_started/kubernetes.rst +++ b/docs/getting_started/kubernetes.rst @@ -30,20 +30,27 @@ For instance, .. code-block:: text - DbtTaskGroup( - ... + run_models = DbtTaskGroup( + profile_config=ProfileConfig( + profile_name="postgres_profile", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="postgres_default", + profile_args={ + "schema": "public", + }, + ), + ), + project_config=ProjectConfig(PROJECT_DIR), + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.KUBERNETES, + ), operator_args={ - "queue": "kubernetes", - "image": "dbt-jaffle-shop:1.0.0", - "image_pull_policy": "Always", + "image": DBT_IMAGE, "get_logs": True, "is_delete_operator_pod": False, - "namespace": "default", - "env_vars": { - ... - }, + "secrets": [postgres_password_secret, postgres_host_secret], }, - execution_mode="kubernetes", ) Step-by-step instructions @@ -53,7 +60,7 @@ Using installed `Kind `_, you can setup a local kuber .. code-block:: bash - kind cluster create + kind create cluster Deploy a Postgres pod to Kind using `Helm `_ diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 7b539bb5b..bd3777209 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -36,6 +36,7 @@ file_path=SAMPLE_PROJ_PATH / "gen2/models/parent.sql", tags=["has_child"], config={"materialized": "view"}, + has_test=True, ) test_parent_node = DbtNode( name="test_parent", unique_id="test_parent", resource_type=DbtResourceType.TEST, depends_on=["parent"], file_path="" @@ -49,15 +50,8 @@ tags=["nightly"], config={"materialized": "table"}, ) -test_child_node = DbtNode( - name="test_child", - unique_id="test_child", - resource_type=DbtResourceType.TEST, - depends_on=["child"], - file_path="", -) -sample_nodes_list = [parent_seed, parent_node, test_parent_node, child_node, test_child_node] +sample_nodes_list = [parent_seed, parent_node, test_parent_node, child_node] sample_nodes = {node.unique_id: node for node in sample_nodes_list} @@ -93,21 +87,18 @@ def test_build_airflow_graph_with_after_each(): "seed_parent_seed", "parent.run", "parent.test", - "child.run", - "child.test", + "child_run", ] + assert topological_sort == expected_sort task_groups = dag.task_group_dict - assert len(task_groups) == 2 + assert len(task_groups) == 1 assert task_groups["parent"].upstream_task_ids == {"seed_parent_seed"} assert list(task_groups["parent"].children.keys()) == ["parent.run", "parent.test"] - assert task_groups["child"].upstream_task_ids == {"parent.test"} - assert list(task_groups["child"].children.keys()) == ["child.run", "child.test"] - assert len(dag.leaves) == 1 - assert dag.leaves[0].task_id == "child.test" + assert dag.leaves[0].task_id == "child_run" @pytest.mark.skipif( @@ -231,7 +222,7 @@ def test_create_task_metadata_model(caplog): assert metadata.arguments == {"models": "my_model"} -def test_create_task_metadata_model_use_name_as_task_id_prefix(caplog): +def test_create_task_metadata_model_use_task_group(caplog): child_node = DbtNode( name="my_model", unique_id="my_folder.my_model", @@ -241,14 +232,12 @@ def test_create_task_metadata_model_use_name_as_task_id_prefix(caplog): tags=[], config={}, ) - metadata = create_task_metadata( - child_node, execution_mode=ExecutionMode.LOCAL, args={}, use_name_as_task_id_prefix=False - ) + metadata = create_task_metadata(child_node, execution_mode=ExecutionMode.LOCAL, args={}, use_task_group=True) assert metadata.id == "run" -@pytest.mark.parametrize("use_name_as_task_id_prefix", (None, True, False)) -def test_create_task_metadata_seed(caplog, use_name_as_task_id_prefix): +@pytest.mark.parametrize("use_task_group", (None, True, False)) +def test_create_task_metadata_seed(caplog, use_task_group): sample_node = DbtNode( name="my_seed", unique_id="my_folder.my_seed", @@ -258,14 +247,14 @@ def test_create_task_metadata_seed(caplog, use_name_as_task_id_prefix): tags=[], config={}, ) - if use_name_as_task_id_prefix is None: + if use_task_group is None: metadata = create_task_metadata(sample_node, execution_mode=ExecutionMode.DOCKER, args={}) else: metadata = create_task_metadata( sample_node, execution_mode=ExecutionMode.DOCKER, args={}, - use_name_as_task_id_prefix=use_name_as_task_id_prefix, + use_task_group=use_task_group, ) assert metadata.id == "my_seed_seed" assert metadata.operator_class == "cosmos.operators.docker.DbtSeedDockerOperator" diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 3a2a3eaa9..317dce0bb 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -6,8 +6,8 @@ import pytest from cosmos.config import ProfileConfig -from cosmos.constants import ExecutionMode, DbtResourceType -from cosmos.dbt.graph import DbtGraph, LoadMode, CosmosLoadDbtException +from cosmos.constants import DbtResourceType, ExecutionMode +from cosmos.dbt.graph import CosmosLoadDbtException, DbtGraph, LoadMode from cosmos.dbt.project import DbtProject from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -132,6 +132,7 @@ def test_load( @patch("cosmos.dbt.graph.Popen") def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(mock_popen, tmp_dbt_project_dir): mock_popen().communicate.return_value = ("", "") + mock_popen().returncode = 0 assert not (tmp_dbt_project_dir / "target").exists() assert not (tmp_dbt_project_dir / "logs").exists() @@ -276,6 +277,53 @@ def test_load_via_dbt_ls_without_dbt_deps(): assert err_info.value.args[0] == expected +@pytest.mark.integration +@patch("cosmos.dbt.graph.Popen") +def test_load_via_dbt_ls_with_zero_returncode_and_non_empty_stderr(mock_popen, tmp_dbt_project_dir): + mock_popen().communicate.return_value = ("", "Some stderr warnings") + mock_popen().returncode = 0 + + dbt_project = DbtProject(name=DBT_PIPELINE_NAME, root_dir=tmp_dbt_project_dir) + dbt_graph = DbtGraph( + project=dbt_project, + profile_config=ProfileConfig( + profile_name="default", + target_name="default", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="airflow_db", + profile_args={"schema": "public"}, + ), + ), + ) + + dbt_graph.load_via_dbt_ls() # does not raise exception + + +@pytest.mark.integration +@patch("cosmos.dbt.graph.Popen") +def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen): + mock_popen().communicate.return_value = ("", "Some stderr message") + mock_popen().returncode = 1 + + dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) + dbt_graph = DbtGraph( + project=dbt_project, + profile_config=ProfileConfig( + profile_name="default", + target_name="default", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="airflow_db", + profile_args={"schema": "public"}, + ), + ), + ) + with pytest.raises(CosmosLoadDbtException) as err_info: + dbt_graph.load_via_dbt_ls() + + expected = "Unable to run dbt deps command due to the error:\nSome stderr message" + assert err_info.value.args[0] == expected + + @pytest.mark.integration @patch("cosmos.dbt.graph.Popen.communicate", return_value=("Some Runtime Error", "")) def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate): @@ -294,6 +342,7 @@ def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate): ) with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load_via_dbt_ls() + expected = "Unable to run dbt deps command due to the error:\nSome Runtime Error" assert err_info.value.args[0] == expected mock_popen_communicate.assert_called_once() @@ -312,3 +361,32 @@ def test_load_via_load_via_custom_parser(pipeline_name): assert dbt_graph.nodes == dbt_graph.filtered_nodes # the custom parser does not add dbt test nodes assert len(dbt_graph.nodes) == 8 + + +@patch("cosmos.dbt.graph.DbtGraph.update_node_dependency", return_value=None) +def test_update_node_dependency_called(mock_update_node_dependency): + dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) + dbt_graph = DbtGraph(project=dbt_project) + dbt_graph.load() + + assert mock_update_node_dependency.called + + +def test_update_node_dependency_target_exist(): + dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) + dbt_graph = DbtGraph(project=dbt_project) + dbt_graph.load() + + for _, nodes in dbt_graph.nodes.items(): + if nodes.resource_type == DbtResourceType.TEST: + for node_id in nodes.depends_on: + assert dbt_graph.nodes[node_id].has_test is True + + +def test_update_node_dependency_test_not_exist(): + dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) + dbt_graph = DbtGraph(project=dbt_project, exclude=["config.materialized:test"]) + dbt_graph.load_from_dbt_manifest() + + for _, nodes in dbt_graph.filtered_nodes.items(): + assert nodes.has_test is False diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py index 67d3ff137..839a6db54 100644 --- a/tests/operators/test_kubernetes.py +++ b/tests/operators/test_kubernetes.py @@ -98,6 +98,8 @@ def test_dbt_kubernetes_build_command(): "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n" "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", "--no-version-check", + "--project-dir", + "my/dir", ] @@ -150,6 +152,8 @@ def test_created_pod(test_hook): "data_interval_start.strftime(''%Y%m%d%H%M%S'') " "}}'\n", "--no-version-check", + "--project-dir", + "my/dir", ], "command": [], "env": [{"name": "FOO", "value": "BAR", "value_from": None}], diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 94b0f8e27..728eea079 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1,3 +1,6 @@ +import os +import shutil +import tempfile from pathlib import Path from unittest.mock import MagicMock, patch @@ -28,6 +31,9 @@ DBT_PROJ_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt/jaffle_shop" +MINI_DBT_PROJ_DIR = Path(__file__).parent.parent / "sample/mini" +MINI_DBT_PROJ_DIR_FAILING_SCHEMA = MINI_DBT_PROJ_DIR / "schema_failing_test.yml" +MINI_DBT_PROJ_PROFILE = MINI_DBT_PROJ_DIR / "profiles.yml" profile_config = ProfileConfig( profile_name="default", @@ -44,6 +50,20 @@ ), ) +mini_profile_config = ProfileConfig(profile_name="mini", target_name="dev", profiles_yml_filepath=MINI_DBT_PROJ_PROFILE) + + +@pytest.fixture +def failing_test_dbt_project(tmp_path): + tmp_dir = tempfile.TemporaryDirectory() + tmp_dir_path = Path(tmp_dir.name) / "mini" + shutil.copytree(MINI_DBT_PROJ_DIR, tmp_dir_path) + target_schema = tmp_dir_path / "models/schema.yml" + target_schema.exists() and os.remove(target_schema) + shutil.copy(MINI_DBT_PROJ_DIR_FAILING_SCHEMA, target_schema) + yield tmp_dir_path + tmp_dir.cleanup() + def test_dbt_base_operator_add_global_flags() -> None: dbt_base_operator = DbtLocalBaseOperator( @@ -167,6 +187,7 @@ def test_run_operator_dataset_inlets_and_outlets(): task_id="run", dbt_cmd_flags=["--models", "stg_customers"], install_deps=True, + append_env=True, ) test_operator = DbtTestLocalOperator( profile_config=real_profile_config, @@ -174,6 +195,7 @@ def test_run_operator_dataset_inlets_and_outlets(): task_id="test", dbt_cmd_flags=["--models", "stg_customers"], install_deps=True, + append_env=True, ) run_operator run_test_dag(dag) @@ -183,6 +205,52 @@ def test_run_operator_dataset_inlets_and_outlets(): assert test_operator.outlets == [] +@pytest.mark.integration +def test_run_test_operator_with_callback(failing_test_dbt_project): + on_warning_callback = MagicMock() + + with DAG("test-id-2", start_date=datetime(2022, 1, 1)) as dag: + run_operator = DbtSeedLocalOperator( + profile_config=mini_profile_config, + project_dir=failing_test_dbt_project, + task_id="run", + append_env=True, + ) + test_operator = DbtTestLocalOperator( + profile_config=mini_profile_config, + project_dir=failing_test_dbt_project, + task_id="test", + append_env=True, + on_warning_callback=on_warning_callback, + ) + run_operator >> test_operator + run_test_dag(dag) + assert on_warning_callback.called + + +@pytest.mark.integration +def test_run_test_operator_without_callback(): + on_warning_callback = MagicMock() + + with DAG("test-id-3", start_date=datetime(2022, 1, 1)) as dag: + run_operator = DbtSeedLocalOperator( + profile_config=mini_profile_config, + project_dir=MINI_DBT_PROJ_DIR, + task_id="run", + append_env=True, + ) + test_operator = DbtTestLocalOperator( + profile_config=mini_profile_config, + project_dir=MINI_DBT_PROJ_DIR, + task_id="test", + append_env=True, + on_warning_callback=on_warning_callback, + ) + run_operator >> test_operator + run_test_dag(dag) + assert not on_warning_callback.called + + @pytest.mark.integration def test_run_operator_emits_events(): class MockRun: diff --git a/tests/sample/mini/dbt_project.yml b/tests/sample/mini/dbt_project.yml new file mode 100644 index 000000000..eaa39d188 --- /dev/null +++ b/tests/sample/mini/dbt_project.yml @@ -0,0 +1,20 @@ +name: 'mini' + +config-version: 2 +version: '0.1' + +profile: 'mini' + +model-paths: ["models"] +seed-paths: ["seeds"] +test-paths: ["tests"] +analysis-paths: ["analysis"] +macro-paths: ["macros"] + +target-path: "target" +clean-targets: + - "target" + - "dbt_modules" + - "logs" + +require-dbt-version: [">=1.0.0", "<2.0.0"] diff --git a/tests/sample/mini/models/.gitkeep b/tests/sample/mini/models/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/tests/sample/mini/profiles.yml b/tests/sample/mini/profiles.yml new file mode 100644 index 000000000..0c53a3894 --- /dev/null +++ b/tests/sample/mini/profiles.yml @@ -0,0 +1,12 @@ +mini: + target: dev + outputs: + dev: + type: postgres + host: "{{ env_var('POSTGRES_HOST') }}" + user: "{{ env_var('POSTGRES_USER') }}" + password: "{{ env_var('POSTGRES_PASSWORD') }}" + port: "{{ env_var('POSTGRES_PORT') | int }}" + dbname: "{{ env_var('POSTGRES_DB') }}" + schema: "{{ env_var('POSTGRES_SCHEMA') }}" + threads: 4 diff --git a/tests/sample/mini/schema_failing_test.yml b/tests/sample/mini/schema_failing_test.yml new file mode 100644 index 000000000..d97d1733d --- /dev/null +++ b/tests/sample/mini/schema_failing_test.yml @@ -0,0 +1,18 @@ +version: 2 + +seeds: + + - name: mini_orders + description: This table has basic information about orders, as well as some derived facts based on payments + + columns: + + - name: status + description: 'Order status' + tests: + - accepted_values: + # this will intentionally fail, since the seed has other values for this column + values: ['placed'] + config: + severity: warn + warn_if: ">1" diff --git a/tests/sample/mini/seeds/.gitkeep b/tests/sample/mini/seeds/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/tests/sample/mini/seeds/mini_orders.csv b/tests/sample/mini/seeds/mini_orders.csv new file mode 100644 index 000000000..31ac98589 --- /dev/null +++ b/tests/sample/mini/seeds/mini_orders.csv @@ -0,0 +1,10 @@ +id,user_id,order_date,status +1,1,2018-01-01,returned +2,3,2018-01-02,completed +3,22,2018-01-26,return_pending +4,9,2018-03-17,shipped +75,69,2018-03-18,completed +76,25,2018-03-20,completed +77,35,2018-03-21,shipped +78,90,2018-03-23,shipped +6,68,2018-03-26,placed diff --git a/tests/sample/mini/snapshots/.gitkeep b/tests/sample/mini/snapshots/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/tests/sample/profiles.yml b/tests/sample/profiles.yml new file mode 100644 index 000000000..359c1e6eb --- /dev/null +++ b/tests/sample/profiles.yml @@ -0,0 +1,12 @@ +default: + target: dev + outputs: + dev: + type: postgres + host: "localhost" + user: "postgres" + password: "postgres" + port: 5432 + dbname: "postgres" + schema: "public" + threads: 4 diff --git a/tests/test_converter.py b/tests/test_converter.py index 99d8c32ac..0d321730a 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -1,8 +1,17 @@ +from pathlib import Path + +from unittest.mock import patch import pytest + +from cosmos.converter import DbtToAirflowConverter, validate_arguments +from cosmos.constants import DbtResourceType, ExecutionMode +from cosmos.config import ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig +from cosmos.dbt.graph import DbtNode from cosmos.exceptions import CosmosValueError -from cosmos.converter import validate_arguments +SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml" +SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/" @pytest.mark.parametrize("argument_key", ["tags", "paths"]) @@ -16,3 +25,45 @@ def test_validate_arguments_tags(argument_key): validate_arguments(select, exclude, profile_args, task_args) expected = f"Can't specify the same {selector_name} in `select` and `exclude`: {{'b'}}" assert err.value.args[0] == expected + + +parent_seed = DbtNode( + name="seed_parent", + unique_id="seed_parent", + resource_type=DbtResourceType.SEED, + depends_on=[], + file_path="", +) +nodes = {"seed_parent": parent_seed} + + +@pytest.mark.parametrize( + "execution_mode,operator_args", + [ + (ExecutionMode.KUBERNETES, {}), + # (ExecutionMode.DOCKER, {"image": "sample-image"}), + ], +) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) +@patch("cosmos.converter.DbtGraph.load") +def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, operator_args): + """ + This test will raise exceptions if we are trying to pass incorrect arguments to operator constructors. + """ + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) + execution_config = ExecutionConfig(execution_mode=execution_mode) + render_config = RenderConfig(emit_datasets=True) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + converter = DbtToAirflowConverter( + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, + ) + assert converter