diff --git a/igf_airflow/utils/dag36_cellranger_arc_scRNA_multiome_utils.py b/igf_airflow/utils/dag36_cellranger_arc_scRNA_multiome_utils.py index e00ac29b..8055d31c 100644 --- a/igf_airflow/utils/dag36_cellranger_arc_scRNA_multiome_utils.py +++ b/igf_airflow/utils/dag36_cellranger_arc_scRNA_multiome_utils.py @@ -24,6 +24,8 @@ fetch_analysis_name_for_analysis_id, calculate_md5sum_for_analysis_dir, collect_analysis_dir) +from igf_airflow.utils.dag34_cellranger_multi_scRNA_utils import ( + prepare_and_run_scanpy_notebook) from airflow.operators.python import get_current_context from airflow.decorators import task @@ -373,3 +375,118 @@ def configure_cellranger_arc_aggr( except Exception as e: raise ValueError( f"Failed to configure cellranger aggr run, error: {e}") + +## TASK +@task( + task_id="run_single_sample_scanpy_for_arc", + retry_delay=timedelta(minutes=5), + retries=4, + queue='hpc_8G') +def run_single_sample_scanpy( + sample_group: str, + cellranger_output_dir: str, + design_dict: dict) -> dict: + try: + ## set cellranger counts dir + cellranger_counts_dir = \ + os.path.join( + cellranger_output_dir, + 'outs') + ## set scanpy dir + scanpy_output_dir = \ + os.path.join( + cellranger_output_dir, + 'outs', + 'scanpy') + os.makedirs(scanpy_output_dir, exist_ok=True) + check_file_path(cellranger_counts_dir) + design_file = design_dict.get('analysis_design') + check_file_path(design_file) + with open(design_file, 'r') as fp: + input_design_yaml = fp.read() + sample_metadata, analysis_metadata = \ + parse_analysis_design_and_get_metadata( + input_design_yaml=input_design_yaml) + if sample_metadata is None or \ + analysis_metadata is None: + raise KeyError("Missing sample or analysis metadata") + scanpy_config = \ + analysis_metadata.get("scanpy_config") + if scanpy_config is None or \ + not isinstance(scanpy_config, dict): + raise KeyError( + f"Missing scanpy_config in the design file: {design_file}") + ## get project id + ## dag_run.conf should have analysis_id + context = get_current_context() + dag_run = context.get('dag_run') + analysis_id = None + if dag_run is not None and \ + dag_run.conf is not None and \ + dag_run.conf.get('analysis_id') is not None: + analysis_id = \ + dag_run.conf.get('analysis_id') + if analysis_id is None: + raise ValueError( + 'analysis_id not found in dag_run.conf') + ## get analysis name and project name + project_igf_id = \ + get_project_igf_id_for_analysis( + analysis_id=analysis_id, + dbconfig_file=DATABASE_CONFIG_FILE) + analysis_name = \ + fetch_analysis_name_for_analysis_id( + analysis_id=analysis_id, + dbconfig_file=DATABASE_CONFIG_FILE) + output_notebook_path, scanpy_h5ad = \ + prepare_and_run_scanpy_notebook( + project_igf_id=project_igf_id, + analysis_name=analysis_name, + cellranger_group_id=str(sample_group), + cellranger_counts_dir=cellranger_counts_dir, + scanpy_config=scanpy_config) + ## copy output files to scanpy output dir + target_notebook_path = \ + os.path.join( + scanpy_output_dir, + os.path.basename(output_notebook_path)) + copy_local_file( + output_notebook_path, + target_notebook_path, + force=True) + target_scanpy_h5ad = \ + os.path.join( + scanpy_output_dir, + os.path.basename(scanpy_h5ad)) + copy_local_file( + scanpy_h5ad, + target_scanpy_h5ad, + force=True) + check_file_path(target_notebook_path) + check_file_path(target_scanpy_h5ad) + output_dict = { + "sample_group": sample_group, + "cellranger_output_dir": cellranger_output_dir, + "notebook_report": target_notebook_path, + "scanpy_h5ad": target_scanpy_h5ad} + return output_dict + except Exception as e: + context = get_current_context() + log.error(e) + log_file_path = [ + os.environ.get('AIRFLOW__LOGGING__BASE_LOG_FOLDER'), + f"dag_id={context['ti'].dag_id}", + f"run_id={context['ti'].run_id}", + f"task_id={context['ti'].task_id}", + f"attempt={context['ti'].try_number}.log"] + message = \ + f"Error: {e}, Log: {os.path.join(*log_file_path)}" + send_log_to_channels( + slack_conf=SLACK_CONF, + ms_teams_conf=MS_TEAMS_CONF, + task_id=context['task'].task_id, + dag_id=context['task'].dag_id, + project_id=None, + comment=message, + reaction='fail') + raise ValueError(e) \ No newline at end of file