From 25712cb95fe8cebba393007338cc990edf5cdf18 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 13 Dec 2024 13:58:39 +0530 Subject: [PATCH] Add helper functions for uploading target directory artifacts to remote cloud storages --- cosmos/operators/local.py | 7 +++++-- dev/dags/basic_cosmos_dag.py | 19 +++++++++++++++++++ dev/dags/example_operators.py | 3 +++ 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index bf47ab4aa..1cc7d0cb7 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -141,6 +141,7 @@ def __init__( invocation_mode: InvocationMode | None = None, install_deps: bool = False, callback: Callable[[str], None] | None = None, + callback_args: dict[str, Any] | None = None, should_store_compiled_sql: bool = True, should_upload_compiled_sql: bool = False, append_env: bool = True, @@ -149,6 +150,7 @@ def __init__( self.task_id = task_id self.profile_config = profile_config self.callback = callback + self.callback_args = callback_args or {} self.compiled_sql = "" self.freshness = "" self.should_store_compiled_sql = should_store_compiled_sql @@ -500,9 +502,10 @@ def run_command( self.store_freshness_json(tmp_project_dir, context) self.store_compiled_sql(tmp_project_dir, context) self.upload_compiled_sql(tmp_project_dir, context) - self.handle_exception(result) if self.callback: - self.callback(tmp_project_dir) + self.callback_args.update(context) + self.callback(tmp_project_dir, **self.callback_args) + self.handle_exception(result) return result diff --git a/dev/dags/basic_cosmos_dag.py b/dev/dags/basic_cosmos_dag.py index f71f351f0..08e3489a6 100644 --- a/dev/dags/basic_cosmos_dag.py +++ b/dev/dags/basic_cosmos_dag.py @@ -7,6 +7,9 @@ from pathlib import Path from cosmos import DbtDag, ProfileConfig, ProjectConfig +from cosmos.helpers import ( + upload_artifacts_to_cloud_storage, +) from cosmos.profiles import PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" @@ -32,6 +35,22 @@ operator_args={ "install_deps": True, # install any necessary dependencies before running any dbt command "full_refresh": True, # used only in dbt commands that support this flag + # -------------------------------------------------------------- + # Callback function to upload artifacts using Airflow Object storage and Cosmos remote_target_path setting on Airflow 2.8 and above + "callback": upload_artifacts_to_cloud_storage, + # -------------------------------------------------------------- + # Callback function to upload artifacts to AWS S3 for Airflow < 2.8 + # "callback": upload_artifacts_to_aws_s3, + # "callback_args": {"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"}, + # -------------------------------------------------------------- + # Callback function to upload artifacts to GCP GS for Airflow < 2.8 + # "callback": upload_artifacts_to_gcp_gs, + # "callback_args": {"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"}, + # -------------------------------------------------------------- + # Callback function to upload artifacts to Azure WASB for Airflow < 2.8 + # "callback": upload_artifacts_to_azure_wasb, + # "callback_args": {"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"}, + # -------------------------------------------------------------- }, # normal dag parameters schedule_interval="@daily", diff --git a/dev/dags/example_operators.py b/dev/dags/example_operators.py index 1c8624a34..fa66543e2 100644 --- a/dev/dags/example_operators.py +++ b/dev/dags/example_operators.py @@ -5,6 +5,7 @@ from airflow import DAG from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig +from cosmos.helpers import upload_artifacts_to_aws_s3 DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) @@ -26,6 +27,8 @@ dbt_cmd_flags=["--select", "raw_customers"], install_deps=True, append_env=True, + callback=upload_artifacts_to_aws_s3, + callback_args={"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"}, ) run_operator = DbtRunLocalOperator( profile_config=profile_config,