Skip to content

Commit

Permalink
Add helper functions for uploading target directory artifacts to remo…
Browse files Browse the repository at this point in the history
…te cloud storages (#1389)

This PR introduces helper functions that can be passed as callable
callbacks for Cosmos tasks to execute post-task execution. These helper
functions enable the uploading of artifacts (from the project's target
directory) to various cloud storage providers, including AWS S3, Google
Cloud Storage (GCS), Azure WASB, and general remote object stores using
Airflow’s ObjectStoragePath.

## Key Changes
Adds a `cosmos/io.py` module that includes the following helper
functions

1. `upload_artifacts_to_aws_s3`
- Uploads artifact files from a task’s local target directory to an AWS
S3 bucket.
- Supports dynamically appending DAG metadata (e.g., dag_id, task_id,
run_id, and try number) to the uploaded file paths.
      - Utilizes S3Hook from the airflow.providers.amazon.aws module.

2. `upload_artifacts_to_gcp_gs`

- Uploads artifact files from a task’s local target directory to a
Google Cloud Storage (GCS) bucket.
- Appends DAG-related context to the GCS object names for better
traceability.
      - Leverages GCSHook from airflow.providers.google.cloud.
    
3. `upload_artifacts_to_azure_wasb`
- Uploads artifact files from a task’s local target directory to an
Azure Blob Storage container.
- Automatically structures blob names with metadata, including dag_id,
task_id, and execution details.
- Utilizes WasbHook from the airflow.providers.microsoft.azure module.
  
4. `upload_artifacts_to_cloud_storage`
- A generic helper function that uploads artifacts from a task’s local
target directory to remote object stores configured via Airflow’s
ObjectStoragePath (Airflow 2.8+ feature).
- Supports custom remote storage configurations such as
`remote_target_path` and `remote_target_path_conn_id`.
- Dynamically constructs file paths that include DAG metadata for clear
organization.
     
These helpers functions can be passed as the `callback` argument to
`DbtDAG` or to your `Dag` instance as demonstrated in the example DAGs
`dev/dags/cosmos_callback_dag.py` and `dev/dags/example_operators.py`
correspondingly. You can also pass `callback_args` as shown in the
example DAGs. These helper functions are mere examples of how callback
functions can be written and passed to your operators/DAGs to be
executed after task completions. Taking reference of these helper
functions, you can write your own callback function and pass those.


## Limitations

1. This PR has been tested and is currently supported only in
`ExecutionMode.LOCAL`. We encourage the community to contribute by
adding callback support for other execution modes as needed, using the
implementation for `ExecutionMode.LOCAL` as a reference.

closes: #1350
closes: #976
closes: #867
closes: #801
closes: #1292
closes: #851
closes: #1351 
related: #1293
related: #1349
  • Loading branch information
pankajkoti authored Dec 17, 2024
1 parent cbd8622 commit 0000f80
Show file tree
Hide file tree
Showing 9 changed files with 545 additions and 3 deletions.
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
DEFAULT_DBT_PROFILE_NAME = "cosmos_profile"
DEFAULT_DBT_TARGET_NAME = "cosmos_target"
DEFAULT_COSMOS_CACHE_DIR_NAME = "cosmos"
DEFAULT_TARGET_PATH = "target"
DBT_LOG_PATH_ENVVAR = "DBT_LOG_PATH"
DBT_LOG_DIR_NAME = "logs"
DBT_TARGET_PATH_ENVVAR = "DBT_TARGET_PATH"
Expand Down
217 changes: 217 additions & 0 deletions cosmos/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
from __future__ import annotations

import os
from pathlib import Path
from typing import Any
from urllib.parse import urlparse

from cosmos import settings
from cosmos.constants import DEFAULT_TARGET_PATH, FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP
from cosmos.exceptions import CosmosValueError
from cosmos.settings import remote_target_path, remote_target_path_conn_id


def upload_to_aws_s3(
project_dir: str,
bucket_name: str,
aws_conn_id: str | None = None,
source_subpath: str = DEFAULT_TARGET_PATH,
**kwargs: Any,
) -> None:
"""
Helper function demonstrating how to upload files to AWS S3 that can be used as a callback.
:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
:param bucket_name: Name of the S3 bucket to upload to.
:param aws_conn_id: AWS connection ID to use when uploading files.
:param source_subpath: Path of the source directory sub-path to upload files from.
"""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

target_dir = f"{project_dir}/{source_subpath}"
aws_conn_id = aws_conn_id if aws_conn_id else S3Hook.default_conn_name
hook = S3Hook(aws_conn_id=aws_conn_id)
context = kwargs["context"]

# Iterate over the files in the target dir and upload them to S3
for dirpath, _, filenames in os.walk(target_dir):
for filename in filenames:
s3_key = (
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
f"{dirpath.split(project_dir)[-1]}/{filename}"
)
hook.load_file(
filename=f"{dirpath}/{filename}",
bucket_name=bucket_name,
key=s3_key,
replace=True,
)


def upload_to_gcp_gs(
project_dir: str,
bucket_name: str,
gcp_conn_id: str | None = None,
source_subpath: str = DEFAULT_TARGET_PATH,
**kwargs: Any,
) -> None:
"""
Helper function demonstrating how to upload files to GCP GS that can be used as a callback.
:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
:param bucket_name: Name of the GCP GS bucket to upload to.
:param gcp_conn_id: GCP connection ID to use when uploading files.
:param source_subpath: Path of the source directory sub-path to upload files from.
"""
from airflow.providers.google.cloud.hooks.gcs import GCSHook

target_dir = f"{project_dir}/{source_subpath}"
gcp_conn_id = gcp_conn_id if gcp_conn_id else GCSHook.default_conn_name
# bucket_name = kwargs["bucket_name"]
hook = GCSHook(gcp_conn_id=gcp_conn_id)
context = kwargs["context"]

# Iterate over the files in the target dir and upload them to GCP GS
for dirpath, _, filenames in os.walk(target_dir):
for filename in filenames:
object_name = (
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
f"{dirpath.split(project_dir)[-1]}/{filename}"
)
hook.upload(
filename=f"{dirpath}/{filename}",
bucket_name=bucket_name,
object_name=object_name,
)


def upload_to_azure_wasb(
project_dir: str,
container_name: str,
azure_conn_id: str | None = None,
source_subpath: str = DEFAULT_TARGET_PATH,
**kwargs: Any,
) -> None:
"""
Helper function demonstrating how to upload files to Azure WASB that can be used as a callback.
:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
:param container_name: Name of the Azure WASB container to upload files to.
:param azure_conn_id: Azure connection ID to use when uploading files.
:param source_subpath: Path of the source directory sub-path to upload files from.
"""
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook

target_dir = f"{project_dir}/{source_subpath}"
azure_conn_id = azure_conn_id if azure_conn_id else WasbHook.default_conn_name
# container_name = kwargs["container_name"]
hook = WasbHook(wasb_conn_id=azure_conn_id)
context = kwargs["context"]

# Iterate over the files in the target dir and upload them to WASB container
for dirpath, _, filenames in os.walk(target_dir):
for filename in filenames:
blob_name = (
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
f"{dirpath.split(project_dir)[-1]}/{filename}"
)
hook.load_file(
file_path=f"{dirpath}/{filename}",
container_name=container_name,
blob_name=blob_name,
overwrite=True,
)


def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]:
"""Configure the remote target path if it is provided."""
from airflow.version import version as airflow_version

if not remote_target_path:
return None, None

_configured_target_path = None

target_path_str = str(remote_target_path)

remote_conn_id = remote_target_path_conn_id
if not remote_conn_id:
target_path_schema = urlparse(target_path_str).scheme
remote_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(target_path_schema, None) # type: ignore[assignment]
if remote_conn_id is None:
return None, None

if not settings.AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"You're trying to specify remote target path {target_path_str}, but the required "
f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
"Airflow 2.8 or later."
)

from airflow.io.path import ObjectStoragePath

_configured_target_path = ObjectStoragePath(target_path_str, conn_id=remote_conn_id)

if not _configured_target_path.exists(): # type: ignore[no-untyped-call]
_configured_target_path.mkdir(parents=True, exist_ok=True)

return _configured_target_path, remote_conn_id


def _construct_dest_file_path(
dest_target_dir: Path,
file_path: str,
source_target_dir: Path,
source_subpath: str,
**kwargs: Any,
) -> str:
"""
Construct the destination path for the artifact files to be uploaded to the remote store.
"""
dest_target_dir_str = str(dest_target_dir).rstrip("/")

context = kwargs["context"]
task_run_identifier = (
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
)
rel_path = os.path.relpath(file_path, source_target_dir).lstrip("/")

return f"{dest_target_dir_str}/{task_run_identifier}/{source_subpath}/{rel_path}"


def upload_to_cloud_storage(project_dir: str, source_subpath: str = DEFAULT_TARGET_PATH, **kwargs: Any) -> None:
"""
Helper function demonstrating how to upload files to remote object stores that can be used as a callback. This is
an example of a helper function that can be used if on Airflow >= 2.8 and cosmos configurations like
``remote_target_path`` and ``remote_target_path_conn_id`` when set can be leveraged.
:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
:param source_subpath: Path of the source directory sub-path to upload files from.
"""
dest_target_dir, dest_conn_id = _configure_remote_target_path()

if not dest_target_dir:
raise CosmosValueError("You're trying to upload artifact files, but the remote target path is not configured.")

from airflow.io.path import ObjectStoragePath

source_target_dir = Path(project_dir) / f"{source_subpath}"
files = [str(file) for file in source_target_dir.rglob("*") if file.is_file()]
for file_path in files:
dest_file_path = _construct_dest_file_path(
dest_target_dir, file_path, source_target_dir, source_subpath, **kwargs
)
dest_object_storage_path = ObjectStoragePath(dest_file_path, conn_id=dest_conn_id)
ObjectStoragePath(file_path).copy(dest_object_storage_path)
7 changes: 5 additions & 2 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def __init__(
invocation_mode: InvocationMode | None = None,
install_deps: bool = False,
callback: Callable[[str], None] | None = None,
callback_args: dict[str, Any] | None = None,
should_store_compiled_sql: bool = True,
should_upload_compiled_sql: bool = False,
append_env: bool = True,
Expand All @@ -149,6 +150,7 @@ def __init__(
self.task_id = task_id
self.profile_config = profile_config
self.callback = callback
self.callback_args = callback_args or {}
self.compiled_sql = ""
self.freshness = ""
self.should_store_compiled_sql = should_store_compiled_sql
Expand Down Expand Up @@ -500,9 +502,10 @@ def run_command(
self.store_freshness_json(tmp_project_dir, context)
self.store_compiled_sql(tmp_project_dir, context)
self.upload_compiled_sql(tmp_project_dir, context)
self.handle_exception(result)
if self.callback:
self.callback(tmp_project_dir)
self.callback_args.update({"context": context})
self.callback(tmp_project_dir, **self.callback_args)
self.handle_exception(result)

return result

Expand Down
60 changes: 60 additions & 0 deletions dev/dags/cosmos_callback_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
An example DAG that uses Cosmos to render a dbt project into an Airflow DAG.
"""

import os
from datetime import datetime
from pathlib import Path

from cosmos import DbtDag, ProfileConfig, ProjectConfig
from cosmos.io import upload_to_cloud_storage
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="example_conn",
profile_args={"schema": "public"},
disable_event_tracking=True,
),
)

# [START cosmos_callback_example]
cosmos_callback_dag = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
operator_args={
"install_deps": True, # install any necessary dependencies before running any dbt command
"full_refresh": True, # used only in dbt commands that support this flag
# --------------------------------------------------------------
# Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting on Airflow 2.8 and above
"callback": upload_to_cloud_storage,
# --------------------------------------------------------------
# Callback function to upload files to AWS S3, works for Airflow < 2.8 too
# "callback": upload_to_aws_s3,
# "callback_args": {"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload files to GCP GS, works for Airflow < 2.8 too
# "callback": upload_to_gcp_gs,
# "callback_args": {"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload files to Azure WASB, works for Airflow < 2.8 too
# "callback": upload_to_azure_wasb,
# "callback_args": {"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
},
# normal dag parameters
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="cosmos_callback_dag",
default_args={"retries": 2},
)
# [END cosmos_callback_example]
41 changes: 41 additions & 0 deletions dev/dags/example_operators.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import os
from datetime import datetime
from pathlib import Path
from typing import Any

from airflow import DAG
from airflow.operators.python import PythonOperator

from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_to_aws_s3

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
Expand All @@ -18,15 +21,52 @@
profiles_yml_filepath=DBT_PROFILE_PATH,
)


def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool:
"""Check if a file exists in the given S3 bucket."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}"
print(f"Checking if file {s3_key} exists in S3 bucket...")
hook = S3Hook(aws_conn_id=aws_conn_id)
return hook.check_for_key(key=s3_key, bucket_name=bucket_name)


with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
# [START single_operator_callback]
seed_operator = DbtSeedLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
# --------------------------------------------------------------
# Callback function to upload artifacts to AWS S3
callback=upload_to_aws_s3,
callback_args={"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload artifacts to GCP GS
# callback=upload_to_gcp_gs,
# callback_args={"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload artifacts to Azure WASB
# callback=upload_to_azure_wasb,
# callback_args={"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
)
# [END single_operator_callback]

check_file_uploaded_task = PythonOperator(
task_id="check_file_uploaded_task",
python_callable=check_s3_file,
op_kwargs={
"aws_conn_id": "aws_s3_conn",
"bucket_name": "cosmos-artifacts-upload",
"file_key": "target/run_results.json",
},
)

run_operator = DbtRunLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
Expand All @@ -48,3 +88,4 @@
# [END clone_example]

seed_operator >> run_operator >> clone_operator
seed_operator >> check_file_uploaded_task
Loading

0 comments on commit 0000f80

Please sign in to comment.