Skip to content

Commit

Permalink
Merge pull request #115 from 4dn-dcic/ajs_upd_wfr_clean_fxns
Browse files Browse the repository at this point in the history
Remove hard coding of wf details from cleanup functions
  • Loading branch information
aschroed authored Aug 9, 2024
2 parents ae41763 + 364d4ca commit 9cbd0e2
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 524 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ Change Log
------


3.2.0
=====

* refactor of cleanup.py to get info about accepted workflow versions and run times from db and remove hard coded array
* updated find_and_release notebook to utilize this - not backward compatible
* corrected small bug in delete_wfr function that failed to cleanup errored or duplicated workflow runs on user submitted files as they are not outputs of wfrs


3.1.1
=====

Expand Down
103 changes: 33 additions & 70 deletions functions/cleanup.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,33 @@
from dcicutils import ff_utils
from datetime import datetime

# accepted workflows
# workflow name, accepted revision numbers (0 if none), accetable run time (hours)
workflow_details = [
# TODO: take this info from foursight
# common ones
['md5', ['0.0.4', '0.2.6'], 12],
['fastqc-0-11-4-1', ['0.2.0'], 50],
['fastqc', ['v1', 'v2'], 50],
# 4dn ones
['bwa-mem', ['0.2.6'], 50],
['pairsqc-single', ['0.2.5', '0.2.6'], 100],
['hi-c-processing-bam', ['0.2.6'], 50],
['hi-c-processing-pairs', ['0.2.6', '0.2.7'], 200],
['hi-c-processing-pairs-nore', ['0.2.6'], 200],
['hi-c-processing-pairs-nonorm', ['0.2.6'], 200],
['hi-c-processing-pairs-nore-nonorm', ['0.2.6'], 200],
['imargi-processing-fastq', ["1.1.1_dcic_4"], 200],
['imargi-processing-bam', ["1.1.1_dcic_4"], 200],
['imargi-processing-pairs', ["1.1.1_dcic_4"], 200],
['repliseq-parta', ['v13.1', 'v14', 'v16', 'v16.1'], 200],
['bedGraphToBigWig', ['v4'], 24],
['bedtobeddb', ['v2', 'v3'], 24],
['encode-chipseq-aln-chip', ['1.1.1', '2.1.6'], 200],
['encode-chipseq-aln-ctl', ['1.1.1','2.1.6'], 200],
['encode-chipseq-postaln', ['1.1.1','2.1.6'], 200],
['encode-atacseq-aln', ['1.1.1'], 200],
['encode-atacseq-postaln', ['1.1.1'], 200],
['mergebed', ['v1'], 200],
['merge-fastq', ['v1'], 200],
['bamqc', ['v2', 'v3'], 200],
['encode-rnaseq-stranded', ['1.1'], 200],
['encode-rnaseq-unstranded', ['1.1'], 200],
['rna-strandedness', ['v2'], 200],
['fastq-first-line', ['v2'], 200],
['re_checker_workflow', ['v1.1', 'v1.2'], 200],
['mad_qc_workflow', ['1.1_dcic_2'], 200],
['insulation-scores-and-boundaries-caller', ['v1'], 200],
['compartments-caller', ['v1.2'], 200],
['mcoolQC', ['v1'], 200],
# cgap ones
['workflow_bwa-mem_no_unzip-check', ['v9', 'v10', 'v11', 'v12', 'v13'], 48],
['workflow_add-readgroups-check', ['v9', 'v10', 'v11', 'v12', 'v13'], 12],
['workflow_merge-bam-check', ['v9', 'v10', 'v11', 'v12', 'v13'], 12],
['workflow_picard-MarkDuplicates-check', ['v9', 'v10', 'v11', 'v12', 'v13'], 12],
['workflow_sort-bam-check', ['v9', 'v10', 'v11', 'v12', 'v13'], 12],
['workflow_gatk-BaseRecalibrator', ['v9', 'v10', 'v11', 'v12', 'v13'], 12],
['workflow_gatk-ApplyBQSR-check', ['v9', 'v10', 'v11', 'v12', 'v13'], 12],
['workflow_index-sorted-bam', ['v9'], 12],
['workflow_gatk-HaplotypeCaller', ['v10', 'v11', 'v12', 'v13'], 12],
['workflow_gatk-CombineGVCFs', ['v10', 'v11', 'v12', 'v13'], 12],
['workflow_gatk-GenotypeGVCFs-check', ['v10', 'v11', 'v12', 'v13'], 12],
['workflow_gatk-VQSR-check', ['v10', 'v11', 'v12', 'v13'], 12],
['workflow_qcboard-bam', ['v9'], 12],
['workflow_cram2fastq', ['v12', 'v13'], 12],
]

workflow_names = [i[0] for i in workflow_details]
# function to get workflow_details info from db
# initial datastructure that is the same as that to get info for foursight is transformed
# into the format used in the cleanup functions
# workflow name, accepted revision numbers (0 if none), accetable run time (hours)
def get_workflow_details(my_auth):
wf_details = {}
wf_query = "search/?type=Workflow&tags=current&tags=accepted&field=max_runtime" \
"&app_name!=No value&app_version!=No value&field=app_name&field=app_version"
workflows = ff_utils.search_metadata(wf_query, my_auth)
for wf in workflows:
app_name = wf.get('app_name')
app_version = wf.get('app_version')
run_time = wf.get('max_runtime', 0)
wf_details.setdefault(app_name, {})
wf_details[app_name].setdefault('accepted_versions', []).append(app_version)
wf_details[app_name].setdefault('run_time', run_time)
# for unexpected case of different wf items with same app_name having
# different run times - use max value
if run_time > wf_details[app_name].get('run_time'):
wf_details[app_name]['run_time'] = run_time
# here is the transformation
# workflow_details = []
# for wfname, wf_info in wf_details.items():
# workflow_details.append((wfname, wf_info.get('accepted_versions'), [], wf_info.get('run_time')))
return [(wfname, wf_details[wfname].get('accepted_versions', []), wf_details[wfname].get('run_time'))
for wfname in wf_details.keys()]


def fetch_pf_associated(pf_id_or_dict, my_key):
Expand Down Expand Up @@ -116,19 +86,14 @@ def get_wfr_report(wfrs):
# skip all style awsem runs
try:
wfr_type_base, wfr_version = wfr_type.strip().split(' ')
except:
except Exception:
continue
time_info = time_info.strip('on').strip()
try:
wfr_time = datetime.strptime(time_info, '%Y-%m-%d %H:%M:%S.%f')
except ValueError:
wfr_time = datetime.strptime(time_info, '%Y-%m-%d %H:%M:%S')
run_hours = (datetime.utcnow() - wfr_time).total_seconds() / 3600
# try:
# wfr_time = datetime.strptime(wfr_data['date_created'], '%Y-%m-%dT%H:%M:%S.%f+00:00')
# except ValueError: # if it was exact second, no fraction is in value
# print("wfr time bingo", wfr_uuid)
# wfr_time = datetime.strptime(wfr_data['date_created'], '%Y-%m-%dT%H:%M:%S+00:00')
output_files = wfr_data.get('output_files', None)
output_uuids = []
qc_uuids = []
Expand Down Expand Up @@ -158,7 +123,7 @@ def get_wfr_report(wfrs):
return wfr_report


def delete_wfrs(file_resp, my_key, delete=False, stash=None):
def delete_wfrs(file_resp, my_key, workflow_details, delete=False, stash=None):
# file_resp in embedded frame
# stash: all related wfrs for file_resp
deleted_wfrs = [] # reports WorkflowRun items deleted by this function
Expand All @@ -170,16 +135,13 @@ def delete_wfrs(file_resp, my_key, delete=False, stash=None):
# do not delete output wfrs of control files
output_wfrs = file_resp.get('workflow_run_outputs')
if not output_wfrs:
if file_type == 'files-processed':
# user submtted processed files
return
else:
# raw files:
pass
# user submitted and raw files generally lack wfr_outputs but they can still have
# duplicate and errored runs so changed return (for file_processed) to pass for all
pass
else:
output_wfr = output_wfrs[0]
wfr_type, _ = output_wfr['display_title'].split(' run ')
if wfr_type in ['encode-chipseq-aln-ctl 1.1.1', 'encode-chipseq-aln-ctl 2.1.6'] :
if wfr_type in ['encode-chipseq-aln-ctl 1.1.1', 'encode-chipseq-aln-ctl 2.1.6']:
print('skipping control file for wfr check', file_resp['accession'])
return

Expand Down Expand Up @@ -221,6 +183,7 @@ def _delete_action(wfr_to_del):
return

# CLEAN UP IF FILE IS DELETED
workflow_names = [wfinfo[0] for wfinfo in workflow_details]
if file_resp['status'] == 'deleted':
if file_resp.get('quality_metric'):
if delete:
Expand Down
Loading

0 comments on commit 9cbd0e2

Please sign in to comment.