Skip to content

Commit

Permalink
feat: execution modes
Browse files Browse the repository at this point in the history
  • Loading branch information
Szymon Szyszkowski committed Jul 25, 2024
1 parent 431fe66 commit f8a9987
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 100 deletions.
84 changes: 80 additions & 4 deletions src/ot_orchestration/cli/process_in_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,104 @@ def harmonise(manifest: Manifest_Object) -> Manifest_Object:
"+step.session.extended_spark_conf={spark.kryoserializer.buffer.max:'500m'}",
"+step.session.extended_spark_conf={spark.driver.maxResultSize:'5g'}",
]
if GCSIOManager().exists(harmonised_path):
logging.info("Harmonisation result exists for %s. Skipping", study_id)
manifest["passHarmonisation"] = True
return manifest

result = subprocess.run(args=command, capture_output=True)
if result.returncode != 0:
logging.error("Harmonisation for study %s failed!", study_id)
logging.error(result.stderr.decode())
error_msg = result.stderr.decode()
logging.error(error_msg)
manifest["passHarmonisation"] = False
logging.info("Dumping manifest to %s", manifest["manifestPath"])
GCSIOManager().dump(manifest["manifestPath"], manifest)
exit(1)
else:
logging.info("Harmonisation for study %s completed successfully!", study_id)
manifest["passHarmonisation"] = True

logging.info("Harmonisation for study %s succeded!", study_id)
manifest["passHarmonisation"] = True
return manifest


def qc(manifest: Manifest_Object) -> Manifest_Object:
"""Run QC."""
harmonised_path = manifest["harmonisedPath"]
qc_path = manifest["qcPath"]
study_id = manifest["studyId"]
command = [
"poetry",
"run",
"gentropy",
"step=summary_statistics_qc",
f'step.gwas_path="{harmonised_path}"',
f'step.output_path="{qc_path}"',
f'step.study_id="{study_id}"',
"+step.session.extended_spark_conf={spark.jars:'https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar'}",
"+step.session.extended_spark_conf={spark.dynamicAllocation.enabled:'false'}",
"+step.session.extended_spark_conf={spark.driver.memory:'30g'}",
"+step.session.extended_spark_conf={spark.kryoserializer.buffer.max:'500m'}",
"+step.session.extended_spark_conf={spark.driver.maxResultSize:'5g'}",
]
result_exists = GCSIOManager().exists(qc_path)
logging.info("Result exists: %s", result_exists)
if GCSIOManager().exists(qc_path):
logging.info("QC result exists for %s. Skipping", study_id)
manifest["passQC"] = True
return manifest

result = subprocess.run(args=command, capture_output=True)
if result.returncode != 0:
logging.error("QC for study %s failed!", study_id)
error_msg = result.stderr.decode()
logging.error(error_msg)
manifest["passQC"] = False
logging.info("Dumping manifest to %s", manifest["manifestPath"])
GCSIOManager().dump(manifest["manifestPath"], manifest)
exit(1)

logging.info("QC for study %s succeded!", study_id)
manifest["passQC"] = True
return manifest


def qc_consolidation(manifest: Manifest_Object) -> Manifest_Object:
pass


def clumping(manifest: Manifest_Object) -> Manifest_Object:
"""Run Clumping."""
harmonised_path = manifest["harmonisedPath"]
clumping_path = manifest["clumpingPath"]
study_id = manifest["studyId"]
command = [
"poetry",
"run",
"gentropy",
"step=clumping",
f'step.gwas_path="{harmonised_path}"',
f'step.output_path="{clumping_path}"',
f'step.study_id="{study_id}"',
"+step.session.extended_spark_conf={spark.jars:'https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar'}",
"+step.session.extended_spark_conf={spark.dynamicAllocation.enabled:'false'}",
"+step.session.extended_spark_conf={spark.driver.memory:'30g'}",
"+step.session.extended_spark_conf={spark.kryoserializer.buffer.max:'500m'}",
"+step.session.extended_spark_conf={spark.driver.maxResultSize:'5g'}",
]
if GCSIOManager().exists(clumping_path):
logging.info("Clumping result exists for %s. Skipping", study_id)
manifest["passClumping"] = True
return manifest

result = subprocess.run(args=command, capture_output=True)
if result.returncode != 0:
logging.error("Clumping for study %s failed!", study_id)
error_msg = result.stderr.decode()
logging.error(error_msg)
manifest["passClumping"] = False
logging.info("Dumping manifest to %s", manifest["manifestPath"])
GCSIOManager().dump(manifest["manifestPath"], manifest)
exit(1)
return manifest


Expand Down
80 changes: 80 additions & 0 deletions src/ot_orchestration/dags/branching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Example DAG demonstrating the usage of the BranchPythonOperator."""

import random
from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator
from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule

with DAG(
dag_id="example_branch_python_operator_decorator",
start_date=datetime(2021, 1, 1),
catchup=False,
schedule_interval="@daily",
tags=["example", "example2"],
) as dag:
run_this_first = EmptyOperator(
task_id="run_this_first",
)

options = ["branch_a", "branch_b", "branch_c", "branch_d"]

@task.branch(task_id="branching")
def random_choice():
return random.choice(options)

random_choice_instance = random_choice()

run_this_first >> random_choice_instance

join = EmptyOperator(
task_id="join",
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

for option in options:
if option == "branch_a":
empty_follow_3 = EmptyOperator(
task_id="follow_" + option,
)
random_choice_instance >> Label(option) >> empty_follow_3 >> join
if option == "branch_b":
empty_follow_2 = EmptyOperator(
task_id="follow_" + option,
)
random_choice_instance >> Label(option) >> empty_follow_2 >> join
if option == "branch_c":
random_choice_instance >> Label(option) >> join

if option == "branch_d":
t = EmptyOperator(
task_id=option,
)

empty_follow = EmptyOperator(
task_id="follow_" + option,
)

# Label is optional here, but it can help identify more complex branches
random_choice_instance >> Label(option) >> t >> empty_follow
7 changes: 3 additions & 4 deletions src/ot_orchestration/dags/gwas_catalog_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,19 @@


from airflow.utils.helpers import chain
from ot_orchestration.utils.common import shared_dag_kwargs


RUN_DATE = datetime.today()

config_path = "/opt/airflow/config/config.yaml"
config = QRCP.from_file(config_path).serialize()


@dag(start_date=RUN_DATE, dag_id="GWAS_Catalog_dag", schedule="@once", params=config)
@dag(dag_id="GWAS_Catalog_dag", params=config, **shared_dag_kwargs)
def gwas_catalog_dag() -> None:
"""GWAS catalog DAG."""
chain(
gwas_catalog_manifest_preparation(),
gwas_catalog_batch_processing(),
# gwas_catalog_batch_processing(),
)


Expand Down
Loading

0 comments on commit f8a9987

Please sign in to comment.