Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix merging lpc #96

Merged
merged 4 commits into from
Jan 28, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 126 additions & 60 deletions Configuration/python/mergeUtilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
import shutil
import math
import socket
import tempfile
from threading import Thread, Lock, Semaphore
from multiprocessing import cpu_count
from OSUT3Analysis.Configuration.configurationOptions import *
from OSUT3Analysis.Configuration.processingUtilities import *
from OSUT3Analysis.Configuration.formattingUtilities import *
from OSUT3Analysis.DBTools.condorSubArgumentsSet import *
import FWCore.ParameterSet.Config as cms
from ROOT import TFile
from ROOT import TFile, TNetXNGFile

NTHREADS_FOR_BATCH_MERGING = 1

Expand Down Expand Up @@ -86,6 +87,7 @@ def MessageDecoder(Message, Good):
###############################################################################
def GetGoodRootFiles(Index):
return " ".join (glob.glob ("*_" + str (Index) + ".root"))

def MakeWeightsString(Weight,FilesSet):
Str = ''
for i in range(0,len(FilesSet)):
Expand All @@ -94,16 +96,17 @@ def MakeWeightsString(Weight,FilesSet):
else:
Str = Str + ',' + str(Weight)
return Str

###############################################################################
# Get the total number of events from cutFlows to calculate the weights #
###############################################################################
def GetNumberOfEvents(FilesSet):
NumberOfEvents = {'SkimNumber' : {}, 'TotalNumber' : 0}
for File in FilesSet:
ScoutFile = TFile(File)
for histFile in FilesSet:
ScoutFile = TFile(histFile)
if ScoutFile.IsZombie():
print File + " is a bad root file."
FilesSet.remove(File)
print histFile + " is a bad root file."
FilesSet.remove(histFile)
continue
randomChannelDirectory = ""
TotalNumberTmp = 0
Expand All @@ -118,10 +121,10 @@ def GetNumberOfEvents(FilesSet):
SkimCounterObj = ScoutFile.Get(randomChannelDirectory + "/cutFlow")
TotalNumberTmp = 0
if not OriginalCounterObj:
print "Could not find eventCounter histogram in " + str(File) + " !"
print "Could not find eventCounter histogram in " + str(histFile) + " !"
continue
elif not SkimCounterObj:
print "Could not find cutFlow histogram in " + str(File) + " !"
print "Could not find cutFlow histogram in " + str(histFile) + " !"
else:
OriginalCounter = OriginalCounterObj.Clone()
OriginalCounter.SetDirectory(0)
Expand All @@ -131,54 +134,98 @@ def GetNumberOfEvents(FilesSet):
NumberOfEvents['SkimNumber'][channelName] = NumberOfEvents['SkimNumber'][channelName] + SkimCounter.GetBinContent(SkimCounter.GetXaxis().GetNbins())
NumberOfEvents['TotalNumber'] = NumberOfEvents['TotalNumber'] + TotalNumberTmp
return NumberOfEvents

###############################################################################
# Produce important files for the skim directory. #
###############################################################################
def MakeFilesForSkimDirectory(Directory, DirectoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove):
print "in MakeFilesForSkimDirectory"
def MakeFilesForSkimDirectoryEOS(Member, Directory, DirectoryOut, TotalNumber, SkimNumber, BadIndices):
xrootdDestination = 'root://cmseos.fnal.gov/' + os.path.realpath(DirectoryOut + '/' + Member + '/')[len('/eos/uscms'):]
tmpDir = tempfile.mkdtemp()
outfile = os.path.join(tmpDir, 'OriginalNumberOfEvents.txt')
fout = open (outfile, 'w')
fout.write(str(TotalNumber) + '\n')
fout.close()
try:
subprocess.check_output(['xrdcp', '-f', outfile, xrootdDestination])
except subprocess.CalledProcessError as e:
print 'Failed to copy OriginalNumberOfEvents.txt:', e
outfile = os.path.join(tmpDir, 'SkimNumberOfEvents.txt')
fout = open(outfile, 'w')
fout.write(str(SkimNumber) + '\n')
fout.close()
try:
subprocess.check_output(['xrdcp', '-f', outfile, xrootdDestination])
except subprocess.CalledProcessError as e:
print 'Failed to copy SkimNumber.txt:', e
shutil.rmtree(tmpDir)

listOfSkimFiles = subprocess.check_output(['xrdfs', 'root://cmseos.fnal.gov', 'ls', os.path.realpath(DirectoryOut + '/' + Member)])
listOfSkimFiles = [x for x in listOfSkimFiles.split('\n') if x.endswith('.root')]
for skimFile in listOfSkimFiles:
index = os.path.basename(skimFile).split('.')[0].split('_')[1]
if index in BadIndices:
continue
GetSkimInputTags(skimFile, xrootdDestination)
break

def MakeFilesForSkimDirectory(Directory, DirectoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove, lpcCAF = False):
for Member in os.listdir(Directory):
if os.path.isfile(os.path.join(Directory, Member)):
continue;
try:
os.makedirs (DirectoryOut + '/' + Member)
except OSError:
pass
outfile = os.path.join(DirectoryOut, Member, 'OriginalNumberOfEvents.txt')
try:
os.unlink (outfile)
except OSError:
pass
fout = open (outfile, "w")
fout.write (str (TotalNumber) + "\n")
fout.close ()
outfile = os.path.join(DirectoryOut, Member, 'SkimNumberOfEvents.txt')
try:
os.unlink (outfile)
except OSError:
pass
fout = open (outfile, "w")
fout.write (str (SkimNumber[Member]) + "\n")
fout.close ()
os.chdir(Directory + '/' + Member)
listOfSkimFiles = glob.glob('*.root')
sys.path.append(Directory + '/' + Member)
createdSkimInputTags = False
for file in listOfSkimFiles:
index = file.split('.')[0].split('_')[1]
if index in BadIndices:
continue
if not createdSkimInputTags:
GetSkimInputTags(file.rstrip('\n'))
createdSkimInputTags = True
os.chdir(Directory)
for file in FilesToRemove:
os.unlink (file)

if lpcCAF and os.path.realpath(DirectoryOut + '/' + Member).startswith('/eos/uscms'):
MakeFilesForSkimDirectoryEOS(Member, Directory, DirectoryOut, TotalNumber, SkimNumber, BadIndices)
else:
outfile = os.path.join(DirectoryOut, Member, 'OriginalNumberOfEvents.txt')
try:
os.unlink (outfile)
except OSError:
pass
fout = open (outfile, "w")
fout.write (str (TotalNumber) + "\n")
fout.close ()
outfile = os.path.join(DirectoryOut, Member, 'SkimNumberOfEvents.txt')
try:
os.unlink (outfile)
except OSError:
pass
fout = open (outfile, "w")
fout.write (str (SkimNumber[Member]) + "\n")
fout.close ()

os.chdir(Directory + '/' + Member)
listOfSkimFiles = glob.glob('*.root')
sys.path.append(Directory + '/' + Member)
for skimFile in listOfSkimFiles:
index = skimFile.split('.')[0].split('_')[1]
if index in BadIndices:
continue
GetSkimInputTags(skimFile.rstrip('\n'))
break
os.chdir(Directory)

for skimFile in FilesToRemove:
if lpcCAF and skimFile.startswith('root://cmseos.fnal.gov/'):
try:
subprocess.check_output(['xrdfs', 'root://cmseos.fnal.gov', 'rm', skimFile[len('root://cmseos.fnal.gov/'):]])
except subprocess.CalledProcessError as e:
print 'Failed to remove file', skimFile, ':', e
else:
os.unlink(skimFile)

###############################################################################
# Produce a pickle file containing the skim input tags. #
###############################################################################
def GetSkimInputTags(File):
def GetSkimInputTags(File, xrootdDestination = ""):
print "Getting skim input tags..."
eventContent = subprocess.check_output (["edmDumpEventContent", "--all", os.getcwd () + "/" + File])
if xrootdDestination != "":
eventContent = subprocess.check_output (["edmDumpEventContent", "--all", "root://cmseos.fnal.gov/" + File[len('/eos/uscms'):]])
else:
eventContent = subprocess.check_output (["edmDumpEventContent", "--all", os.getcwd () + "/" + File])
parsing = False
cppTypes = []
inputTags = {}
Expand Down Expand Up @@ -208,11 +255,23 @@ def GetSkimInputTags(File):
if "OSUAnalysis" in thisTag.getProcessName ():
inputTags[collectionType] = thisTag

if os.path.exists("SkimInputTags.pkl"):
os.remove("SkimInputTags.pkl")
fout = open ("SkimInputTags.pkl", "w")
pickle.dump (inputTags, fout)
fout.close ()
if xrootdDestination != "":
tmpDir = tempfile.mkdtemp()
outfile = os.path.join(tmpDir, 'SkimInputTags.pkl')
fout = open (outfile, 'w')
pickle.dump (inputTags, fout)
fout.close()
try:
subprocess.check_output(['xrdcp', '-f', outfile, xrootdDestination])
except subprocess.CalledProcessError as e:
print 'Failed to copy SkimInputTags.pkl:', e
shutil.rmtree(tmpDir)
else:
if os.path.exists("SkimInputTags.pkl"):
os.remove("SkimInputTags.pkl")
fout = open ("SkimInputTags.pkl", "w")
pickle.dump (inputTags, fout)
fout.close ()

###############################################################################
# Make submission script for the failed jobs. #
Expand Down Expand Up @@ -250,9 +309,11 @@ def MakeResubmissionScript(badIndices, originalSubmissionScript):
###############################################################################
# Determine whether a skim file is valid. #
###############################################################################
def SkimFileValidator(File):
print "testing ", File
FileToTest = TFile(File)
def SkimFileValidator(skimFile):
if skimFile.startswith('root://'):
FileToTest = TNetXNGFile(skimFile)
else:
FileToTest = TFile(skimFile)
Valid = True
for TreeToTest in ['MetaData', 'ParameterSets', 'Parentage', 'Events', 'LuminosityBlocks', 'Runs']:
Valid = Valid and (FileToTest.Get(TreeToTest) != None)
Expand Down Expand Up @@ -341,21 +402,27 @@ def mergeOneDataset(dataSet, IntLumi, CondorDir, OutputDir="", nThreadsActive =
# check for files that weren't found and were skipped
StdErrFiles = sorted(glob.glob('condor_*.err'))

for file in StdErrFiles:
index = file.split("_")[-1].split(".")[0]
if 'was not found or could not be opened, and will be skipped.' in open(file).read():
for errFile in StdErrFiles:
index = errFile.split("_")[-1].split(".")[0]
if 'was not found or could not be opened, and will be skipped.' in open(errFile).read():
BadIndices.append(index)
if verbose:
print " job" + ' ' * (4-len(str(index))) + index + " had bad/skipped input file"


# check for any corrupted skim output files
skimDirs = [member for member in os.listdir(os.getcwd()) if os.path.isdir(member)]
skimDirs = [member for member in os.listdir(os.getcwd()) if not os.path.isfile(member)]
FilesToRemove = []
for channel in skimDirs:
for skimFile in glob.glob(channel+'/*.root'):
if lpcCAF and os.path.realpath(channel).startswith('/eos/uscms'):
print 'Testing skim files in: root://cmseos.fnal.gov/' + os.path.realpath(channel)
listOfSkimFiles = subprocess.check_output(['xrdfs', 'root://cmseos.fnal.gov', 'ls', os.path.realpath(channel)])
listOfSkimFiles = ['root://cmseos.fnal.gov/' + x for x in listOfSkimFiles.split('\n') if x.endswith('.root')]
else:
print 'Testing skim files in:', os.path.realpath(channel)
listOfSkimFiles = glob.glob(channel + '/*.root')
for skimFile in listOfSkimFiles:
# don't check for good skims of jobs we already know are bad
index = skimFile.split('.')[0].split('_')[1]
index = os.path.basename(skimFile).split('.')[0].split('_')[1]
if index in BadIndices:
continue
Valid, InvalidOrEmpty = SkimFileValidator(skimFile.rstrip('\n'))
Expand All @@ -366,7 +433,6 @@ def mergeOneDataset(dataSet, IntLumi, CondorDir, OutputDir="", nThreadsActive =
if InvalidOrEmpty:
FilesToRemove.append (skimFile)


# check for abnormal condor return values
sys.path.append(directory)
for returnValue in ReturnValues:
Expand Down Expand Up @@ -404,7 +470,7 @@ def mergeOneDataset(dataSet, IntLumi, CondorDir, OutputDir="", nThreadsActive =
if verbose:
print "TotalNumber =", TotalNumber, ", SkimNumber =", SkimNumber
if not TotalNumber:
MakeFilesForSkimDirectory(directory, directoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove)
MakeFilesForSkimDirectory(directory, directoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove, lpcCAF)
return
Weight = 1.0
crossSection = float(datasetInfo.crossSection)
Expand All @@ -422,9 +488,9 @@ def mergeOneDataset(dataSet, IntLumi, CondorDir, OutputDir="", nThreadsActive =
Weight = IntLumi*crossSection/float(TotalNumber)
InputWeightString = MakeWeightsString(Weight, GoodRootFiles)
if runOverSkim:
MakeFilesForSkimDirectory(directory, directoryOut, datasetInfo.originalNumberOfEvents, SkimNumber, BadIndices, FilesToRemove)
MakeFilesForSkimDirectory(directory, directoryOut, datasetInfo.originalNumberOfEvents, SkimNumber, BadIndices, FilesToRemove, lpcCAF)
else:
MakeFilesForSkimDirectory(directory, directoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove)
MakeFilesForSkimDirectory(directory, directoryOut, TotalNumber, SkimNumber, BadIndices, FilesToRemove, lpcCAF)

if not skipMerging:
threadLog = ""
Expand Down