diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f6c714f79..590b864b0 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -11,6 +11,31 @@ Features * Add ``DbtDocsGCSOperator`` for uploading dbt docs to GCS by @jbandoro in #616 +1.2.4 (2023-11-14) +------------------ + +Bug fixes + +* Store ``compiled_sql`` even when task fails by @agreenburg in #671 +* Refactor ``LoadMethod.LOCAL`` to use symlinks instead of copying directory by @jbandoro in #660 +* Fix 'Unable to find the dbt executable: dbt' error by @tatiana in #666 +* Fix installing deps when using ``profile_mapping`` & ``ExecutionMode.LOCAL`` by @joppevos in #659 + +Others + +* Docs fix: add execution config to MWAA code example by @ugmuka in #674 + + +1.2.3 (2023-11-09) +------------------ + +Features + +* Add ``ProfileMapping`` for Vertica by @perttus in #540 +* Add ``ProfileMapping`` for Snowflake encrypted private key path by @ivanstillfront in #608 +* Add ``DbtDocsGCSOperator`` for uploading dbt docs to GCS by @jbandoro in #616 + + 1.2.2 (2023-11-06) ------------------ diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 18f675750..f4af1bd63 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,7 +5,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.3.0a1" +__version__ = "1.2.4" from cosmos.airflow.dag import DbtDag from cosmos.airflow.task_group import DbtTaskGroup diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 3a140a235..af854d4f5 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -83,11 +83,11 @@ def create_test_task_metadata( task_args["indirect_selection"] = test_indirect_selection.value if node is not None: if node.resource_type == DbtResourceType.MODEL: - task_args["models"] = node.name + task_args["models"] = node.resource_name elif node.resource_type == DbtResourceType.SOURCE: - task_args["select"] = f"source:{node.unique_id[len('source.'):]}" + task_args["select"] = f"source:{node.resource_name}" else: # tested with node.resource_type == DbtResourceType.SEED or DbtResourceType.SNAPSHOT - task_args["select"] = node.name + task_args["select"] = node.resource_name return TaskMetadata( id=test_task_name, operator_class=calculate_operator_class( @@ -108,8 +108,8 @@ def create_task_metadata( :param execution_mode: Where Cosmos should run each dbt task (e.g. ExecutionMode.LOCAL, ExecutionMode.KUBERNETES). Default is ExecutionMode.LOCAL. :param args: Arguments to be used to instantiate an Airflow Task - :param use_name_as_task_id_prefix: If resource_type is DbtResourceType.MODEL, it determines whether - using name as task id prefix or not. If it is True task_id = _run, else task_id=run. + :param use_task_group: It determines whether to use the name as a prefix for the task id or not. + If it is False, then use the name as a prefix for the task id, otherwise do not. :returns: The metadata necessary to instantiate the source dbt node as an Airflow task. """ dbt_resource_to_class = { @@ -118,7 +118,7 @@ def create_task_metadata( DbtResourceType.SEED: "DbtSeed", DbtResourceType.TEST: "DbtTest", } - args = {**args, **{"models": node.name}} + args = {**args, **{"models": node.resource_name}} if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class: if node.resource_type == DbtResourceType.MODEL: diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 242bca6f9..40154308b 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -42,7 +42,6 @@ class DbtNode: Metadata related to a dbt node (e.g. model, seed, snapshot). """ - name: str unique_id: str resource_type: DbtResourceType depends_on: list[str] @@ -51,6 +50,23 @@ class DbtNode: config: dict[str, Any] = field(default_factory=lambda: {}) has_test: bool = False + @property + def resource_name(self) -> str: + """ + Use this property to retrieve the resource name for command generation, for instance: ["dbt", "run", "--models", f"{resource_name}"]. + The unique_id format is defined as [..](https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details). + For a special case like a versioned model, the unique_id follows this pattern: [model...](https://github.com/dbt-labs/dbt-core/blob/main/core/dbt/contracts/graph/node_args.py#L26C3-L31) + """ + return self.unique_id.split(".", 2)[2] + + @property + def name(self) -> str: + """ + Use this property as the task name or task group name. + Replace period (.) with underscore (_) due to versioned models. + """ + return self.resource_name.replace(".", "_") + def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str: """Run a command in a subprocess, returning the stdout.""" @@ -89,7 +105,6 @@ def parse_dbt_ls_output(project_path: Path, ls_stdout: str) -> dict[str, DbtNode logger.debug("Skipped dbt ls line: %s", line) else: node = DbtNode( - name=node_dict.get("alias", node_dict["name"]), unique_id=node_dict["unique_id"], resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), @@ -195,9 +210,6 @@ def load_via_dbt_ls(self) -> None: This is the most accurate way of loading `dbt` projects and filtering them out, since it uses the `dbt` command line for both parsing and filtering the nodes. - Noted that if dbt project contains versioned models, need to use dbt>=1.6.0 instead. Because, as dbt<1.6.0, - dbt cli doesn't support select a specific versioned models as stg_customers_v1, customers_v1, ... - Updates in-place: * self.nodes * self.filtered_nodes @@ -291,8 +303,7 @@ def load_via_custom_parser(self) -> None: for model_name, model in models: config = {item.split(":")[0]: item.split(":")[-1] for item in model.config.config_selectors} node = DbtNode( - name=model_name, - unique_id=model_name, + unique_id=f"{model.type.value}.{self.project.project_name}.{model_name}", resource_type=DbtResourceType(model.type.value), depends_on=list(model.config.upstream_models), file_path=Path( @@ -325,9 +336,6 @@ def load_from_dbt_manifest(self) -> None: However, since the Manifest does not represent filters, it relies on the Custom Cosmos implementation to filter out the nodes relevant to the user (based on self.exclude and self.select). - Noted that if dbt project contains versioned models, need to use dbt>=1.6.0 instead. Because, as dbt<1.6.0, - dbt cli doesn't support select a specific versioned models as stg_customers_v1, customers_v1, ... - Updates in-place: * self.nodes * self.filtered_nodes @@ -347,7 +355,6 @@ def load_from_dbt_manifest(self) -> None: resources = {**manifest.get("nodes", {}), **manifest.get("sources", {}), **manifest.get("exposures", {})} for unique_id, node_dict in resources.items(): node = DbtNode( - name=node_dict.get("alias", node_dict["name"]), unique_id=unique_id, resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), diff --git a/dev/dags/dbt/model_version/models/schema.yml b/dev/dags/dbt/model_version/models/schema.yml index 40a5a4055..66f1ccedd 100644 --- a/dev/dags/dbt/model_version/models/schema.yml +++ b/dev/dags/dbt/model_version/models/schema.yml @@ -37,7 +37,11 @@ models: - include: all exclude: - full_name + config: + alias: '{{ "customers_" ~ var("division", "USA") ~ "_v1" }}' - v: 2 + config: + alias: '{{ "customers_" ~ var("division", "USA") ~ "_v2" }}' - name: orders description: This table has basic information about orders, as well as some derived facts based on payments diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 6bc244b6b..35e313fca 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -17,43 +17,57 @@ generate_task_or_group, ) from cosmos.config import ProfileConfig -from cosmos.constants import DbtResourceType, ExecutionMode, TestBehavior, TestIndirectSelection +from cosmos.constants import ( + DbtResourceType, + ExecutionMode, + TestBehavior, + TestIndirectSelection, +) from cosmos.dbt.graph import DbtNode from cosmos.profiles import PostgresUserPasswordProfileMapping SAMPLE_PROJ_PATH = Path("/home/user/path/dbt-proj/") parent_seed = DbtNode( - name="seed_parent", - unique_id="seed_parent", + unique_id=f"{DbtResourceType.SEED.value}.{SAMPLE_PROJ_PATH.stem}.seed_parent", resource_type=DbtResourceType.SEED, depends_on=[], file_path="", ) parent_node = DbtNode( - name="parent", - unique_id="parent", + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.parent", resource_type=DbtResourceType.MODEL, - depends_on=["seed_parent"], + depends_on=[parent_seed.unique_id], file_path=SAMPLE_PROJ_PATH / "gen2/models/parent.sql", tags=["has_child"], config={"materialized": "view"}, has_test=True, ) test_parent_node = DbtNode( - name="test_parent", unique_id="test_parent", resource_type=DbtResourceType.TEST, depends_on=["parent"], file_path="" + unique_id=f"{DbtResourceType.TEST.value}.{SAMPLE_PROJ_PATH.stem}.test_parent", + resource_type=DbtResourceType.TEST, + depends_on=[parent_node.unique_id], + file_path="", ) child_node = DbtNode( - name="child", - unique_id="child", + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.child", resource_type=DbtResourceType.MODEL, - depends_on=["parent"], + depends_on=[parent_node.unique_id], file_path=SAMPLE_PROJ_PATH / "gen3/models/child.sql", tags=["nightly"], config={"materialized": "table"}, ) -sample_nodes_list = [parent_seed, parent_node, test_parent_node, child_node] +child2_node = DbtNode( + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.child2.v2", + resource_type=DbtResourceType.MODEL, + depends_on=[parent_node.unique_id], + file_path=SAMPLE_PROJ_PATH / "gen3/models/child2_v2.sql", + tags=["nightly"], + config={"materialized": "table"}, +) + +sample_nodes_list = [parent_seed, parent_node, test_parent_node, child_node, child2_node] sample_nodes = {node.unique_id: node for node in sample_nodes_list} @@ -91,6 +105,7 @@ def test_build_airflow_graph_with_after_each(): "parent.run", "parent.test", "child_run", + "child2_v2_run", ] assert topological_sort == expected_sort @@ -100,15 +115,16 @@ def test_build_airflow_graph_with_after_each(): assert task_groups["parent"].upstream_task_ids == {"seed_parent_seed"} assert list(task_groups["parent"].children.keys()) == ["parent.run", "parent.test"] - assert len(dag.leaves) == 1 + assert len(dag.leaves) == 2 assert dag.leaves[0].task_id == "child_run" + assert dag.leaves[1].task_id == "child2_v2_run" @pytest.mark.parametrize( "node_type,task_suffix", [(DbtResourceType.MODEL, "run"), (DbtResourceType.SEED, "seed"), (DbtResourceType.SNAPSHOT, "snapshot")], ) -def test_create_task_group_for_after_each_supported_nodes(node_type, task_suffix): +def test_create_task_group_for_after_each_supported_nodes(node_type: DbtResourceType, task_suffix): """ dbt test runs tests defined on models, sources, snapshots, and seeds. It expects that you have already created those resources through the appropriate commands. @@ -116,8 +132,7 @@ def test_create_task_group_for_after_each_supported_nodes(node_type, task_suffix """ with DAG("test-task-group-after-each", start_date=datetime(2022, 1, 1)) as dag: node = DbtNode( - name="dbt_node", - unique_id="dbt_node", + unique_id=f"{node_type.value}.{SAMPLE_PROJ_PATH.stem}.dbt_node", resource_type=node_type, file_path=SAMPLE_PROJ_PATH / "gen2/models/parent.sql", tags=["has_child"], @@ -178,7 +193,7 @@ def test_build_airflow_graph_with_after_all(): dbt_project_name="astro_shop", ) topological_sort = [task.task_id for task in dag.topological_sort()] - expected_sort = ["seed_parent_seed", "parent_run", "child_run", "astro_shop_test"] + expected_sort = ["seed_parent_seed", "parent_run", "child_run", "child2_v2_run", "astro_shop_test"] assert topological_sort == expected_sort task_groups = dag.task_group_dict @@ -195,8 +210,7 @@ def test_calculate_operator_class(): def test_calculate_leaves(): grandparent_node = DbtNode( - name="grandparent", - unique_id="grandparent", + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.grandparent", resource_type=DbtResourceType.MODEL, depends_on=[], file_path="", @@ -204,28 +218,25 @@ def test_calculate_leaves(): config={}, ) parent1_node = DbtNode( - name="parent1", - unique_id="parent1", + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.parent1", resource_type=DbtResourceType.MODEL, - depends_on=["grandparent"], + depends_on=[grandparent_node.unique_id], file_path="", tags=[], config={}, ) parent2_node = DbtNode( - name="parent2", - unique_id="parent2", + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.parent2", resource_type=DbtResourceType.MODEL, - depends_on=["grandparent"], + depends_on=[parent1_node.unique_id], file_path="", tags=[], config={}, ) child_node = DbtNode( - name="child", - unique_id="child", + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.child", resource_type=DbtResourceType.MODEL, - depends_on=["parent1", "parent2"], + depends_on=[parent1_node.unique_id, parent2_node.unique_id], file_path="", tags=[], config={}, @@ -235,14 +246,13 @@ def test_calculate_leaves(): nodes = {node.unique_id: node for node in nodes_list} leaves = calculate_leaves(nodes.keys(), nodes) - assert leaves == ["child"] + assert leaves == [f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.child"] @patch("cosmos.airflow.graph.logger.propagate", True) def test_create_task_metadata_unsupported(caplog): child_node = DbtNode( - name="unsupported", - unique_id="unsupported", + unique_id=f"unsupported.{SAMPLE_PROJ_PATH.stem}.unsupported", resource_type="unsupported", depends_on=[], file_path="", @@ -252,7 +262,7 @@ def test_create_task_metadata_unsupported(caplog): response = create_task_metadata(child_node, execution_mode="", args={}) assert response is None expected_msg = ( - "Unavailable conversion function for (node ). " + "Unavailable conversion function for (node ). " "Define a converter function using render_config.node_converters." ) assert caplog.messages[0] == expected_msg @@ -260,8 +270,7 @@ def test_create_task_metadata_unsupported(caplog): def test_create_task_metadata_model(caplog): child_node = DbtNode( - name="my_model", - unique_id="my_folder.my_model", + unique_id=f"{DbtResourceType.MODEL.value}.my_folder.my_model", resource_type=DbtResourceType.MODEL, depends_on=[], file_path="", @@ -274,10 +283,24 @@ def test_create_task_metadata_model(caplog): assert metadata.arguments == {"models": "my_model"} +def test_create_task_metadata_model_with_versions(caplog): + child_node = DbtNode( + unique_id=f"{DbtResourceType.MODEL.value}.my_folder.my_model.v1", + resource_type=DbtResourceType.MODEL, + depends_on=[], + file_path="", + tags=[], + config={}, + ) + metadata = create_task_metadata(child_node, execution_mode=ExecutionMode.LOCAL, args={}) + assert metadata.id == "my_model_v1_run" + assert metadata.operator_class == "cosmos.operators.local.DbtRunLocalOperator" + assert metadata.arguments == {"models": "my_model.v1"} + + def test_create_task_metadata_model_use_task_group(caplog): child_node = DbtNode( - name="my_model", - unique_id="my_folder.my_model", + unique_id=f"{DbtResourceType.MODEL.value}.my_folder.my_model", resource_type=DbtResourceType.MODEL, depends_on=[], file_path=Path(""), @@ -291,8 +314,7 @@ def test_create_task_metadata_model_use_task_group(caplog): @pytest.mark.parametrize("use_task_group", (None, True, False)) def test_create_task_metadata_seed(caplog, use_task_group): sample_node = DbtNode( - name="my_seed", - unique_id="my_folder.my_seed", + unique_id=f"{DbtResourceType.SEED.value}.my_folder.my_seed", resource_type=DbtResourceType.SEED, depends_on=[], file_path="", @@ -320,8 +342,7 @@ def test_create_task_metadata_seed(caplog, use_task_group): def test_create_task_metadata_snapshot(caplog): sample_node = DbtNode( - name="my_snapshot", - unique_id="my_folder.my_snapshot", + unique_id=f"{DbtResourceType.SNAPSHOT.value}.my_folder.my_snapshot", resource_type=DbtResourceType.SNAPSHOT, depends_on=[], file_path="", @@ -337,22 +358,33 @@ def test_create_task_metadata_snapshot(caplog): @pytest.mark.parametrize( "node_type,node_unique_id,test_indirect_selection,additional_arguments", [ - (DbtResourceType.MODEL, "node_name", TestIndirectSelection.EAGER, {"models": "node_name"}), + ( + DbtResourceType.MODEL, + f"{DbtResourceType.MODEL.value}.my_folder.node_name", + TestIndirectSelection.EAGER, + {"models": "node_name"}, + ), + ( + DbtResourceType.MODEL, + f"{DbtResourceType.MODEL.value}.my_folder.node_name.v1", + TestIndirectSelection.EAGER, + {"models": "node_name.v1"}, + ), ( DbtResourceType.SEED, - "node_name", + f"{DbtResourceType.SEED.value}.my_folder.node_name", TestIndirectSelection.CAUTIOUS, {"select": "node_name", "indirect_selection": "cautious"}, ), ( DbtResourceType.SOURCE, - "source.node_name", + f"{DbtResourceType.SOURCE.value}.my_folder.node_name", TestIndirectSelection.BUILDABLE, {"select": "source:node_name", "indirect_selection": "buildable"}, ), ( DbtResourceType.SNAPSHOT, - "node_name", + f"{DbtResourceType.SNAPSHOT.value}.my_folder.node_name", TestIndirectSelection.EMPTY, {"select": "node_name", "indirect_selection": "empty"}, ), @@ -360,7 +392,6 @@ def test_create_task_metadata_snapshot(caplog): ) def test_create_test_task_metadata(node_type, node_unique_id, test_indirect_selection, additional_arguments): sample_node = DbtNode( - name="node_name", unique_id=node_unique_id, resource_type=node_type, depends_on=[], diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 0ad7424c8..3e3218259 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -12,8 +12,8 @@ DbtGraph, DbtNode, LoadMode, - run_command, parse_dbt_ls_output, + run_command, ) from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -42,6 +42,20 @@ def tmp_dbt_project_dir(): shutil.rmtree(tmp_dir, ignore_errors=True) # delete directory +@pytest.mark.parametrize( + "unique_id,expected_name, expected_select", + [ + ("model.my_project.customers", "customers", "customers"), + ("model.my_project.customers.v1", "customers_v1", "customers.v1"), + ("model.my_project.orders.v2", "orders_v2", "orders.v2"), + ], +) +def test_dbt_node_name_and_select(unique_id, expected_name, expected_select): + node = DbtNode(unique_id=unique_id, resource_type=DbtResourceType.MODEL, depends_on=[], file_path="") + assert node.name == expected_name + assert node.resource_name == expected_select + + @pytest.mark.parametrize( "project_name,manifest_filepath,model_filepath", [(DBT_PROJECT_NAME, SAMPLE_MANIFEST, "customers.sql"), ("jaffle_shop_python", SAMPLE_MANIFEST_PY, "customers.py")], @@ -692,7 +706,6 @@ def test_parse_dbt_ls_output(): expected_nodes = { "fake-unique-id": DbtNode( - name="fake-name", unique_id="fake-unique-id", resource_type=DbtResourceType.MODEL, file_path=Path("fake-project/fake-file-path.sql"), diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index 9f6071a20..f7ece6391 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -39,8 +39,7 @@ def test_is_empty_config(selector_config, paths, tags, config, other, expected): grandparent_node = DbtNode( - name="grandparent", - unique_id="grandparent", + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.grandparent", resource_type=DbtResourceType.MODEL, depends_on=[], file_path=SAMPLE_PROJ_PATH / "gen1/models/grandparent.sql", @@ -48,8 +47,7 @@ def test_is_empty_config(selector_config, paths, tags, config, other, expected): config={"materialized": "view", "tags": ["has_child"]}, ) parent_node = DbtNode( - name="parent", - unique_id="parent", + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.parent", resource_type=DbtResourceType.MODEL, depends_on=["grandparent"], file_path=SAMPLE_PROJ_PATH / "gen2/models/parent.sql", @@ -57,8 +55,7 @@ def test_is_empty_config(selector_config, paths, tags, config, other, expected): config={"materialized": "view", "tags": ["has_child", "is_child"]}, ) child_node = DbtNode( - name="child", - unique_id="child", + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.child", resource_type=DbtResourceType.MODEL, depends_on=["parent"], file_path=SAMPLE_PROJ_PATH / "gen3/models/child.sql", @@ -67,8 +64,7 @@ def test_is_empty_config(selector_config, paths, tags, config, other, expected): ) grandchild_1_test_node = DbtNode( - name="grandchild_1", - unique_id="grandchild_1", + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.grandchild_1", resource_type=DbtResourceType.MODEL, depends_on=["parent"], file_path=SAMPLE_PROJ_PATH / "gen3/models/grandchild_1.sql", @@ -77,8 +73,7 @@ def test_is_empty_config(selector_config, paths, tags, config, other, expected): ) grandchild_2_test_node = DbtNode( - name="grandchild_2", - unique_id="grandchild_2", + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.grandchild_2", resource_type=DbtResourceType.MODEL, depends_on=["parent"], file_path=SAMPLE_PROJ_PATH / "gen3/models/grandchild_2.sql", diff --git a/tests/test_converter.py b/tests/test_converter.py index 4210b24d6..8b5101061 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -31,8 +31,7 @@ def test_validate_arguments_tags(argument_key): parent_seed = DbtNode( - name="seed_parent", - unique_id="seed_parent", + unique_id=f"{DbtResourceType.SEED}.{SAMPLE_DBT_PROJECT.stem}.seed_parent", resource_type=DbtResourceType.SEED, depends_on=[], file_path="",