diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index d96e4346..52b97db7 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -5,7 +5,7 @@ name: Python application on: push: - branches: [ "master", "test_july24" ] + branches: [ master, fix_cellranger_july24] pull_request: branches: [ "master" ] diff --git a/igf_airflow/utils/dag34_cellranger_multi_scRNA_utils.py b/igf_airflow/utils/dag34_cellranger_multi_scRNA_utils.py index 0f8d3af7..e45709f6 100644 --- a/igf_airflow/utils/dag34_cellranger_multi_scRNA_utils.py +++ b/igf_airflow/utils/dag34_cellranger_multi_scRNA_utils.py @@ -22,6 +22,7 @@ get_project_igf_id_for_analysis, fetch_analysis_name_for_analysis_id, send_airflow_failed_logs_to_channels, + send_airflow_pipeline_logs_to_channels, collect_analysis_dir, parse_analysis_design_and_get_metadata ) @@ -63,7 +64,10 @@ retries=4, queue='hpc_4G', multiple_outputs=False) -def get_analysis_group_list(design_dict: dict) -> dict: +def get_analysis_group_list( + design_dict: dict, + required_tag_name: Optional[str] = None, + required_tag_value: Optional[str] = None) -> dict: try: design_file = design_dict.get('analysis_design') check_file_path(design_file) @@ -76,14 +80,35 @@ def get_analysis_group_list(design_dict: dict) -> dict: analysis_metadata is None: raise KeyError("Missing sample or analysis metadata") unique_sample_groups = set() + required_tag_list = list() for _, group in sample_metadata.items(): grp_name = group.get('cellranger_group') if grp_name is None: raise KeyError("Missing cellranger_group in sample_metadata") unique_sample_groups.add(grp_name) + if required_tag_name is not None and \ + required_tag_value is not None: + required_tag_list.append({ + "name": grp_name, + required_tag_name: group.get(required_tag_name)}) if len(unique_sample_groups) == 0: raise ValueError("No sample group found") - return list(unique_sample_groups) + unique_sample_groups = \ + list(unique_sample_groups) + ## check for required tags + if required_tag_name is not None and \ + required_tag_value is not None: + required_tag_df = \ + pd.DataFrame(required_tag_list) + for g in unique_sample_groups: + g_tag_values = \ + required_tag_df[required_tag_df['name']==g][required_tag_name].\ + values.\ + tolist() + if required_tag_value not in g_tag_values: + raise KeyError( + f"No {required_tag_value} found for group {g}") + return unique_sample_groups except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( @@ -120,7 +145,11 @@ def prepare_cellranger_script(sample_group: str, design_dict: dict) -> dict: design_file=design_file, db_config_file=DATABASE_CONFIG_FILE, run_script_template=CELLRANGER_MULTI_SCRIPT_TEMPLATE) - return {"sample_group": sample_group, "run_script": run_script_file, "output_dir": os.path.join(work_dir, sample_group)} + analysis_script_info = { + "sample_group": sample_group, + "run_script": run_script_file, + "output_dir": os.path.join(work_dir, sample_group)} + return analysis_script_info except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( @@ -269,6 +298,11 @@ def run_cellranger_script( f"""Lock file exists in cellranger run path: {output_dir}. \ Remove it to continue!""") try: + send_airflow_pipeline_logs_to_channels( + slack_conf=SLACK_CONF, + ms_teams_conf=MS_TEAMS_CONF, + message_prefix=\ + f"Started Cellranger for sample: {sample_group}, script: {run_script}") _, _ = \ bash_script_wrapper( script_path=run_script, @@ -278,6 +312,11 @@ def run_cellranger_script( f"Failed to run script, Script: {run_script} for group: {sample_group}") ## check output dir exists check_file_path(output_dir) + send_airflow_pipeline_logs_to_channels( + slack_conf=SLACK_CONF, + ms_teams_conf=MS_TEAMS_CONF, + message_prefix=\ + f"Finished Cellranger for sample: {sample_group}") return output_dir except Exception as e: log.error(e) @@ -516,79 +555,69 @@ def move_single_sample_result_to_main_work_dir( raise ValueError(e) -## TASK -@task.branch( - task_id="collect_and_branch", - retry_delay=timedelta(minutes=5), - retries=4, - queue='hpc_4G') -def collect_and_branch( - merge_step='configure_cellranger_aggr_run', - skip_step='calculate_md5sum_for_main_work_dir') \ - -> list: - try: - cellranger_output_dict = dict() - context = get_current_context() - ti = context.get('ti') - all_lazy_task_ids = \ - context['task'].\ - get_direct_relative_ids(upstream=True) - lazy_xcom = ti.xcom_pull(task_ids=all_lazy_task_ids) - for entry in lazy_xcom: - sample_group = entry.get("sample_group") - cellranger_output_dir = entry.get("cellranger_output_dir") - if sample_group is not None and \ - cellranger_output_dir is not None: - ## skipping failed runs - cellranger_output_dict.update( - {sample_group: cellranger_output_dir}) - if len(cellranger_output_dict) == 0: - raise ValueError(f"No cellranger output found") - elif len(cellranger_output_dict) == 1: - return [skip_step] - else: - ti.xcom_push( - key='cellranger_output_dict', - value=cellranger_output_dict) - return [merge_step] - except Exception as e: - log.error(e) - send_airflow_failed_logs_to_channels( - slack_conf=SLACK_CONF, - ms_teams_conf=MS_TEAMS_CONF, - message_prefix=e) - raise ValueError(e) - +# ## TASK +# @task.branch( +# task_id="collect_and_branch", +# retry_delay=timedelta(minutes=5), +# retries=4, +# queue='hpc_4G') +# def collect_and_branch( +# merge_step='configure_cellranger_aggr_run', +# skip_step='calculate_md5sum_for_main_work_dir') \ +# -> list: +# try: +# cellranger_output_dict = dict() +# context = get_current_context() +# ti = context.get('ti') +# all_lazy_task_ids = \ +# context['task'].\ +# get_direct_relative_ids(upstream=True) +# lazy_xcom = ti.xcom_pull(task_ids=all_lazy_task_ids) +# for entry in lazy_xcom: +# sample_group = entry.get("sample_group") +# cellranger_output_dir = entry.get("cellranger_output_dir") +# if sample_group is not None and \ +# cellranger_output_dir is not None: +# ## skipping failed runs +# cellranger_output_dict.update( +# {sample_group: cellranger_output_dir}) +# if len(cellranger_output_dict) == 0: +# raise ValueError(f"No cellranger output found") +# elif len(cellranger_output_dict) == 1: +# return [skip_step] +# else: +# ti.xcom_push( +# key='cellranger_output_dict', +# value=cellranger_output_dict) +# return [merge_step] +# except Exception as e: +# log.error(e) +# send_airflow_failed_logs_to_channels( +# slack_conf=SLACK_CONF, +# ms_teams_conf=MS_TEAMS_CONF, +# message_prefix=e) +# raise ValueError(e) -## TASK @task( task_id="configure_cellranger_aggr_run", retry_delay=timedelta(minutes=5), retries=4, queue='hpc_4G') def configure_cellranger_aggr_run( - xcom_pull_task_ids: str = 'collect_and_branch', - xcom_pull_task_key: str = 'cellranger_output_dict') \ - -> dict: + analysis_output_list: list) -> dict: try: - cellranger_output_dict = dict() - context = get_current_context() - ti = context.get('ti') - cellranger_output_dict = \ - ti.xcom_pull( - task_ids=xcom_pull_task_ids, - key=xcom_pull_task_key) - if cellranger_output_dict is None or \ - (isinstance(cellranger_output_dict, dict) and len(cellranger_output_dict)) == 0: - raise ValueError(f"No cellranger output found") - elif len(cellranger_output_dict) == 1: - raise ValueError(f"Single cellranger output found. Can't merge it!") - else: - output_dict = \ - configure_cellranger_aggr( - run_script_template=CELLRANGER_AGGR_SCRIPT_TEMPLATE, - cellranger_output_dict=cellranger_output_dict) - return output_dict + cellranger_multi_dict = dict() + for entry in analysis_output_list: + if entry is not None: + sample_id = entry.get("sample_id") + output_dir = entry.get("output") + cellranger_multi_dict.update({ + sample_id: output_dir}) + output_dict = \ + configure_cellranger_aggr( + run_script_template=CELLRANGER_AGGR_SCRIPT_TEMPLATE, + cellranger_output_dict=cellranger_multi_dict) + return output_dict except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( @@ -597,6 +626,43 @@ def configure_cellranger_aggr_run( message_prefix=e) raise ValueError(e) +# ## TASK +# @task( +# task_id="configure_cellranger_aggr_run", +# retry_delay=timedelta(minutes=5), +# retries=4, +# queue='hpc_4G') +# def configure_cellranger_aggr_run( +# xcom_pull_task_ids: str = 'collect_and_branch', +# xcom_pull_task_key: str = 'cellranger_output_dict') \ +# -> dict: +# try: +# cellranger_output_dict = dict() +# context = get_current_context() +# ti = context.get('ti') +# cellranger_output_dict = \ +# ti.xcom_pull( +# task_ids=xcom_pull_task_ids, +# key=xcom_pull_task_key) +# if cellranger_output_dict is None or \ +# (isinstance(cellranger_output_dict, dict) and len(cellranger_output_dict)) == 0: +# raise ValueError(f"No cellranger output found") +# elif len(cellranger_output_dict) == 1: +# raise ValueError(f"Single cellranger output found. Can't merge it!") +# else: +# output_dict = \ +# configure_cellranger_aggr( +# run_script_template=CELLRANGER_AGGR_SCRIPT_TEMPLATE, +# cellranger_output_dict=cellranger_output_dict) +# return output_dict +# except Exception as e: +# log.error(e) +# send_airflow_failed_logs_to_channels( +# slack_conf=SLACK_CONF, +# ms_teams_conf=MS_TEAMS_CONF, +# message_prefix=e) +# raise ValueError(e) + def configure_cellranger_aggr( @@ -663,12 +729,13 @@ def run_cellranger_aggr_script( run_script = script_dict.get('run_script') output_dir = script_dict.get('output_dir') try: - stdout_file, stderr_file = \ + _ = \ bash_script_wrapper( - script_path=run_script) + script_path=run_script, + capture_stderr=False) except Exception as e: raise ValueError( - f"Failed to run script, Script: {run_script} for group: ALL, error file: {stderr_file}") + f"Failed to run script, Script: {run_script} for group: ALL") ## check output dir exists check_file_path(output_dir) return output_dir @@ -877,6 +944,29 @@ def load_cellranger_results_to_db( db_config_file=DATABASE_CONFIG_FILE, hpc_base_path=HPC_BASE_RAW_DATA_PATH) return {'target_dir_path': target_dir_path, 'date_tag': date_tag} + except Exception as e: + log.error(e) + send_airflow_failed_logs_to_channels( + slack_conf=SLACK_CONF, + ms_teams_conf=MS_TEAMS_CONF, + message_prefix=e) + raise ValueError(e) + +## TASK: switch to aggr if morethan one samples are +@task( + task_id="decide_aggr", + retry_delay=timedelta(minutes=5), + retries=4, + queue='hpc_4G') +def decide_aggr( + analysis_output_list: list, + aggr_task: str = "configure_cellranger_aggr_run", + non_aggr_task: str = "calculate_md5_for_work_dir") -> list: + try: + if len(analysis_output_list) > 1: + return [aggr_task] + elif len(analysis_output_list) == 1: + return [non_aggr_task] except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( diff --git a/igf_airflow/utils/dag36_cellranger_arc_scRNA_multiome_utils.py b/igf_airflow/utils/dag36_cellranger_arc_scRNA_multiome_utils.py index 1e3fe378..ab10daed 100644 --- a/igf_airflow/utils/dag36_cellranger_arc_scRNA_multiome_utils.py +++ b/igf_airflow/utils/dag36_cellranger_arc_scRNA_multiome_utils.py @@ -215,17 +215,14 @@ def create_library_information_for_multiome_sample_group( raise ValueError( f"Failed to prepare cellranger script, error: {e}") - -## TASK @task( task_id="configure_cellranger_arc_aggr_run", retry_delay=timedelta(minutes=5), retries=4, queue='hpc_4G') def configure_cellranger_arc_aggr_run( - design_dict: dict, - xcom_pull_task_ids: str = 'collect_and_branch', - xcom_pull_task_key: str = 'cellranger_output_dict') \ + analysis_output_list: list, + design_dict: dict) \ -> dict: try: design_file = design_dict.get('analysis_design') @@ -246,7 +243,8 @@ def configure_cellranger_arc_aggr_run( cellranger_arc_aggr_config_ref = \ cellranger_arc_aggr_config.get("reference") if cellranger_arc_aggr_config_ref is None: - raise KeyError("Missing cellranger_arc_aggr_config reference in analysis design") + raise KeyError( + "Missing cellranger_arc_aggr_config reference in analysis design") cellranger_arc_aggr_config_params = \ cellranger_arc_aggr_config.get("parameters") if cellranger_arc_aggr_config_params is None: @@ -254,29 +252,22 @@ def configure_cellranger_arc_aggr_run( if cellranger_arc_aggr_config_params is not None and \ not isinstance(cellranger_arc_aggr_config_params, list): raise TypeError( - f"cellranger_arc_aggr_config_params are not list: {type(cellranger_arc_aggr_config_params)}") - ## configure arc aggr - cellranger_output_dict = dict() - context = get_current_context() - ti = context.get('ti') - cellranger_output_dict = \ - ti.xcom_pull( - task_ids=xcom_pull_task_ids, - key=xcom_pull_task_key) - if cellranger_output_dict is None or \ - (isinstance(cellranger_output_dict, dict) and \ - len(cellranger_output_dict)) == 0: - raise ValueError(f"No cellranger output found") - elif len(cellranger_output_dict) == 1: - raise ValueError(f"Single cellranger output found. Can't merge it!") - else: - output_dict = \ - configure_cellranger_arc_aggr( - run_script_template=CELLRANGER_ARC_AGGR_SCRIPT_TEMPLATE, - cellranger_arc_aggr_config_ref=cellranger_arc_aggr_config_ref, - cellranger_arc_aggr_config_params=cellranger_arc_aggr_config_params, - cellranger_output_dict=cellranger_output_dict) - return output_dict + f"""cellranger_arc_aggr_config_params are not list: + {type(cellranger_arc_aggr_config_params)}""") + cellranger_arc_dict = dict() + for entry in analysis_output_list: + if entry is not None: + sample_id = entry.get("sample_id") + output_dir = entry.get("output") + cellranger_arc_dict.update({ + sample_id: output_dir}) + output_dict = \ + configure_cellranger_arc_aggr( + run_script_template=CELLRANGER_ARC_AGGR_SCRIPT_TEMPLATE, + cellranger_arc_aggr_config_ref=cellranger_arc_aggr_config_ref, + cellranger_arc_aggr_config_params=cellranger_arc_aggr_config_params, + cellranger_output_dict=cellranger_arc_dict) + return output_dict except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( @@ -285,6 +276,75 @@ def configure_cellranger_arc_aggr_run( message_prefix=e) raise ValueError(e) +# ## TASK +# @task( +# task_id="configure_cellranger_arc_aggr_run", +# retry_delay=timedelta(minutes=5), +# retries=4, +# queue='hpc_4G') +# def configure_cellranger_arc_aggr_run( +# design_dict: dict, +# xcom_pull_task_ids: str = 'collect_and_branch', +# xcom_pull_task_key: str = 'cellranger_output_dict') \ +# -> dict: +# try: +# design_file = design_dict.get('analysis_design') +# check_file_path(design_file) +# with open(design_file, 'r') as fp: +# input_design_yaml = fp.read() +# sample_metadata, analysis_metadata = \ +# parse_analysis_design_and_get_metadata( +# input_design_yaml=input_design_yaml) +# if sample_metadata is None or \ +# analysis_metadata is None: +# raise KeyError("Missing sample or analysis metadata") +# ## get cellranger arc aggr conf +# cellranger_arc_aggr_config = \ +# analysis_metadata.get("cellranger_arc_aggr_config") +# if cellranger_arc_aggr_config is None: +# raise KeyError("Missing cellranger_arc_aggr_config in analysis design") +# cellranger_arc_aggr_config_ref = \ +# cellranger_arc_aggr_config.get("reference") +# if cellranger_arc_aggr_config_ref is None: +# raise KeyError("Missing cellranger_arc_aggr_config reference in analysis design") +# cellranger_arc_aggr_config_params = \ +# cellranger_arc_aggr_config.get("parameters") +# if cellranger_arc_aggr_config_params is None: +# cellranger_arc_aggr_config_params = [] +# if cellranger_arc_aggr_config_params is not None and \ +# not isinstance(cellranger_arc_aggr_config_params, list): +# raise TypeError( +# f"cellranger_arc_aggr_config_params are not list: {type(cellranger_arc_aggr_config_params)}") +# ## configure arc aggr +# cellranger_output_dict = dict() +# context = get_current_context() +# ti = context.get('ti') +# cellranger_output_dict = \ +# ti.xcom_pull( +# task_ids=xcom_pull_task_ids, +# key=xcom_pull_task_key) +# if cellranger_output_dict is None or \ +# (isinstance(cellranger_output_dict, dict) and \ +# len(cellranger_output_dict)) == 0: +# raise ValueError(f"No cellranger output found") +# elif len(cellranger_output_dict) == 1: +# raise ValueError(f"Single cellranger output found. Can't merge it!") +# else: +# output_dict = \ +# configure_cellranger_arc_aggr( +# run_script_template=CELLRANGER_ARC_AGGR_SCRIPT_TEMPLATE, +# cellranger_arc_aggr_config_ref=cellranger_arc_aggr_config_ref, +# cellranger_arc_aggr_config_params=cellranger_arc_aggr_config_params, +# cellranger_output_dict=cellranger_output_dict) +# return output_dict +# except Exception as e: +# log.error(e) +# send_airflow_failed_logs_to_channels( +# slack_conf=SLACK_CONF, +# ms_teams_conf=MS_TEAMS_CONF, +# message_prefix=e) +# raise ValueError(e) + def configure_cellranger_arc_aggr( run_script_template: str, diff --git a/test/igf_airflow/test_dag34_cellranger_multi_scRNA_utils.py b/test/igf_airflow/test_dag34_cellranger_multi_scRNA_utils.py index d2f23d6b..2766c883 100644 --- a/test/igf_airflow/test_dag34_cellranger_multi_scRNA_utils.py +++ b/test/igf_airflow/test_dag34_cellranger_multi_scRNA_utils.py @@ -36,12 +36,11 @@ run_single_sample_scanpy, prepare_and_run_scanpy_notebook, move_single_sample_result_to_main_work_dir, - collect_and_branch, run_cellranger_aggr_script, merged_scanpy_report, move_aggr_result_to_main_work_dir, - load_cellranger_results_to_db) - + load_cellranger_results_to_db, + decide_aggr) DESIGN_YAML = """sample_metadata: IGFsampleA: @@ -322,6 +321,12 @@ def test_get_analysis_group_list(self): design_dict=design_dict) self.assertEqual(len(unique_sample_groups), 2) self.assertIn("grp1", unique_sample_groups) + with self.assertRaises(Exception): + unique_sample_groups = \ + get_analysis_group_list.function( + design_dict=design_dict, + required_tag_name="feature_types", + required_tag_value="Gene Expression") def test_prepare_cellranger_script(self): design_dict = { @@ -344,11 +349,13 @@ def test_prepare_cellranger_script(self): self.assertIn(f'--id={sample_group}', data) self.assertEqual(sample_group, "grp1") + @patch("igf_airflow.utils.dag34_cellranger_multi_scRNA_utils.send_airflow_pipeline_logs_to_channels") @patch("igf_airflow.utils.dag34_cellranger_multi_scRNA_utils.bash_script_wrapper", return_value=["A", "B"]) def test_run_cellranger_script( self, - bash_script_wrapper): + bash_script_wrapper, + send_airflow_pipeline_logs_to_channels): script_dict = { "sample_group": "AAA", "run_script": "BBB", @@ -516,6 +523,60 @@ def test_merged_scanpy_report( fetch_analysis_name_for_analysis_id.\ assert_called_once() + @patch("igf_airflow.utils.dag34_cellranger_multi_scRNA_utils.bash_script_wrapper", + return_value=[None, None]) + def test_run_cellranger_aggr_script(self, bash_script_wrapper): + script_dict = { + "sample_name": "ALL", + "run_script": "", + "output_dir": self.temp_dir} + output_dir = \ + run_cellranger_aggr_script.\ + function( + script_dict=script_dict) + bash_script_wrapper.\ + assert_called_once() + + def test_move_aggr_result_to_main_work_dir(self): + # move_aggr_result_to_main_work_dir + main_work_dir = os.path.join(self.temp_dir, "work") + os.makedirs(main_work_dir) + source_dir = os.path.join(self.temp_dir, "source") + os.makedirs(source_dir) + with open(os.path.join(source_dir, "t"), "w") as fp: + fp.write("A") + m = \ + move_aggr_result_to_main_work_dir.\ + function( + main_work_dir=main_work_dir, + scanpy_aggr_output_dict={"cellranger_output_dir": source_dir}) + self.assertEqual(m, main_work_dir) + + @patch("igf_airflow.utils.dag34_cellranger_multi_scRNA_utils.get_current_context") + @patch("igf_airflow.utils.dag34_cellranger_multi_scRNA_utils.collect_analysis_dir", + return_value=["A", "B", "C"]) + def test_load_cellranger_results_to_db( + self, + get_current_context, + collect_analysis_dir): + out = \ + load_cellranger_results_to_db.\ + function( + main_work_dir=self.temp_dir, + md5_file=self.temp_dir) + collect_analysis_dir.\ + assert_called_once() + get_current_context.\ + assert_called_once() + + + def test_decide_aggr(self): + task_name = \ + decide_aggr.function(analysis_output_list=["A", "B"]) + self.assertEqual(task_name[0], "configure_cellranger_aggr_run") + task_name = \ + decide_aggr.function(analysis_output_list=["A",]) + self.assertEqual(task_name[0], "calculate_md5_for_work_dir") class TestDag34_cellranger_multi_scRNA_utilB(unittest.TestCase): diff --git a/test/igf_airflow/test_dag36_cellranger_arc_scRNA_multiome_utils.py b/test/igf_airflow/test_dag36_cellranger_arc_scRNA_multiome_utils.py index d32b40b3..44bc3b6f 100644 --- a/test/igf_airflow/test_dag36_cellranger_arc_scRNA_multiome_utils.py +++ b/test/igf_airflow/test_dag36_cellranger_arc_scRNA_multiome_utils.py @@ -33,6 +33,7 @@ prepare_cellranger_arc_script, run_single_sample_scanpy_for_arc, merged_scanpy_report_for_arc) +from igf_airflow.utils.dag34_cellranger_multi_scRNA_utils import get_analysis_group_list DESIGN_YAML = """sample_metadata: IGFsampleA: @@ -47,6 +48,9 @@ IGFsampleD: library_type: Chromatin Accessibility cellranger_group: GRP2 + IGFsampleE: + library_type: Chromatin Accessibility + cellranger_group: GRP3 analysis_metadata: scanpy_config: TEMPLATE_FILE: /path/scanpy_single_sample_analysis_v0.0.6.3.ipynb @@ -316,6 +320,21 @@ def tearDown(self): if os.path.exists(self.dbname): os.remove(self.dbname) + def test_get_analysis_group_list(self): + design_dict = { + "analysis_design": self.yaml_file} + unique_sample_groups = \ + get_analysis_group_list.function( + design_dict=design_dict) + self.assertEqual(len(unique_sample_groups), 3) + self.assertIn("GRP3", unique_sample_groups) + with self.assertRaises(Exception): + unique_sample_groups = \ + get_analysis_group_list.function( + design_dict=design_dict, + required_tag_name="library_types", + required_tag_value="Gene Expression") + def test_prepare_cellranger_arc_run_dir_and_script_file(self): temp_dir = get_temp_dir() library_csv_file, script_file = \