From 3eb5d08a0ff33a9253c9c8f4f0046c90c30cb751 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 17 Dec 2024 14:53:17 +0000 Subject: [PATCH 01/46] Add telemetry module and tests --- cosmos/__init__.py | 2 +- cosmos/constants.py | 4 ++ cosmos/settings.py | 22 ++++++-- cosmos/telemetry.py | 75 ++++++++++++++++++++++++++ tests/test_telemetry.py | 113 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 212 insertions(+), 4 deletions(-) create mode 100644 cosmos/telemetry.py create mode 100644 tests/test_telemetry.py diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 884a90659..6cf3c2f9b 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -6,7 +6,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.7.1" +__version__: str = "1.8.0a4" from cosmos.airflow.dag import DbtDag diff --git a/cosmos/constants.py b/cosmos/constants.py index 8378e8d10..ee2099800 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -160,3 +160,7 @@ def _missing_value_(cls, value): # type: ignore TESTABLE_DBT_RESOURCES = {DbtResourceType.MODEL, DbtResourceType.SOURCE, DbtResourceType.SNAPSHOT, DbtResourceType.SEED} DBT_COMPILE_TASK_ID = "dbt_compile" + +TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{task_count}" +TELEMETRY_VERSION = "v1" +TELEMETRY_TIMEOUT = 5.0 diff --git a/cosmos/settings.py b/cosmos/settings.py index 5b24321c8..ba9da106a 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -37,12 +37,28 @@ remote_target_path = conf.get("cosmos", "remote_target_path", fallback=None) remote_target_path_conn_id = conf.get("cosmos", "remote_target_path_conn_id", fallback=None) +AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0") + +# The following environment variable is populated in Astro Cloud +in_astro_cloud = os.getenv("ASTRONOMER_ENVIRONMENT") == "cloud" + try: LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") except airflow.exceptions.AirflowConfigException: LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE) -AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0") -# The following environment variable is populated in Astro Cloud -in_astro_cloud = os.getenv("ASTRONOMER_ENVIRONMENT") == "cloud" +def convert_to_boolean(value: str | None) -> bool: + """ + Convert a string that represents a boolean to a Python boolean. + """ + value = str(value).lower().strip() + if value in ("f", "false", "0", "", "none"): + return False + return True + + +# Telemetry-related settings +enable_telemetry = conf.getboolean("cosmos", "enable_telemetry", fallback=True) +do_not_track = convert_to_boolean(os.getenv("DO_NOT_TRACK")) +no_analytics = convert_to_boolean(os.getenv("SCARF_NO_ANALYTICS")) diff --git a/cosmos/telemetry.py b/cosmos/telemetry.py new file mode 100644 index 000000000..4b5dd4abc --- /dev/null +++ b/cosmos/telemetry.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +import logging +import platform +from urllib import parse +from urllib.parse import urlencode + +import httpx +from airflow import __version__ as airflow_version + +import cosmos +from cosmos import constants, settings + + +def should_emit() -> bool: + """ + Identify if telemetry metrics should be emitted or not. + """ + return settings.enable_telemetry and not settings.do_not_track and not settings.no_analytics + + +def collect_standard_usage_metrics() -> dict[str, object]: + """ + Return standard telemetry metrics. + """ + metrics = { + "cosmos_version": cosmos.__version__, # type: ignore[attr-defined] + "airflow_version": parse.quote(airflow_version), + "python_version": platform.python_version(), + "platform_system": platform.system(), + "platform_machine": platform.machine(), + "variables": {}, + } + return metrics + + +def emit_usage_metrics(metrics: dict[str, object]) -> bool: + """ + Emit desired telemetry metrics to remote telemetry endpoint. + + The metrics must contain the necessary fields to build the TELEMETRY_URL. + """ + query_string = urlencode(metrics) + telemetry_url = constants.TELEMETRY_URL.format( + **metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string + ) + logging.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics) + response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True) + if not response.is_success: + logging.warning( + "Unable to emit usage metrics to %s. Status code: %s. Message: %s", + telemetry_url, + response.status_code, + response.text, + ) + return response.is_success + + +def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, object]) -> bool: + """ + Checks if telemetry should be emitted, fetch standard metrics, complement with custom metrics + and emit them to remote telemetry endpoint. + + :returns: If the event was successfully sent to the telemetry backend or not. + """ + if should_emit(): + metrics = collect_standard_usage_metrics() + metrics["event_type"] = event_type + metrics["variables"].update(additional_metrics) # type: ignore[attr-defined] + metrics.update(additional_metrics) + is_success = emit_usage_metrics(metrics) + return is_success + else: + logging.debug("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.") + return False diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py new file mode 100644 index 000000000..e238dbbab --- /dev/null +++ b/tests/test_telemetry.py @@ -0,0 +1,113 @@ +import logging +from unittest.mock import patch + +import pytest + +from cosmos import telemetry + + +def test_should_emit_is_true_by_default(): + assert telemetry.should_emit() + + +@patch("cosmos.settings.enable_telemetry", True) +def test_should_emit_is_true_when_only_enable_telemetry_is_true(): + assert telemetry.should_emit() + + +@patch("cosmos.settings.do_not_track", True) +def test_should_emit_is_false_when_do_not_track(): + assert not telemetry.should_emit() + + +@patch("cosmos.settings.no_analytics", True) +def test_should_emit_is_false_when_no_analytics(): + assert not telemetry.should_emit() + + +def test_collect_standard_usage_metrics(): + metrics = telemetry.collect_standard_usage_metrics() + expected_keus = [ + "airflow_version", + "cosmos_version", + "platform_machine", + "platform_system", + "python_version", + "variables", + ] + assert sorted(metrics.keys()) == expected_keus + + +class MockFailedResponse: + is_success = False + status_code = "404" + text = "Non existent URL" + + +@patch("cosmos.telemetry.httpx.get", return_value=MockFailedResponse()) +def test_emit_usage_metrics_fails(mock_httpx_get, caplog): + sample_metrics = { + "cosmos_version": "1.8.0a4", + "airflow_version": "2.10.1", + "python_version": "3.11", + "platform_system": "darwin", + "platform_machine": "amd64", + "event_type": "dag_run", + "status": "success", + "dag_hash": "d151d1fa2f03270ea116cc7494f2c591", + "task_count": 3, + } + is_success = telemetry.emit_usage_metrics(sample_metrics) + mock_httpx_get.assert_called_once_with( + f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v2/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3""", + timeout=5.0, + follow_redirects=True, + ) + assert not is_success + log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v2/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3. Status code: 404. Message: Non existent URL""" + assert caplog.text.startswith("WARNING") + assert log_msg in caplog.text + + +@pytest.mark.integration +def test_emit_usage_metrics_succeeds(caplog): + caplog.set_level(logging.DEBUG) + sample_metrics = { + "cosmos_version": "1.8.0a4", + "airflow_version": "2.10.1", + "python_version": "3.11", + "platform_system": "darwin", + "platform_machine": "amd64", + "event_type": "dag_run", + "status": "success", + "dag_hash": "d151d1fa2f03270ea116cc7494f2c591", + "task_count": 3, + } + is_success = telemetry.emit_usage_metrics(sample_metrics) + assert is_success + assert caplog.text.startswith("DEBUG") + assert "Telemetry is enabled. Emitting the following usage metrics to" in caplog.text + + +@patch("cosmos.telemetry.should_emit", return_value=False) +def test_emit_usage_metrics_if_enabled_fails(mock_should_emit, caplog): + caplog.set_level(logging.DEBUG) + assert not telemetry.emit_usage_metrics_if_enabled("any", {}) + assert caplog.text.startswith("DEBUG") + assert "Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True." in caplog.text + + +@patch("cosmos.telemetry.should_emit", return_value=True) +@patch("cosmos.telemetry.collect_standard_usage_metrics", return_value={"k1": "v1", "k2": "v2", "variables": {}}) +@patch("cosmos.telemetry.emit_usage_metrics") +def test_emit_usage_metrics_if_enabled_succeeds( + mock_emit_usage_metrics, mock_collect_standard_usage_metrics, mock_should_emit +): + assert telemetry.emit_usage_metrics_if_enabled("any", {"k2": "v2"}) + mock_emit_usage_metrics.assert_called_once() + assert mock_emit_usage_metrics.call_args.args[0] == { + "k1": "v1", + "k2": "v2", + "event_type": "any", + "variables": {"k2": "v2"}, + } From 6e0c6f5051213f753ab619b179c2d036d772b483 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 17 Dec 2024 15:18:38 +0000 Subject: [PATCH 02/46] Emit telemetry via Airflow listener --- cosmos/constants.py | 2 +- cosmos/listeners/__init__.py | 0 cosmos/listeners/dag_run.py | 85 ++++++++++++++++++++++++++++++++++++ cosmos/plugin/__init__.py | 2 + 4 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 cosmos/listeners/__init__.py create mode 100644 cosmos/listeners/dag_run.py diff --git a/cosmos/constants.py b/cosmos/constants.py index ee2099800..51951d259 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -161,6 +161,6 @@ def _missing_value_(cls, value): # type: ignore DBT_COMPILE_TASK_ID = "dbt_compile" -TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{task_count}" +TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{cosmos_task_count}" TELEMETRY_VERSION = "v1" TELEMETRY_TIMEOUT = 5.0 diff --git a/cosmos/listeners/__init__.py b/cosmos/listeners/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/cosmos/listeners/dag_run.py b/cosmos/listeners/dag_run.py new file mode 100644 index 000000000..d702c727e --- /dev/null +++ b/cosmos/listeners/dag_run.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +import functools + +from airflow.listeners import hookimpl +from airflow.models.dag import DAG +from airflow.models.dagrun import DagRun + +from cosmos import telemetry +from cosmos.airflow.dag import DbtDag +from cosmos.airflow.task_group import DbtTaskGroup + + +class EventStatus: + SUCCESS = "success" + FAILED = "failed" + + +DAG_RUN = "dag_run" + + +@functools.lru_cache() +def is_cosmos_dag(dag: DAG) -> bool: + if isinstance(dag, DbtDag): + return True + return False + + +@functools.lru_cache() +def total_cosmos_task_groups(dag: DAG) -> int: + cosmos_task_groups = 0 + for group_id, task_group in dag.task_group_dict.items(): + if isinstance(task_group, DbtTaskGroup): + cosmos_task_groups += 1 + return cosmos_task_groups + + +@functools.lru_cache() +def total_cosmos_tasks(dag: DAG) -> int: + cosmos_tasks = 0 + for task in dag.tasks: + task_class = type(task) + task_module = task_class.__module__ + if task_module.startswith("cosmos."): + cosmos_tasks += 1 + return cosmos_tasks + + +@functools.lru_cache() +def uses_cosmos(dag: DAG) -> bool: + return bool(is_cosmos_dag(dag) or total_cosmos_task_groups(dag) or total_cosmos_tasks(dag)) + + +@hookimpl +def on_dag_run_success(dag_run: DagRun, msg: str) -> None: + dag = dag_run.get_dag() + if not uses_cosmos(dag): + return + additional_telemetry_metrics = { + "dag_hash": dag_run.dag_hash, + "status": EventStatus.SUCCESS, + "task_count": len(dag.task_ids), + "cosmos_task_count": total_cosmos_tasks(dag), + "cosmos_task_groups_count": total_cosmos_task_groups(dag), + "is_cosmos_dag": is_cosmos_dag(dag), + } + + telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) + + +@hookimpl +def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: + dag = dag_run.get_dag() + if not uses_cosmos(dag): + return + additional_telemetry_metrics = { + "dag_hash": dag_run.dag_hash, + "status": EventStatus.FAILED, + "task_count": len(dag.task_ids), + "cosmos_task_count": total_cosmos_tasks(dag), + "cosmos_task_groups_count": total_cosmos_task_groups(dag), + "is_cosmos_dag": is_cosmos_dag(dag), + } + + telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) diff --git a/cosmos/plugin/__init__.py b/cosmos/plugin/__init__.py index 5997a5fe3..496369121 100644 --- a/cosmos/plugin/__init__.py +++ b/cosmos/plugin/__init__.py @@ -10,6 +10,7 @@ from flask import abort, url_for from flask_appbuilder import AppBuilder, expose +from cosmos.listeners import dag_run from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir, dbt_docs_index_file_name, in_astro_cloud if in_astro_cloud: @@ -269,3 +270,4 @@ class CosmosPlugin(AirflowPlugin): "href": conf.get("webserver", "base_url") + "/cosmos/dbt_docs", } appbuilder_views = [item] + listeners = [dag_run] From c124b79051608fe830ca470a209b62e31d16818f Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 18 Dec 2024 10:14:53 +0000 Subject: [PATCH 03/46] Change version --- cosmos/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 6cf3c2f9b..c19d1a8e2 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -6,7 +6,7 @@ Contains dags, task groups, and operators. """ -__version__: str = "1.8.0a4" +__version__: str = "1.7.2" from cosmos.airflow.dag import DbtDag From ee8e9286e887396b2506d759306003944261ec00 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 18 Dec 2024 15:12:57 +0000 Subject: [PATCH 04/46] Try to fix version issue in CI when running hatch type-check --- cosmos/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index c19d1a8e2..f8bdc613f 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -6,7 +6,7 @@ Contains dags, task groups, and operators. """ -__version__: str = "1.7.2" +__version__ = "1.7.2" from cosmos.airflow.dag import DbtDag From bbcce4ac4999dea6e6556cfeb79fea73ddf200a1 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 18 Dec 2024 15:16:02 +0000 Subject: [PATCH 05/46] Fix unit test --- tests/test_telemetry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index e238dbbab..f4c9606b5 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -81,7 +81,7 @@ def test_emit_usage_metrics_succeeds(caplog): "event_type": "dag_run", "status": "success", "dag_hash": "d151d1fa2f03270ea116cc7494f2c591", - "task_count": 3, + "cosmos_task_count": 3, } is_success = telemetry.emit_usage_metrics(sample_metrics) assert is_success From c484e500c9361f3194f2540a207bd276669ffec7 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 18 Dec 2024 15:29:15 +0000 Subject: [PATCH 06/46] Fix unit test --- tests/test_telemetry.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index f4c9606b5..d074a9bff 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -55,16 +55,16 @@ def test_emit_usage_metrics_fails(mock_httpx_get, caplog): "event_type": "dag_run", "status": "success", "dag_hash": "d151d1fa2f03270ea116cc7494f2c591", - "task_count": 3, + "cosmos_task_count": 3, } is_success = telemetry.emit_usage_metrics(sample_metrics) mock_httpx_get.assert_called_once_with( - f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v2/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3""", + f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3""", timeout=5.0, follow_redirects=True, ) assert not is_success - log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v2/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3. Status code: 404. Message: Non existent URL""" + log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3. Status code: 404. Message: Non existent URL""" assert caplog.text.startswith("WARNING") assert log_msg in caplog.text From c7d282f491c513ae83cc98a589180c82c8d9946f Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 10:47:18 +0000 Subject: [PATCH 07/46] Expose more metrics in Scarf --- cosmos/constants.py | 2 +- .../{dag_run.py => dag_run_listener.py} | 51 +++++++++++++------ cosmos/plugin/__init__.py | 4 +- tests/test_telemetry.py | 14 +++-- 4 files changed, 48 insertions(+), 23 deletions(-) rename cosmos/listeners/{dag_run.py => dag_run_listener.py} (59%) diff --git a/cosmos/constants.py b/cosmos/constants.py index 51951d259..5104e9760 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -161,6 +161,6 @@ def _missing_value_(cls, value): # type: ignore DBT_COMPILE_TASK_ID = "dbt_compile" -TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{cosmos_task_count}" +TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{is_cosmos_dag}/{cosmos_task_groups_count}/{task_count}/{cosmos_task_count}" TELEMETRY_VERSION = "v1" TELEMETRY_TIMEOUT = 5.0 diff --git a/cosmos/listeners/dag_run.py b/cosmos/listeners/dag_run_listener.py similarity index 59% rename from cosmos/listeners/dag_run.py rename to cosmos/listeners/dag_run_listener.py index d702c727e..b65c6f409 100644 --- a/cosmos/listeners/dag_run.py +++ b/cosmos/listeners/dag_run_listener.py @@ -1,6 +1,9 @@ from __future__ import annotations import functools +import time +from contextlib import contextmanager +from typing import Generator from airflow.listeners import hookimpl from airflow.models.dag import DAG @@ -9,6 +12,17 @@ from cosmos import telemetry from cosmos.airflow.dag import DbtDag from cosmos.airflow.task_group import DbtTaskGroup +from cosmos.log import get_logger + +logger = get_logger(__name__) + + +@contextmanager +def measure_time() -> Generator[None, None, None]: + start = time.perf_counter() + yield + end = time.perf_counter() + logger.info(f"DAG listener metrics collection time: {end - start:.6f} seconds") class EventStatus: @@ -54,16 +68,19 @@ def uses_cosmos(dag: DAG) -> bool: @hookimpl def on_dag_run_success(dag_run: DagRun, msg: str) -> None: dag = dag_run.get_dag() + if not uses_cosmos(dag): return - additional_telemetry_metrics = { - "dag_hash": dag_run.dag_hash, - "status": EventStatus.SUCCESS, - "task_count": len(dag.task_ids), - "cosmos_task_count": total_cosmos_tasks(dag), - "cosmos_task_groups_count": total_cosmos_task_groups(dag), - "is_cosmos_dag": is_cosmos_dag(dag), - } + + with measure_time(): + additional_telemetry_metrics = { + "dag_hash": dag_run.dag_hash, + "status": EventStatus.SUCCESS, + "task_count": len(dag.task_ids), + "cosmos_task_count": total_cosmos_tasks(dag), + "cosmos_task_groups_count": total_cosmos_task_groups(dag), + "is_cosmos_dag": is_cosmos_dag(dag), + } telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) @@ -73,13 +90,15 @@ def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: dag = dag_run.get_dag() if not uses_cosmos(dag): return - additional_telemetry_metrics = { - "dag_hash": dag_run.dag_hash, - "status": EventStatus.FAILED, - "task_count": len(dag.task_ids), - "cosmos_task_count": total_cosmos_tasks(dag), - "cosmos_task_groups_count": total_cosmos_task_groups(dag), - "is_cosmos_dag": is_cosmos_dag(dag), - } + + with measure_time(): + additional_telemetry_metrics = { + "dag_hash": dag_run.dag_hash, + "status": EventStatus.FAILED, + "task_count": len(dag.task_ids), + "cosmos_task_count": total_cosmos_tasks(dag), + "cosmos_task_groups_count": total_cosmos_task_groups(dag), + "is_cosmos_dag": is_cosmos_dag(dag), + } telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) diff --git a/cosmos/plugin/__init__.py b/cosmos/plugin/__init__.py index 496369121..4bbea4fa2 100644 --- a/cosmos/plugin/__init__.py +++ b/cosmos/plugin/__init__.py @@ -10,7 +10,7 @@ from flask import abort, url_for from flask_appbuilder import AppBuilder, expose -from cosmos.listeners import dag_run +from cosmos.listeners import dag_run_listener from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir, dbt_docs_index_file_name, in_astro_cloud if in_astro_cloud: @@ -270,4 +270,4 @@ class CosmosPlugin(AirflowPlugin): "href": conf.get("webserver", "base_url") + "/cosmos/dbt_docs", } appbuilder_views = [item] - listeners = [dag_run] + listeners = [dag_run_listener] diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index d074a9bff..82760b0d8 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -55,16 +55,19 @@ def test_emit_usage_metrics_fails(mock_httpx_get, caplog): "event_type": "dag_run", "status": "success", "dag_hash": "d151d1fa2f03270ea116cc7494f2c591", + "is_cosmos_dag": True, + "cosmos_task_groups_count": 0, + "task_count": 3, "cosmos_task_count": 3, } is_success = telemetry.emit_usage_metrics(sample_metrics) mock_httpx_get.assert_called_once_with( - f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3""", + f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/True/0/3/3""", timeout=5.0, follow_redirects=True, ) assert not is_success - log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3. Status code: 404. Message: Non existent URL""" + log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/True/0/3/3. Status code: 404. Message: Non existent URL""" assert caplog.text.startswith("WARNING") assert log_msg in caplog.text @@ -80,8 +83,11 @@ def test_emit_usage_metrics_succeeds(caplog): "platform_machine": "amd64", "event_type": "dag_run", "status": "success", - "dag_hash": "d151d1fa2f03270ea116cc7494f2c591", - "cosmos_task_count": 3, + "dag_hash": "dag-hash-ci", + "is_cosmos_dag": False, + "cosmos_task_groups_count": 1, + "task_count": 33, + "cosmos_task_count": 33, } is_success = telemetry.emit_usage_metrics(sample_metrics) assert is_success From e460db29ac50751cbc39b5a53d0e6b8cff05afee Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 11:16:08 +0000 Subject: [PATCH 08/46] Fix spelling --- tests/test_telemetry.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index 82760b0d8..d2ff24984 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -27,7 +27,7 @@ def test_should_emit_is_false_when_no_analytics(): def test_collect_standard_usage_metrics(): metrics = telemetry.collect_standard_usage_metrics() - expected_keus = [ + expected_keys = [ "airflow_version", "cosmos_version", "platform_machine", @@ -35,7 +35,7 @@ def test_collect_standard_usage_metrics(): "python_version", "variables", ] - assert sorted(metrics.keys()) == expected_keus + assert sorted(metrics.keys()) == expected_keys class MockFailedResponse: From 0300809d2c68ad0c29480c98196d2849f6ec6d12 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 11:42:46 +0000 Subject: [PATCH 09/46] Update cosmos/constants.py --- cosmos/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/constants.py b/cosmos/constants.py index 5104e9760..0d47e82cc 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -163,4 +163,4 @@ def _missing_value_(cls, value): # type: ignore TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{is_cosmos_dag}/{cosmos_task_groups_count}/{task_count}/{cosmos_task_count}" TELEMETRY_VERSION = "v1" -TELEMETRY_TIMEOUT = 5.0 +TELEMETRY_TIMEOUT = 1.0 From e8c1a26088f1d14d68c29f7c3556a1f2610aa32c Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 11:29:13 +0000 Subject: [PATCH 10/46] Add docs about telemetry --- PRIVACY_NOTICE.rst | 42 ++++++++++++++++++++++++++++++++++++++++++ README.rst | 4 +++- docs/index.rst | 6 +++++- 3 files changed, 50 insertions(+), 2 deletions(-) create mode 100644 PRIVACY_NOTICE.rst diff --git a/PRIVACY_NOTICE.rst b/PRIVACY_NOTICE.rst new file mode 100644 index 000000000..49cdbaa8a --- /dev/null +++ b/PRIVACY_NOTICE.rst @@ -0,0 +1,42 @@ +Privacy Notice +============== + +This project follows the `Privacy Policy of Astronomer `_. + +Collection of Data +------------------ + +Astronomer Cosmos integrates `Scarf `_ to collect basic telemetry data during operation. +This data assists the project maintainers in better understanding how Cosmos is used. +Insights gained from this telemetry are critical for prioritizing patches, minor releases, and +security fixes. Additionally, this information supports key decisions related to the development road map. + +Deployments and individual users can opt-out of analytics by setting the configuration: + + +.. code-block:: + + [cosmos] enable_telemetry False + + +As described in the `official documentation `_, it is also possible to opt out by setting one of the following environment variables: + +.. code-block:: + + DO_NOT_TRACK=True + SCARF_NO_ANALYTICS=True + + +In addition to Scarf's default data collection, DAG Factory collects the following information: + +- Cosmos version +- Airflow version +- Python version +- Operating system & machine architecture +- Event type +- DAG that uses Cosmos hash +- Total tasks in DAGs that use Cosmos +- Total Cosmos tasks +- Total Cosmos task groups + +No user-identifiable information (IP included) is stored in Scarf. diff --git a/README.rst b/README.rst index 7eb32bcac..e35b8a913 100644 --- a/README.rst +++ b/README.rst @@ -82,7 +82,9 @@ _______ Privacy Notice ______________ -This project follows `Astronomer's Privacy Policy `_ +The application and this website collect telemetry to support the project's development. These can be disabled by the end-users. + +Read the `Privacy Notice `_ to learn more about it. .. Tracking pixel for Scarf diff --git a/docs/index.rst b/docs/index.rst index e788bd04c..7a56b8df7 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -137,10 +137,14 @@ _______ `Apache License 2.0 `_ + Privacy Notice ______________ -This project follows `Astronomer's Privacy Policy `_ +The application and this website collect telemetry to support the project's development. These can be disabled by the end-users. + +Read the `Privacy Notice `_ to learn more about it. + .. Tracking pixel for Scarf .. raw:: html From 44387b3bcccf26261f8c4e1cbeb3c5d15374cb0a Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 11:45:57 +0000 Subject: [PATCH 11/46] Update cosmos/__init__.py --- cosmos/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index f8bdc613f..884a90659 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -6,7 +6,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.7.2" +__version__ = "1.7.1" from cosmos.airflow.dag import DbtDag From 3ba1580d85e08d7f31a4640ef45fe78a5b11b82d Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 11:56:52 +0000 Subject: [PATCH 12/46] Add debug messages --- cosmos/listeners/dag_run_listener.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index b65c6f409..0a1a0025a 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -67,8 +67,8 @@ def uses_cosmos(dag: DAG) -> bool: @hookimpl def on_dag_run_success(dag_run: DagRun, msg: str) -> None: + logger.info("The on_dag_run_success was called") dag = dag_run.get_dag() - if not uses_cosmos(dag): return @@ -87,6 +87,7 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: @hookimpl def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: + logger.info("The on_dag_run_failed was called") dag = dag_run.get_dag() if not uses_cosmos(dag): return From 79650e43ce9be4d1fb04fea28ff392eb54562160 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 11:58:08 +0000 Subject: [PATCH 13/46] Add more logs --- cosmos/listeners/dag_run_listener.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 0a1a0025a..893ab9180 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -68,6 +68,7 @@ def uses_cosmos(dag: DAG) -> bool: @hookimpl def on_dag_run_success(dag_run: DagRun, msg: str) -> None: logger.info("The on_dag_run_success was called") + print("The on_dag_run_success was called") dag = dag_run.get_dag() if not uses_cosmos(dag): return @@ -88,6 +89,7 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: @hookimpl def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: logger.info("The on_dag_run_failed was called") + print("The on_dag_run_failed was called") dag = dag_run.get_dag() if not uses_cosmos(dag): return From 4de8c62a3ab12ce22dff945e461541c56d9cbd9f Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 12:11:32 +0000 Subject: [PATCH 14/46] Fix import [2024-12-19T12:09:23.654+0000] {plugins_manager.py:266} ERROR - Failed to import plugin cosmos Traceback (most recent call last): File "/usr/local/lib/python3.12/site-packages/airflow/plugins_manager.py", line 258, in load_entrypoint_plugins plugin_class = entry_point.load() ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/importlib/metadata/__init__.py", line 205, in load module = import_module(match.group('module')) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/importlib/__init__.py", line 90, in import_module return _bootstrap._gcd_import(name[level:], package, level) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "", line 1387, in _gcd_import File "", line 1360, in _find_and_load File "", line 1331, in _find_and_load_unlocked File "", line 935, in _load_unlocked File "", line 995, in exec_module File "", line 488, in _call_with_frames_removed File "/usr/local/airflow/src/astronomer-cosmos/cosmos/plugin/__init__.py", line 13, in from cosmos.listeners import dag_run_listener File "/usr/local/airflow/src/astronomer-cosmos/cosmos/listeners/dag_run_listener.py", line 13, in from cosmos.airflow.dag import DbtDag ImportError: cannot import name 'DbtDag' from partially initialized module 'cosmos.airflow.dag' (most likely due to a circular import) (/usr/local/airflow/src/astronomer-cosmos/cosmos/airflow/dag.py) --- cosmos/listeners/dag_run_listener.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 893ab9180..ec324bf47 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -10,8 +10,6 @@ from airflow.models.dagrun import DagRun from cosmos import telemetry -from cosmos.airflow.dag import DbtDag -from cosmos.airflow.task_group import DbtTaskGroup from cosmos.log import get_logger logger = get_logger(__name__) @@ -35,6 +33,8 @@ class EventStatus: @functools.lru_cache() def is_cosmos_dag(dag: DAG) -> bool: + from cosmos.airflow.dag import DbtDag + if isinstance(dag, DbtDag): return True return False @@ -42,6 +42,8 @@ def is_cosmos_dag(dag: DAG) -> bool: @functools.lru_cache() def total_cosmos_task_groups(dag: DAG) -> int: + from cosmos.airflow.task_group import DbtTaskGroup + cosmos_task_groups = 0 for group_id, task_group in dag.task_group_dict.items(): if isinstance(task_group, DbtTaskGroup): From 5b0207a959d3d4811e14c30c90260174ed14fbde Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 12:12:31 +0000 Subject: [PATCH 15/46] Change version for testing --- cosmos/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 884a90659..0cf84f564 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -6,7 +6,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.7.1" +__version__ = "1.8.0a5" from cosmos.airflow.dag import DbtDag From a84dc7cabc60689cfd6d422f71a1da560d68ddc7 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 13:54:13 +0000 Subject: [PATCH 16/46] Add more logs to troubleshoot --- cosmos/listeners/dag_run_listener.py | 6 ++++-- cosmos/telemetry.py | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index ec324bf47..ee553a975 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -70,7 +70,7 @@ def uses_cosmos(dag: DAG) -> bool: @hookimpl def on_dag_run_success(dag_run: DagRun, msg: str) -> None: logger.info("The on_dag_run_success was called") - print("The on_dag_run_success was called") + dag = dag_run.get_dag() if not uses_cosmos(dag): return @@ -86,12 +86,13 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: } telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) + logger.info("Completed on_dag_run_success") @hookimpl def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: logger.info("The on_dag_run_failed was called") - print("The on_dag_run_failed was called") + dag = dag_run.get_dag() if not uses_cosmos(dag): return @@ -107,3 +108,4 @@ def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: } telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) + logger.info("Completed on_dag_run_failed") diff --git a/cosmos/telemetry.py b/cosmos/telemetry.py index 4b5dd4abc..6ae452023 100644 --- a/cosmos/telemetry.py +++ b/cosmos/telemetry.py @@ -44,7 +44,7 @@ def emit_usage_metrics(metrics: dict[str, object]) -> bool: telemetry_url = constants.TELEMETRY_URL.format( **metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string ) - logging.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics) + logging.info("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics) response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True) if not response.is_success: logging.warning( @@ -71,5 +71,5 @@ def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, is_success = emit_usage_metrics(metrics) return is_success else: - logging.debug("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.") + logging.info("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.") return False From 585f2096590ba0bde20515fe74d1d755d7a31d2f Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 13:55:21 +0000 Subject: [PATCH 17/46] Bump the version to see changes in scarf --- cosmos/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 0cf84f564..bf3041d2b 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -6,7 +6,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.8.0a5" +__version__ = "1.8.0a6" from cosmos.airflow.dag import DbtDag From aac353ef68ab1d621ab6ff63e0a3c331b5b8679e Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 14:05:20 +0000 Subject: [PATCH 18/46] Stop using context manager to measure time of emitting telemetry --- cosmos/listeners/dag_run_listener.py | 45 ++++++++++------------------ 1 file changed, 16 insertions(+), 29 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index ee553a975..c103a5e25 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -1,9 +1,6 @@ from __future__ import annotations import functools -import time -from contextlib import contextmanager -from typing import Generator from airflow.listeners import hookimpl from airflow.models.dag import DAG @@ -15,14 +12,6 @@ logger = get_logger(__name__) -@contextmanager -def measure_time() -> Generator[None, None, None]: - start = time.perf_counter() - yield - end = time.perf_counter() - logger.info(f"DAG listener metrics collection time: {end - start:.6f} seconds") - - class EventStatus: SUCCESS = "success" FAILED = "failed" @@ -75,15 +64,14 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: if not uses_cosmos(dag): return - with measure_time(): - additional_telemetry_metrics = { - "dag_hash": dag_run.dag_hash, - "status": EventStatus.SUCCESS, - "task_count": len(dag.task_ids), - "cosmos_task_count": total_cosmos_tasks(dag), - "cosmos_task_groups_count": total_cosmos_task_groups(dag), - "is_cosmos_dag": is_cosmos_dag(dag), - } + additional_telemetry_metrics = { + "dag_hash": dag_run.dag_hash, + "status": EventStatus.SUCCESS, + "task_count": len(dag.task_ids), + "cosmos_task_count": total_cosmos_tasks(dag), + "cosmos_task_groups_count": total_cosmos_task_groups(dag), + "is_cosmos_dag": is_cosmos_dag(dag), + } telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) logger.info("Completed on_dag_run_success") @@ -97,15 +85,14 @@ def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: if not uses_cosmos(dag): return - with measure_time(): - additional_telemetry_metrics = { - "dag_hash": dag_run.dag_hash, - "status": EventStatus.FAILED, - "task_count": len(dag.task_ids), - "cosmos_task_count": total_cosmos_tasks(dag), - "cosmos_task_groups_count": total_cosmos_task_groups(dag), - "is_cosmos_dag": is_cosmos_dag(dag), - } + additional_telemetry_metrics = { + "dag_hash": dag_run.dag_hash, + "status": EventStatus.FAILED, + "task_count": len(dag.task_ids), + "cosmos_task_count": total_cosmos_tasks(dag), + "cosmos_task_groups_count": total_cosmos_task_groups(dag), + "is_cosmos_dag": is_cosmos_dag(dag), + } telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) logger.info("Completed on_dag_run_failed") From a85192069484d0878d5394e66d0dba2f2ae68a57 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 14:21:18 +0000 Subject: [PATCH 19/46] troubleshot --- cosmos/listeners/dag_run_listener.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index c103a5e25..ec5c1028b 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -24,6 +24,7 @@ class EventStatus: def is_cosmos_dag(dag: DAG) -> bool: from cosmos.airflow.dag import DbtDag + logger.info(f"is_cosmos_dag: {isinstance(dag, DbtDag)}") if isinstance(dag, DbtDag): return True return False @@ -62,6 +63,7 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: dag = dag_run.get_dag() if not uses_cosmos(dag): + logger.info("The DAG does not use Cosmos") return additional_telemetry_metrics = { From adc54d1a7c51b51da708ccb1b75189026268fad1 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 14:30:40 +0000 Subject: [PATCH 20/46] Troubleshoot --- cosmos/listeners/dag_run_listener.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index ec5c1028b..89535132a 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -20,17 +20,23 @@ class EventStatus: DAG_RUN = "dag_run" -@functools.lru_cache() def is_cosmos_dag(dag: DAG) -> bool: + import inspect + from cosmos.airflow.dag import DbtDag - logger.info(f"is_cosmos_dag: {isinstance(dag, DbtDag)}") + dag_class = dag.__class__ + dag_module = inspect.getmodule(dag_class) + + logger.info( + f"is_cosmos_dag ({dag}, {DbtDag}, {dag_class and dag_class.__name__}, {dag_module and dag_module.__name__}): {isinstance(dag, DbtDag)}" + ) + return True if isinstance(dag, DbtDag): return True return False -@functools.lru_cache() def total_cosmos_task_groups(dag: DAG) -> int: from cosmos.airflow.task_group import DbtTaskGroup @@ -41,7 +47,6 @@ def total_cosmos_task_groups(dag: DAG) -> int: return cosmos_task_groups -@functools.lru_cache() def total_cosmos_tasks(dag: DAG) -> int: cosmos_tasks = 0 for task in dag.tasks: From 1de1083476475949d0445ab22acb298a043bc0a0 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 15:42:32 +0000 Subject: [PATCH 21/46] Upgrade telemetry --- cosmos/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index bf3041d2b..9feb20b7f 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -6,7 +6,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.8.0a6" +__version__ = "1.8.0a7" from cosmos.airflow.dag import DbtDag From 15c1688d41f40a34f37b38c13d9477f83a7b014b Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 16:34:36 +0000 Subject: [PATCH 22/46] Improve test coverage --- cosmos/listeners/dag_run_listener.py | 16 +--- cosmos/telemetry.py | 4 +- tests/listeners/test_dag_run_listener.py | 94 ++++++++++++++++++++++++ tests/test_telemetry.py | 2 +- 4 files changed, 99 insertions(+), 17 deletions(-) create mode 100644 tests/listeners/test_dag_run_listener.py diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 89535132a..7af936b04 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -21,28 +21,16 @@ class EventStatus: def is_cosmos_dag(dag: DAG) -> bool: - import inspect - - from cosmos.airflow.dag import DbtDag - - dag_class = dag.__class__ - dag_module = inspect.getmodule(dag_class) - - logger.info( - f"is_cosmos_dag ({dag}, {DbtDag}, {dag_class and dag_class.__name__}, {dag_module and dag_module.__name__}): {isinstance(dag, DbtDag)}" - ) - return True - if isinstance(dag, DbtDag): + if dag.__class__.__module__.startswith("cosmos."): return True return False def total_cosmos_task_groups(dag: DAG) -> int: - from cosmos.airflow.task_group import DbtTaskGroup cosmos_task_groups = 0 for group_id, task_group in dag.task_group_dict.items(): - if isinstance(task_group, DbtTaskGroup): + if task_group.__class__.__module__.startswith("cosmos."): cosmos_task_groups += 1 return cosmos_task_groups diff --git a/cosmos/telemetry.py b/cosmos/telemetry.py index 6ae452023..4b5dd4abc 100644 --- a/cosmos/telemetry.py +++ b/cosmos/telemetry.py @@ -44,7 +44,7 @@ def emit_usage_metrics(metrics: dict[str, object]) -> bool: telemetry_url = constants.TELEMETRY_URL.format( **metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string ) - logging.info("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics) + logging.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics) response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True) if not response.is_success: logging.warning( @@ -71,5 +71,5 @@ def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, is_success = emit_usage_metrics(metrics) return is_success else: - logging.info("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.") + logging.debug("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.") return False diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py new file mode 100644 index 000000000..173df76a3 --- /dev/null +++ b/tests/listeners/test_dag_run_listener.py @@ -0,0 +1,94 @@ +from datetime import datetime +from pathlib import Path + +from airflow.models import DAG + +from cosmos import DbtRunLocalOperator, ProfileConfig, ProjectConfig +from cosmos.airflow.dag import DbtDag +from cosmos.airflow.task_group import DbtTaskGroup +from cosmos.listeners.dag_run_listener import is_cosmos_dag, total_cosmos_task_groups, total_cosmos_tasks, uses_cosmos +from cosmos.profiles import PostgresUserPasswordProfileMapping + +DBT_ROOT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt" +DBT_PROJECT_NAME = "jaffle_shop" + +profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), +) + + +def test_is_cosmos_dag_is_true(): + dag = DbtDag( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + start_date=datetime(2023, 1, 1), + dag_id="basic_cosmos_dag", + ) + + assert is_cosmos_dag(dag) + assert total_cosmos_task_groups(dag) == 0 + assert uses_cosmos(dag) + + +def test_total_cosmos_task_groups(): + with DAG("test-id-dbt-compile", start_date=datetime(2022, 1, 1)) as dag: + _ = DbtTaskGroup( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + ) + + assert not is_cosmos_dag(dag) + assert total_cosmos_task_groups(dag) == 1 + assert uses_cosmos(dag) + + +def test_total_cosmos_tasks_in_task_group(): + with DAG("test-id-dbt-compile", start_date=datetime(2022, 1, 1)) as dag: + _ = DbtTaskGroup( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + ) + + assert total_cosmos_tasks(dag) == 13 + assert uses_cosmos(dag) + + +def test_total_cosmos_tasks_is_one(): + + with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: + run_operator = DbtRunLocalOperator( + profile_config=profile_config, + project_dir=DBT_ROOT_PATH / "jaffle_shop", + task_id="run", + install_deps=True, + append_env=True, + ) + run_operator + + assert not is_cosmos_dag(dag) + assert total_cosmos_task_groups(dag) == 0 + assert total_cosmos_tasks(dag) == 1 + assert uses_cosmos(dag) + + +def test_not_cosmos_dag(): + + with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: + pass + + assert not is_cosmos_dag(dag) + assert total_cosmos_task_groups(dag) == 0 + assert total_cosmos_tasks(dag) == 0 + assert not uses_cosmos(dag) diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index d2ff24984..be9f5cbfa 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -63,7 +63,7 @@ def test_emit_usage_metrics_fails(mock_httpx_get, caplog): is_success = telemetry.emit_usage_metrics(sample_metrics) mock_httpx_get.assert_called_once_with( f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/True/0/3/3""", - timeout=5.0, + timeout=1.0, follow_redirects=True, ) assert not is_success From 8919ef0d3b4ed96b614eaea3cd65a1c2944e75d5 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 19 Dec 2024 17:28:17 +0000 Subject: [PATCH 23/46] Fix unit test --- tests/listeners/test_dag_run_listener.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index 173df76a3..1729d592a 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -1,6 +1,7 @@ from datetime import datetime from pathlib import Path +import pytest from airflow.models import DAG from cosmos import DbtRunLocalOperator, ProfileConfig, ProjectConfig @@ -23,6 +24,7 @@ ) +@pytest.mark.integration def test_is_cosmos_dag_is_true(): dag = DbtDag( project_config=ProjectConfig( @@ -38,6 +40,7 @@ def test_is_cosmos_dag_is_true(): assert uses_cosmos(dag) +@pytest.mark.integration def test_total_cosmos_task_groups(): with DAG("test-id-dbt-compile", start_date=datetime(2022, 1, 1)) as dag: _ = DbtTaskGroup( @@ -52,6 +55,7 @@ def test_total_cosmos_task_groups(): assert uses_cosmos(dag) +@pytest.mark.integration def test_total_cosmos_tasks_in_task_group(): with DAG("test-id-dbt-compile", start_date=datetime(2022, 1, 1)) as dag: _ = DbtTaskGroup( From aaa19dae1eb7f592ff3e43cd5bb75ef2e27db748 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 05:44:56 +0000 Subject: [PATCH 24/46] Change log levels to analyse behaviour in Astro CLI --- cosmos/listeners/dag_run_listener.py | 4 ---- cosmos/telemetry.py | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 7af936b04..62328de3c 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -1,7 +1,5 @@ from __future__ import annotations -import functools - from airflow.listeners import hookimpl from airflow.models.dag import DAG from airflow.models.dagrun import DagRun @@ -27,7 +25,6 @@ def is_cosmos_dag(dag: DAG) -> bool: def total_cosmos_task_groups(dag: DAG) -> int: - cosmos_task_groups = 0 for group_id, task_group in dag.task_group_dict.items(): if task_group.__class__.__module__.startswith("cosmos."): @@ -45,7 +42,6 @@ def total_cosmos_tasks(dag: DAG) -> int: return cosmos_tasks -@functools.lru_cache() def uses_cosmos(dag: DAG) -> bool: return bool(is_cosmos_dag(dag) or total_cosmos_task_groups(dag) or total_cosmos_tasks(dag)) diff --git a/cosmos/telemetry.py b/cosmos/telemetry.py index 4b5dd4abc..6ae452023 100644 --- a/cosmos/telemetry.py +++ b/cosmos/telemetry.py @@ -44,7 +44,7 @@ def emit_usage_metrics(metrics: dict[str, object]) -> bool: telemetry_url = constants.TELEMETRY_URL.format( **metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string ) - logging.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics) + logging.info("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics) response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True) if not response.is_success: logging.warning( @@ -71,5 +71,5 @@ def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, is_success = emit_usage_metrics(metrics) return is_success else: - logging.debug("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.") + logging.info("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.") return False From fc7b502b8733f502834ea5599751d97ceb6975d4 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 05:48:25 +0000 Subject: [PATCH 25/46] Change logger --- cosmos/telemetry.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cosmos/telemetry.py b/cosmos/telemetry.py index 6ae452023..95baab2ed 100644 --- a/cosmos/telemetry.py +++ b/cosmos/telemetry.py @@ -1,6 +1,5 @@ from __future__ import annotations -import logging import platform from urllib import parse from urllib.parse import urlencode @@ -10,6 +9,9 @@ import cosmos from cosmos import constants, settings +from cosmos.log import get_logger + +logger = get_logger(__name__) def should_emit() -> bool: @@ -44,10 +46,10 @@ def emit_usage_metrics(metrics: dict[str, object]) -> bool: telemetry_url = constants.TELEMETRY_URL.format( **metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string ) - logging.info("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics) + logger.info("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics) response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True) if not response.is_success: - logging.warning( + logger.warning( "Unable to emit usage metrics to %s. Status code: %s. Message: %s", telemetry_url, response.status_code, @@ -71,5 +73,5 @@ def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, is_success = emit_usage_metrics(metrics) return is_success else: - logging.info("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.") + logger.info("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.") return False From a1d63eedf4e8f56c7e308007857372d98860b80a Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 05:52:28 +0000 Subject: [PATCH 26/46] Add more logs --- cosmos/listeners/dag_run_listener.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 62328de3c..752f1a0f4 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -19,6 +19,7 @@ class EventStatus: def is_cosmos_dag(dag: DAG) -> bool: + logger.info("is_cosmos_dag: {dag.__class__.__module__}") if dag.__class__.__module__.startswith("cosmos."): return True return False @@ -27,6 +28,7 @@ def is_cosmos_dag(dag: DAG) -> bool: def total_cosmos_task_groups(dag: DAG) -> int: cosmos_task_groups = 0 for group_id, task_group in dag.task_group_dict.items(): + logger.info("total_cosmos_task_groups: {task_group.__class__}") if task_group.__class__.__module__.startswith("cosmos."): cosmos_task_groups += 1 return cosmos_task_groups @@ -35,9 +37,8 @@ def total_cosmos_task_groups(dag: DAG) -> int: def total_cosmos_tasks(dag: DAG) -> int: cosmos_tasks = 0 for task in dag.tasks: - task_class = type(task) - task_module = task_class.__module__ - if task_module.startswith("cosmos."): + logger.info("total_cosmos_task_groups: {task.__class__.__module__}") + if task.__class__.__module__.startswith("cosmos."): cosmos_tasks += 1 return cosmos_tasks From 7ef47f516837d8e68a36a81442ffdd502d1c40af Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 05:55:18 +0000 Subject: [PATCH 27/46] improve logs --- cosmos/listeners/dag_run_listener.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 752f1a0f4..e10ff1305 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -19,7 +19,7 @@ class EventStatus: def is_cosmos_dag(dag: DAG) -> bool: - logger.info("is_cosmos_dag: {dag.__class__.__module__}") + logger.info(f"is_cosmos_dag: {dag.__class__.__module__}") if dag.__class__.__module__.startswith("cosmos."): return True return False @@ -28,7 +28,7 @@ def is_cosmos_dag(dag: DAG) -> bool: def total_cosmos_task_groups(dag: DAG) -> int: cosmos_task_groups = 0 for group_id, task_group in dag.task_group_dict.items(): - logger.info("total_cosmos_task_groups: {task_group.__class__}") + logger.info(f"total_cosmos_task_groups: {task_group.__class__.__module__}") if task_group.__class__.__module__.startswith("cosmos."): cosmos_task_groups += 1 return cosmos_task_groups @@ -37,7 +37,7 @@ def total_cosmos_task_groups(dag: DAG) -> int: def total_cosmos_tasks(dag: DAG) -> int: cosmos_tasks = 0 for task in dag.tasks: - logger.info("total_cosmos_task_groups: {task.__class__.__module__}") + logger.info(f"total_cosmos_task_groups: {task.__class__.__module__}") if task.__class__.__module__.startswith("cosmos."): cosmos_tasks += 1 return cosmos_tasks From 2f3ddb71eccc8468775ad11a486a35e60c6352d8 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 06:06:25 +0000 Subject: [PATCH 28/46] Log more info --- cosmos/listeners/dag_run_listener.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index e10ff1305..a270057f9 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -19,7 +19,7 @@ class EventStatus: def is_cosmos_dag(dag: DAG) -> bool: - logger.info(f"is_cosmos_dag: {dag.__class__.__module__}") + logger.info(f"is_cosmos_dag: {dag.__class__}") if dag.__class__.__module__.startswith("cosmos."): return True return False @@ -28,7 +28,7 @@ def is_cosmos_dag(dag: DAG) -> bool: def total_cosmos_task_groups(dag: DAG) -> int: cosmos_task_groups = 0 for group_id, task_group in dag.task_group_dict.items(): - logger.info(f"total_cosmos_task_groups: {task_group.__class__.__module__}") + logger.info(f"total_cosmos_task_groups: {task_group.__class__}") if task_group.__class__.__module__.startswith("cosmos."): cosmos_task_groups += 1 return cosmos_task_groups @@ -37,7 +37,7 @@ def total_cosmos_task_groups(dag: DAG) -> int: def total_cosmos_tasks(dag: DAG) -> int: cosmos_tasks = 0 for task in dag.tasks: - logger.info(f"total_cosmos_task_groups: {task.__class__.__module__}") + logger.info(f"total_cosmos_task_groups: {task.__class__}") if task.__class__.__module__.startswith("cosmos."): cosmos_tasks += 1 return cosmos_tasks @@ -52,6 +52,8 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: logger.info("The on_dag_run_success was called") dag = dag_run.get_dag() + dag_run.dag_id + if not uses_cosmos(dag): logger.info("The DAG does not use Cosmos") return From 940a1a202cffdc2ca1891c07bac4926d458e95fe Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 06:07:18 +0000 Subject: [PATCH 29/46] add more logs --- cosmos/listeners/dag_run_listener.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index a270057f9..15e2f2e0b 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -52,7 +52,7 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: logger.info("The on_dag_run_success was called") dag = dag_run.get_dag() - dag_run.dag_id + logger.info(f"dir: {dir(dag)}") if not uses_cosmos(dag): logger.info("The DAG does not use Cosmos") From 7661ab380cec1c0a2c3c6cbe781cfa6c3babaec8 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 06:18:25 +0000 Subject: [PATCH 30/46] Add logs --- cosmos/listeners/dag_run_listener.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 15e2f2e0b..f8020e513 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -53,6 +53,12 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: dag = dag_run.get_dag() logger.info(f"dir: {dir(dag)}") + logger.info(f"1: {dag.fileloc}") + logger.info(f"2:{dag.filepath}") + logger.info(f"3: {dag.task_dict}") + logger.info(f"4: {dag.task_group_dict}") + logger.info(f"5: {dag.serialize_to_json()}") + logger.info(f"6: {dag.deserialize_dag()}") if not uses_cosmos(dag): logger.info("The DAG does not use Cosmos") From 7ae61523cd28a3d96aeb9fec9fae2ec56c5803ae Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 06:28:20 +0000 Subject: [PATCH 31/46] Force to fetch the actual dag, add more debug logs --- cosmos/listeners/dag_run_listener.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index f8020e513..762c3dbf3 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -52,16 +52,24 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: logger.info("The on_dag_run_success was called") dag = dag_run.get_dag() + logger.info(f"dir: {dir(dag)}") logger.info(f"1: {dag.fileloc}") logger.info(f"2:{dag.filepath}") logger.info(f"3: {dag.task_dict}") logger.info(f"4: {dag.task_group_dict}") - logger.info(f"5: {dag.serialize_to_json()}") - logger.info(f"6: {dag.deserialize_dag()}") + logger.info(f"5: {dag.get_serialized_fields()}") + logger.info(f"6: {dag.to_dict()}") + logger.info(f"7: {dag.deserialize()}") + + if not isinstance(dag, DAG): + from airflow.models import DagBag - if not uses_cosmos(dag): logger.info("The DAG does not use Cosmos") + dag_bag = DagBag(dag_folder=dag.fileloc, include_examples=False) + dag = dag_bag.get_dag(dag_run.dag_id) + + if not uses_cosmos(dag): return additional_telemetry_metrics = { From 5caf6a869d0c8207e7520cf012b06497ee952fff Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 06:33:44 +0000 Subject: [PATCH 32/46] Force to fetch the actual dag, add more debug logs --- cosmos/listeners/dag_run_listener.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 762c3dbf3..b382b8210 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -51,18 +51,17 @@ def uses_cosmos(dag: DAG) -> bool: def on_dag_run_success(dag_run: DagRun, msg: str) -> None: logger.info("The on_dag_run_success was called") - dag = dag_run.get_dag() + # The following is an airflow.serialization.serialized_objects.SerializedDAG instance + serialized_dag = dag_run.get_dag() + serialized_dag - logger.info(f"dir: {dir(dag)}") - logger.info(f"1: {dag.fileloc}") - logger.info(f"2:{dag.filepath}") - logger.info(f"3: {dag.task_dict}") - logger.info(f"4: {dag.task_group_dict}") - logger.info(f"5: {dag.get_serialized_fields()}") - logger.info(f"6: {dag.to_dict()}") - logger.info(f"7: {dag.deserialize()}") + logger.info(f"dir: {dir(serialized_dag)}") + logger.info(f"1: {serialized_dag.fileloc}") + logger.info(f"2:{serialized_dag.filepath}") + logger.info(f"3: {serialized_dag.task_dict}") + logger.info(f"4: {serialized_dag.task_group_dict}") - if not isinstance(dag, DAG): + if not isinstance(serialized_dag, DAG): from airflow.models import DagBag logger.info("The DAG does not use Cosmos") @@ -70,8 +69,11 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: dag = dag_bag.get_dag(dag_run.dag_id) if not uses_cosmos(dag): + logger.info(f"5: {serialized_dag.deserialize()}") return + logger.info(f"5: {serialized_dag.deserialize()}") + additional_telemetry_metrics = { "dag_hash": dag_run.dag_hash, "status": EventStatus.SUCCESS, From 430aec3ee14c5364d416f97da53bf248d836c787 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 06:37:47 +0000 Subject: [PATCH 33/46] Force to fetch the actual dag, add more debug logs --- cosmos/listeners/dag_run_listener.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index b382b8210..e21897687 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -61,14 +61,13 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: logger.info(f"3: {serialized_dag.task_dict}") logger.info(f"4: {serialized_dag.task_group_dict}") - if not isinstance(serialized_dag, DAG): - from airflow.models import DagBag + from airflow.models import DagBag - logger.info("The DAG does not use Cosmos") - dag_bag = DagBag(dag_folder=dag.fileloc, include_examples=False) - dag = dag_bag.get_dag(dag_run.dag_id) + dag_bag = DagBag(dag_folder=dag.fileloc, include_examples=False) + dag = dag_bag.get_dag(dag_run.dag_id) if not uses_cosmos(dag): + logger.info("The DAG does not use Cosmos") logger.info(f"5: {serialized_dag.deserialize()}") return From bc98126ec0a4b7b31f14d577c46398807434aff0 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 06:44:46 +0000 Subject: [PATCH 34/46] Force to fetch the actual dag, add more debug logs --- cosmos/listeners/dag_run_listener.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index e21897687..ec798f2bc 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -53,7 +53,6 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: # The following is an airflow.serialization.serialized_objects.SerializedDAG instance serialized_dag = dag_run.get_dag() - serialized_dag logger.info(f"dir: {dir(serialized_dag)}") logger.info(f"1: {serialized_dag.fileloc}") @@ -63,7 +62,7 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: from airflow.models import DagBag - dag_bag = DagBag(dag_folder=dag.fileloc, include_examples=False) + dag_bag = DagBag(dag_folder=serialized_dag.fileloc, include_examples=False) dag = dag_bag.get_dag(dag_run.dag_id) if not uses_cosmos(dag): From 41a92935883bb830dca8415870a8d415866d6449 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 08:13:41 +0000 Subject: [PATCH 35/46] debugging --- cosmos/listeners/dag_run_listener.py | 20 ++++++++++---------- tests/perf/test_performance.py | 5 +++-- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index ec798f2bc..1712e4250 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -49,12 +49,14 @@ def uses_cosmos(dag: DAG) -> bool: @hookimpl def on_dag_run_success(dag_run: DagRun, msg: str) -> None: - logger.info("The on_dag_run_success was called") + logger.info(f"dir: {dir(dag_run.dag)}") + logger.info("The on_dag_run_success was called") # The following is an airflow.serialization.serialized_objects.SerializedDAG instance serialized_dag = dag_run.get_dag() + logger.info(f" are they equal? {serialized_dag == dag_run.dag}") - logger.info(f"dir: {dir(serialized_dag)}") + logger.info(f"__dict__: {serialized_dag.__dict__}") logger.info(f"1: {serialized_dag.fileloc}") logger.info(f"2:{serialized_dag.filepath}") logger.info(f"3: {serialized_dag.task_dict}") @@ -64,21 +66,19 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: dag_bag = DagBag(dag_folder=serialized_dag.fileloc, include_examples=False) dag = dag_bag.get_dag(dag_run.dag_id) + logger.info(f"dag: {dag}") - if not uses_cosmos(dag): + if not uses_cosmos(serialized_dag): logger.info("The DAG does not use Cosmos") - logger.info(f"5: {serialized_dag.deserialize()}") return - logger.info(f"5: {serialized_dag.deserialize()}") - additional_telemetry_metrics = { "dag_hash": dag_run.dag_hash, "status": EventStatus.SUCCESS, - "task_count": len(dag.task_ids), - "cosmos_task_count": total_cosmos_tasks(dag), - "cosmos_task_groups_count": total_cosmos_task_groups(dag), - "is_cosmos_dag": is_cosmos_dag(dag), + "task_count": len(serialized_dag.task_ids), + "cosmos_task_count": total_cosmos_tasks(serialized_dag), + "cosmos_task_groups_count": total_cosmos_task_groups(serialized_dag), + "is_cosmos_dag": is_cosmos_dag(serialized_dag), } telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) diff --git a/tests/perf/test_performance.py b/tests/perf/test_performance.py index 995c33a74..13b70be1a 100644 --- a/tests/perf/test_performance.py +++ b/tests/perf/test_performance.py @@ -82,9 +82,10 @@ def generate_project( yield project_path finally: + pass # clean up the models in the project_path / models directory - for model in models_dir.iterdir(): - model.unlink() + # for model in models_dir.iterdir(): + # model.unlink() @pytest.mark.perf From 47fb46c56277e111cfbee246e119608f456a77f4 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 08:22:16 +0000 Subject: [PATCH 36/46] Attempt to work around RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS --- cosmos/listeners/dag_run_listener.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 1712e4250..2b0b60698 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -47,8 +47,12 @@ def uses_cosmos(dag: DAG) -> bool: return bool(is_cosmos_dag(dag) or total_cosmos_task_groups(dag) or total_cosmos_tasks(dag)) +from airflow.utils.session import provide_session + + +@provide_session @hookimpl -def on_dag_run_success(dag_run: DagRun, msg: str) -> None: +def on_dag_run_success(dag_run: DagRun, msg: str, session=None) -> None: logger.info(f"dir: {dir(dag_run.dag)}") logger.info("The on_dag_run_success was called") From 6dac09a63709e83e5d46efe804e7d57963f7bda3 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 08:36:20 +0000 Subject: [PATCH 37/46] Remove DagBag from listener --- cosmos/listeners/dag_run_listener.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 2b0b60698..ddac1a540 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -47,12 +47,9 @@ def uses_cosmos(dag: DAG) -> bool: return bool(is_cosmos_dag(dag) or total_cosmos_task_groups(dag) or total_cosmos_tasks(dag)) -from airflow.utils.session import provide_session - - -@provide_session +# @provide_session @hookimpl -def on_dag_run_success(dag_run: DagRun, msg: str, session=None) -> None: +def on_dag_run_success(dag_run: DagRun, msg: str) -> None: logger.info(f"dir: {dir(dag_run.dag)}") logger.info("The on_dag_run_success was called") @@ -66,11 +63,11 @@ def on_dag_run_success(dag_run: DagRun, msg: str, session=None) -> None: logger.info(f"3: {serialized_dag.task_dict}") logger.info(f"4: {serialized_dag.task_group_dict}") - from airflow.models import DagBag + # from airflow.models import DagBag - dag_bag = DagBag(dag_folder=serialized_dag.fileloc, include_examples=False) - dag = dag_bag.get_dag(dag_run.dag_id) - logger.info(f"dag: {dag}") + # dag_bag = DagBag(dag_folder=serialized_dag.fileloc, include_examples=False) + # dag = dag_bag.get_dag(dag_run.dag_id) + # logger.info(f"dag: {dag}") if not uses_cosmos(serialized_dag): logger.info("The DAG does not use Cosmos") From f06f86a3d937e509a6d9c4329d5684f44db7158d Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 08:50:33 +0000 Subject: [PATCH 38/46] Change logic to iterate over dag tasks --- cosmos/listeners/dag_run_listener.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index ddac1a540..7f9bc5a6f 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -36,8 +36,9 @@ def total_cosmos_task_groups(dag: DAG) -> int: def total_cosmos_tasks(dag: DAG) -> int: cosmos_tasks = 0 - for task in dag.tasks: + for task in dag.task_dict.values(): logger.info(f"total_cosmos_task_groups: {task.__class__}") + logger.info(f"type: {type(task)}") if task.__class__.__module__.startswith("cosmos."): cosmos_tasks += 1 return cosmos_tasks From 5b4fda230a83a598350db68f20658fecc4d0fb14 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 08:55:36 +0000 Subject: [PATCH 39/46] Change logic to iterate over dag tasks --- cosmos/listeners/dag_run_listener.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 7f9bc5a6f..9addb6622 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -39,6 +39,7 @@ def total_cosmos_tasks(dag: DAG) -> int: for task in dag.task_dict.values(): logger.info(f"total_cosmos_task_groups: {task.__class__}") logger.info(f"type: {type(task)}") + logger.info(f"__dict__: {task.__dict__}") if task.__class__.__module__.startswith("cosmos."): cosmos_tasks += 1 return cosmos_tasks From 93da409f12b9576eb65cfc55cc583d6650a639bb Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 09:07:29 +0000 Subject: [PATCH 40/46] Improve identifying COsmos task class --- cosmos/listeners/dag_run_listener.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 9addb6622..951b77a61 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -40,7 +40,9 @@ def total_cosmos_tasks(dag: DAG) -> int: logger.info(f"total_cosmos_task_groups: {task.__class__}") logger.info(f"type: {type(task)}") logger.info(f"__dict__: {task.__dict__}") - if task.__class__.__module__.startswith("cosmos."): + logger.info(f"dir: {dir(task)}") + task_module = getattr(task, "_task_module", None) or task.__class__.__module__ + if task_module.startswith("cosmos."): cosmos_tasks += 1 return cosmos_tasks From ac937148bc4f40546697c94034362f38c699cc0d Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 10:29:08 +0000 Subject: [PATCH 41/46] Clean up working telemetry --- cosmos/constants.py | 2 +- cosmos/listeners/dag_run_listener.py | 81 ++++++++--------------- cosmos/telemetry.py | 4 +- tests/listeners/test_dag_run_listener.py | 83 ++++++++++++++++-------- tests/test_telemetry.py | 8 +-- 5 files changed, 88 insertions(+), 90 deletions(-) diff --git a/cosmos/constants.py b/cosmos/constants.py index 0d47e82cc..0513d50d2 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -161,6 +161,6 @@ def _missing_value_(cls, value): # type: ignore DBT_COMPILE_TASK_ID = "dbt_compile" -TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{is_cosmos_dag}/{cosmos_task_groups_count}/{task_count}/{cosmos_task_count}" +TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{task_count}/{cosmos_task_count}" TELEMETRY_VERSION = "v1" TELEMETRY_TIMEOUT = 1.0 diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 951b77a61..0314c3474 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -18,63 +18,36 @@ class EventStatus: DAG_RUN = "dag_run" -def is_cosmos_dag(dag: DAG) -> bool: - logger.info(f"is_cosmos_dag: {dag.__class__}") - if dag.__class__.__module__.startswith("cosmos."): - return True - return False - - -def total_cosmos_task_groups(dag: DAG) -> int: - cosmos_task_groups = 0 - for group_id, task_group in dag.task_group_dict.items(): - logger.info(f"total_cosmos_task_groups: {task_group.__class__}") - if task_group.__class__.__module__.startswith("cosmos."): - cosmos_task_groups += 1 - return cosmos_task_groups - - def total_cosmos_tasks(dag: DAG) -> int: + """ + Identify if there are any Cosmos DAGs on a given serialized `airflow.serialization.serialized_objects.SerializedDAG`. + + The approach is naive, from the perspective it does not take into account subclasses, but it is inexpensive and + works. + """ cosmos_tasks = 0 for task in dag.task_dict.values(): - logger.info(f"total_cosmos_task_groups: {task.__class__}") - logger.info(f"type: {type(task)}") - logger.info(f"__dict__: {task.__dict__}") - logger.info(f"dir: {dir(task)}") + # In a real Airflow deployment, the following `task` is an instance of + # `airflow.serialization.serialized_objects.SerializedBaseOperator` + # and the only reference to Cosmos is in the _task_module. + # It is suboptimal, but works as of Airflow 2.10 task_module = getattr(task, "_task_module", None) or task.__class__.__module__ if task_module.startswith("cosmos."): cosmos_tasks += 1 return cosmos_tasks -def uses_cosmos(dag: DAG) -> bool: - return bool(is_cosmos_dag(dag) or total_cosmos_task_groups(dag) or total_cosmos_tasks(dag)) - - # @provide_session @hookimpl def on_dag_run_success(dag_run: DagRun, msg: str) -> None: - - logger.info(f"dir: {dir(dag_run.dag)}") - logger.info("The on_dag_run_success was called") - # The following is an airflow.serialization.serialized_objects.SerializedDAG instance + logger.debug("Running on_dag_run_success") + # In a real Airflow deployment, the following `serialized_dag` is an instance of + # `airflow.serialization.serialized_objects.SerializedDAG` + # and it is not a subclass of DbtDag, nor contain any references to Cosmos serialized_dag = dag_run.get_dag() - logger.info(f" are they equal? {serialized_dag == dag_run.dag}") - - logger.info(f"__dict__: {serialized_dag.__dict__}") - logger.info(f"1: {serialized_dag.fileloc}") - logger.info(f"2:{serialized_dag.filepath}") - logger.info(f"3: {serialized_dag.task_dict}") - logger.info(f"4: {serialized_dag.task_group_dict}") - - # from airflow.models import DagBag - - # dag_bag = DagBag(dag_folder=serialized_dag.fileloc, include_examples=False) - # dag = dag_bag.get_dag(dag_run.dag_id) - # logger.info(f"dag: {dag}") - if not uses_cosmos(serialized_dag): - logger.info("The DAG does not use Cosmos") + if not total_cosmos_tasks(serialized_dag): + logger.debug("The DAG does not use Cosmos") return additional_telemetry_metrics = { @@ -82,30 +55,30 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: "status": EventStatus.SUCCESS, "task_count": len(serialized_dag.task_ids), "cosmos_task_count": total_cosmos_tasks(serialized_dag), - "cosmos_task_groups_count": total_cosmos_task_groups(serialized_dag), - "is_cosmos_dag": is_cosmos_dag(serialized_dag), } telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) - logger.info("Completed on_dag_run_success") + logger.debug("Completed on_dag_run_success") @hookimpl def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: - logger.info("The on_dag_run_failed was called") + logger.debug("Running on_dag_run_failed") + # In a real Airflow deployment, the following `serialized_dag` is an instance of + # `airflow.serialization.serialized_objects.SerializedDAG` + # and it is not a subclass of DbtDag, nor contain any references to Cosmos + serialized_dag = dag_run.get_dag() - dag = dag_run.get_dag() - if not uses_cosmos(dag): + if not total_cosmos_tasks(serialized_dag): + logger.debug("The DAG does not use Cosmos") return additional_telemetry_metrics = { "dag_hash": dag_run.dag_hash, "status": EventStatus.FAILED, - "task_count": len(dag.task_ids), - "cosmos_task_count": total_cosmos_tasks(dag), - "cosmos_task_groups_count": total_cosmos_task_groups(dag), - "is_cosmos_dag": is_cosmos_dag(dag), + "task_count": len(serialized_dag.task_ids), + "cosmos_task_count": total_cosmos_tasks(serialized_dag), } telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) - logger.info("Completed on_dag_run_failed") + logger.debug("Completed on_dag_run_failed") diff --git a/cosmos/telemetry.py b/cosmos/telemetry.py index 95baab2ed..0e267b28b 100644 --- a/cosmos/telemetry.py +++ b/cosmos/telemetry.py @@ -46,7 +46,7 @@ def emit_usage_metrics(metrics: dict[str, object]) -> bool: telemetry_url = constants.TELEMETRY_URL.format( **metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string ) - logger.info("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics) + logger.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics) response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True) if not response.is_success: logger.warning( @@ -73,5 +73,5 @@ def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, is_success = emit_usage_metrics(metrics) return is_success else: - logger.info("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.") + logger.debug("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.") return False diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index 1729d592a..a547f20ad 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -1,13 +1,17 @@ +import logging +import uuid from datetime import datetime from pathlib import Path +from unittest.mock import patch import pytest from airflow.models import DAG +from airflow.utils.state import State from cosmos import DbtRunLocalOperator, ProfileConfig, ProjectConfig from cosmos.airflow.dag import DbtDag from cosmos.airflow.task_group import DbtTaskGroup -from cosmos.listeners.dag_run_listener import is_cosmos_dag, total_cosmos_task_groups, total_cosmos_tasks, uses_cosmos +from cosmos.listeners.dag_run_listener import on_dag_run_failed, on_dag_run_success, total_cosmos_tasks from cosmos.profiles import PostgresUserPasswordProfileMapping DBT_ROOT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt" @@ -34,25 +38,7 @@ def test_is_cosmos_dag_is_true(): start_date=datetime(2023, 1, 1), dag_id="basic_cosmos_dag", ) - - assert is_cosmos_dag(dag) - assert total_cosmos_task_groups(dag) == 0 - assert uses_cosmos(dag) - - -@pytest.mark.integration -def test_total_cosmos_task_groups(): - with DAG("test-id-dbt-compile", start_date=datetime(2022, 1, 1)) as dag: - _ = DbtTaskGroup( - project_config=ProjectConfig( - DBT_ROOT_PATH / "jaffle_shop", - ), - profile_config=profile_config, - ) - - assert not is_cosmos_dag(dag) - assert total_cosmos_task_groups(dag) == 1 - assert uses_cosmos(dag) + assert total_cosmos_tasks(dag) == 13 @pytest.mark.integration @@ -66,7 +52,6 @@ def test_total_cosmos_tasks_in_task_group(): ) assert total_cosmos_tasks(dag) == 13 - assert uses_cosmos(dag) def test_total_cosmos_tasks_is_one(): @@ -81,10 +66,7 @@ def test_total_cosmos_tasks_is_one(): ) run_operator - assert not is_cosmos_dag(dag) - assert total_cosmos_task_groups(dag) == 0 assert total_cosmos_tasks(dag) == 1 - assert uses_cosmos(dag) def test_not_cosmos_dag(): @@ -92,7 +74,54 @@ def test_not_cosmos_dag(): with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: pass - assert not is_cosmos_dag(dag) - assert total_cosmos_task_groups(dag) == 0 assert total_cosmos_tasks(dag) == 0 - assert not uses_cosmos(dag) + + +@pytest.mark.integration +@patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_dag_run_success(mock_emit_usage_metrics_if_enabled, caplog): + caplog.set_level(logging.DEBUG) + + dag = DbtDag( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + start_date=datetime(2023, 1, 1), + dag_id="basic_cosmos_dag", + ) + run_id = str(uuid.uuid1()) + dag_run = dag.create_dagrun( + state=State.NONE, + run_id=run_id, + ) + + on_dag_run_success(dag_run, msg="test success") + assert "Running on_dag_run_success" in caplog.text + assert "Completed on_dag_run_success" in caplog.text + assert mock_emit_usage_metrics_if_enabled.call_count == 1 + + +@pytest.mark.integration +@patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_dag_run_failed(mock_emit_usage_metrics_if_enabled, caplog): + caplog.set_level(logging.DEBUG) + + dag = DbtDag( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + start_date=datetime(2023, 1, 1), + dag_id="basic_cosmos_dag", + ) + run_id = str(uuid.uuid1()) + dag_run = dag.create_dagrun( + state=State.FAILED, + run_id=run_id, + ) + + on_dag_run_failed(dag_run, msg="test failed") + assert "Running on_dag_run_failed" in caplog.text + assert "Completed on_dag_run_failed" in caplog.text + assert mock_emit_usage_metrics_if_enabled.call_count == 1 diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index be9f5cbfa..b11caabe1 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -55,19 +55,17 @@ def test_emit_usage_metrics_fails(mock_httpx_get, caplog): "event_type": "dag_run", "status": "success", "dag_hash": "d151d1fa2f03270ea116cc7494f2c591", - "is_cosmos_dag": True, - "cosmos_task_groups_count": 0, "task_count": 3, "cosmos_task_count": 3, } is_success = telemetry.emit_usage_metrics(sample_metrics) mock_httpx_get.assert_called_once_with( - f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/True/0/3/3""", + f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3/3""", timeout=1.0, follow_redirects=True, ) assert not is_success - log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/True/0/3/3. Status code: 404. Message: Non existent URL""" + log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3/3. Status code: 404. Message: Non existent URL""" assert caplog.text.startswith("WARNING") assert log_msg in caplog.text @@ -84,8 +82,6 @@ def test_emit_usage_metrics_succeeds(caplog): "event_type": "dag_run", "status": "success", "dag_hash": "dag-hash-ci", - "is_cosmos_dag": False, - "cosmos_task_groups_count": 1, "task_count": 33, "cosmos_task_count": 33, } From 07752ac15c30cab250a97bbd1b81458ef9afcd67 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 10:30:47 +0000 Subject: [PATCH 42/46] Update privacy notice --- PRIVACY_NOTICE.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/PRIVACY_NOTICE.rst b/PRIVACY_NOTICE.rst index 49cdbaa8a..5dea03268 100644 --- a/PRIVACY_NOTICE.rst +++ b/PRIVACY_NOTICE.rst @@ -34,9 +34,9 @@ In addition to Scarf's default data collection, DAG Factory collects the followi - Python version - Operating system & machine architecture - Event type -- DAG that uses Cosmos hash -- Total tasks in DAGs that use Cosmos -- Total Cosmos tasks -- Total Cosmos task groups +- For DAGs that use Cosmos, the following information is also emitted: + - the DAG hash + - Total tasks + - Total Cosmos tasks No user-identifiable information (IP included) is stored in Scarf. From 7dc9f96d571b09af1a1bb66e1fbcadd675443e75 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 10:50:22 +0000 Subject: [PATCH 43/46] Fix privacy notice --- PRIVACY_NOTICE.rst | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/PRIVACY_NOTICE.rst b/PRIVACY_NOTICE.rst index 5dea03268..7477ee795 100644 --- a/PRIVACY_NOTICE.rst +++ b/PRIVACY_NOTICE.rst @@ -27,16 +27,15 @@ As described in the `official documentation Date: Fri, 20 Dec 2024 10:54:55 +0000 Subject: [PATCH 44/46] Update cosmos/__init__.py --- cosmos/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 9feb20b7f..884a90659 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -6,7 +6,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.8.0a7" +__version__ = "1.7.1" from cosmos.airflow.dag import DbtDag From 574c3bae706ddab980d71cd7675db160b213a6db Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 10:55:39 +0000 Subject: [PATCH 45/46] Update test_performance.py --- tests/perf/test_performance.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/perf/test_performance.py b/tests/perf/test_performance.py index 13b70be1a..7194d0c92 100644 --- a/tests/perf/test_performance.py +++ b/tests/perf/test_performance.py @@ -82,10 +82,9 @@ def generate_project( yield project_path finally: - pass # clean up the models in the project_path / models directory - # for model in models_dir.iterdir(): - # model.unlink() + for model in models_dir.iterdir(): + model.unlink() @pytest.mark.perf From c57f884df39e68c776651126e23fcf57f91d5e0d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 20 Dec 2024 10:55:52 +0000 Subject: [PATCH 46/46] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/perf/test_performance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/perf/test_performance.py b/tests/perf/test_performance.py index 7194d0c92..995c33a74 100644 --- a/tests/perf/test_performance.py +++ b/tests/perf/test_performance.py @@ -84,7 +84,7 @@ def generate_project( finally: # clean up the models in the project_path / models directory for model in models_dir.iterdir(): - model.unlink() + model.unlink() @pytest.mark.perf