-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve callback support #1349
Labels
execution:callback
Tasks related to callback when executing tasks
Milestone
Comments
tatiana
added
the
execution:callback
Tasks related to callback when executing tasks
label
Nov 29, 2024
tatiana
changed the title
Add helper functions for artifact management
Improve callback support
Nov 29, 2024
2 tasks
tatiana
pushed a commit
that referenced
this issue
Dec 17, 2024
…te cloud storages (#1389) This PR introduces helper functions that can be passed as callable callbacks for Cosmos tasks to execute post-task execution. These helper functions enable the uploading of artifacts (from the project's target directory) to various cloud storage providers, including AWS S3, Google Cloud Storage (GCS), Azure WASB, and general remote object stores using Airflow’s ObjectStoragePath. ## Key Changes Adds a `cosmos/io.py` module that includes the following helper functions 1. `upload_artifacts_to_aws_s3` - Uploads artifact files from a task’s local target directory to an AWS S3 bucket. - Supports dynamically appending DAG metadata (e.g., dag_id, task_id, run_id, and try number) to the uploaded file paths. - Utilizes S3Hook from the airflow.providers.amazon.aws module. 2. `upload_artifacts_to_gcp_gs` - Uploads artifact files from a task’s local target directory to a Google Cloud Storage (GCS) bucket. - Appends DAG-related context to the GCS object names for better traceability. - Leverages GCSHook from airflow.providers.google.cloud. 3. `upload_artifacts_to_azure_wasb` - Uploads artifact files from a task’s local target directory to an Azure Blob Storage container. - Automatically structures blob names with metadata, including dag_id, task_id, and execution details. - Utilizes WasbHook from the airflow.providers.microsoft.azure module. 4. `upload_artifacts_to_cloud_storage` - A generic helper function that uploads artifacts from a task’s local target directory to remote object stores configured via Airflow’s ObjectStoragePath (Airflow 2.8+ feature). - Supports custom remote storage configurations such as `remote_target_path` and `remote_target_path_conn_id`. - Dynamically constructs file paths that include DAG metadata for clear organization. These helpers functions can be passed as the `callback` argument to `DbtDAG` or to your `Dag` instance as demonstrated in the example DAGs `dev/dags/cosmos_callback_dag.py` and `dev/dags/example_operators.py` correspondingly. You can also pass `callback_args` as shown in the example DAGs. These helper functions are mere examples of how callback functions can be written and passed to your operators/DAGs to be executed after task completions. Taking reference of these helper functions, you can write your own callback function and pass those. ## Limitations 1. This PR has been tested and is currently supported only in `ExecutionMode.LOCAL`. We encourage the community to contribute by adding callback support for other execution modes as needed, using the implementation for `ExecutionMode.LOCAL` as a reference. closes: #1350 closes: #976 closes: #867 closes: #801 closes: #1292 closes: #851 closes: #1351 related: #1293 related: #1349
This was referenced Dec 17, 2024
I am removing #914 from this task's description since it is been tackled separately by @pankajastro in PR #1400 & it has the Cosmos 1.8.0 milestone tagged to it. With that I am closing this issue as the rest of the issues are already covered |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is a follow up to #1121
The idea is to close all callback issue tasks:
We created a subtask for each of the proposed ideas:
#1121 (comment)
The text was updated successfully, but these errors were encountered: