Skip to content

Commit

Permalink
Merge pull request #92 from imperial-genomics-facility/fix_cellranger…
Browse files Browse the repository at this point in the history
…_july24

Fix cellranger july24
  • Loading branch information
avikdatta authored Jul 16, 2024
2 parents e9da68a + 620e520 commit d1134d6
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 105 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ name: Python application

on:
push:
branches: [ "master", "test_july24" ]
branches: [ master, fix_cellranger_july24]
pull_request:
branches: [ "master" ]

Expand Down
232 changes: 161 additions & 71 deletions igf_airflow/utils/dag34_cellranger_multi_scRNA_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
get_project_igf_id_for_analysis,
fetch_analysis_name_for_analysis_id,
send_airflow_failed_logs_to_channels,
send_airflow_pipeline_logs_to_channels,
collect_analysis_dir,
parse_analysis_design_and_get_metadata
)
Expand Down Expand Up @@ -63,7 +64,10 @@
retries=4,
queue='hpc_4G',
multiple_outputs=False)
def get_analysis_group_list(design_dict: dict) -> dict:
def get_analysis_group_list(
design_dict: dict,
required_tag_name: Optional[str] = None,
required_tag_value: Optional[str] = None) -> dict:
try:
design_file = design_dict.get('analysis_design')
check_file_path(design_file)
Expand All @@ -76,14 +80,35 @@ def get_analysis_group_list(design_dict: dict) -> dict:
analysis_metadata is None:
raise KeyError("Missing sample or analysis metadata")
unique_sample_groups = set()
required_tag_list = list()
for _, group in sample_metadata.items():
grp_name = group.get('cellranger_group')
if grp_name is None:
raise KeyError("Missing cellranger_group in sample_metadata")
unique_sample_groups.add(grp_name)
if required_tag_name is not None and \
required_tag_value is not None:
required_tag_list.append({
"name": grp_name,
required_tag_name: group.get(required_tag_name)})
if len(unique_sample_groups) == 0:
raise ValueError("No sample group found")
return list(unique_sample_groups)
unique_sample_groups = \
list(unique_sample_groups)
## check for required tags
if required_tag_name is not None and \
required_tag_value is not None:
required_tag_df = \
pd.DataFrame(required_tag_list)
for g in unique_sample_groups:
g_tag_values = \
required_tag_df[required_tag_df['name']==g][required_tag_name].\
values.\
tolist()
if required_tag_value not in g_tag_values:
raise KeyError(
f"No {required_tag_value} found for group {g}")
return unique_sample_groups
except Exception as e:
log.error(e)
send_airflow_failed_logs_to_channels(
Expand Down Expand Up @@ -120,7 +145,11 @@ def prepare_cellranger_script(sample_group: str, design_dict: dict) -> dict:
design_file=design_file,
db_config_file=DATABASE_CONFIG_FILE,
run_script_template=CELLRANGER_MULTI_SCRIPT_TEMPLATE)
return {"sample_group": sample_group, "run_script": run_script_file, "output_dir": os.path.join(work_dir, sample_group)}
analysis_script_info = {
"sample_group": sample_group,
"run_script": run_script_file,
"output_dir": os.path.join(work_dir, sample_group)}
return analysis_script_info
except Exception as e:
log.error(e)
send_airflow_failed_logs_to_channels(
Expand Down Expand Up @@ -269,6 +298,11 @@ def run_cellranger_script(
f"""Lock file exists in cellranger run path: {output_dir}. \
Remove it to continue!""")
try:
send_airflow_pipeline_logs_to_channels(
slack_conf=SLACK_CONF,
ms_teams_conf=MS_TEAMS_CONF,
message_prefix=\
f"Started Cellranger for sample: {sample_group}, script: {run_script}")
_, _ = \
bash_script_wrapper(
script_path=run_script,
Expand All @@ -278,6 +312,11 @@ def run_cellranger_script(
f"Failed to run script, Script: {run_script} for group: {sample_group}")
## check output dir exists
check_file_path(output_dir)
send_airflow_pipeline_logs_to_channels(
slack_conf=SLACK_CONF,
ms_teams_conf=MS_TEAMS_CONF,
message_prefix=\
f"Finished Cellranger for sample: {sample_group}")
return output_dir
except Exception as e:
log.error(e)
Expand Down Expand Up @@ -516,79 +555,69 @@ def move_single_sample_result_to_main_work_dir(
raise ValueError(e)


## TASK
@task.branch(
task_id="collect_and_branch",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_4G')
def collect_and_branch(
merge_step='configure_cellranger_aggr_run',
skip_step='calculate_md5sum_for_main_work_dir') \
-> list:
try:
cellranger_output_dict = dict()
context = get_current_context()
ti = context.get('ti')
all_lazy_task_ids = \
context['task'].\
get_direct_relative_ids(upstream=True)
lazy_xcom = ti.xcom_pull(task_ids=all_lazy_task_ids)
for entry in lazy_xcom:
sample_group = entry.get("sample_group")
cellranger_output_dir = entry.get("cellranger_output_dir")
if sample_group is not None and \
cellranger_output_dir is not None:
## skipping failed runs
cellranger_output_dict.update(
{sample_group: cellranger_output_dir})
if len(cellranger_output_dict) == 0:
raise ValueError(f"No cellranger output found")
elif len(cellranger_output_dict) == 1:
return [skip_step]
else:
ti.xcom_push(
key='cellranger_output_dict',
value=cellranger_output_dict)
return [merge_step]
except Exception as e:
log.error(e)
send_airflow_failed_logs_to_channels(
slack_conf=SLACK_CONF,
ms_teams_conf=MS_TEAMS_CONF,
message_prefix=e)
raise ValueError(e)

# ## TASK
# @task.branch(
# task_id="collect_and_branch",
# retry_delay=timedelta(minutes=5),
# retries=4,
# queue='hpc_4G')
# def collect_and_branch(
# merge_step='configure_cellranger_aggr_run',
# skip_step='calculate_md5sum_for_main_work_dir') \
# -> list:
# try:
# cellranger_output_dict = dict()
# context = get_current_context()
# ti = context.get('ti')
# all_lazy_task_ids = \
# context['task'].\
# get_direct_relative_ids(upstream=True)
# lazy_xcom = ti.xcom_pull(task_ids=all_lazy_task_ids)
# for entry in lazy_xcom:
# sample_group = entry.get("sample_group")
# cellranger_output_dir = entry.get("cellranger_output_dir")
# if sample_group is not None and \
# cellranger_output_dir is not None:
# ## skipping failed runs
# cellranger_output_dict.update(
# {sample_group: cellranger_output_dir})
# if len(cellranger_output_dict) == 0:
# raise ValueError(f"No cellranger output found")
# elif len(cellranger_output_dict) == 1:
# return [skip_step]
# else:
# ti.xcom_push(
# key='cellranger_output_dict',
# value=cellranger_output_dict)
# return [merge_step]
# except Exception as e:
# log.error(e)
# send_airflow_failed_logs_to_channels(
# slack_conf=SLACK_CONF,
# ms_teams_conf=MS_TEAMS_CONF,
# message_prefix=e)
# raise ValueError(e)

## TASK
@task(
task_id="configure_cellranger_aggr_run",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_4G')
def configure_cellranger_aggr_run(
xcom_pull_task_ids: str = 'collect_and_branch',
xcom_pull_task_key: str = 'cellranger_output_dict') \
-> dict:
analysis_output_list: list) -> dict:
try:
cellranger_output_dict = dict()
context = get_current_context()
ti = context.get('ti')
cellranger_output_dict = \
ti.xcom_pull(
task_ids=xcom_pull_task_ids,
key=xcom_pull_task_key)
if cellranger_output_dict is None or \
(isinstance(cellranger_output_dict, dict) and len(cellranger_output_dict)) == 0:
raise ValueError(f"No cellranger output found")
elif len(cellranger_output_dict) == 1:
raise ValueError(f"Single cellranger output found. Can't merge it!")
else:
output_dict = \
configure_cellranger_aggr(
run_script_template=CELLRANGER_AGGR_SCRIPT_TEMPLATE,
cellranger_output_dict=cellranger_output_dict)
return output_dict
cellranger_multi_dict = dict()
for entry in analysis_output_list:
if entry is not None:
sample_id = entry.get("sample_id")
output_dir = entry.get("output")
cellranger_multi_dict.update({
sample_id: output_dir})
output_dict = \
configure_cellranger_aggr(
run_script_template=CELLRANGER_AGGR_SCRIPT_TEMPLATE,
cellranger_output_dict=cellranger_multi_dict)
return output_dict
except Exception as e:
log.error(e)
send_airflow_failed_logs_to_channels(
Expand All @@ -597,6 +626,43 @@ def configure_cellranger_aggr_run(
message_prefix=e)
raise ValueError(e)

# ## TASK
# @task(
# task_id="configure_cellranger_aggr_run",
# retry_delay=timedelta(minutes=5),
# retries=4,
# queue='hpc_4G')
# def configure_cellranger_aggr_run(
# xcom_pull_task_ids: str = 'collect_and_branch',
# xcom_pull_task_key: str = 'cellranger_output_dict') \
# -> dict:
# try:
# cellranger_output_dict = dict()
# context = get_current_context()
# ti = context.get('ti')
# cellranger_output_dict = \
# ti.xcom_pull(
# task_ids=xcom_pull_task_ids,
# key=xcom_pull_task_key)
# if cellranger_output_dict is None or \
# (isinstance(cellranger_output_dict, dict) and len(cellranger_output_dict)) == 0:
# raise ValueError(f"No cellranger output found")
# elif len(cellranger_output_dict) == 1:
# raise ValueError(f"Single cellranger output found. Can't merge it!")
# else:
# output_dict = \
# configure_cellranger_aggr(
# run_script_template=CELLRANGER_AGGR_SCRIPT_TEMPLATE,
# cellranger_output_dict=cellranger_output_dict)
# return output_dict
# except Exception as e:
# log.error(e)
# send_airflow_failed_logs_to_channels(
# slack_conf=SLACK_CONF,
# ms_teams_conf=MS_TEAMS_CONF,
# message_prefix=e)
# raise ValueError(e)



def configure_cellranger_aggr(
Expand Down Expand Up @@ -663,12 +729,13 @@ def run_cellranger_aggr_script(
run_script = script_dict.get('run_script')
output_dir = script_dict.get('output_dir')
try:
stdout_file, stderr_file = \
_ = \
bash_script_wrapper(
script_path=run_script)
script_path=run_script,
capture_stderr=False)
except Exception as e:
raise ValueError(
f"Failed to run script, Script: {run_script} for group: ALL, error file: {stderr_file}")
f"Failed to run script, Script: {run_script} for group: ALL")
## check output dir exists
check_file_path(output_dir)
return output_dir
Expand Down Expand Up @@ -877,6 +944,29 @@ def load_cellranger_results_to_db(
db_config_file=DATABASE_CONFIG_FILE,
hpc_base_path=HPC_BASE_RAW_DATA_PATH)
return {'target_dir_path': target_dir_path, 'date_tag': date_tag}
except Exception as e:
log.error(e)
send_airflow_failed_logs_to_channels(
slack_conf=SLACK_CONF,
ms_teams_conf=MS_TEAMS_CONF,
message_prefix=e)
raise ValueError(e)

## TASK: switch to aggr if morethan one samples are
@task(
task_id="decide_aggr",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_4G')
def decide_aggr(
analysis_output_list: list,
aggr_task: str = "configure_cellranger_aggr_run",
non_aggr_task: str = "calculate_md5_for_work_dir") -> list:
try:
if len(analysis_output_list) > 1:
return [aggr_task]
elif len(analysis_output_list) == 1:
return [non_aggr_task]
except Exception as e:
log.error(e)
send_airflow_failed_logs_to_channels(
Expand Down
Loading

0 comments on commit d1134d6

Please sign in to comment.