Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional transfer step to dataset DAG #259

Draft
wants to merge 3 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions dags/veda_data_pipeline/groups/transfer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule
from airflow.decorators import task

group_kwgs = {"group_id": "Transfer", "tooltip": "Transfer"}

Expand All @@ -27,13 +28,22 @@ def cogify_copy_task(ti):
external_role_arn = airflow_vars_json.get("ASSUME_ROLE_WRITE_ARN")
return cogify_transfer_handler(event_src=config, external_role_arn=external_role_arn)


def transfer_data(ti):
@task
def transfer_data(ti, payload):
"""Transfer data from one S3 bucket to another; s3 copy, no need for docker"""
from veda_data_pipeline.utils.transfer import (
data_transfer_handler,
)
config = ti.dag_run.conf
# use task-provided payload if provided, otherwise fall back on ti values
# payload will generally have the same values expected by discovery, so some renames are needed when combining the dicts
config = {
**payload,
**ti.dag_run.conf,
"origin_bucket": payload.get("bucket", ti.dag_run.conf.get("origin_bucket", "veda-data-store")),
"origin_prefix": payload.get("prefix", ti.dag_run.conf.get("origin_prefix", "s3-prefix/")),
"target_bucket": payload.get("target_bucket", ti.dag_run.conf.get("target_bucket", "veda-data-store")),
"dry_run": payload.get("dry_run", ti.dag_run.conf.get("dry_run", True)),# TODO default false before merge
}
airflow_vars = Variable.get("aws_dags_variables")
airflow_vars_json = json.loads(airflow_vars)
external_role_arn = airflow_vars_json.get("ASSUME_ROLE_WRITE_ARN")
Expand Down
1 change: 1 addition & 0 deletions dags/veda_data_pipeline/utils/collection_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class GenerateCollection:
"is_periodic",
"time_density",
"type",
"transfer"
]

def get_template(self, dataset: Dict[str, Any]) -> dict:
Expand Down
28 changes: 25 additions & 3 deletions dags/veda_data_pipeline/veda_dataset_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ def build_stac_task(payload):
event_bucket = airflow_vars_json.get("EVENT_BUCKET")
return stac_handler(payload_src=payload, bucket_output=event_bucket)

@task(max_active_tis_per_dag=3)
def transfer_assets_to_production_bucket(payload):
# TODO do transfer
transfer_data(payload)
# if transfer complete, update discovery payload to reflect new bucket
payload.update({"bucket": "veda-data-store"})
payload.update({"prefix": payload.get("collection")+"/"})
return payload


template_dag_run_conf = {
"collection": "<collection-id>",
Expand All @@ -79,7 +88,8 @@ def build_stac_task(payload):
"is_periodic": "<true|false>",
"license": "<collection-LICENSE>",
"time_density": "<time-density>",
"title": "<collection-title>"
"title": "<collection-title>",
"transfer": "<true|false> # transfer assets to production bucket if true (false by default)",
}

with DAG("veda_dataset_pipeline", params=template_dag_run_conf, **dag_args) as dag:
Expand All @@ -89,13 +99,25 @@ def build_stac_task(payload):
end = EmptyOperator(task_id="end", dag=dag)

collection_grp = collection_task_group()
discover = discover_from_s3_task.expand(event=extract_discovery_items())
extract_from_payload = extract_discovery_items()

# asset transfer to production bucket
transfer_flag = dag.params.get("transfer", False)
if transfer_flag:
transfer_task = transfer_assets_to_production_bucket.expand(payload=extract_from_payload)
discover = discover_from_s3_task.expand(event=transfer_task)
else:
discover = discover_from_s3_task.expand(event=extract_from_payload)
discover.set_upstream(collection_grp) # do not discover until collection exists
get_files = get_dataset_files_to_process(payload=discover)

get_files = get_dataset_files_to_process(payload=discover) # untangle mapped data format to get iterable payloads from discover step


build_stac = build_stac_task.expand(payload=get_files)
build_stac.set_upstream(get_files)
# .output is needed coming from a non-taskflow operator
submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac)

collection_grp.set_upstream(start)
extract_from_payload.set_upstream(start)
submit_stac.set_downstream(end)
10 changes: 6 additions & 4 deletions sm2a/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ info_message = \

count_down = \
@echo "Spinning up the system please wait..."; \
secs=40 ;\
while [ $$secs -gt 0 ]; do \
printf "%d\033[0K\r" $$secs; \
secs=40; \
while [ $secs -gt 0 ]; do \
printf "%d\033[0K\r" "$$secs"; \
sleep 1; \
: $$((secs--)); \
secs=$$((secs - 1)); \
done;

.PHONY:
Expand All @@ -24,6 +24,8 @@ count_down = \

all: sm2a-local-init sm2a-local-run

refresh: sm2a-local-build sm2a-local-run

sm2a-local-run: sm2a-local-stop
@echo "Running SM2A"
docker compose up -d
Expand Down