From 122110a6ae165a46265b014dc9efd44bf81c58bc Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 17 Dec 2024 13:18:14 +0530 Subject: [PATCH] Add operator to check that artifact was uploaded using callback --- dev/dags/example_operators.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/dev/dags/example_operators.py b/dev/dags/example_operators.py index be1dc1947..e84ad78c4 100644 --- a/dev/dags/example_operators.py +++ b/dev/dags/example_operators.py @@ -1,8 +1,10 @@ import os from datetime import datetime from pathlib import Path +from typing import Any from airflow import DAG +from airflow.operators.python import PythonOperator from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig from cosmos.io import upload_artifacts_to_aws_s3 @@ -19,6 +21,17 @@ profiles_yml_filepath=DBT_PROFILE_PATH, ) + +def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool: + """Check if a file exists in the given S3 bucket.""" + from airflow.providers.amazon.aws.hooks.s3 import S3Hook + + s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}" + print(f"Checking if file {s3_key} exists in S3 bucket...") + hook = S3Hook(aws_conn_id=aws_conn_id) + return hook.check_for_key(key=s3_key, bucket_name=bucket_name) + + with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag: # [START single_operator_callback] seed_operator = DbtSeedLocalOperator( @@ -43,6 +56,16 @@ ) # [END single_operator_callback] + check_file_uploaded_task = PythonOperator( + task_id="check_file_uploaded_task", + python_callable=check_s3_file, + op_kwargs={ + "aws_conn_id": "aws_s3_conn", + "bucket_name": "cosmos-artifacts-upload", + "file_key": "target/run_results.json", + }, + ) + run_operator = DbtRunLocalOperator( profile_config=profile_config, project_dir=DBT_PROJ_DIR, @@ -64,3 +87,4 @@ # [END clone_example] seed_operator >> run_operator >> clone_operator + seed_operator >> check_file_uploaded_task