Skip to content

Commit

Permalink
fix: small fixes for the unified pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
javfg committed Nov 29, 2024
1 parent d56e150 commit f881fcd
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 24 deletions.
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ plugins = ["returns.contrib.mypy.returns_plugin"]
module = ["google.cloud.storage", "yaml"]
ignore_missing_imports = true

[tool.ruff]
line-length = 120

[tool.ruff.lint]
select = ["D", "I", "E"]
ignore = [
Expand Down
2 changes: 1 addition & 1 deletion src/ot_orchestration/dags/config/genetics_etl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ nodes:
step.credible_set_path: gs://ot_orchestration/releases/24.11_freeze10/credible_set
step.coloc_path: gs://ot_orchestration/releases/24.11_freeze10/colocalisation
step.colocalisation_method: ECaviar
+step.session.extended_spark_conf: "{spark.sql.shuffle.partitions: '3200', spark.executor.memory: '16g', spark.sql.files.maxPartitionBytes: '25000000', spark.executor.cores: '2'}"
+step.session.extended_spark_conf: "{spark.sql.shuffle.partitions: '4000', spark.executor.memory: '25g', spark.sql.files.maxPartitionBytes: '25000000', spark.executor.cores: '4'}"

- id: l2g_feature_matrix
prerequisites:
Expand Down
33 changes: 14 additions & 19 deletions src/ot_orchestration/dags/config/gentropy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,6 @@ dataproc_cluster_settings:
allow_efm: false
num_workers: 2

batch_settings: &batch_settings
resource_specs:
cpu_milli: 2000
memory_mib: 2000
boot_disk_mib: 10000
task_specs:
max_retry_count: 1
max_run_duration: '2h'
policy_specs:
machine_type: n1-standard-4

steps:
biosample_index:
params:
Expand Down Expand Up @@ -138,8 +127,16 @@ steps:
step.session.output_partitions: 25

variant_annotation:
google-batch:
<<: *batch_settings
google_batch:
resource_specs:
cpu_milli: 2000
memory_mib: 2000
boot_disk_mib: 10000
task_specs:
max_retry_count: 1
max_run_duration: '2h'
policy_specs:
machine_type: n1-standard-4
image: europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/custom_ensembl_vep:{vep_version}
entrypoint: /bin/sh
params:
Expand All @@ -154,7 +151,7 @@ steps:
step.gnomad_variant_annotations_path: gs://gnomad_data_2/v4.1/variant_index
step.variant_index_path: '{gcs_url}/output/genetics/parquet/variant_index'
step.session.write_mode: ignore
step.session.output_partitions: 1
step.session.output_partitions: 25

l2g_feature_matrix:
params:
Expand All @@ -166,8 +163,7 @@ steps:
step.gene_index_path: '{gcs_url}/output/genetics/parquet/gene_index'
step.feature_matrix_path: '{gcs_url}/output/genetics/parquet/l2g_feature_matrix'
+step.session.extended_spark_conf: "{spark.sql.autoBroadcastJoinThreshold:'-1'}"
step.session.write_mode: overwrite
# step.session.write_mode: ignore
step.session.write_mode: ignore

l2g_predict:
params:
Expand All @@ -180,8 +176,7 @@ steps:
step.l2g_threshold: 0.05
step.model_path: '{gcs_url}/input/benchmarks_l2g_fm0_v5.1_best_cv_locus_to_gene_model_classifier.skops'
step.hf_hub_repo_id: opentargets/locus_to_gene
step.session.write_mode: overwrite
# step.session.write_mode: ignore
step.session.write_mode: ignore
step.session.output_partitions: 1

l2g_evidence:
Expand All @@ -192,4 +187,4 @@ steps:
step.credible_set_path: '{gcs_url}/output/genetics/parquet/credible_set'
step.study_index_path: '{gcs_url}/output/genetics/parquet/study_index'
step.locus_to_gene_threshold: 0.05
step.session.write_mode: overwrite
step.session.write_mode: ignore
5 changes: 2 additions & 3 deletions src/ot_orchestration/dags/config/unified_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ def __init__(self) -> None:
self.etl_config = self.init_etl_config()
self.etl_config_gcs_uri = f"{self.gcs_url}/output/etl-config.conf"
# The base url for the ETL jar, the version will be replaced in from the config file.
etl_jar_base = "https://github.com/opentargets/platform-etl-backend/releases/download/v{version}/etl-backend-{version}.jar"
self.etl_jar_origin_url = f"{etl_jar_base.format(version=etl_version)}"
self.etl_jar_origin_url = f"https://github.com/opentargets/platform-etl-backend/releases/download/v{etl_version}/etl-backend-{etl_version}.jar"
self.etl_jar_gcs_uri = f"{self.gcs_url}/output/etl-backend-{etl_version}.jar" # fmt: skip
self.etl_step_list = [s for s in settings["steps"].keys() if s.startswith("etl_")]

Expand Down Expand Up @@ -181,5 +180,5 @@ def gentropy_step(self, step_name: str) -> dict[str, Any]:
real_step_name = step_name.replace("gentropy_", "")
step = self.gentropy_config["steps"].get(real_step_name)
if not step:
raise ValueError(f"Step {real_step_name} not in gentropy config.")
raise ValueError(f"Step {real_step_name} not in gentropy config ({self.gentropy_config_local_path}).")
return step
2 changes: 1 addition & 1 deletion src/ot_orchestration/dags/unified_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def gentropy_stage() -> None:
task_id=f"run_{step_name}",
project_id=GCP_PROJECT_PLATFORM,
**step_config["params"],
google_batch=step_config["google-batch"],
google_batch=step_config["google_batch"],
labels=labels,
)
clusterless_steps.append(r)
Expand Down

0 comments on commit f881fcd

Please sign in to comment.