diff --git a/Configuration/python/mergeUtilities.py b/Configuration/python/mergeUtilities.py index 861aa6e4..d72771ff 100644 --- a/Configuration/python/mergeUtilities.py +++ b/Configuration/python/mergeUtilities.py @@ -10,6 +10,7 @@ import shutil import math import socket +import tempfile from threading import Thread, Lock, Semaphore from multiprocessing import cpu_count from OSUT3Analysis.Configuration.configurationOptions import * @@ -17,7 +18,7 @@ from OSUT3Analysis.Configuration.formattingUtilities import * from OSUT3Analysis.DBTools.condorSubArgumentsSet import * import FWCore.ParameterSet.Config as cms -from ROOT import TFile +from ROOT import TFile, TNetXNGFile NTHREADS_FOR_BATCH_MERGING = 1 @@ -86,6 +87,7 @@ def MessageDecoder(Message, Good): ############################################################################### def GetGoodRootFiles(Index): return " ".join (glob.glob ("*_" + str (Index) + ".root")) + def MakeWeightsString(Weight,FilesSet): Str = '' for i in range(0,len(FilesSet)): @@ -94,16 +96,17 @@ def MakeWeightsString(Weight,FilesSet): else: Str = Str + ',' + str(Weight) return Str + ############################################################################### # Get the total number of events from cutFlows to calculate the weights # ############################################################################### def GetNumberOfEvents(FilesSet): NumberOfEvents = {'SkimNumber' : {}, 'TotalNumber' : 0} - for File in FilesSet: - ScoutFile = TFile(File) + for histFile in FilesSet: + ScoutFile = TFile(histFile) if ScoutFile.IsZombie(): - print File + " is a bad root file." - FilesSet.remove(File) + print histFile + " is a bad root file." + FilesSet.remove(histFile) continue randomChannelDirectory = "" TotalNumberTmp = 0 @@ -118,10 +121,10 @@ def GetNumberOfEvents(FilesSet): SkimCounterObj = ScoutFile.Get(randomChannelDirectory + "/cutFlow") TotalNumberTmp = 0 if not OriginalCounterObj: - print "Could not find eventCounter histogram in " + str(File) + " !" + print "Could not find eventCounter histogram in " + str(histFile) + " !" continue elif not SkimCounterObj: - print "Could not find cutFlow histogram in " + str(File) + " !" + print "Could not find cutFlow histogram in " + str(histFile) + " !" else: OriginalCounter = OriginalCounterObj.Clone() OriginalCounter.SetDirectory(0) @@ -131,11 +134,41 @@ def GetNumberOfEvents(FilesSet): NumberOfEvents['SkimNumber'][channelName] = NumberOfEvents['SkimNumber'][channelName] + SkimCounter.GetBinContent(SkimCounter.GetXaxis().GetNbins()) NumberOfEvents['TotalNumber'] = NumberOfEvents['TotalNumber'] + TotalNumberTmp return NumberOfEvents + ############################################################################### # Produce important files for the skim directory. # ############################################################################### -def MakeFilesForSkimDirectory(Directory, DirectoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove): - print "in MakeFilesForSkimDirectory" +def MakeFilesForSkimDirectoryEOS(Member, Directory, DirectoryOut, TotalNumber, SkimNumber, BadIndices): + xrootdDestination = 'root://cmseos.fnal.gov/' + os.path.realpath(DirectoryOut + '/' + Member + '/')[len('/eos/uscms'):] + tmpDir = tempfile.mkdtemp() + outfile = os.path.join(tmpDir, 'OriginalNumberOfEvents.txt') + fout = open (outfile, 'w') + fout.write(str(TotalNumber) + '\n') + fout.close() + try: + subprocess.check_output(['xrdcp', '-f', outfile, xrootdDestination]) + except subprocess.CalledProcessError as e: + print 'Failed to copy OriginalNumberOfEvents.txt:', e + outfile = os.path.join(tmpDir, 'SkimNumberOfEvents.txt') + fout = open(outfile, 'w') + fout.write(str(SkimNumber) + '\n') + fout.close() + try: + subprocess.check_output(['xrdcp', '-f', outfile, xrootdDestination]) + except subprocess.CalledProcessError as e: + print 'Failed to copy SkimNumber.txt:', e + shutil.rmtree(tmpDir) + + listOfSkimFiles = subprocess.check_output(['xrdfs', 'root://cmseos.fnal.gov', 'ls', os.path.realpath(DirectoryOut + '/' + Member)]) + listOfSkimFiles = [x for x in listOfSkimFiles.split('\n') if x.endswith('.root')] + for skimFile in listOfSkimFiles: + index = os.path.basename(skimFile).split('.')[0].split('_')[1] + if index in BadIndices: + continue + GetSkimInputTags(skimFile, xrootdDestination) + break + +def MakeFilesForSkimDirectory(Directory, DirectoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove, lpcCAF = False): for Member in os.listdir(Directory): if os.path.isfile(os.path.join(Directory, Member)): continue; @@ -143,42 +176,56 @@ def MakeFilesForSkimDirectory(Directory, DirectoryOut, TotalNumber, SkimNumber, os.makedirs (DirectoryOut + '/' + Member) except OSError: pass - outfile = os.path.join(DirectoryOut, Member, 'OriginalNumberOfEvents.txt') - try: - os.unlink (outfile) - except OSError: - pass - fout = open (outfile, "w") - fout.write (str (TotalNumber) + "\n") - fout.close () - outfile = os.path.join(DirectoryOut, Member, 'SkimNumberOfEvents.txt') - try: - os.unlink (outfile) - except OSError: - pass - fout = open (outfile, "w") - fout.write (str (SkimNumber[Member]) + "\n") - fout.close () - os.chdir(Directory + '/' + Member) - listOfSkimFiles = glob.glob('*.root') - sys.path.append(Directory + '/' + Member) - createdSkimInputTags = False - for file in listOfSkimFiles: - index = file.split('.')[0].split('_')[1] - if index in BadIndices: - continue - if not createdSkimInputTags: - GetSkimInputTags(file.rstrip('\n')) - createdSkimInputTags = True - os.chdir(Directory) - for file in FilesToRemove: - os.unlink (file) + + if lpcCAF and os.path.realpath(DirectoryOut + '/' + Member).startswith('/eos/uscms'): + MakeFilesForSkimDirectoryEOS(Member, Directory, DirectoryOut, TotalNumber, SkimNumber, BadIndices) + else: + outfile = os.path.join(DirectoryOut, Member, 'OriginalNumberOfEvents.txt') + try: + os.unlink (outfile) + except OSError: + pass + fout = open (outfile, "w") + fout.write (str (TotalNumber) + "\n") + fout.close () + outfile = os.path.join(DirectoryOut, Member, 'SkimNumberOfEvents.txt') + try: + os.unlink (outfile) + except OSError: + pass + fout = open (outfile, "w") + fout.write (str (SkimNumber[Member]) + "\n") + fout.close () + + os.chdir(Directory + '/' + Member) + listOfSkimFiles = glob.glob('*.root') + sys.path.append(Directory + '/' + Member) + for skimFile in listOfSkimFiles: + index = skimFile.split('.')[0].split('_')[1] + if index in BadIndices: + continue + GetSkimInputTags(skimFile.rstrip('\n')) + break + os.chdir(Directory) + + for skimFile in FilesToRemove: + if lpcCAF and skimFile.startswith('root://cmseos.fnal.gov/'): + try: + subprocess.check_output(['xrdfs', 'root://cmseos.fnal.gov', 'rm', skimFile[len('root://cmseos.fnal.gov/'):]]) + except subprocess.CalledProcessError as e: + print 'Failed to remove file', skimFile, ':', e + else: + os.unlink(skimFile) + ############################################################################### # Produce a pickle file containing the skim input tags. # ############################################################################### -def GetSkimInputTags(File): +def GetSkimInputTags(File, xrootdDestination = ""): print "Getting skim input tags..." - eventContent = subprocess.check_output (["edmDumpEventContent", "--all", os.getcwd () + "/" + File]) + if xrootdDestination != "": + eventContent = subprocess.check_output (["edmDumpEventContent", "--all", "root://cmseos.fnal.gov/" + File[len('/eos/uscms'):]]) + else: + eventContent = subprocess.check_output (["edmDumpEventContent", "--all", os.getcwd () + "/" + File]) parsing = False cppTypes = [] inputTags = {} @@ -208,11 +255,23 @@ def GetSkimInputTags(File): if "OSUAnalysis" in thisTag.getProcessName (): inputTags[collectionType] = thisTag - if os.path.exists("SkimInputTags.pkl"): - os.remove("SkimInputTags.pkl") - fout = open ("SkimInputTags.pkl", "w") - pickle.dump (inputTags, fout) - fout.close () + if xrootdDestination != "": + tmpDir = tempfile.mkdtemp() + outfile = os.path.join(tmpDir, 'SkimInputTags.pkl') + fout = open (outfile, 'w') + pickle.dump (inputTags, fout) + fout.close() + try: + subprocess.check_output(['xrdcp', '-f', outfile, xrootdDestination]) + except subprocess.CalledProcessError as e: + print 'Failed to copy SkimInputTags.pkl:', e + shutil.rmtree(tmpDir) + else: + if os.path.exists("SkimInputTags.pkl"): + os.remove("SkimInputTags.pkl") + fout = open ("SkimInputTags.pkl", "w") + pickle.dump (inputTags, fout) + fout.close () ############################################################################### # Make submission script for the failed jobs. # @@ -250,9 +309,11 @@ def MakeResubmissionScript(badIndices, originalSubmissionScript): ############################################################################### # Determine whether a skim file is valid. # ############################################################################### -def SkimFileValidator(File): - print "testing ", File - FileToTest = TFile(File) +def SkimFileValidator(skimFile): + if skimFile.startswith('root://'): + FileToTest = TNetXNGFile(skimFile) + else: + FileToTest = TFile(skimFile) Valid = True for TreeToTest in ['MetaData', 'ParameterSets', 'Parentage', 'Events', 'LuminosityBlocks', 'Runs']: Valid = Valid and (FileToTest.Get(TreeToTest) != None) @@ -341,21 +402,27 @@ def mergeOneDataset(dataSet, IntLumi, CondorDir, OutputDir="", nThreadsActive = # check for files that weren't found and were skipped StdErrFiles = sorted(glob.glob('condor_*.err')) - for file in StdErrFiles: - index = file.split("_")[-1].split(".")[0] - if 'was not found or could not be opened, and will be skipped.' in open(file).read(): + for errFile in StdErrFiles: + index = errFile.split("_")[-1].split(".")[0] + if 'was not found or could not be opened, and will be skipped.' in open(errFile).read(): BadIndices.append(index) if verbose: print " job" + ' ' * (4-len(str(index))) + index + " had bad/skipped input file" - # check for any corrupted skim output files - skimDirs = [member for member in os.listdir(os.getcwd()) if os.path.isdir(member)] + skimDirs = [member for member in os.listdir(os.getcwd()) if not os.path.isfile(member)] FilesToRemove = [] for channel in skimDirs: - for skimFile in glob.glob(channel+'/*.root'): + if lpcCAF and os.path.realpath(channel).startswith('/eos/uscms'): + print 'Testing skim files in: root://cmseos.fnal.gov/' + os.path.realpath(channel) + listOfSkimFiles = subprocess.check_output(['xrdfs', 'root://cmseos.fnal.gov', 'ls', os.path.realpath(channel)]) + listOfSkimFiles = ['root://cmseos.fnal.gov/' + x for x in listOfSkimFiles.split('\n') if x.endswith('.root')] + else: + print 'Testing skim files in:', os.path.realpath(channel) + listOfSkimFiles = glob.glob(channel + '/*.root') + for skimFile in listOfSkimFiles: # don't check for good skims of jobs we already know are bad - index = skimFile.split('.')[0].split('_')[1] + index = os.path.basename(skimFile).split('.')[0].split('_')[1] if index in BadIndices: continue Valid, InvalidOrEmpty = SkimFileValidator(skimFile.rstrip('\n')) @@ -366,7 +433,6 @@ def mergeOneDataset(dataSet, IntLumi, CondorDir, OutputDir="", nThreadsActive = if InvalidOrEmpty: FilesToRemove.append (skimFile) - # check for abnormal condor return values sys.path.append(directory) for returnValue in ReturnValues: @@ -404,7 +470,7 @@ def mergeOneDataset(dataSet, IntLumi, CondorDir, OutputDir="", nThreadsActive = if verbose: print "TotalNumber =", TotalNumber, ", SkimNumber =", SkimNumber if not TotalNumber: - MakeFilesForSkimDirectory(directory, directoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove) + MakeFilesForSkimDirectory(directory, directoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove, lpcCAF) return Weight = 1.0 crossSection = float(datasetInfo.crossSection) @@ -422,9 +488,9 @@ def mergeOneDataset(dataSet, IntLumi, CondorDir, OutputDir="", nThreadsActive = Weight = IntLumi*crossSection/float(TotalNumber) InputWeightString = MakeWeightsString(Weight, GoodRootFiles) if runOverSkim: - MakeFilesForSkimDirectory(directory, directoryOut, datasetInfo.originalNumberOfEvents, SkimNumber, BadIndices, FilesToRemove) + MakeFilesForSkimDirectory(directory, directoryOut, datasetInfo.originalNumberOfEvents, SkimNumber, BadIndices, FilesToRemove, lpcCAF) else: - MakeFilesForSkimDirectory(directory, directoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove) + MakeFilesForSkimDirectory(directory, directoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove, lpcCAF) if not skipMerging: threadLog = ""