From 41053ed2cf6e1ec601be44995f5f43fc079810b3 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 16 Aug 2024 16:25:16 +0530 Subject: [PATCH] Add support to store and fetch dbt ls cache in remote stores (#1147) This PR introduces the functionality to store and retrieve the `dbt ls` output cache in remote storage systems. This enhancement improves the efficiency and scalability of cache management for Cosmos dbt projects that use the `dbt ls` cache option (enabled by default) introduced in PR #1014 ## Key Changes 1. **`dbt ls` Cache Storage in Remote Stores**: Added support to store the dbt ls cache as a JSON file in remote storage paths configured in the Airflow settings under the `cosmos` section. The cache is saved in the specified remote storage path & it includes the `cosmos_cache__` prefix. 2. **Cache Retrieval from Remote Stores**: Implemented logic to check the existence of the cache in the remote storage path before falling back to the Variable cache. If the `remote_cache_dir` is specified and it exists in the remote store, it is read and used; We try creating the specified path if it does not exist. 3. **Backward Compatibility**: Maintained backward compatibility by allowing users to continue using local cache storage through Airflow Variables if a `remote_cache_dir` is not specified. ## Impact 1. **Scalability**: Enables the use of remote, scalable storage systems for dbt cache management. 2. **Performance**: Reduces the load on Airflow's metadata database by offloading cache storage to external systems. 3. **Flexibility**: Provides users with the option to choose between local (Airflow metadata using Variables) and remote cache storage based on their infrastructure needs. ## Configuration To leverage this feature, users need to set the `remote_cache_dir` in their Airflow settings in the `cosmos` section. This path should point to a compatible remote storage location. You can also specify the `remote_cache_dir_conn_id` which is your Airflow connection that can connect to your remote store. If it's not specified, Cosmos will aim to identify the scheme for the specified path and use the default Airflow connection ID as per the scheme. ## Testing 1. Tested with various remote storage backends (AWS S3 and GCP GS) to ensure compatibility and reliability 2. Verified that cache retrieval falls back to Variable based caching approach if the `remote_cache_dir` is not configured. ## Documentation Updated the documentation to include instructions on configuring `remote_cache_dir`. ## Limitations 1. Users must be on Airflow version 2.8 or higher because the underlying Airflow Object Store feature we utilise to access remote stores was introduced in this version. If users attempt to specify a `remote_cache_dir` on an older Airflow version, they will encounter an error indicating the version requirement. 2. Users would observe a slight delay for the tasks being in queued state (approx 1-2 seconds queued duration vs the 0-1 seconds previously in the Variable approach) due to remote storage calls to retrieve the cache from. Closes: #1072 --- cosmos/__init__.py | 2 +- cosmos/cache.py | 103 +++++++++++++++++++++++++ cosmos/dbt/graph.py | 25 +++++- cosmos/settings.py | 12 ++- dev/dags/example_cosmos_cleanup_dag.py | 11 ++- docs/configuration/caching.rst | 17 +++- docs/configuration/cosmos-conf.rst | 25 ++++++ tests/dbt/test_graph.py | 66 ++++++++++++++++ tests/test_cache.py | 65 +++++++++++++++- 9 files changed, 315 insertions(+), 11 deletions(-) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index c98a14402..3ef523f7a 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,7 +5,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.5.1" +__version__ = "1.6.0a6" from cosmos.airflow.dag import DbtDag diff --git a/cosmos/cache.py b/cosmos/cache.py index 102186423..16ca7709d 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -17,6 +17,7 @@ from airflow.models.dag import DAG from airflow.utils.session import provide_session from airflow.utils.task_group import TaskGroup +from airflow.version import version as airflow_version from sqlalchemy import select from sqlalchemy.orm import Session @@ -25,22 +26,66 @@ DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME, DEFAULT_PROFILES_FILE_NAME, + FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP, PACKAGE_LOCKFILE_YML, ) from cosmos.dbt.project import get_partial_parse_path +from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger from cosmos.settings import ( + AIRFLOW_IO_AVAILABLE, cache_dir, dbt_profile_cache_dir_name, enable_cache, enable_cache_package_lockfile, enable_cache_profile, + remote_cache_dir_conn_id, ) +from cosmos.settings import remote_cache_dir as settings_remote_cache_dir logger = get_logger(__name__) VAR_KEY_CACHE_PREFIX = "cosmos_cache__" +def _configure_remote_cache_dir() -> Path | None: + """Configure the remote cache dir if it is provided.""" + if not settings_remote_cache_dir: + return None + + _configured_cache_dir = None + + cache_dir_str = str(settings_remote_cache_dir) + + remote_cache_conn_id = remote_cache_dir_conn_id + if not remote_cache_conn_id: + cache_dir_schema = cache_dir_str.split("://")[0] + remote_cache_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(cache_dir_schema, None) # type: ignore[assignment] + if remote_cache_conn_id is None: + return _configured_cache_dir + + if not AIRFLOW_IO_AVAILABLE: + raise CosmosValueError( + f"You're trying to specify remote cache_dir {cache_dir_str}, but the required " + f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " + "Airflow 2.8 or later." + ) + + from airflow.io.path import ObjectStoragePath + + _configured_cache_dir = ObjectStoragePath(cache_dir_str, conn_id=remote_cache_conn_id) + + if not _configured_cache_dir.exists(): # type: ignore[no-untyped-call] + # TODO: Check if we should raise an error instead in case the provided path does not exist. + _configured_cache_dir.mkdir(parents=True, exist_ok=True) + + # raise CosmosValueError( + # f"remote_cache_path `{cache_dir_str}` does not exist or is not accessible using " + # f"remote_cache_conn_id `{remote_cache_conn_id}`" + # ) + + return _configured_cache_dir + + def _get_airflow_metadata(dag: DAG, task_group: TaskGroup | None) -> dict[str, str | None]: dag_id = None task_group_id = None @@ -366,6 +411,64 @@ def delete_unused_dbt_ls_cache( return deleted_cosmos_variables +# TODO: Add integration tests once remote cache is supported in the CI pipeline +@provide_session +def delete_unused_dbt_ls_remote_cache_files( # pragma: no cover + max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None +) -> int: + """ + Delete Cosmos cache stored in remote storage based on the last execution of their associated DAGs. + """ + if session is None: + return 0 + + logger.info(f"Delete the Cosmos cache stored remotely that hasn't been used for {max_age_last_usage}") + cosmos_dags_ids_remote_cache_files = defaultdict(list) + + configured_remote_cache_dir = _configure_remote_cache_dir() + if not configured_remote_cache_dir: + logger.info( + "No remote cache directory configured. Skipping the deletion of the dbt ls cache files in remote storage." + ) + return 0 + + dirs = [obj for obj in configured_remote_cache_dir.iterdir() if obj.is_dir()] + files = [f for label in dirs for f in label.iterdir() if f.is_file()] + + total_cosmos_remote_cache_files = 0 + for file in files: + prefix_path = (configured_remote_cache_dir / VAR_KEY_CACHE_PREFIX).as_uri() + if file.as_uri().startswith(prefix_path): + with file.open("r") as fp: + cache_dict = json.load(fp) + cosmos_dags_ids_remote_cache_files[cache_dict["dag_id"]].append(file) + total_cosmos_remote_cache_files += 1 + + deleted_cosmos_remote_cache_files = 0 + + for dag_id, files in cosmos_dags_ids_remote_cache_files.items(): + last_dag_run = ( + session.query(DagRun) + .filter( + DagRun.dag_id == dag_id, + ) + .order_by(DagRun.execution_date.desc()) + .first() + ) + if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage): + for file in files: + logger.info(f"Removing the dbt ls cache remote file {file}") + file.unlink() + deleted_cosmos_remote_cache_files += 1 + logger.info( + "Deleted %s/%s dbt ls cache files in remote storage.", + deleted_cosmos_remote_cache_files, + total_cosmos_remote_cache_files, + ) + + return deleted_cosmos_remote_cache_files + + def is_profile_cache_enabled() -> bool: """Return True if global and profile cache is enable""" return enable_cache and enable_cache_profile diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index f60ac2ece..ad759d675 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -19,6 +19,7 @@ from cosmos import cache, settings from cosmos.cache import ( + _configure_remote_cache_dir, _copy_cached_package_lockfile_to_project, _get_latest_cached_package_lockfile, is_cache_package_lockfile_enabled, @@ -312,7 +313,22 @@ def save_dbt_ls_cache(self, dbt_ls_output: str) -> None: "last_modified": datetime.datetime.now(datetime.timezone.utc).isoformat(), **self.airflow_metadata, } - Variable.set(self.dbt_ls_cache_key, cache_dict, serialize_json=True) + remote_cache_dir = _configure_remote_cache_dir() + if remote_cache_dir: + remote_cache_key_path = remote_cache_dir / self.dbt_ls_cache_key / "dbt_ls_cache.json" + with remote_cache_key_path.open("w") as fp: + json.dump(cache_dict, fp) + else: + Variable.set(self.dbt_ls_cache_key, cache_dict, serialize_json=True) + + def _get_dbt_ls_remote_cache(self, remote_cache_dir: Path) -> dict[str, str]: + """Loads the remote cache for dbt ls.""" + cache_dict: dict[str, str] = {} + remote_cache_key_path = remote_cache_dir / self.dbt_ls_cache_key / "dbt_ls_cache.json" + if remote_cache_key_path.exists(): + with remote_cache_key_path.open("r") as fp: + cache_dict = json.load(fp) + return cache_dict def get_dbt_ls_cache(self) -> dict[str, str]: """ @@ -327,7 +343,12 @@ def get_dbt_ls_cache(self) -> dict[str, str]: """ cache_dict: dict[str, str] = {} try: - cache_dict = Variable.get(self.dbt_ls_cache_key, deserialize_json=True) + remote_cache_dir = _configure_remote_cache_dir() + cache_dict = ( + self._get_dbt_ls_remote_cache(remote_cache_dir) + if remote_cache_dir + else Variable.get(self.dbt_ls_cache_key, deserialize_json=True) + ) except (json.decoder.JSONDecodeError, KeyError): return cache_dict else: diff --git a/cosmos/settings.py b/cosmos/settings.py index fc7eca72a..37fe437c8 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import os import tempfile from pathlib import Path @@ -7,7 +9,10 @@ from airflow.version import version as airflow_version from packaging.version import Version -from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME, DEFAULT_OPENLINEAGE_NAMESPACE +from cosmos.constants import ( + DEFAULT_COSMOS_CACHE_DIR_NAME, + DEFAULT_OPENLINEAGE_NAMESPACE, +) # In MacOS users may want to set the envvar `TMPDIR` if they do not want the value of the temp directory to change DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME) @@ -23,6 +28,11 @@ enable_cache_profile = conf.getboolean("cosmos", "enable_cache_profile", fallback=True) dbt_profile_cache_dir_name = conf.get("cosmos", "profile_cache_dir_name", fallback="profile") +# Experimentally adding `remote_cache_dir` as a separate entity in the Cosmos 1.6 release to gather feedback. +# This will be merged with the `cache_dir` config parameter in upcoming releases. +remote_cache_dir = conf.get("cosmos", "remote_cache_dir", fallback=None) +remote_cache_dir_conn_id = conf.get("cosmos", "remote_cache_dir_conn_id", fallback=None) + try: LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") except airflow.exceptions.AirflowConfigException: diff --git a/dev/dags/example_cosmos_cleanup_dag.py b/dev/dags/example_cosmos_cleanup_dag.py index c93bdf002..1d37589a0 100644 --- a/dev/dags/example_cosmos_cleanup_dag.py +++ b/dev/dags/example_cosmos_cleanup_dag.py @@ -8,7 +8,7 @@ from airflow.decorators import dag, task -from cosmos.cache import delete_unused_dbt_ls_cache +from cosmos.cache import delete_unused_dbt_ls_cache, delete_unused_dbt_ls_remote_cache_files @dag( @@ -28,6 +28,15 @@ def clear_db_ls_cache(session=None): clear_db_ls_cache() + @task() + def clear_db_ls_remote_cache(session=None): + """ + Delete the dbt ls remote cache files that have not been used for the last five days. + """ + delete_unused_dbt_ls_remote_cache_files(max_age_last_usage=timedelta(days=5)) + + clear_db_ls_remote_cache() + # [END cache_example] diff --git a/docs/configuration/caching.rst b/docs/configuration/caching.rst index 7dba9933f..cc518f748 100644 --- a/docs/configuration/caching.rst +++ b/docs/configuration/caching.rst @@ -19,7 +19,7 @@ It is possible to turn off any cache in Cosmos by exporting the environment vari Disabling individual types of cache in Cosmos is also possible, as explained below. Caching the dbt ls output -~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~~~ (Introduced in Cosmos 1.5) @@ -29,13 +29,24 @@ also the tasks queueing time. Cosmos 1.5 introduced a feature to mitigate the performance issue associated with ``LoadMode.DBT_LS`` by caching the output of this command as an `Airflow Variable `_. -Based on an initial `analysis `_, enabling this setting reduced some DAGs ask queueing from 30s to 0s. Additionally, some users `reported improvements of 84% `_ in the DAG run time. +Based on an initial `analysis `_, enabling this setting reduced some DAGs task queueing from 30s to 0s. Additionally, some users `reported improvements of 84% `_ in the DAG run time. This feature is on by default. To turn it off, export the following environment variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0``. +(Introduced in Cosmos 1.6 - Experimental feature) + +Starting with Cosmos 1.6.0, users can also set a remote directory path to store this cache instead of using Airflow +Variables. To do so, you need to configure a remote cache directory. See :ref:`remote_cache_dir` and +:ref:`remote_cache_dir_conn_id` for more information. This is an experimental feature introduced in 1.6.0 to gather +user feedback. The ``remote_cache_dir`` will eventually be merged into the :ref:`cache_dir` setting in upcoming +releases. + **How the cache is refreshed** -Users can purge or delete the cache via Airflow UI by identifying and deleting the cache key. +If using the default Variables cache approach, users can purge or delete the cache via Airflow UI by identifying and +deleting the cache key. In case you're using the alternative approach by setting the ``remote_cache_dir`` introduced +in Cosmos 1.6.0, you can delete the cache by removing the specific files by identifying them using your configured path +in the remote store. Cosmos will refresh the cache in a few circumstances: diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index 5b8ee210b..6b05ec814 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -94,6 +94,31 @@ This page lists all available Airflow configurations that affect ``astronomer-co - Default: ``profile`` - Environment Variable: ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME`` +.. _remote_cache_dir: + +`remote_cache_dir`_: + The remote directory to store the dbt cache. Starting with Cosmos 1.6.0, you can store the `dbt ls` output as cache + in a remote location (an alternative to the Variable cache approach released previously since Cosmos 1.5.0) + using this configuration. The value for the remote cache directory can be any of the schemes that are supported by + the `Airflow Object Store `_ + feature introduced in Airflow 2.8.0 (e.g. ``s3://your_s3_bucket/cache_dir/``, ``gs://your_gs_bucket/cache_dir/``, + ``abfs://your_azure_container/cache_dir``, etc.) + + This is an experimental feature available since Cosmos 1.6 to gather user feedback and will be merged into the + ``cache_dir`` setting in upcoming releases. + + - Default: ``None`` + - Environment Variable: ``AIRFLOW__COSMOS__REMOTE_CACHE_DIR`` + +.. _remote_cache_dir_conn_id: + +`remote_cache_dir_conn_id`_: + The connection ID for the remote cache directory. If this is not set, the default Airflow connection ID identified + for the scheme will be used. + + - Default: ``None`` + - Environment Variable: ``AIRFLOW__COSMOS__REMOTE_CACHE_DIR_CONN_ID`` + [openlineage] ~~~~~~~~~~~~~ diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index d1891169d..879663009 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1,9 +1,12 @@ +import base64 import importlib +import json import logging import os import shutil import sys import tempfile +import zlib from datetime import datetime from pathlib import Path from subprocess import PIPE, Popen @@ -24,6 +27,7 @@ run_command, ) from cosmos.profiles import PostgresUserPasswordProfileMapping +from cosmos.settings import AIRFLOW_IO_AVAILABLE DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt" DBT_PROJECT_NAME = "jaffle_shop" @@ -1628,3 +1632,65 @@ def test_should_use_dbt_ls_cache(enable_cache, enable_cache_dbt_ls, cache_id, sh graph = DbtGraph(cache_identifier=cache_id, project=ProjectConfig(dbt_project_path="/tmp")) graph.should_use_dbt_ls_cache.cache_clear() assert graph.should_use_dbt_ls_cache() == should_use + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("airflow.io.path.ObjectStoragePath") +@patch("cosmos.config.ProjectConfig") +@patch("cosmos.dbt.graph._configure_remote_cache_dir") +def test_save_dbt_ls_cache_remote_cache_dir( + mock_configure_remote_cache_dir, mock_project_config, mock_object_storage_path +): + mock_remote_cache_dir_path = mock_object_storage_path.return_value + mock_remote_cache_dir_path.exists.return_value = True + + mock_configure_remote_cache_dir.return_value = mock_remote_cache_dir_path + + dbt_ls_output = "sample dbt ls output" + mock_project_config.dbt_vars = {"var1": "value1"} + mock_project_config.env_vars = {"var1": "value1"} + mock_project_config._calculate_dbt_ls_cache_current_version.return_value = "mock_version" + dbt_graph = DbtGraph(project=mock_project_config) + + dbt_graph.save_dbt_ls_cache(dbt_ls_output) + + mock_remote_cache_key_path = mock_remote_cache_dir_path / dbt_graph.dbt_ls_cache_key / "dbt_ls_cache.json" + mock_remote_cache_key_path.open.assert_called_once_with("w") + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("airflow.io.path.ObjectStoragePath") +@patch("cosmos.config.ProjectConfig") +@patch("cosmos.dbt.graph._configure_remote_cache_dir") +def test_get_dbt_ls_cache_remote_cache_dir( + mock_configure_remote_cache_dir, mock_project_config, mock_object_storage_path +): + mock_remote_cache_dir_path = mock_object_storage_path.return_value + mock_remote_cache_dir_path.exists.return_value = True + mock_configure_remote_cache_dir.return_value = mock_remote_cache_dir_path + + dbt_ls_output = "sample dbt ls output" + compressed_data = zlib.compress(dbt_ls_output.encode("utf-8")) + encoded_data = base64.b64encode(compressed_data).decode("utf-8") + + cache_dict = { + "version": "cache-version", + "dbt_ls_compressed": encoded_data, + "last_modified": "2024-08-13T12:34:56Z", + } + + mock_remote_cache_key_path = mock_remote_cache_dir_path / "some_cache_key" / "dbt_ls_cache.json" + mock_remote_cache_key_path.exists.return_value = True + mock_remote_cache_key_path.open.return_value.__enter__.return_value.read.return_value = json.dumps(cache_dict) + + dbt_graph = DbtGraph(project=mock_project_config) + + result = dbt_graph.get_dbt_ls_cache() + + expected_result = { + "version": "cache-version", + "dbt_ls": dbt_ls_output, + "last_modified": "2024-08-13T12:34:56Z", + } + + assert result == expected_result diff --git a/tests/test_cache.py b/tests/test_cache.py index b9ab087a7..c4c7e45c9 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -4,7 +4,7 @@ import time from datetime import datetime, timedelta, timezone from pathlib import Path -from unittest.mock import call, patch +from unittest.mock import MagicMock, call, patch import pytest from airflow import DAG @@ -13,6 +13,7 @@ from airflow.utils.task_group import TaskGroup from cosmos.cache import ( + _configure_remote_cache_dir, _copy_partial_parse_to_project, _create_cache_identifier, _create_folder_version_hash, @@ -27,8 +28,14 @@ is_cache_package_lockfile_enabled, is_profile_cache_enabled, ) -from cosmos.constants import DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME, DEFAULT_PROFILES_FILE_NAME -from cosmos.settings import dbt_profile_cache_dir_name +from cosmos.constants import ( + DBT_PARTIAL_PARSE_FILE_NAME, + DBT_TARGET_DIR_NAME, + DEFAULT_PROFILES_FILE_NAME, + _default_s3_conn, +) +from cosmos.exceptions import CosmosValueError +from cosmos.settings import AIRFLOW_IO_AVAILABLE, dbt_profile_cache_dir_name START_DATE = datetime(2024, 4, 16) example_dag = DAG("dag", start_date=START_DATE) @@ -406,3 +413,55 @@ def test_get_latest_cached_lockfile_with_no_cache(mock_get_sha): # Test case where there is a cached file result = _get_latest_cached_package_lockfile(project_dir) assert result.exists() + + +@patch("cosmos.cache.settings_remote_cache_dir", new=None) +def test_remote_cache_path_initialization_no_remote_cache_dir(): + configured_remote_cache_dir = _configure_remote_cache_dir() + assert configured_remote_cache_dir is None + + +@patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") +@patch("cosmos.cache.AIRFLOW_IO_AVAILABLE", new=False) +def test_remote_cache_path_initialization_object_storage_unavailable_on_earlier_airflow_versions(): + with pytest.raises(CosmosValueError, match="Object Storage feature is unavailable"): + _configure_remote_cache_dir() + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") +@patch("airflow.io.path.ObjectStoragePath") +def test_remote_cache_path_initialization_path_available_default_connection(mock_object_storage_path): + mock_cache_dir_path = MagicMock() + mock_cache_dir_path.exists.return_value = True + mock_object_storage_path.return_value = mock_cache_dir_path + + configured_remote_cache_dir = _configure_remote_cache_dir() + mock_object_storage_path.assert_called_with("s3://some-bucket/cache", conn_id=_default_s3_conn) + assert configured_remote_cache_dir == mock_cache_dir_path + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") +@patch("airflow.io.path.ObjectStoragePath") +def test_remote_cache_dir_initialization_path_not_exist_creates_path(mock_object_storage_path): + mock_cache_dir_path = MagicMock() + mock_cache_dir_path.exists.return_value = False + mock_object_storage_path.return_value = mock_cache_dir_path + + _ = _configure_remote_cache_dir() + mock_cache_dir_path.mkdir.assert_called_once_with(parents=True, exist_ok=True) + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("cosmos.cache.settings_remote_cache_dir", new="s3://some-bucket/cache") +@patch("cosmos.cache.remote_cache_dir_conn_id", new="my_conn_id") +@patch("airflow.io.path.ObjectStoragePath") +def test_remote_cache_path_initialization_with_conn_id(mock_object_storage_path): + mock_cache_path = MagicMock() + mock_cache_path.exists.return_value = True + mock_object_storage_path.return_value = mock_cache_path + + configured_remote_cache_dir = _configure_remote_cache_dir() + mock_object_storage_path.assert_called_with("s3://some-bucket/cache", conn_id="my_conn_id") + assert configured_remote_cache_dir == mock_cache_path