Skip to content

Commit

Permalink
refactor: gwas_catalog processing dag
Browse files Browse the repository at this point in the history
  • Loading branch information
Szymon Szyszkowski committed Aug 20, 2024
1 parent 23dc7bd commit e789311
Showing 1 changed file with 33 additions and 24 deletions.
57 changes: 33 additions & 24 deletions src/ot_orchestration/dags/gwas_catalog_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from airflow.utils.task_group import TaskGroup
from airflow.models.baseoperator import chain

from ot_orchestration.utils.common import shared_dag_args, shared_dag_kwargs
from ot_orchestration.utils.dataproc import (
Expand All @@ -26,15 +27,13 @@
HARMONISED_SUMSTATS_PREFIX = "harmonised_summary_statistics"

# Manifest paths:
MANIFESTS_PATH = f"gs://{GWAS_CATALOG_BUCKET_NAME}/manifests/"
MANIFESTS_PATH = "gs://gwas_catalog_data/manifests/"
HARMONISED_SUMSTATS_LIST_FULL_NAME = "gs://gwas_catalog_data/manifests/gwas_catalog_harmonised_summary_statistics_list.txt"

# The name of the manifest files have to be consistent with the config file:
HARMONISED_SUMSTATS_LIST_OBJECT_NAME = (
"manifests/gwas_catalog_harmonised_summary_statistics_list.txt"
)
HARMONISED_SUMSTATS_LIST_FULL_NAME = (
f"gs://{GWAS_CATALOG_BUCKET_NAME}/{HARMONISED_SUMSTATS_LIST_OBJECT_NAME}"
)
CURATION_INCLUSION_NAME = f"{MANIFESTS_PATH}/gwas_catalog_curation_included_studies"
CURATION_EXCLUSION_NAME = f"{MANIFESTS_PATH}/gwas_catalog_curation_excluded_studies"
SUMMARY_STATISTICS_INCLUSION_NAME = (
Expand All @@ -44,17 +43,17 @@
f"{MANIFESTS_PATH}/gwas_catalog_summary_statistics_excluded_studies"
)

# Study index:
STUDY_INDEX = f"gs://{GWAS_CATALOG_BUCKET_NAME}/study_index"

# Study loci:
CURATED_STUDY_LOCI = f"gs://{GWAS_CATALOG_BUCKET_NAME}/study_locus_datasets/gwas_catalog_curated_associations"
CURATED_LD_CLUMPED = f"gs://{GWAS_CATALOG_BUCKET_NAME}/study_locus_datasets/gwas_catalog_curated_associations_ld_clumped"
WINDOW_BASED_CLUMPED = f"gs://{GWAS_CATALOG_BUCKET_NAME}/study_locus_datasets/gwas_catalog_summary_stats_window_clumped"
LD_BASED_CLUMPED = f"gs://{GWAS_CATALOG_BUCKET_NAME}/study_locus_datasets/gwas_catalog_summary_stats_ld_clumped"
# Credible sets:
CURATED_CREDIBLE_SETS = f"gs://{GWAS_CATALOG_BUCKET_NAME}/credible_set_datasets/gwas_catalog_PICSed_curated_associations"
SUMMARY_STATISTICS_CREDIBLE_SETS = f"gs://{GWAS_CATALOG_BUCKET_NAME}/credible_set_datasets/gwas_catalog_PICSed_summary_statistics"
STUDY_INDEX = "gs://gwas_catalog_data/study_index"
CURATED_STUDY_LOCI = (
"gs://gwas_catalog_data/study_locus_datasets/gwas_catalog_curated_associations"
)
CURATED_LD_CLUMPED = "gs://gwas_catalog_data/study_locus_datasets/gwas_catalog_curated_associations_ld_clumped"
WINDOW_BASED_CLUMPED = "gs://gwas_catalog_data/study_locus_datasets/gwas_catalog_summary_stats_window_clumped"
LD_BASED_CLUMPED = (
"gs://gwas_catalog_data/study_locus_datasets/gwas_catalog_summary_stats_ld_clumped"
)
CURATED_CREDIBLE_SETS = "gs://gwas_catalog_data/credible_set_datasets/gwas_catalog_PICSed_curated_associations"
SUMMARY_STATISTICS_CREDIBLE_SETS = "gs://gwas_catalog_data/credible_set_datasets/gwas_catalog_PICSed_summary_statistics"


def upload_harmonized_study_list(
Expand Down Expand Up @@ -113,6 +112,7 @@ def upload_harmonized_study_list(
f"step.inclusion_list_path={CURATION_INCLUSION_NAME}",
f"step.exclusion_list_path={CURATION_EXCLUSION_NAME}",
f"step.harmonised_study_file={HARMONISED_SUMSTATS_LIST_FULL_NAME}",
"step.session.write_mode=overwrite",
],
)

Expand All @@ -121,7 +121,10 @@ def upload_harmonized_study_list(
cluster_name=CLUSTER_NAME,
step_id="ot_gwas_catalog_ingestion",
task_id="ingest_curated_gwas_catalog_data",
other_args=[f"step.inclusion_list_path={CURATION_INCLUSION_NAME}"],
other_args=[
f"step.inclusion_list_path={CURATION_INCLUSION_NAME}",
"step.session.write_mode=overwrite",
],
)

# Run LD-annotation and clumping on curated data:
Expand All @@ -133,6 +136,7 @@ def upload_harmonized_study_list(
f"step.study_locus_input_path={CURATED_STUDY_LOCI}",
f"step.study_index_path={STUDY_INDEX}",
f"step.clumped_study_locus_output_path={CURATED_LD_CLUMPED}",
"step.session.write_mode=overwrite",
],
)

Expand All @@ -144,6 +148,7 @@ def upload_harmonized_study_list(
other_args=[
f"step.study_locus_ld_annotated_in={CURATED_LD_CLUMPED}",
f"step.picsed_study_locus_out={CURATED_CREDIBLE_SETS}",
"step.session.write_mode=overwrite",
],
)

Expand All @@ -169,6 +174,7 @@ def upload_harmonized_study_list(
f"step.inclusion_list_path={SUMMARY_STATISTICS_INCLUSION_NAME}",
f"step.exclusion_list_path={SUMMARY_STATISTICS_EXCLUSION_NAME}",
f"step.harmonised_study_file={HARMONISED_SUMSTATS_LIST_FULL_NAME}",
"step.session.write_mode=overwrite",
],
)

Expand All @@ -181,6 +187,7 @@ def upload_harmonized_study_list(
f"step.summary_statistics_input_path=gs://{GWAS_CATALOG_BUCKET_NAME}/{HARMONISED_SUMSTATS_PREFIX}",
f"step.inclusion_list_path={SUMMARY_STATISTICS_INCLUSION_NAME}",
f"step.study_locus_output_path={WINDOW_BASED_CLUMPED}",
"step.session.write_mode=overwrite",
],
)

Expand All @@ -193,6 +200,7 @@ def upload_harmonized_study_list(
f"step.study_locus_input_path={WINDOW_BASED_CLUMPED}",
f"step.study_index_path={STUDY_INDEX}",
f"step.clumped_study_locus_output_path={LD_BASED_CLUMPED}",
"step.session.write_mode=overwrite",
],
)

Expand All @@ -204,6 +212,7 @@ def upload_harmonized_study_list(
other_args=[
f"step.study_locus_ld_annotated_in={LD_BASED_CLUMPED}",
f"step.picsed_study_locus_out={SUMMARY_STATISTICS_CREDIBLE_SETS}",
"step.session.write_mode=overwrite",
],
)

Expand All @@ -216,12 +225,12 @@ def upload_harmonized_study_list(
)

# DAG description:
(
create_cluster(CLUSTER_NAME, autoscaling_policy=AUTOSCALING, num_workers=5)
>> install_dependencies(CLUSTER_NAME)
>> list_harmonised_sumstats
>> upload_task
>> curation_processing
>> summary_statistics_processing
>> delete_cluster(CLUSTER_NAME)
chain(
create_cluster(CLUSTER_NAME, autoscaling_policy=AUTOSCALING, num_workers=5),
install_dependencies(CLUSTER_NAME),
list_harmonised_sumstats,
upload_task,
curation_processing,
summary_statistics_processing,
delete_cluster(CLUSTER_NAME),
)

0 comments on commit e789311

Please sign in to comment.