diff --git a/cosmos/constants.py b/cosmos/constants.py index b45170445..8378e8d10 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -10,6 +10,7 @@ DEFAULT_DBT_PROFILE_NAME = "cosmos_profile" DEFAULT_DBT_TARGET_NAME = "cosmos_target" DEFAULT_COSMOS_CACHE_DIR_NAME = "cosmos" +DEFAULT_TARGET_PATH = "target" DBT_LOG_PATH_ENVVAR = "DBT_LOG_PATH" DBT_LOG_DIR_NAME = "logs" DBT_TARGET_PATH_ENVVAR = "DBT_TARGET_PATH" diff --git a/cosmos/io.py b/cosmos/io.py index 7d2567371..0cce873e5 100644 --- a/cosmos/io.py +++ b/cosmos/io.py @@ -6,32 +6,41 @@ from urllib.parse import urlparse from cosmos import settings -from cosmos.constants import FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP +from cosmos.constants import DEFAULT_TARGET_PATH, FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP from cosmos.exceptions import CosmosValueError from cosmos.settings import remote_target_path, remote_target_path_conn_id -def upload_artifacts_to_aws_s3(project_dir: str, **kwargs: Any) -> None: +def upload_to_aws_s3( + project_dir: str, + bucket_name: str, + aws_conn_id: str | None = None, + source_subpath: str = DEFAULT_TARGET_PATH, + **kwargs: Any, +) -> None: """ - Helper function demonstrating how to upload artifacts to AWS S3 that can be used as a callback. + Helper function demonstrating how to upload files to AWS S3 that can be used as a callback. :param project_dir: Path of the cloned project directory which Cosmos tasks work from. + :param bucket_name: Name of the S3 bucket to upload to. + :param aws_conn_id: AWS connection ID to use when uploading files. + :param source_subpath: Path of the source directory sub-path to upload files from. """ from airflow.providers.amazon.aws.hooks.s3 import S3Hook - target_dir = f"{project_dir}/target" - aws_conn_id = kwargs.get("aws_conn_id", S3Hook.default_conn_name) - bucket_name = kwargs["bucket_name"] + target_dir = f"{project_dir}/{source_subpath}" + aws_conn_id = aws_conn_id if aws_conn_id else S3Hook.default_conn_name hook = S3Hook(aws_conn_id=aws_conn_id) + context = kwargs["context"] # Iterate over the files in the target dir and upload them to S3 for dirpath, _, filenames in os.walk(target_dir): for filename in filenames: s3_key = ( - f"{kwargs['dag'].dag_id}" - f"/{kwargs['run_id']}" - f"/{kwargs['task_instance'].task_id}" - f"/{kwargs['task_instance']._try_number}" + f"{context['dag'].dag_id}" + f"/{context['run_id']}" + f"/{context['task_instance'].task_id}" + f"/{context['task_instance']._try_number}" f"{dirpath.split(project_dir)[-1]}/{filename}" ) hook.load_file( @@ -42,27 +51,37 @@ def upload_artifacts_to_aws_s3(project_dir: str, **kwargs: Any) -> None: ) -def upload_artifacts_to_gcp_gs(project_dir: str, **kwargs: Any) -> None: +def upload_to_gcp_gs( + project_dir: str, + bucket_name: str, + gcp_conn_id: str | None = None, + source_subpath: str = DEFAULT_TARGET_PATH, + **kwargs: Any, +) -> None: """ - Helper function demonstrating how to upload artifacts to GCP GS that can be used as a callback. + Helper function demonstrating how to upload files to GCP GS that can be used as a callback. :param project_dir: Path of the cloned project directory which Cosmos tasks work from. + :param bucket_name: Name of the GCP GS bucket to upload to. + :param gcp_conn_id: GCP connection ID to use when uploading files. + :param source_subpath: Path of the source directory sub-path to upload files from. """ from airflow.providers.google.cloud.hooks.gcs import GCSHook - target_dir = f"{project_dir}/target" - gcp_conn_id = kwargs.get("gcp_conn_id", GCSHook.default_conn_name) - bucket_name = kwargs["bucket_name"] + target_dir = f"{project_dir}/{source_subpath}" + gcp_conn_id = gcp_conn_id if gcp_conn_id else GCSHook.default_conn_name + # bucket_name = kwargs["bucket_name"] hook = GCSHook(gcp_conn_id=gcp_conn_id) + context = kwargs["context"] # Iterate over the files in the target dir and upload them to GCP GS for dirpath, _, filenames in os.walk(target_dir): for filename in filenames: object_name = ( - f"{kwargs['dag'].dag_id}" - f"/{kwargs['run_id']}" - f"/{kwargs['task_instance'].task_id}" - f"/{kwargs['task_instance']._try_number}" + f"{context['dag'].dag_id}" + f"/{context['run_id']}" + f"/{context['task_instance'].task_id}" + f"/{context['task_instance']._try_number}" f"{dirpath.split(project_dir)[-1]}/{filename}" ) hook.upload( @@ -72,27 +91,37 @@ def upload_artifacts_to_gcp_gs(project_dir: str, **kwargs: Any) -> None: ) -def upload_artifacts_to_azure_wasb(project_dir: str, **kwargs: Any) -> None: +def upload_to_azure_wasb( + project_dir: str, + container_name: str, + azure_conn_id: str | None = None, + source_subpath: str = DEFAULT_TARGET_PATH, + **kwargs: Any, +) -> None: """ - Helper function demonstrating how to upload artifacts to Azure WASB that can be used as a callback. + Helper function demonstrating how to upload files to Azure WASB that can be used as a callback. :param project_dir: Path of the cloned project directory which Cosmos tasks work from. + :param container_name: Name of the Azure WASB container to upload files to. + :param azure_conn_id: Azure connection ID to use when uploading files. + :param source_subpath: Path of the source directory sub-path to upload files from. """ from airflow.providers.microsoft.azure.hooks.wasb import WasbHook - target_dir = f"{project_dir}/target" - azure_conn_id = kwargs.get("azure_conn_id", WasbHook.default_conn_name) - container_name = kwargs["container_name"] + target_dir = f"{project_dir}/{source_subpath}" + azure_conn_id = azure_conn_id if azure_conn_id else WasbHook.default_conn_name + # container_name = kwargs["container_name"] hook = WasbHook(wasb_conn_id=azure_conn_id) + context = kwargs["context"] # Iterate over the files in the target dir and upload them to WASB container for dirpath, _, filenames in os.walk(target_dir): for filename in filenames: blob_name = ( - f"{kwargs['dag'].dag_id}" - f"/{kwargs['run_id']}" - f"/{kwargs['task_instance'].task_id}" - f"/{kwargs['task_instance']._try_number}" + f"{context['dag'].dag_id}" + f"/{context['run_id']}" + f"/{context['task_instance'].task_id}" + f"/{context['task_instance']._try_number}" f"{dirpath.split(project_dir)[-1]}/{filename}" ) hook.load_file( @@ -142,6 +171,7 @@ def _construct_dest_file_path( dest_target_dir: Path, file_path: str, source_target_dir: Path, + source_subpath: str, **kwargs: Any, ) -> str: """ @@ -149,24 +179,26 @@ def _construct_dest_file_path( """ dest_target_dir_str = str(dest_target_dir).rstrip("/") + context = kwargs["context"] task_run_identifier = ( - f"{kwargs['dag'].dag_id}" - f"/{kwargs['run_id']}" - f"/{kwargs['task_instance'].task_id}" - f"/{kwargs['task_instance']._try_number}" + f"{context['dag'].dag_id}" + f"/{context['run_id']}" + f"/{context['task_instance'].task_id}" + f"/{context['task_instance']._try_number}" ) rel_path = os.path.relpath(file_path, source_target_dir).lstrip("/") - return f"{dest_target_dir_str}/{task_run_identifier}/target/{rel_path}" + return f"{dest_target_dir_str}/{task_run_identifier}/{source_subpath}/{rel_path}" -def upload_artifacts_to_cloud_storage(project_dir: str, **kwargs: Any) -> None: +def upload_to_cloud_storage(project_dir: str, source_subpath: str = DEFAULT_TARGET_PATH, **kwargs: Any) -> None: """ - Helper function demonstrating how to upload artifacts to remote blob stores that can be used as a callback. This is + Helper function demonstrating how to upload files to remote object stores that can be used as a callback. This is an example of a helper function that can be used if on Airflow >= 2.8 and cosmos configurations like ``remote_target_path`` and ``remote_target_path_conn_id`` when set can be leveraged. :param project_dir: Path of the cloned project directory which Cosmos tasks work from. + :param source_subpath: Path of the source directory sub-path to upload files from. """ dest_target_dir, dest_conn_id = _configure_remote_target_path() @@ -175,9 +207,11 @@ def upload_artifacts_to_cloud_storage(project_dir: str, **kwargs: Any) -> None: from airflow.io.path import ObjectStoragePath - source_target_dir = Path(project_dir) / "target" + source_target_dir = Path(project_dir) / f"{source_subpath}" files = [str(file) for file in source_target_dir.rglob("*") if file.is_file()] for file_path in files: - dest_file_path = _construct_dest_file_path(dest_target_dir, file_path, source_target_dir, **kwargs) + dest_file_path = _construct_dest_file_path( + dest_target_dir, file_path, source_target_dir, source_subpath, **kwargs + ) dest_object_storage_path = ObjectStoragePath(dest_file_path, conn_id=dest_conn_id) ObjectStoragePath(file_path).copy(dest_object_storage_path) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 1cc7d0cb7..2a56c33e3 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -503,7 +503,7 @@ def run_command( self.store_compiled_sql(tmp_project_dir, context) self.upload_compiled_sql(tmp_project_dir, context) if self.callback: - self.callback_args.update(context) + self.callback_args.update({"context": context}) self.callback(tmp_project_dir, **self.callback_args) self.handle_exception(result) diff --git a/dev/dags/cosmos_callback_dag.py b/dev/dags/cosmos_callback_dag.py index e8c12078d..b1f1b3701 100644 --- a/dev/dags/cosmos_callback_dag.py +++ b/dev/dags/cosmos_callback_dag.py @@ -7,7 +7,7 @@ from pathlib import Path from cosmos import DbtDag, ProfileConfig, ProjectConfig -from cosmos.io import upload_artifacts_to_cloud_storage +from cosmos.io import upload_to_cloud_storage from cosmos.profiles import PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" @@ -34,8 +34,21 @@ "install_deps": True, # install any necessary dependencies before running any dbt command "full_refresh": True, # used only in dbt commands that support this flag # -------------------------------------------------------------- - # Callback function to upload artifacts using Airflow Object storage and Cosmos remote_target_path setting on Airflow 2.8 and above - "callback": upload_artifacts_to_cloud_storage, + # Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting on Airflow 2.8 and above + "callback": upload_to_cloud_storage, + # -------------------------------------------------------------- + # Callback function to upload files to AWS S3, works for Airflow < 2.8 too + # "callback": upload_to_aws_s3, + # "callback_args": {"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"}, + # -------------------------------------------------------------- + # Callback function to upload files to GCP GS, works for Airflow < 2.8 too + # "callback": upload_to_gcp_gs, + # "callback_args": {"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"}, + # -------------------------------------------------------------- + # Callback function to upload files to Azure WASB, works for Airflow < 2.8 too + # "callback": upload_to_azure_wasb, + # "callback_args": {"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"}, + # -------------------------------------------------------------- }, # normal dag parameters schedule_interval="@daily", diff --git a/dev/dags/example_operators.py b/dev/dags/example_operators.py index e84ad78c4..1e583b12e 100644 --- a/dev/dags/example_operators.py +++ b/dev/dags/example_operators.py @@ -7,7 +7,7 @@ from airflow.operators.python import PythonOperator from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig -from cosmos.io import upload_artifacts_to_aws_s3 +from cosmos.io import upload_to_aws_s3 DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) @@ -41,17 +41,18 @@ def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_defau dbt_cmd_flags=["--select", "raw_customers"], install_deps=True, append_env=True, + # -------------------------------------------------------------- # Callback function to upload artifacts to AWS S3 - callback=upload_artifacts_to_aws_s3, + callback=upload_to_aws_s3, callback_args={"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"}, # -------------------------------------------------------------- # Callback function to upload artifacts to GCP GS - # "callback": upload_artifacts_to_gcp_gs, - # "callback_args": {"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"}, + # callback=upload_to_gcp_gs, + # callback_args={"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"}, # -------------------------------------------------------------- # Callback function to upload artifacts to Azure WASB - # "callback": upload_artifacts_to_azure_wasb, - # "callback_args": {"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"}, + # callback=upload_to_azure_wasb, + # callback_args={"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"}, # -------------------------------------------------------------- ) # [END single_operator_callback] diff --git a/tests/test_io.py b/tests/test_io.py index a814efe16..7410f0588 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -3,15 +3,15 @@ import pytest -from cosmos.constants import _default_s3_conn +from cosmos.constants import DEFAULT_TARGET_PATH, _default_s3_conn from cosmos.exceptions import CosmosValueError from cosmos.io import ( _configure_remote_target_path, _construct_dest_file_path, - upload_artifacts_to_aws_s3, - upload_artifacts_to_azure_wasb, - upload_artifacts_to_cloud_storage, - upload_artifacts_to_gcp_gs, + upload_to_aws_s3, + upload_to_azure_wasb, + upload_to_cloud_storage, + upload_to_gcp_gs, ) from cosmos.settings import AIRFLOW_IO_AVAILABLE @@ -20,9 +20,11 @@ def dummy_kwargs(): """Fixture for reusable test kwargs.""" return { - "dag": MagicMock(dag_id="test_dag"), - "run_id": "test_run_id", - "task_instance": MagicMock(task_id="test_task", _try_number=1), + "context": { + "dag": MagicMock(dag_id="test_dag"), + "run_id": "test_run_id", + "task_instance": MagicMock(task_id="test_task", _try_number=1), + }, "bucket_name": "test_bucket", "container_name": "test_container", } @@ -33,7 +35,7 @@ def test_upload_artifacts_to_aws_s3(dummy_kwargs): with patch("airflow.providers.amazon.aws.hooks.s3.S3Hook") as mock_hook, patch("os.walk") as mock_walk: mock_walk.return_value = [("/target", [], ["file1.txt", "file2.txt"])] - upload_artifacts_to_aws_s3("/project_dir", **dummy_kwargs) + upload_to_aws_s3("/project_dir", **dummy_kwargs) mock_walk.assert_called_once_with("/project_dir/target") hook_instance = mock_hook.return_value @@ -45,7 +47,7 @@ def test_upload_artifacts_to_gcp_gs(dummy_kwargs): with patch("airflow.providers.google.cloud.hooks.gcs.GCSHook") as mock_hook, patch("os.walk") as mock_walk: mock_walk.return_value = [("/target", [], ["file1.txt", "file2.txt"])] - upload_artifacts_to_gcp_gs("/project_dir", **dummy_kwargs) + upload_to_gcp_gs("/project_dir", **dummy_kwargs) mock_walk.assert_called_once_with("/project_dir/target") hook_instance = mock_hook.return_value @@ -57,7 +59,7 @@ def test_upload_artifacts_to_azure_wasb(dummy_kwargs): with patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook") as mock_hook, patch("os.walk") as mock_walk: mock_walk.return_value = [("/target", [], ["file1.txt", "file2.txt"])] - upload_artifacts_to_azure_wasb("/project_dir", **dummy_kwargs) + upload_to_azure_wasb("/project_dir", **dummy_kwargs) mock_walk.assert_called_once_with("/project_dir/target") hook_instance = mock_hook.return_value @@ -79,14 +81,17 @@ def test_construct_dest_file_path(dummy_kwargs): file_path = "/project_dir/target/subdir/file.txt" expected_path = "/dest/test_dag/test_run_id/test_task/1/target/subdir/file.txt" - assert _construct_dest_file_path(dest_target_dir, file_path, source_target_dir, **dummy_kwargs) == expected_path + assert ( + _construct_dest_file_path(dest_target_dir, file_path, source_target_dir, DEFAULT_TARGET_PATH, **dummy_kwargs) + == expected_path + ) def test_upload_artifacts_to_cloud_storage_no_remote_path(): """Test upload_artifacts_to_cloud_storage with no remote path.""" with patch("cosmos.io._configure_remote_target_path", return_value=(None, None)): with pytest.raises(CosmosValueError): - upload_artifacts_to_cloud_storage("/project_dir", **{}) + upload_to_cloud_storage("/project_dir", **{}) @pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") @@ -108,7 +113,7 @@ def test_upload_artifacts_to_cloud_storage_success(dummy_kwargs): mock_rglob.return_value = [mock_file1, mock_file2] - upload_artifacts_to_cloud_storage("/project_dir", **dummy_kwargs) + upload_to_cloud_storage("/project_dir", **dummy_kwargs) mock_configure.assert_called_once() assert mock_copy.call_count == 2