Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve callback support #1349

Closed
tatiana opened this issue Nov 29, 2024 · 1 comment
Closed

Improve callback support #1349

tatiana opened this issue Nov 29, 2024 · 1 comment
Assignees
Labels
execution:callback Tasks related to callback when executing tasks
Milestone

Comments

@tatiana
Copy link
Collaborator

tatiana commented Nov 29, 2024

This is a follow up to #1121

The idea is to close all callback issue tasks:

We created a subtask for each of the proposed ideas:
#1121 (comment)

@tatiana tatiana added the execution:callback Tasks related to callback when executing tasks label Nov 29, 2024
@tatiana tatiana added this to the Cosmos 1.8.0 milestone Nov 29, 2024
@tatiana tatiana changed the title Add helper functions for artifact management Improve callback support Nov 29, 2024
tatiana pushed a commit that referenced this issue Dec 17, 2024
…te cloud storages (#1389)

This PR introduces helper functions that can be passed as callable
callbacks for Cosmos tasks to execute post-task execution. These helper
functions enable the uploading of artifacts (from the project's target
directory) to various cloud storage providers, including AWS S3, Google
Cloud Storage (GCS), Azure WASB, and general remote object stores using
Airflow’s ObjectStoragePath.

## Key Changes
Adds a `cosmos/io.py` module that includes the following helper
functions

1. `upload_artifacts_to_aws_s3`
- Uploads artifact files from a task’s local target directory to an AWS
S3 bucket.
- Supports dynamically appending DAG metadata (e.g., dag_id, task_id,
run_id, and try number) to the uploaded file paths.
      - Utilizes S3Hook from the airflow.providers.amazon.aws module.

2. `upload_artifacts_to_gcp_gs`

- Uploads artifact files from a task’s local target directory to a
Google Cloud Storage (GCS) bucket.
- Appends DAG-related context to the GCS object names for better
traceability.
      - Leverages GCSHook from airflow.providers.google.cloud.
    
3. `upload_artifacts_to_azure_wasb`
- Uploads artifact files from a task’s local target directory to an
Azure Blob Storage container.
- Automatically structures blob names with metadata, including dag_id,
task_id, and execution details.
- Utilizes WasbHook from the airflow.providers.microsoft.azure module.
  
4. `upload_artifacts_to_cloud_storage`
- A generic helper function that uploads artifacts from a task’s local
target directory to remote object stores configured via Airflow’s
ObjectStoragePath (Airflow 2.8+ feature).
- Supports custom remote storage configurations such as
`remote_target_path` and `remote_target_path_conn_id`.
- Dynamically constructs file paths that include DAG metadata for clear
organization.
     
These helpers functions can be passed as the `callback` argument to
`DbtDAG` or to your `Dag` instance as demonstrated in the example DAGs
`dev/dags/cosmos_callback_dag.py` and `dev/dags/example_operators.py`
correspondingly. You can also pass `callback_args` as shown in the
example DAGs. These helper functions are mere examples of how callback
functions can be written and passed to your operators/DAGs to be
executed after task completions. Taking reference of these helper
functions, you can write your own callback function and pass those.


## Limitations

1. This PR has been tested and is currently supported only in
`ExecutionMode.LOCAL`. We encourage the community to contribute by
adding callback support for other execution modes as needed, using the
implementation for `ExecutionMode.LOCAL` as a reference.

closes: #1350
closes: #976
closes: #867
closes: #801
closes: #1292
closes: #851
closes: #1351 
related: #1293
related: #1349
@pankajkoti
Copy link
Contributor

I am removing #914 from this task's description since it is been tackled separately by @pankajastro in PR #1400 & it has the Cosmos 1.8.0 milestone tagged to it.

With that I am closing this issue as the rest of the issues are already covered

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
execution:callback Tasks related to callback when executing tasks
Projects
None yet
Development

No branches or pull requests

2 participants