From 2ad0367d7e278d6737f53e81faca1a93b52323be Mon Sep 17 00:00:00 2001 From: Juraj Smiesko Date: Tue, 16 Jan 2024 11:31:56 +0100 Subject: [PATCH] Improved batch submition --- python/run_analysis.py | 227 +++++++++++++++++++++++------------------ 1 file changed, 128 insertions(+), 99 deletions(-) diff --git a/python/run_analysis.py b/python/run_analysis.py index 2312b31df9..03d0b13755 100644 --- a/python/run_analysis.py +++ b/python/run_analysis.py @@ -23,7 +23,7 @@ # _____________________________________________________________________________ -def determine_os(local_dir): +def determine_os(local_dir: str) -> str | None: ''' Determines platform on which FCCAnalyses was compiled ''' @@ -44,8 +44,11 @@ def determine_os(local_dir): # _____________________________________________________________________________ -def create_condor_config(log_dir, process_name, build_os, rdf_module, - subjob_scripts): +def create_condor_config(log_dir: str, + process_name: str, + build_os: str, + rdf_module, + subjob_scripts: list[str]) -> str: ''' Creates contents of condor configuration file. ''' @@ -92,6 +95,68 @@ def create_condor_config(log_dir, process_name, build_os, rdf_module, return cfg +# _____________________________________________________________________________ +def create_subjob_script(local_dir: str, + rdf_module, + process_name: str, + chunk_num: int, + chunk_list: list[list[str]], + anascript_path: str) -> str: + ''' + Creates sub-job script to be run. + ''' + + output_dir = getElement(rdf_module, "outputDir") + output_dir_eos = getElement(rdf_module, "outputDirEos") + eos_type = getElement(rdf_module, "eosType") + user_batch_config = getElement(rdf_module, "userBatchConfig") + + scr = '#!/bin/bash\n\n' + scr += 'source ' + local_dir + '/setup.sh\n\n' + + # add userBatchConfig if any + if user_batch_config != '': + if not os.path.isfile(user_batch_config): + LOGGER.warning('userBatchConfig file can\'t be found! Will not ' + 'add it to the default config.') + else: + with open(user_batch_config, 'r', encoding='utf-8') as cfgfile: + for line in cfgfile: + scr += line + '\n' + scr += '\n\n' + + scr += f'mkdir job_{process_name}_chunk_{chunk_num}\n' + scr += f'cd job_{process_name}_chunk_{chunk_num}\n\n' + + if not os.path.isabs(output_dir): + output_path = os.path.join(output_dir, f'chunk_{chunk_num}.root') + else: + output_path = os.path.join(output_dir, process_name, + f'chunk_{chunk_num}.root') + + scr += local_dir + scr += f'/bin/fccanalysis run {anascript_path} --batch ' + scr += f'--output {output_path} ' + scr += '--files-list' + for i in range(len(chunk_list[chunk_num])): + scr += ' %s' % (chunk_list[chunk_num][i]) + scr += '\n\n' + + if not os.path.isabs(output_dir) and output_dir_eos == '': + final_dest = os.path.join(local_dir, output_dir, process_name, + f'chunk_{chunk_num}.root') + scr += f'cp {output_path} {final_dest}\n' + + if output_dir_eos != '': + final_dest = os.path.join(output_dir_eos, + process_name, + f'chunk_{chunk_num}.root') + final_dest = f'root://{eos_type}.cern.ch/' + final_dest + scr += f'xrdcp {output_path} {final_dest}\n' + + return scr + + # _____________________________________________________________________________ def getsubfileList(in_file_list, event_list, fraction): nevts_total = sum(event_list) @@ -148,48 +213,32 @@ def saveBenchmark(outfile, benchmark): # _____________________________________________________________________________ -def getCommandOutput(command): - p = subprocess.Popen(command, shell=True, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - universal_newlines=True) - (stdout, stderr) = p.communicate() - return {"stdout": stdout, "stderr": stderr, "returncode": p.returncode} - - -# _____________________________________________________________________________ -def submit_job(cmd, max_trials): - submissionStatus = 0 +def submit_job(cmd: str, max_trials: int) -> bool: + ''' + Submit job to condor, retry `max_trials` times. + ''' for i in range(max_trials): with subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) as proc: (stdout, stderr) = proc.communicate() - print('stdout:') - print(stdout) - print('stderr:') - print(stderr) - - outputCMD = getCommandOutput(cmd) - stderr = outputCMD["stderr"].split('\n') - - if len(stderr) == 1 and stderr[0] == '': - LOGGER.info('GOOD SUBMISSION') - submissionStatus = 1 - else: - LOGGER.warning('Error while submitting, retrying...\n\t' - 'Trial: %i / %i\n\tError: %s', i, max_trials, stderr) - time.sleep(10) + if proc.returncode == 0 and len(stderr) == 0: + LOGGER.info(stdout) + LOGGER.info('GOOD SUBMISSION') + return True + else: + LOGGER.warning('Error while submitting, retrying...\n ' + 'Trial: %i / %i\n Error: %s', + i, max_trials, stderr) + time.sleep(10) - if submissionStatus == 1: - return 1 + LOGGER.error('Failed submitting after: %i trials!\n Please, stop trying to ' + 'submit.', max_trials) + return False - if i == max_trials - 1: - LOGGER.error('Failed submitting after: %i trials, stop trying to ' - 'submit', max_trials) - return 0 -# __________________________________________________________ +# _____________________________________________________________________________ def initialize(args, rdf_module, anascript_path): ''' Common initialization steps. @@ -274,17 +323,17 @@ def runRDF(rdf_module, inputlist, outFile, nevt, args): # _____________________________________________________________________________ def sendToBatch(rdf_module, chunkList, process, analysisFile): - localDir = os.environ["LOCAL_DIR"] + local_dir = os.environ["LOCAL_DIR"] current_date = datetime.datetime.fromtimestamp( datetime.datetime.now().timestamp()).strftime('%Y-%m-%d_%H-%M-%S') - logDir = localDir + "/BatchOutputs/{}/{}".format(current_date, process) - if not os.path.exists(logDir): - os.system("mkdir -p {}".format(logDir)) + log_dir = local_dir + "/BatchOutputs/{}/{}".format(current_date, process) + if not os.path.exists(log_dir): + os.system("mkdir -p {}".format(log_dir)) # Making sure the FCCAnalyses libraries are compiled and installed try: subprocess.check_output(['make', 'install'], - cwd=localDir+'/build', + cwd=local_dir+'/build', stderr=subprocess.DEVNULL ) except subprocess.CalledProcessError: @@ -297,84 +346,64 @@ def sendToBatch(rdf_module, chunkList, process, analysisFile): eosType = getElement(rdf_module, "eosType") userBatchConfig = getElement(rdf_module, "userBatchConfig") - if outputDir!="" and outputDir[-1]!="/": outputDir+="/" - subjob_scripts = [] for ch in range(len(chunkList)): - frunname = '{}/job{}_chunk{}.sh'.format(logDir, process, ch) - LOGGER.info('Script to run %s: ', frunname) - subjob_scripts.append(frunname) - - frun = None - try: - frun = open(frunname, 'w') - except IOError as e: - LOGGER.warning('I/O error(%i): %s', e.errno, e.strerror) - time.sleep(10) - frun = open(frunname, 'w') - - subprocess.getstatusoutput('chmod 777 %s'%(frunname)) - frun.write('#!/bin/bash\n') - frun.write('source ' + localDir + '/setup.sh\n') - - # add userBatchConfig if any - if userBatchConfig != "": - if not os.path.isfile(userBatchConfig): - LOGGER.warning('userBatchConfig file does not exist, will not ' - 'add it to default config, please check') - else: - configFile=open(userBatchConfig) - for line in configFile: - frun.write(line+'\n') - - frun.write('mkdir job{}_chunk{}\n'.format(process,ch)) - frun.write('cd job{}_chunk{}\n'.format(process,ch)) - - if not os.path.isabs(outputDir): - frun.write(localDir + '/bin/fccanalysis run {} --batch --output {}chunk{}.root --files-list '.format(analysisFile, outputDir, ch)) - else: - frun.write(localDir + '/bin/fccanalysis run {} --batch --output {}{}/chunk{}.root --files-list '.format(analysisFile, outputDir, process,ch)) - - for ff in range(len(chunkList[ch])): - frun.write(' %s'%(chunkList[ch][ff])) - frun.write('\n') - if not os.path.isabs(outputDir): - if outputDirEos == "": - frun.write('cp {}chunk{}.root {}/{}/{}/chunk{}.root\n'.format(outputDir,ch,localDir,outputDir,process,ch)) + subjob_script_path = '{}/job_{}_chunk{}.sh'.format(log_dir, process, ch) + subjob_scripts.append(subjob_script_path) + + for i in range(3): + try: + with open(subjob_script_path, 'w', encoding='utf-8') as ofile: + subjob_script = create_subjob_script(local_dir, + rdf_module, + process, + ch, + chunkList, + analysisFile) + ofile.write(subjob_script) + except IOError as e: + if i < 2: + LOGGER.warning('I/O error(%i): %s', e.errno, e.strerror) + else: + LOGGER.error('I/O error(%i): %s', e.errno, e.strerror) + sys.exit(3) else: - frun.write('xrdcp {}chunk{}.root root://{}.cern.ch/{}/{}/chunk{}.root\n'.format(outputDir,ch,eosType,outputDirEos,process,ch)) - else: - if outputDirEos != "": - frun.write('xrdcp {}chunk{}.root root://{}.cern.ch/{}/{}/chunk{}.root\n'.format(outputDir,ch,eosType,outputDirEos,process,ch)) + break + time.sleep(10) + subprocess.getstatusoutput(f'chmod 777 {subjob_script_path}') - frun.close() + LOGGER.debug('Sub-job scripts to be run:\n - %s', + '\n - '.join(subjob_scripts)) - condor_config_path = f'{logDir}/job_desc_{process}.cfg' + condor_config_path = f'{log_dir}/job_desc_{process}.cfg' - for _ in range(3): + for i in range(3): try: with open(condor_config_path, 'w', encoding='utf-8') as cfgfile: - condor_config = create_condor_config(logDir, + condor_config = create_condor_config(log_dir, process, - determine_os(localDir), + determine_os(local_dir), rdf_module, subjob_scripts) cfgfile.write(condor_config) except IOError as e: LOGGER.warning('I/O error(%i): %s', e.errno, e.strerror) + if i == 2: + sys.exit(3) else: break time.sleep(10) subprocess.getstatusoutput(f'chmod 777 {condor_config_path}') batch_cmd = f'condor_submit {condor_config_path}' - LOGGER.info('Batch command: %s', batch_cmd) - ntry = SubmitToCondor(batch_cmd, 10) - LOGGER.debug('Batch command submitted on %i try.', ntry) + LOGGER.info('Batch command:\n %s', batch_cmd) + success = submit_job(batch_cmd, 10) + if not success: + sys.exit(3) # _____________________________________________________________________________ -def apply_filepath_rewrites(filepath): +def apply_filepath_rewrites(filepath: str) -> str: ''' Apply path rewrites if applicable. ''' @@ -550,14 +579,14 @@ def runStages(args, rdf_module, preprocess, analysisFile): runLocal(rdf_module, args.files_list, args) sys.exit(0) - # Check if batch mode and set start and end file from original list - run_batch = getElement(rdf_module, "runBatch") + # Check if batch mode is available + run_batch = getElement(rdf_module, 'runBatch') if run_batch and shutil.which('condor_q') is None: LOGGER.error('HTCondor tools can\'t be found!\nAborting...') sys.exit(3) # Check if the process list is specified - process_list = getElement(rdf_module, "processList") + process_list = getElement(rdf_module, 'processList') for process_name in process_list: file_list, event_list = get_process_info(