diff --git a/pyproject.toml b/pyproject.toml index be5c88e..717a827 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,6 +66,9 @@ plugins = ["returns.contrib.mypy.returns_plugin"] module = ["google.cloud.storage", "yaml"] ignore_missing_imports = true +[tool.ruff] +line-length = 120 + [tool.ruff.lint] select = ["D", "I", "E"] ignore = [ diff --git a/src/ot_orchestration/dags/config/genetics_etl.yaml b/src/ot_orchestration/dags/config/genetics_etl.yaml index f396f94..7d4c35a 100644 --- a/src/ot_orchestration/dags/config/genetics_etl.yaml +++ b/src/ot_orchestration/dags/config/genetics_etl.yaml @@ -162,7 +162,7 @@ nodes: step.credible_set_path: gs://ot_orchestration/releases/24.11_freeze10/credible_set step.coloc_path: gs://ot_orchestration/releases/24.11_freeze10/colocalisation step.colocalisation_method: ECaviar - +step.session.extended_spark_conf: "{spark.sql.shuffle.partitions: '3200', spark.executor.memory: '16g', spark.sql.files.maxPartitionBytes: '25000000', spark.executor.cores: '2'}" + +step.session.extended_spark_conf: "{spark.sql.shuffle.partitions: '4000', spark.executor.memory: '25g', spark.sql.files.maxPartitionBytes: '25000000', spark.executor.cores: '4'}" - id: l2g_feature_matrix prerequisites: diff --git a/src/ot_orchestration/dags/config/gentropy.yaml b/src/ot_orchestration/dags/config/gentropy.yaml index 9cbf365..e50c798 100644 --- a/src/ot_orchestration/dags/config/gentropy.yaml +++ b/src/ot_orchestration/dags/config/gentropy.yaml @@ -9,17 +9,6 @@ dataproc_cluster_settings: allow_efm: false num_workers: 2 -batch_settings: &batch_settings - resource_specs: - cpu_milli: 2000 - memory_mib: 2000 - boot_disk_mib: 10000 - task_specs: - max_retry_count: 1 - max_run_duration: '2h' - policy_specs: - machine_type: n1-standard-4 - steps: biosample_index: params: @@ -138,8 +127,16 @@ steps: step.session.output_partitions: 25 variant_annotation: - google-batch: - <<: *batch_settings + google_batch: + resource_specs: + cpu_milli: 2000 + memory_mib: 2000 + boot_disk_mib: 10000 + task_specs: + max_retry_count: 1 + max_run_duration: '2h' + policy_specs: + machine_type: n1-standard-4 image: europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/custom_ensembl_vep:{vep_version} entrypoint: /bin/sh params: @@ -154,7 +151,7 @@ steps: step.gnomad_variant_annotations_path: gs://gnomad_data_2/v4.1/variant_index step.variant_index_path: '{gcs_url}/output/genetics/parquet/variant_index' step.session.write_mode: ignore - step.session.output_partitions: 1 + step.session.output_partitions: 25 l2g_feature_matrix: params: @@ -166,8 +163,7 @@ steps: step.gene_index_path: '{gcs_url}/output/genetics/parquet/gene_index' step.feature_matrix_path: '{gcs_url}/output/genetics/parquet/l2g_feature_matrix' +step.session.extended_spark_conf: "{spark.sql.autoBroadcastJoinThreshold:'-1'}" - step.session.write_mode: overwrite - # step.session.write_mode: ignore + step.session.write_mode: ignore l2g_predict: params: @@ -180,8 +176,7 @@ steps: step.l2g_threshold: 0.05 step.model_path: '{gcs_url}/input/benchmarks_l2g_fm0_v5.1_best_cv_locus_to_gene_model_classifier.skops' step.hf_hub_repo_id: opentargets/locus_to_gene - step.session.write_mode: overwrite - # step.session.write_mode: ignore + step.session.write_mode: ignore step.session.output_partitions: 1 l2g_evidence: @@ -192,4 +187,4 @@ steps: step.credible_set_path: '{gcs_url}/output/genetics/parquet/credible_set' step.study_index_path: '{gcs_url}/output/genetics/parquet/study_index' step.locus_to_gene_threshold: 0.05 - step.session.write_mode: overwrite + step.session.write_mode: ignore diff --git a/src/ot_orchestration/dags/config/unified_pipeline.py b/src/ot_orchestration/dags/config/unified_pipeline.py index 92dd98f..c989447 100644 --- a/src/ot_orchestration/dags/config/unified_pipeline.py +++ b/src/ot_orchestration/dags/config/unified_pipeline.py @@ -79,8 +79,7 @@ def __init__(self) -> None: self.etl_config = self.init_etl_config() self.etl_config_gcs_uri = f"{self.gcs_url}/output/etl-config.conf" # The base url for the ETL jar, the version will be replaced in from the config file. - etl_jar_base = "https://github.com/opentargets/platform-etl-backend/releases/download/v{version}/etl-backend-{version}.jar" - self.etl_jar_origin_url = f"{etl_jar_base.format(version=etl_version)}" + self.etl_jar_origin_url = f"https://github.com/opentargets/platform-etl-backend/releases/download/v{etl_version}/etl-backend-{etl_version}.jar" self.etl_jar_gcs_uri = f"{self.gcs_url}/output/etl-backend-{etl_version}.jar" # fmt: skip self.etl_step_list = [s for s in settings["steps"].keys() if s.startswith("etl_")] @@ -181,5 +180,5 @@ def gentropy_step(self, step_name: str) -> dict[str, Any]: real_step_name = step_name.replace("gentropy_", "") step = self.gentropy_config["steps"].get(real_step_name) if not step: - raise ValueError(f"Step {real_step_name} not in gentropy config.") + raise ValueError(f"Step {real_step_name} not in gentropy config ({self.gentropy_config_local_path}).") return step diff --git a/src/ot_orchestration/dags/unified_pipeline.py b/src/ot_orchestration/dags/unified_pipeline.py index fbf6cff..031724b 100644 --- a/src/ot_orchestration/dags/unified_pipeline.py +++ b/src/ot_orchestration/dags/unified_pipeline.py @@ -289,7 +289,7 @@ def gentropy_stage() -> None: task_id=f"run_{step_name}", project_id=GCP_PROJECT_PLATFORM, **step_config["params"], - google_batch=step_config["google-batch"], + google_batch=step_config["google_batch"], labels=labels, ) clusterless_steps.append(r)