Skip to content

Commit

Permalink
Merge pull request #96 from bpf6qc/fixMergingLPC
Browse files Browse the repository at this point in the history
Fix merging lpc
  • Loading branch information
jalimena authored Jan 28, 2019
2 parents 076f09d + 0befa4d commit d5f9f57
Showing 1 changed file with 126 additions and 60 deletions.
186 changes: 126 additions & 60 deletions Configuration/python/mergeUtilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
import shutil
import math
import socket
import tempfile
from threading import Thread, Lock, Semaphore
from multiprocessing import cpu_count
from OSUT3Analysis.Configuration.configurationOptions import *
from OSUT3Analysis.Configuration.processingUtilities import *
from OSUT3Analysis.Configuration.formattingUtilities import *
from OSUT3Analysis.DBTools.condorSubArgumentsSet import *
import FWCore.ParameterSet.Config as cms
from ROOT import TFile
from ROOT import TFile, TNetXNGFile

NTHREADS_FOR_BATCH_MERGING = 1

Expand Down Expand Up @@ -86,6 +87,7 @@ def MessageDecoder(Message, Good):
###############################################################################
def GetGoodRootFiles(Index):
return " ".join (glob.glob ("*_" + str (Index) + ".root"))

def MakeWeightsString(Weight,FilesSet):
Str = ''
for i in range(0,len(FilesSet)):
Expand All @@ -94,16 +96,17 @@ def MakeWeightsString(Weight,FilesSet):
else:
Str = Str + ',' + str(Weight)
return Str

###############################################################################
# Get the total number of events from cutFlows to calculate the weights #
###############################################################################
def GetNumberOfEvents(FilesSet):
NumberOfEvents = {'SkimNumber' : {}, 'TotalNumber' : 0}
for File in FilesSet:
ScoutFile = TFile(File)
for histFile in FilesSet:
ScoutFile = TFile(histFile)
if ScoutFile.IsZombie():
print File + " is a bad root file."
FilesSet.remove(File)
print histFile + " is a bad root file."
FilesSet.remove(histFile)
continue
randomChannelDirectory = ""
TotalNumberTmp = 0
Expand All @@ -118,10 +121,10 @@ def GetNumberOfEvents(FilesSet):
SkimCounterObj = ScoutFile.Get(randomChannelDirectory + "/cutFlow")
TotalNumberTmp = 0
if not OriginalCounterObj:
print "Could not find eventCounter histogram in " + str(File) + " !"
print "Could not find eventCounter histogram in " + str(histFile) + " !"
continue
elif not SkimCounterObj:
print "Could not find cutFlow histogram in " + str(File) + " !"
print "Could not find cutFlow histogram in " + str(histFile) + " !"
else:
OriginalCounter = OriginalCounterObj.Clone()
OriginalCounter.SetDirectory(0)
Expand All @@ -131,54 +134,98 @@ def GetNumberOfEvents(FilesSet):
NumberOfEvents['SkimNumber'][channelName] = NumberOfEvents['SkimNumber'][channelName] + SkimCounter.GetBinContent(SkimCounter.GetXaxis().GetNbins())
NumberOfEvents['TotalNumber'] = NumberOfEvents['TotalNumber'] + TotalNumberTmp
return NumberOfEvents

###############################################################################
# Produce important files for the skim directory. #
###############################################################################
def MakeFilesForSkimDirectory(Directory, DirectoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove):
print "in MakeFilesForSkimDirectory"
def MakeFilesForSkimDirectoryEOS(Member, Directory, DirectoryOut, TotalNumber, SkimNumber, BadIndices):
xrootdDestination = 'root://cmseos.fnal.gov/' + os.path.realpath(DirectoryOut + '/' + Member + '/')[len('/eos/uscms'):]
tmpDir = tempfile.mkdtemp()
outfile = os.path.join(tmpDir, 'OriginalNumberOfEvents.txt')
fout = open (outfile, 'w')
fout.write(str(TotalNumber) + '\n')
fout.close()
try:
subprocess.check_output(['xrdcp', '-f', outfile, xrootdDestination])
except subprocess.CalledProcessError as e:
print 'Failed to copy OriginalNumberOfEvents.txt:', e
outfile = os.path.join(tmpDir, 'SkimNumberOfEvents.txt')
fout = open(outfile, 'w')
fout.write(str(SkimNumber) + '\n')
fout.close()
try:
subprocess.check_output(['xrdcp', '-f', outfile, xrootdDestination])
except subprocess.CalledProcessError as e:
print 'Failed to copy SkimNumber.txt:', e
shutil.rmtree(tmpDir)

listOfSkimFiles = subprocess.check_output(['xrdfs', 'root://cmseos.fnal.gov', 'ls', os.path.realpath(DirectoryOut + '/' + Member)])
listOfSkimFiles = [x for x in listOfSkimFiles.split('\n') if x.endswith('.root')]
for skimFile in listOfSkimFiles:
index = os.path.basename(skimFile).split('.')[0].split('_')[1]
if index in BadIndices:
continue
GetSkimInputTags(skimFile, xrootdDestination)
break

def MakeFilesForSkimDirectory(Directory, DirectoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove, lpcCAF = False):
for Member in os.listdir(Directory):
if os.path.isfile(os.path.join(Directory, Member)):
continue;
try:
os.makedirs (DirectoryOut + '/' + Member)
except OSError:
pass
outfile = os.path.join(DirectoryOut, Member, 'OriginalNumberOfEvents.txt')
try:
os.unlink (outfile)
except OSError:
pass
fout = open (outfile, "w")
fout.write (str (TotalNumber) + "\n")
fout.close ()
outfile = os.path.join(DirectoryOut, Member, 'SkimNumberOfEvents.txt')
try:
os.unlink (outfile)
except OSError:
pass
fout = open (outfile, "w")
fout.write (str (SkimNumber[Member]) + "\n")
fout.close ()
os.chdir(Directory + '/' + Member)
listOfSkimFiles = glob.glob('*.root')
sys.path.append(Directory + '/' + Member)
createdSkimInputTags = False
for file in listOfSkimFiles:
index = file.split('.')[0].split('_')[1]
if index in BadIndices:
continue
if not createdSkimInputTags:
GetSkimInputTags(file.rstrip('\n'))
createdSkimInputTags = True
os.chdir(Directory)
for file in FilesToRemove:
os.unlink (file)

if lpcCAF and os.path.realpath(DirectoryOut + '/' + Member).startswith('/eos/uscms'):
MakeFilesForSkimDirectoryEOS(Member, Directory, DirectoryOut, TotalNumber, SkimNumber, BadIndices)
else:
outfile = os.path.join(DirectoryOut, Member, 'OriginalNumberOfEvents.txt')
try:
os.unlink (outfile)
except OSError:
pass
fout = open (outfile, "w")
fout.write (str (TotalNumber) + "\n")
fout.close ()
outfile = os.path.join(DirectoryOut, Member, 'SkimNumberOfEvents.txt')
try:
os.unlink (outfile)
except OSError:
pass
fout = open (outfile, "w")
fout.write (str (SkimNumber[Member]) + "\n")
fout.close ()

os.chdir(Directory + '/' + Member)
listOfSkimFiles = glob.glob('*.root')
sys.path.append(Directory + '/' + Member)
for skimFile in listOfSkimFiles:
index = skimFile.split('.')[0].split('_')[1]
if index in BadIndices:
continue
GetSkimInputTags(skimFile.rstrip('\n'))
break
os.chdir(Directory)

for skimFile in FilesToRemove:
if lpcCAF and skimFile.startswith('root://cmseos.fnal.gov/'):
try:
subprocess.check_output(['xrdfs', 'root://cmseos.fnal.gov', 'rm', skimFile[len('root://cmseos.fnal.gov/'):]])
except subprocess.CalledProcessError as e:
print 'Failed to remove file', skimFile, ':', e
else:
os.unlink(skimFile)

###############################################################################
# Produce a pickle file containing the skim input tags. #
###############################################################################
def GetSkimInputTags(File):
def GetSkimInputTags(File, xrootdDestination = ""):
print "Getting skim input tags..."
eventContent = subprocess.check_output (["edmDumpEventContent", "--all", os.getcwd () + "/" + File])
if xrootdDestination != "":
eventContent = subprocess.check_output (["edmDumpEventContent", "--all", "root://cmseos.fnal.gov/" + File[len('/eos/uscms'):]])
else:
eventContent = subprocess.check_output (["edmDumpEventContent", "--all", os.getcwd () + "/" + File])
parsing = False
cppTypes = []
inputTags = {}
Expand Down Expand Up @@ -208,11 +255,23 @@ def GetSkimInputTags(File):
if "OSUAnalysis" in thisTag.getProcessName ():
inputTags[collectionType] = thisTag

if os.path.exists("SkimInputTags.pkl"):
os.remove("SkimInputTags.pkl")
fout = open ("SkimInputTags.pkl", "w")
pickle.dump (inputTags, fout)
fout.close ()
if xrootdDestination != "":
tmpDir = tempfile.mkdtemp()
outfile = os.path.join(tmpDir, 'SkimInputTags.pkl')
fout = open (outfile, 'w')
pickle.dump (inputTags, fout)
fout.close()
try:
subprocess.check_output(['xrdcp', '-f', outfile, xrootdDestination])
except subprocess.CalledProcessError as e:
print 'Failed to copy SkimInputTags.pkl:', e
shutil.rmtree(tmpDir)
else:
if os.path.exists("SkimInputTags.pkl"):
os.remove("SkimInputTags.pkl")
fout = open ("SkimInputTags.pkl", "w")
pickle.dump (inputTags, fout)
fout.close ()

###############################################################################
# Make submission script for the failed jobs. #
Expand Down Expand Up @@ -250,9 +309,11 @@ def MakeResubmissionScript(badIndices, originalSubmissionScript):
###############################################################################
# Determine whether a skim file is valid. #
###############################################################################
def SkimFileValidator(File):
print "testing ", File
FileToTest = TFile(File)
def SkimFileValidator(skimFile):
if skimFile.startswith('root://'):
FileToTest = TNetXNGFile(skimFile)
else:
FileToTest = TFile(skimFile)
Valid = True
for TreeToTest in ['MetaData', 'ParameterSets', 'Parentage', 'Events', 'LuminosityBlocks', 'Runs']:
Valid = Valid and (FileToTest.Get(TreeToTest) != None)
Expand Down Expand Up @@ -341,21 +402,27 @@ def mergeOneDataset(dataSet, IntLumi, CondorDir, OutputDir="", nThreadsActive =
# check for files that weren't found and were skipped
StdErrFiles = sorted(glob.glob('condor_*.err'))

for file in StdErrFiles:
index = file.split("_")[-1].split(".")[0]
if 'was not found or could not be opened, and will be skipped.' in open(file).read():
for errFile in StdErrFiles:
index = errFile.split("_")[-1].split(".")[0]
if 'was not found or could not be opened, and will be skipped.' in open(errFile).read():
BadIndices.append(index)
if verbose:
print " job" + ' ' * (4-len(str(index))) + index + " had bad/skipped input file"


# check for any corrupted skim output files
skimDirs = [member for member in os.listdir(os.getcwd()) if os.path.isdir(member)]
skimDirs = [member for member in os.listdir(os.getcwd()) if not os.path.isfile(member)]
FilesToRemove = []
for channel in skimDirs:
for skimFile in glob.glob(channel+'/*.root'):
if lpcCAF and os.path.realpath(channel).startswith('/eos/uscms'):
print 'Testing skim files in: root://cmseos.fnal.gov/' + os.path.realpath(channel)
listOfSkimFiles = subprocess.check_output(['xrdfs', 'root://cmseos.fnal.gov', 'ls', os.path.realpath(channel)])
listOfSkimFiles = ['root://cmseos.fnal.gov/' + x for x in listOfSkimFiles.split('\n') if x.endswith('.root')]
else:
print 'Testing skim files in:', os.path.realpath(channel)
listOfSkimFiles = glob.glob(channel + '/*.root')
for skimFile in listOfSkimFiles:
# don't check for good skims of jobs we already know are bad
index = skimFile.split('.')[0].split('_')[1]
index = os.path.basename(skimFile).split('.')[0].split('_')[1]
if index in BadIndices:
continue
Valid, InvalidOrEmpty = SkimFileValidator(skimFile.rstrip('\n'))
Expand All @@ -366,7 +433,6 @@ def mergeOneDataset(dataSet, IntLumi, CondorDir, OutputDir="", nThreadsActive =
if InvalidOrEmpty:
FilesToRemove.append (skimFile)


# check for abnormal condor return values
sys.path.append(directory)
for returnValue in ReturnValues:
Expand Down Expand Up @@ -404,7 +470,7 @@ def mergeOneDataset(dataSet, IntLumi, CondorDir, OutputDir="", nThreadsActive =
if verbose:
print "TotalNumber =", TotalNumber, ", SkimNumber =", SkimNumber
if not TotalNumber:
MakeFilesForSkimDirectory(directory, directoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove)
MakeFilesForSkimDirectory(directory, directoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove, lpcCAF)
return
Weight = 1.0
crossSection = float(datasetInfo.crossSection)
Expand All @@ -422,9 +488,9 @@ def mergeOneDataset(dataSet, IntLumi, CondorDir, OutputDir="", nThreadsActive =
Weight = IntLumi*crossSection/float(TotalNumber)
InputWeightString = MakeWeightsString(Weight, GoodRootFiles)
if runOverSkim:
MakeFilesForSkimDirectory(directory, directoryOut, datasetInfo.originalNumberOfEvents, SkimNumber, BadIndices, FilesToRemove)
MakeFilesForSkimDirectory(directory, directoryOut, datasetInfo.originalNumberOfEvents, SkimNumber, BadIndices, FilesToRemove, lpcCAF)
else:
MakeFilesForSkimDirectory(directory, directoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove)
MakeFilesForSkimDirectory(directory, directoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove, lpcCAF)

if not skipMerging:
threadLog = ""
Expand Down

0 comments on commit d5f9f57

Please sign in to comment.