Skip to content

Commit

Permalink
Address @tatiana's review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajkoti committed Dec 17, 2024
1 parent 5522a8a commit 1203b73
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 61 deletions.
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
DEFAULT_DBT_PROFILE_NAME = "cosmos_profile"
DEFAULT_DBT_TARGET_NAME = "cosmos_target"
DEFAULT_COSMOS_CACHE_DIR_NAME = "cosmos"
DEFAULT_TARGET_PATH = "target"
DBT_LOG_PATH_ENVVAR = "DBT_LOG_PATH"
DBT_LOG_DIR_NAME = "logs"
DBT_TARGET_PATH_ENVVAR = "DBT_TARGET_PATH"
Expand Down
108 changes: 71 additions & 37 deletions cosmos/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,41 @@
from urllib.parse import urlparse

from cosmos import settings
from cosmos.constants import FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP
from cosmos.constants import DEFAULT_TARGET_PATH, FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP
from cosmos.exceptions import CosmosValueError
from cosmos.settings import remote_target_path, remote_target_path_conn_id


def upload_artifacts_to_aws_s3(project_dir: str, **kwargs: Any) -> None:
def upload_to_aws_s3(
project_dir: str,
bucket_name: str,
aws_conn_id: str | None = None,
source_subpath: str = DEFAULT_TARGET_PATH,
**kwargs: Any,
) -> None:
"""
Helper function demonstrating how to upload artifacts to AWS S3 that can be used as a callback.
Helper function demonstrating how to upload files to AWS S3 that can be used as a callback.
:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
:param bucket_name: Name of the S3 bucket to upload to.
:param aws_conn_id: AWS connection ID to use when uploading files.
:param source_subpath: Path of the source directory sub-path to upload files from.
"""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

target_dir = f"{project_dir}/target"
aws_conn_id = kwargs.get("aws_conn_id", S3Hook.default_conn_name)
bucket_name = kwargs["bucket_name"]
target_dir = f"{project_dir}/{source_subpath}"
aws_conn_id = aws_conn_id if aws_conn_id else S3Hook.default_conn_name
hook = S3Hook(aws_conn_id=aws_conn_id)
context = kwargs["context"]

# Iterate over the files in the target dir and upload them to S3
for dirpath, _, filenames in os.walk(target_dir):
for filename in filenames:
s3_key = (
f"{kwargs['dag'].dag_id}"
f"/{kwargs['run_id']}"
f"/{kwargs['task_instance'].task_id}"
f"/{kwargs['task_instance']._try_number}"
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
f"{dirpath.split(project_dir)[-1]}/{filename}"
)
hook.load_file(
Expand All @@ -42,27 +51,37 @@ def upload_artifacts_to_aws_s3(project_dir: str, **kwargs: Any) -> None:
)


def upload_artifacts_to_gcp_gs(project_dir: str, **kwargs: Any) -> None:
def upload_to_gcp_gs(
project_dir: str,
bucket_name: str,
gcp_conn_id: str | None = None,
source_subpath: str = DEFAULT_TARGET_PATH,
**kwargs: Any,
) -> None:
"""
Helper function demonstrating how to upload artifacts to GCP GS that can be used as a callback.
Helper function demonstrating how to upload files to GCP GS that can be used as a callback.
:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
:param bucket_name: Name of the GCP GS bucket to upload to.
:param gcp_conn_id: GCP connection ID to use when uploading files.
:param source_subpath: Path of the source directory sub-path to upload files from.
"""
from airflow.providers.google.cloud.hooks.gcs import GCSHook

target_dir = f"{project_dir}/target"
gcp_conn_id = kwargs.get("gcp_conn_id", GCSHook.default_conn_name)
bucket_name = kwargs["bucket_name"]
target_dir = f"{project_dir}/{source_subpath}"
gcp_conn_id = gcp_conn_id if gcp_conn_id else GCSHook.default_conn_name
# bucket_name = kwargs["bucket_name"]
hook = GCSHook(gcp_conn_id=gcp_conn_id)
context = kwargs["context"]

# Iterate over the files in the target dir and upload them to GCP GS
for dirpath, _, filenames in os.walk(target_dir):
for filename in filenames:
object_name = (
f"{kwargs['dag'].dag_id}"
f"/{kwargs['run_id']}"
f"/{kwargs['task_instance'].task_id}"
f"/{kwargs['task_instance']._try_number}"
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
f"{dirpath.split(project_dir)[-1]}/{filename}"
)
hook.upload(
Expand All @@ -72,27 +91,37 @@ def upload_artifacts_to_gcp_gs(project_dir: str, **kwargs: Any) -> None:
)


def upload_artifacts_to_azure_wasb(project_dir: str, **kwargs: Any) -> None:
def upload_to_azure_wasb(
project_dir: str,
container_name: str,
azure_conn_id: str | None = None,
source_subpath: str = DEFAULT_TARGET_PATH,
**kwargs: Any,
) -> None:
"""
Helper function demonstrating how to upload artifacts to Azure WASB that can be used as a callback.
Helper function demonstrating how to upload files to Azure WASB that can be used as a callback.
:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
:param container_name: Name of the Azure WASB container to upload files to.
:param azure_conn_id: Azure connection ID to use when uploading files.
:param source_subpath: Path of the source directory sub-path to upload files from.
"""
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook

target_dir = f"{project_dir}/target"
azure_conn_id = kwargs.get("azure_conn_id", WasbHook.default_conn_name)
container_name = kwargs["container_name"]
target_dir = f"{project_dir}/{source_subpath}"
azure_conn_id = azure_conn_id if azure_conn_id else WasbHook.default_conn_name
# container_name = kwargs["container_name"]
hook = WasbHook(wasb_conn_id=azure_conn_id)
context = kwargs["context"]

# Iterate over the files in the target dir and upload them to WASB container
for dirpath, _, filenames in os.walk(target_dir):
for filename in filenames:
blob_name = (
f"{kwargs['dag'].dag_id}"
f"/{kwargs['run_id']}"
f"/{kwargs['task_instance'].task_id}"
f"/{kwargs['task_instance']._try_number}"
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
f"{dirpath.split(project_dir)[-1]}/{filename}"
)
hook.load_file(
Expand Down Expand Up @@ -142,31 +171,34 @@ def _construct_dest_file_path(
dest_target_dir: Path,
file_path: str,
source_target_dir: Path,
source_subpath: str,
**kwargs: Any,
) -> str:
"""
Construct the destination path for the artifact files to be uploaded to the remote store.
"""
dest_target_dir_str = str(dest_target_dir).rstrip("/")

context = kwargs["context"]
task_run_identifier = (
f"{kwargs['dag'].dag_id}"
f"/{kwargs['run_id']}"
f"/{kwargs['task_instance'].task_id}"
f"/{kwargs['task_instance']._try_number}"
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
)
rel_path = os.path.relpath(file_path, source_target_dir).lstrip("/")

return f"{dest_target_dir_str}/{task_run_identifier}/target/{rel_path}"
return f"{dest_target_dir_str}/{task_run_identifier}/{source_subpath}/{rel_path}"


def upload_artifacts_to_cloud_storage(project_dir: str, **kwargs: Any) -> None:
def upload_to_cloud_storage(project_dir: str, source_subpath: str = DEFAULT_TARGET_PATH, **kwargs: Any) -> None:
"""
Helper function demonstrating how to upload artifacts to remote blob stores that can be used as a callback. This is
Helper function demonstrating how to upload files to remote object stores that can be used as a callback. This is
an example of a helper function that can be used if on Airflow >= 2.8 and cosmos configurations like
``remote_target_path`` and ``remote_target_path_conn_id`` when set can be leveraged.
:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
:param source_subpath: Path of the source directory sub-path to upload files from.
"""
dest_target_dir, dest_conn_id = _configure_remote_target_path()

Expand All @@ -175,9 +207,11 @@ def upload_artifacts_to_cloud_storage(project_dir: str, **kwargs: Any) -> None:

from airflow.io.path import ObjectStoragePath

source_target_dir = Path(project_dir) / "target"
source_target_dir = Path(project_dir) / f"{source_subpath}"
files = [str(file) for file in source_target_dir.rglob("*") if file.is_file()]
for file_path in files:
dest_file_path = _construct_dest_file_path(dest_target_dir, file_path, source_target_dir, **kwargs)
dest_file_path = _construct_dest_file_path(
dest_target_dir, file_path, source_target_dir, source_subpath, **kwargs
)
dest_object_storage_path = ObjectStoragePath(dest_file_path, conn_id=dest_conn_id)
ObjectStoragePath(file_path).copy(dest_object_storage_path)
2 changes: 1 addition & 1 deletion cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ def run_command(
self.store_compiled_sql(tmp_project_dir, context)
self.upload_compiled_sql(tmp_project_dir, context)
if self.callback:
self.callback_args.update(context)
self.callback_args.update({"context": context})
self.callback(tmp_project_dir, **self.callback_args)
self.handle_exception(result)

Expand Down
19 changes: 16 additions & 3 deletions dev/dags/cosmos_callback_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pathlib import Path

from cosmos import DbtDag, ProfileConfig, ProjectConfig
from cosmos.io import upload_artifacts_to_cloud_storage
from cosmos.io import upload_to_cloud_storage
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
Expand All @@ -34,8 +34,21 @@
"install_deps": True, # install any necessary dependencies before running any dbt command
"full_refresh": True, # used only in dbt commands that support this flag
# --------------------------------------------------------------
# Callback function to upload artifacts using Airflow Object storage and Cosmos remote_target_path setting on Airflow 2.8 and above
"callback": upload_artifacts_to_cloud_storage,
# Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting on Airflow 2.8 and above
"callback": upload_to_cloud_storage,
# --------------------------------------------------------------
# Callback function to upload files to AWS S3, works for Airflow < 2.8 too
# "callback": upload_to_aws_s3,
# "callback_args": {"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload files to GCP GS, works for Airflow < 2.8 too
# "callback": upload_to_gcp_gs,
# "callback_args": {"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload files to Azure WASB, works for Airflow < 2.8 too
# "callback": upload_to_azure_wasb,
# "callback_args": {"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
},
# normal dag parameters
schedule_interval="@daily",
Expand Down
13 changes: 7 additions & 6 deletions dev/dags/example_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from airflow.operators.python import PythonOperator

from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_artifacts_to_aws_s3
from cosmos.io import upload_to_aws_s3

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
Expand Down Expand Up @@ -41,17 +41,18 @@ def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_defau
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
# --------------------------------------------------------------
# Callback function to upload artifacts to AWS S3
callback=upload_artifacts_to_aws_s3,
callback=upload_to_aws_s3,
callback_args={"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload artifacts to GCP GS
# "callback": upload_artifacts_to_gcp_gs,
# "callback_args": {"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"},
# callback=upload_to_gcp_gs,
# callback_args={"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload artifacts to Azure WASB
# "callback": upload_artifacts_to_azure_wasb,
# "callback_args": {"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"},
# callback=upload_to_azure_wasb,
# callback_args={"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
)
# [END single_operator_callback]
Expand Down
33 changes: 19 additions & 14 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@

import pytest

from cosmos.constants import _default_s3_conn
from cosmos.constants import DEFAULT_TARGET_PATH, _default_s3_conn
from cosmos.exceptions import CosmosValueError
from cosmos.io import (
_configure_remote_target_path,
_construct_dest_file_path,
upload_artifacts_to_aws_s3,
upload_artifacts_to_azure_wasb,
upload_artifacts_to_cloud_storage,
upload_artifacts_to_gcp_gs,
upload_to_aws_s3,
upload_to_azure_wasb,
upload_to_cloud_storage,
upload_to_gcp_gs,
)
from cosmos.settings import AIRFLOW_IO_AVAILABLE

Expand All @@ -20,9 +20,11 @@
def dummy_kwargs():
"""Fixture for reusable test kwargs."""
return {
"dag": MagicMock(dag_id="test_dag"),
"run_id": "test_run_id",
"task_instance": MagicMock(task_id="test_task", _try_number=1),
"context": {
"dag": MagicMock(dag_id="test_dag"),
"run_id": "test_run_id",
"task_instance": MagicMock(task_id="test_task", _try_number=1),
},
"bucket_name": "test_bucket",
"container_name": "test_container",
}
Expand All @@ -33,7 +35,7 @@ def test_upload_artifacts_to_aws_s3(dummy_kwargs):
with patch("airflow.providers.amazon.aws.hooks.s3.S3Hook") as mock_hook, patch("os.walk") as mock_walk:
mock_walk.return_value = [("/target", [], ["file1.txt", "file2.txt"])]

upload_artifacts_to_aws_s3("/project_dir", **dummy_kwargs)
upload_to_aws_s3("/project_dir", **dummy_kwargs)

mock_walk.assert_called_once_with("/project_dir/target")
hook_instance = mock_hook.return_value
Expand All @@ -45,7 +47,7 @@ def test_upload_artifacts_to_gcp_gs(dummy_kwargs):
with patch("airflow.providers.google.cloud.hooks.gcs.GCSHook") as mock_hook, patch("os.walk") as mock_walk:
mock_walk.return_value = [("/target", [], ["file1.txt", "file2.txt"])]

upload_artifacts_to_gcp_gs("/project_dir", **dummy_kwargs)
upload_to_gcp_gs("/project_dir", **dummy_kwargs)

mock_walk.assert_called_once_with("/project_dir/target")
hook_instance = mock_hook.return_value
Expand All @@ -57,7 +59,7 @@ def test_upload_artifacts_to_azure_wasb(dummy_kwargs):
with patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook") as mock_hook, patch("os.walk") as mock_walk:
mock_walk.return_value = [("/target", [], ["file1.txt", "file2.txt"])]

upload_artifacts_to_azure_wasb("/project_dir", **dummy_kwargs)
upload_to_azure_wasb("/project_dir", **dummy_kwargs)

mock_walk.assert_called_once_with("/project_dir/target")
hook_instance = mock_hook.return_value
Expand All @@ -79,14 +81,17 @@ def test_construct_dest_file_path(dummy_kwargs):
file_path = "/project_dir/target/subdir/file.txt"

expected_path = "/dest/test_dag/test_run_id/test_task/1/target/subdir/file.txt"
assert _construct_dest_file_path(dest_target_dir, file_path, source_target_dir, **dummy_kwargs) == expected_path
assert (
_construct_dest_file_path(dest_target_dir, file_path, source_target_dir, DEFAULT_TARGET_PATH, **dummy_kwargs)
== expected_path
)


def test_upload_artifacts_to_cloud_storage_no_remote_path():
"""Test upload_artifacts_to_cloud_storage with no remote path."""
with patch("cosmos.io._configure_remote_target_path", return_value=(None, None)):
with pytest.raises(CosmosValueError):
upload_artifacts_to_cloud_storage("/project_dir", **{})
upload_to_cloud_storage("/project_dir", **{})


@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release")
Expand All @@ -108,7 +113,7 @@ def test_upload_artifacts_to_cloud_storage_success(dummy_kwargs):

mock_rglob.return_value = [mock_file1, mock_file2]

upload_artifacts_to_cloud_storage("/project_dir", **dummy_kwargs)
upload_to_cloud_storage("/project_dir", **dummy_kwargs)

mock_configure.assert_called_once()
assert mock_copy.call_count == 2
Expand Down

0 comments on commit 1203b73

Please sign in to comment.