Skip to content

Commit

Permalink
Address review feedback and write a separate DAG for AF 2.8 callback …
Browse files Browse the repository at this point in the history
…function
  • Loading branch information
pankajkoti committed Dec 16, 2024
1 parent edf3ca2 commit 7a166c9
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 19 deletions.
22 changes: 22 additions & 0 deletions cosmos/helpers.py → cosmos/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@


def upload_artifacts_to_aws_s3(project_dir: str, **kwargs: Any) -> None:
"""
Helper function demonstrating how to upload artifacts to AWS S3 that can be used as a callback.
:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
"""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

target_dir = f"{project_dir}/target"
Expand All @@ -38,6 +43,11 @@ def upload_artifacts_to_aws_s3(project_dir: str, **kwargs: Any) -> None:


def upload_artifacts_to_gcp_gs(project_dir: str, **kwargs: Any) -> None:
"""
Helper function demonstrating how to upload artifacts to GCP GS that can be used as a callback.
:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
"""
from airflow.providers.google.cloud.hooks.gcs import GCSHook

target_dir = f"{project_dir}/target"
Expand All @@ -63,6 +73,11 @@ def upload_artifacts_to_gcp_gs(project_dir: str, **kwargs: Any) -> None:


def upload_artifacts_to_azure_wasb(project_dir: str, **kwargs: Any) -> None:
"""
Helper function demonstrating how to upload artifacts to Azure WASB that can be used as a callback.
:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
"""
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook

target_dir = f"{project_dir}/target"
Expand Down Expand Up @@ -146,6 +161,13 @@ def _construct_dest_file_path(


def upload_artifacts_to_cloud_storage(project_dir: str, **kwargs: Any) -> None:
"""
Helper function demonstrating how to upload artifacts to remote blob stores that can be used as a callback. This is
an example of a helper function that can be used if on Airflow >= 2.8 and cosmos configurations like
``remote_target_path`` and ``remote_target_path_conn_id`` when set can be leveraged.
:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
"""
dest_target_dir, dest_conn_id = _configure_remote_target_path()

if not dest_target_dir:
Expand Down
17 changes: 0 additions & 17 deletions dev/dags/basic_cosmos_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from pathlib import Path

from cosmos import DbtDag, ProfileConfig, ProjectConfig
from cosmos.helpers import upload_artifacts_to_cloud_storage
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
Expand All @@ -33,22 +32,6 @@
operator_args={
"install_deps": True, # install any necessary dependencies before running any dbt command
"full_refresh": True, # used only in dbt commands that support this flag
# --------------------------------------------------------------
# Callback function to upload artifacts using Airflow Object storage and Cosmos remote_target_path setting on Airflow 2.8 and above
"callback": upload_artifacts_to_cloud_storage,
# --------------------------------------------------------------
# Callback function to upload artifacts to AWS S3 for Airflow < 2.8
# "callback": upload_artifacts_to_aws_s3,
# "callback_args": {"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload artifacts to GCP GS for Airflow < 2.8
# "callback": upload_artifacts_to_gcp_gs,
# "callback_args": {"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload artifacts to Azure WASB for Airflow < 2.8
# "callback": upload_artifacts_to_azure_wasb,
# "callback_args": {"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
},
# normal dag parameters
schedule_interval="@daily",
Expand Down
15 changes: 14 additions & 1 deletion dev/dags/example_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from airflow import DAG

from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.helpers import upload_artifacts_to_aws_s3
from cosmos.io import upload_artifacts_to_aws_s3

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
Expand All @@ -20,16 +20,29 @@
)

with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
# [START single_operator_callback]
seed_operator = DbtSeedLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
# Callback function to upload artifacts to AWS S3
callback=upload_artifacts_to_aws_s3,
callback_args={"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload artifacts to GCP GS
# "callback": upload_artifacts_to_gcp_gs,
# "callback_args": {"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload artifacts to Azure WASB
# "callback": upload_artifacts_to_azure_wasb,
# "callback_args": {"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
)
# [END single_operator_callback]

run_operator = DbtRunLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

MIN_VER_DAG_FILE: dict[str, list[str]] = {
"2.4": ["cosmos_seed_dag.py"],
"2.8": ["cosmos_manifest_example.py", "simple_dag_async.py"],
"2.8": ["cosmos_manifest_example.py", "simple_dag_async.py", "cosmos_callback_dag.py"],
}

IGNORED_DAG_FILES = ["performance_dag.py", "jaffle_shop_kubernetes.py"]
Expand Down

0 comments on commit 7a166c9

Please sign in to comment.