diff --git a/cosmos/__init__.py b/cosmos/__init__.py index c98a14402..3ef523f7a 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,7 +5,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.5.1" +__version__ = "1.6.0a6" from cosmos.airflow.dag import DbtDag diff --git a/cosmos/cache.py b/cosmos/cache.py index 102186423..16ca7709d 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -17,6 +17,7 @@ from airflow.models.dag import DAG from airflow.utils.session import provide_session from airflow.utils.task_group import TaskGroup +from airflow.version import version as airflow_version from sqlalchemy import select from sqlalchemy.orm import Session @@ -25,22 +26,66 @@ DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME, DEFAULT_PROFILES_FILE_NAME, + FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP, PACKAGE_LOCKFILE_YML, ) from cosmos.dbt.project import get_partial_parse_path +from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger from cosmos.settings import ( + AIRFLOW_IO_AVAILABLE, cache_dir, dbt_profile_cache_dir_name, enable_cache, enable_cache_package_lockfile, enable_cache_profile, + remote_cache_dir_conn_id, ) +from cosmos.settings import remote_cache_dir as settings_remote_cache_dir logger = get_logger(__name__) VAR_KEY_CACHE_PREFIX = "cosmos_cache__" +def _configure_remote_cache_dir() -> Path | None: + """Configure the remote cache dir if it is provided.""" + if not settings_remote_cache_dir: + return None + + _configured_cache_dir = None + + cache_dir_str = str(settings_remote_cache_dir) + + remote_cache_conn_id = remote_cache_dir_conn_id + if not remote_cache_conn_id: + cache_dir_schema = cache_dir_str.split("://")[0] + remote_cache_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(cache_dir_schema, None) # type: ignore[assignment] + if remote_cache_conn_id is None: + return _configured_cache_dir + + if not AIRFLOW_IO_AVAILABLE: + raise CosmosValueError( + f"You're trying to specify remote cache_dir {cache_dir_str}, but the required " + f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " + "Airflow 2.8 or later." + ) + + from airflow.io.path import ObjectStoragePath + + _configured_cache_dir = ObjectStoragePath(cache_dir_str, conn_id=remote_cache_conn_id) + + if not _configured_cache_dir.exists(): # type: ignore[no-untyped-call] + # TODO: Check if we should raise an error instead in case the provided path does not exist. + _configured_cache_dir.mkdir(parents=True, exist_ok=True) + + # raise CosmosValueError( + # f"remote_cache_path `{cache_dir_str}` does not exist or is not accessible using " + # f"remote_cache_conn_id `{remote_cache_conn_id}`" + # ) + + return _configured_cache_dir + + def _get_airflow_metadata(dag: DAG, task_group: TaskGroup | None) -> dict[str, str | None]: dag_id = None task_group_id = None @@ -366,6 +411,64 @@ def delete_unused_dbt_ls_cache( return deleted_cosmos_variables +# TODO: Add integration tests once remote cache is supported in the CI pipeline +@provide_session +def delete_unused_dbt_ls_remote_cache_files( # pragma: no cover + max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None +) -> int: + """ + Delete Cosmos cache stored in remote storage based on the last execution of their associated DAGs. + """ + if session is None: + return 0 + + logger.info(f"Delete the Cosmos cache stored remotely that hasn't been used for {max_age_last_usage}") + cosmos_dags_ids_remote_cache_files = defaultdict(list) + + configured_remote_cache_dir = _configure_remote_cache_dir() + if not configured_remote_cache_dir: + logger.info( + "No remote cache directory configured. Skipping the deletion of the dbt ls cache files in remote storage." + ) + return 0 + + dirs = [obj for obj in configured_remote_cache_dir.iterdir() if obj.is_dir()] + files = [f for label in dirs for f in label.iterdir() if f.is_file()] + + total_cosmos_remote_cache_files = 0 + for file in files: + prefix_path = (configured_remote_cache_dir / VAR_KEY_CACHE_PREFIX).as_uri() + if file.as_uri().startswith(prefix_path): + with file.open("r") as fp: + cache_dict = json.load(fp) + cosmos_dags_ids_remote_cache_files[cache_dict["dag_id"]].append(file) + total_cosmos_remote_cache_files += 1 + + deleted_cosmos_remote_cache_files = 0 + + for dag_id, files in cosmos_dags_ids_remote_cache_files.items(): + last_dag_run = ( + session.query(DagRun) + .filter( + DagRun.dag_id == dag_id, + ) + .order_by(DagRun.execution_date.desc()) + .first() + ) + if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage): + for file in files: + logger.info(f"Removing the dbt ls cache remote file {file}") + file.unlink() + deleted_cosmos_remote_cache_files += 1 + logger.info( + "Deleted %s/%s dbt ls cache files in remote storage.", + deleted_cosmos_remote_cache_files, + total_cosmos_remote_cache_files, + ) + + return deleted_cosmos_remote_cache_files + + def is_profile_cache_enabled() -> bool: """Return True if global and profile cache is enable""" return enable_cache and enable_cache_profile diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index cbe2741f1..1c0237e8f 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -19,6 +19,7 @@ from cosmos import cache, settings from cosmos.cache import ( + _configure_remote_cache_dir, _copy_cached_package_lockfile_to_project, _get_latest_cached_package_lockfile, is_cache_package_lockfile_enabled, @@ -323,7 +324,22 @@ def save_dbt_ls_cache(self, dbt_ls_output: str) -> None: "last_modified": datetime.datetime.now(datetime.timezone.utc).isoformat(), **self.airflow_metadata, } - Variable.set(self.dbt_ls_cache_key, cache_dict, serialize_json=True) + remote_cache_dir = _configure_remote_cache_dir() + if remote_cache_dir: + remote_cache_key_path = remote_cache_dir / self.dbt_ls_cache_key / "dbt_ls_cache.json" + with remote_cache_key_path.open("w") as fp: + json.dump(cache_dict, fp) + else: + Variable.set(self.dbt_ls_cache_key, cache_dict, serialize_json=True) + + def _get_dbt_ls_remote_cache(self, remote_cache_dir: Path) -> dict[str, str]: + """Loads the remote cache for dbt ls.""" + cache_dict: dict[str, str] = {} + remote_cache_key_path = remote_cache_dir / self.dbt_ls_cache_key / "dbt_ls_cache.json" + if remote_cache_key_path.exists(): + with remote_cache_key_path.open("r") as fp: + cache_dict = json.load(fp) + return cache_dict def get_dbt_ls_cache(self) -> dict[str, str]: """ @@ -338,7 +354,12 @@ def get_dbt_ls_cache(self) -> dict[str, str]: """ cache_dict: dict[str, str] = {} try: - cache_dict = Variable.get(self.dbt_ls_cache_key, deserialize_json=True) + remote_cache_dir = _configure_remote_cache_dir() + cache_dict = ( + self._get_dbt_ls_remote_cache(remote_cache_dir) + if remote_cache_dir + else Variable.get(self.dbt_ls_cache_key, deserialize_json=True) + ) except (json.decoder.JSONDecodeError, KeyError): return cache_dict else: diff --git a/cosmos/settings.py b/cosmos/settings.py index c9028f7da..43abc8897 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import os import tempfile from pathlib import Path @@ -7,7 +9,10 @@ from airflow.version import version as airflow_version from packaging.version import Version -from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME, DEFAULT_OPENLINEAGE_NAMESPACE +from cosmos.constants import ( + DEFAULT_COSMOS_CACHE_DIR_NAME, + DEFAULT_OPENLINEAGE_NAMESPACE, +) # In MacOS users may want to set the envvar `TMPDIR` if they do not want the value of the temp directory to change DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME) @@ -24,6 +29,11 @@ dbt_profile_cache_dir_name = conf.get("cosmos", "profile_cache_dir_name", fallback="profile") virtualenv_max_retries_lock = conf.getint("cosmos", "virtualenv_max_retries_lock", fallback=120) +# Experimentally adding `remote_cache_dir` as a separate entity in the Cosmos 1.6 release to gather feedback. +# This will be merged with the `cache_dir` config parameter in upcoming releases. +remote_cache_dir = conf.get("cosmos", "remote_cache_dir", fallback=None) +remote_cache_dir_conn_id = conf.get("cosmos", "remote_cache_dir_conn_id", fallback=None) + try: LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") except airflow.exceptions.AirflowConfigException: diff --git a/dev/dags/example_cosmos_cleanup_dag.py b/dev/dags/example_cosmos_cleanup_dag.py index c93bdf002..1d37589a0 100644 --- a/dev/dags/example_cosmos_cleanup_dag.py +++ b/dev/dags/example_cosmos_cleanup_dag.py @@ -8,7 +8,7 @@ from airflow.decorators import dag, task -from cosmos.cache import delete_unused_dbt_ls_cache +from cosmos.cache import delete_unused_dbt_ls_cache, delete_unused_dbt_ls_remote_cache_files @dag( @@ -28,6 +28,15 @@ def clear_db_ls_cache(session=None): clear_db_ls_cache() + @task() + def clear_db_ls_remote_cache(session=None): + """ + Delete the dbt ls remote cache files that have not been used for the last five days. + """ + delete_unused_dbt_ls_remote_cache_files(max_age_last_usage=timedelta(days=5)) + + clear_db_ls_remote_cache() + # [END cache_example] diff --git a/docs/configuration/caching.rst b/docs/configuration/caching.rst index 7dba9933f..cc518f748 100644 --- a/docs/configuration/caching.rst +++ b/docs/configuration/caching.rst @@ -19,7 +19,7 @@ It is possible to turn off any cache in Cosmos by exporting the environment vari Disabling individual types of cache in Cosmos is also possible, as explained below. Caching the dbt ls output -~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~~~ (Introduced in Cosmos 1.5) @@ -29,13 +29,24 @@ also the tasks queueing time. Cosmos 1.5 introduced a feature to mitigate the performance issue associated with ``LoadMode.DBT_LS`` by caching the output of this command as an `Airflow Variable `_. -Based on an initial `analysis `_, enabling this setting reduced some DAGs ask queueing from 30s to 0s. Additionally, some users `reported improvements of 84% `_ in the DAG run time. +Based on an initial `analysis `_, enabling this setting reduced some DAGs task queueing from 30s to 0s. Additionally, some users `reported improvements of 84% `_ in the DAG run time. This feature is on by default. To turn it off, export the following environment variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0``. +(Introduced in Cosmos 1.6 - Experimental feature) + +Starting with Cosmos 1.6.0, users can also set a remote directory path to store this cache instead of using Airflow +Variables. To do so, you need to configure a remote cache directory. See :ref:`remote_cache_dir` and +:ref:`remote_cache_dir_conn_id` for more information. This is an experimental feature introduced in 1.6.0 to gather +user feedback. The ``remote_cache_dir`` will eventually be merged into the :ref:`cache_dir` setting in upcoming +releases. + **How the cache is refreshed** -Users can purge or delete the cache via Airflow UI by identifying and deleting the cache key. +If using the default Variables cache approach, users can purge or delete the cache via Airflow UI by identifying and +deleting the cache key. In case you're using the alternative approach by setting the ``remote_cache_dir`` introduced +in Cosmos 1.6.0, you can delete the cache by removing the specific files by identifying them using your configured path +in the remote store. Cosmos will refresh the cache in a few circumstances: diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index a15f3ce70..95a4adcad 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -101,6 +101,31 @@ This page lists all available Airflow configurations that affect ``astronomer-co - Default: 120 - Environment Variable: ``AIRFLOW__COSMOS__VIRTUALENV_MAX_RETRIES_LOCK`` +.. _remote_cache_dir: + +`remote_cache_dir`_: + The remote directory to store the dbt cache. Starting with Cosmos 1.6.0, you can store the `dbt ls` output as cache + in a remote location (an alternative to the Variable cache approach released previously since Cosmos 1.5.0) + using this configuration. The value for the remote cache directory can be any of the schemes that are supported by + the `Airflow Object Store `_ + feature introduced in Airflow 2.8.0 (e.g. ``s3://your_s3_bucket/cache_dir/``, ``gs://your_gs_bucket/cache_dir/``, + ``abfs://your_azure_container/cache_dir``, etc.) + + This is an experimental feature available since Cosmos 1.6 to gather user feedback and will be merged into the + ``cache_dir`` setting in upcoming releases. + + - Default: ``None`` + - Environment Variable: ``AIRFLOW__COSMOS__REMOTE_CACHE_DIR`` + +.. _remote_cache_dir_conn_id: + +`remote_cache_dir_conn_id`_: + The connection ID for the remote cache directory. If this is not set, the default Airflow connection ID identified + for the scheme will be used. + + - Default: ``None`` + - Environment Variable: ``AIRFLOW__COSMOS__REMOTE_CACHE_DIR_CONN_ID`` + [openlineage] ~~~~~~~~~~~~~ diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 0a1942270..4174c9a2d 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1,9 +1,12 @@ +import base64 import importlib +import json import logging import os import shutil import sys import tempfile +import zlib from datetime import datetime from pathlib import Path from subprocess import PIPE, Popen @@ -24,6 +27,7 @@ run_command, ) from cosmos.profiles import PostgresUserPasswordProfileMapping +from cosmos.settings import AIRFLOW_IO_AVAILABLE DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt" DBT_PROJECT_NAME = "jaffle_shop" @@ -1628,3 +1632,65 @@ def test_should_use_dbt_ls_cache(enable_cache, enable_cache_dbt_ls, cache_id, sh graph = DbtGraph(cache_identifier=cache_id, project=ProjectConfig(dbt_project_path="/tmp")) graph.should_use_dbt_ls_cache.cache_clear() assert graph.should_use_dbt_ls_cache() == should_use + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("airflow.io.path.ObjectStoragePath") +@patch("cosmos.config.ProjectConfig") +@patch("cosmos.dbt.graph._configure_remote_cache_dir") +def test_save_dbt_ls_cache_remote_cache_dir( + mock_configure_remote_cache_dir, mock_project_config, mock_object_storage_path +): + mock_remote_cache_dir_path = mock_object_storage_path.return_value + mock_remote_cache_dir_path.exists.return_value = True + + mock_configure_remote_cache_dir.return_value = mock_remote_cache_dir_path + + dbt_ls_output = "sample dbt ls output" + mock_project_config.dbt_vars = {"var1": "value1"} + mock_project_config.env_vars = {"var1": "value1"} + mock_project_config._calculate_dbt_ls_cache_current_version.return_value = "mock_version" + dbt_graph = DbtGraph(project=mock_project_config) + + dbt_graph.save_dbt_ls_cache(dbt_ls_output) + + mock_remote_cache_key_path = mock_remote_cache_dir_path / dbt_graph.dbt_ls_cache_key / "dbt_ls_cache.json" + mock_remote_cache_key_path.open.assert_called_once_with("w") + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("airflow.io.path.ObjectStoragePath") +@patch("cosmos.config.ProjectConfig") +@patch("cosmos.dbt.graph._configure_remote_cache_dir") +def test_get_dbt_ls_cache_remote_cache_dir( + mock_configure_remote_cache_dir, mock_project_config, mock_object_storage_path +): + mock_remote_cache_dir_path = mock_object_storage_path.return_value + mock_remote_cache_dir_path.exists.return_value = True + mock_configure_remote_cache_dir.return_value = mock_remote_cache_dir_path + + dbt_ls_output = "sample dbt ls output" + compressed_data = zlib.compress(dbt_ls_output.encode("utf-8")) + encoded_data = base64.b64encode(compressed_data).decode("utf-8") + + cache_dict = { + "version": "cache-version", + "dbt_ls_compressed": encoded_data, + "last_modified": "2024-08-13T12:34:56Z", + } + + mock_remote_cache_key_path = mock_remote_cache_dir_path / "some_cache_key" / "dbt_ls_cache.json" + mock_remote_cache_key_path.exists.return_value = True + mock_remote_cache_key_path.open.return_value.__enter__.return_value.read.return_value = json.dumps(cache_dict) + + dbt_graph = DbtGraph(project=mock_project_config) + + result = dbt_graph.get_dbt_ls_cache() + + expected_result = { + "version": "cache-version", + "dbt_ls": dbt_ls_output, + "last_modified": "2024-08-13T12:34:56Z", + } + + assert result == expected_result diff --git a/tests/test_cache.py b/tests/test_cache.py index b9ab087a7..c4c7e45c9 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -4,7 +4,7 @@ import time from datetime import datetime, timedelta, timezone from pathlib import Path -from unittest.mock import call, patch +from unittest.mock import MagicMock, call, patch import pytest from airflow import DAG @@ -13,6 +13,7 @@ from airflow.utils.task_group import TaskGroup from cosmos.cache import ( + _configure_remote_cache_dir, _copy_partial_parse_to_project, _create_cache_identifier, _create_folder_version_hash, @@ -27,8 +28,14 @@ is_cache_package_lockfile_enabled, is_profile_cache_enabled, ) -from cosmos.constants import DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME, DEFAULT_PROFILES_FILE_NAME -from cosmos.settings import dbt_profile_cache_dir_name +from cosmos.constants import ( + DBT_PARTIAL_PARSE_FILE_NAME, + DBT_TARGET_DIR_NAME, + DEFAULT_PROFILES_FILE_NAME, + _default_s3_conn, +) +from cosmos.exceptions import CosmosValueError +from cosmos.settings import AIRFLOW_IO_AVAILABLE, dbt_profile_cache_dir_name START_DATE = datetime(2024, 4, 16) example_dag = DAG("dag", start_date=START_DATE) @@ -406,3 +413,55 @@ def test_get_latest_cached_lockfile_with_no_cache(mock_get_sha): # Test case where there is a cached file result = _get_latest_cached_package_lockfile(project_dir) assert result.exists() + + +@patch("cosmos.cache.settings_remote_cache_dir", new=None) +def test_remote_cache_path_initialization_no_remote_cache_dir(): + configured_remote_cache_dir = _configure_remote_cache_dir() + assert configured_remote_cache_dir is None + + +@patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") +@patch("cosmos.cache.AIRFLOW_IO_AVAILABLE", new=False) +def test_remote_cache_path_initialization_object_storage_unavailable_on_earlier_airflow_versions(): + with pytest.raises(CosmosValueError, match="Object Storage feature is unavailable"): + _configure_remote_cache_dir() + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") +@patch("airflow.io.path.ObjectStoragePath") +def test_remote_cache_path_initialization_path_available_default_connection(mock_object_storage_path): + mock_cache_dir_path = MagicMock() + mock_cache_dir_path.exists.return_value = True + mock_object_storage_path.return_value = mock_cache_dir_path + + configured_remote_cache_dir = _configure_remote_cache_dir() + mock_object_storage_path.assert_called_with("s3://some-bucket/cache", conn_id=_default_s3_conn) + assert configured_remote_cache_dir == mock_cache_dir_path + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") +@patch("airflow.io.path.ObjectStoragePath") +def test_remote_cache_dir_initialization_path_not_exist_creates_path(mock_object_storage_path): + mock_cache_dir_path = MagicMock() + mock_cache_dir_path.exists.return_value = False + mock_object_storage_path.return_value = mock_cache_dir_path + + _ = _configure_remote_cache_dir() + mock_cache_dir_path.mkdir.assert_called_once_with(parents=True, exist_ok=True) + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") +@patch("cosmos.cache.remote_cache_dir_conn_id", new="my_conn_id") +@patch("airflow.io.path.ObjectStoragePath") +def test_remote_cache_path_initialization_with_conn_id(mock_object_storage_path): + mock_cache_path = MagicMock() + mock_cache_path.exists.return_value = True + mock_object_storage_path.return_value = mock_cache_path + + configured_remote_cache_dir = _configure_remote_cache_dir() + mock_object_storage_path.assert_called_with("s3://some-bucket/cache", conn_id="my_conn_id") + assert configured_remote_cache_dir == mock_cache_path