Skip to content

Commit

Permalink
Generate computational graph of the analysis (#341)
Browse files Browse the repository at this point in the history
* Generate computational graph of the analysis

* Generating full graph for histmaker

* Checked with mypy
  • Loading branch information
kjvbrt authored Jan 28, 2024
1 parent e932c9e commit bb9370d
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 29 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,7 @@ benchmark*json

# Local configuration
.fccana/*

# Graphviz graphs
*.dot
*.png
11 changes: 10 additions & 1 deletion man/man1/fccanalysis-run.1
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
[\fB\-\-test\fR]
[\fB\-\-bench\fR]
[\fB\-\-ncpus\fR \fINCPUS\fR]
[\fB\-g\fR]
[\fB\-\-graph\-path\fR \fIGRAPH_PATH\fR]
.I analysis-script
.SH DESCRIPTION
.B fccanalysis-run
Expand Down Expand Up @@ -47,8 +49,15 @@ Run over the test file\&.
.B \-\-bench
Output benchmark results to a JSON file\&.
.TP
\-\-ncpus \fINCPUS\fR
\fB\-\-ncpus\fR \fINCPUS\fR
Set number of threads\&.
.TP
.BR \-g ", " \-\-graph
The computational graph of the analysis will be generated\&.
.TP
\fB\-\-graph\-path\fR \fIGRAPH_PATH\fR
Location where the computational graph of the analysis should be stored. Only
paths with \fI.dot\fR and \fI.png\fR extensions are accepted.
.SH ENVIRONMENT VARIABLES
.TP
.B FCCDICTSDIR
Expand Down
11 changes: 11 additions & 0 deletions man/man7/fccanalysis-script.7
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,17 @@ Location of the test file.
.br
Default value: empty string
.TP
\fBgraph\fR (optional)
The computational graph of the analysis will be generated.
.br
Default value: False
.TP
\fBgraphPath\fR (optional)
Location where the computational graph of the analysis should be stored. Only
paths with \fI.dot\fR and \fI.png\fR extensions are accepted.
.br
Default value: empty string
.TP
.B procDict
This variable controls which process dictionary will be used. It can be either
simple file name, absolute path or url. In the case of simple filename, the file
Expand Down
6 changes: 6 additions & 0 deletions python/anascript.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ def get_element(rdf_module, element: str, is_final: bool = False):
'stage of the analysis.', element)
return ''

elif element == 'graph':
return False

elif element == 'graphPath':
return ''

return None


Expand Down
58 changes: 58 additions & 0 deletions python/frame.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
'''
RDataFrame helpers.
'''

import os
import pathlib
import shutil
import logging
import ROOT # type: ignore


ROOT.gROOT.SetBatch(True)

LOGGER: logging.Logger = logging.getLogger('FCCAnalyses.frame')


# _____________________________________________________________________________
def generate_graph(dframe, args, suffix: str | None = None) -> None:
'''
Generate computational graph of the analysis
'''
# Check if output file path is provided
graph_path: pathlib.PurePath = pathlib.PurePath(args.graph_path)
if args.graph_path == '':
graph_path = pathlib.PurePath(args.anascript_path).with_suffix('.dot')

# check if file path ends with "correct" extension
if graph_path.suffix not in ('.dot', '.png'):
LOGGER.warning('Graph output file extension not recognized!\n'
'Using analysis script name...')
graph_path = pathlib.PurePath(args.anascript_path).with_suffix('.dot')

# Add optional suffix to the output file path
if suffix is not None:
graph_path = graph_path.with_name(graph_path.stem +
suffix +
graph_path.suffix) # extension

# Announce to which files graph will be saved
if shutil.which('dot') is None:
LOGGER.info('Analysis computational graph will be saved into:\n - %s',
graph_path.with_suffix('.dot'))
else:
LOGGER.info('Analysis computational graph will be saved into:\n - %s\n - %s',
graph_path.with_suffix('.dot'),
graph_path.with_suffix('.png'))

# Generate graph in .dot format
ROOT.RDF.SaveGraph(dframe, str(graph_path.with_suffix('.dot')))

if shutil.which('dot') is None:
LOGGER.warning('PNG version of the computational graph will not be '
'generated.\nGraphviz library not found!')
return

# Convert .dot file into .png
os.system(f'dot -Tpng {graph_path.with_suffix(".dot")} '
f'-o {graph_path.with_suffix(".png")}')
7 changes: 7 additions & 0 deletions python/frame.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# generated with `stubgen frame.py`

import logging

LOGGER: logging.Logger

def generate_graph(dframe, args, suffix: str | None = None) -> None: ...
10 changes: 10 additions & 0 deletions python/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ def setup_run_parser(parser):
help='output benchmark results to a JSON file')
parser.add_argument('--ncpus', type=int, default=-1,
help='set number of threads')
parser.add_argument('-g', '--graph', action='store_true', default=False,
help='generate computational graph of the analysis')
parser.add_argument('--graph-path', type=str, default='',
help='analysis graph save path, should end with '
'\'.dot\' or \'.png\'')

# Internal argument, not to be used by the users
parser.add_argument('--batch', action='store_true', default=False,
Expand All @@ -126,6 +131,11 @@ def setup_run_parser_final(parser):
'''
parser.add_argument('anascript_path',
help='path to analysis_final script')
parser.add_argument('-g', '--graph', action='store_true', default=False,
help='generate computational graph of the analysis')
parser.add_argument('--graph-path', type=str, default='',
help='analysis graph save path, should end with '
'\'.dot\' or \'.png\'')


def setup_run_parser_plots(parser):
Expand Down
13 changes: 7 additions & 6 deletions python/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,23 @@ def get_process_info_files(process: str, input_dir: str) -> tuple[list[str],
return filelist, eventlist


def get_process_info_yaml(process: str, prod_tag: str) -> tuple[list[str],
list[int]]:
def get_process_info_yaml(process_name: str,
prod_tag: str) -> tuple[list[str],
list[int]]:
'''
Get list of files and events from the YAML file
'''
doc = None
proc_dict_dirs = get_process_dict_dirs()
yamlfilepath = None
for path in proc_dict_dirs:
yamlfilepath = os.path.join(path, 'yaml', prod_tag, process,
yamlfilepath = os.path.join(path, 'yaml', prod_tag, process_name,
'merge.yaml')
if not os.path.isfile(yamlfilepath):
continue
if not yamlfilepath:
LOGGER.error('Can\'t find the YAML file with process info!\n'
'Aborting...')
if not os.path.isfile(yamlfilepath):
LOGGER.error('Can\'t find the YAML file with process info for process '
'"%s"!\nAborting...', process_name)
sys.exit(3)

with open(yamlfilepath, 'r', encoding='utf-8') as ftmp:
Expand Down
2 changes: 1 addition & 1 deletion python/process.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ LOGGER: logging.Logger
def get_entries(inpath: str) -> int: ...
def get_process_info(process: str, prod_tag: str, input_dir: str) -> tuple[list[str], list[int]]: ...
def get_process_info_files(process: str, input_dir: str) -> tuple[list[str], list[int]]: ...
def get_process_info_yaml(process: str, prod_tag: str) -> tuple[list[str], list[int]]: ...
def get_process_info_yaml(process_name: str, prod_tag: str) -> tuple[list[str], list[int]]: ...
def get_process_dict(proc_dict_location: str) -> dict: ...
def get_process_dict_dirs() -> list[str]: ...
45 changes: 33 additions & 12 deletions python/run_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import os
import sys
import time
import json
import shutil
import json
import logging
import subprocess
import importlib.util
Expand All @@ -16,6 +16,7 @@
import ROOT # type: ignore
from anascript import get_element, get_element_dict
from process import get_process_info, get_process_dict
from frame import generate_graph

LOGGER = logging.getLogger('FCCAnalyses.run')

Expand Down Expand Up @@ -158,24 +159,26 @@ def create_subjob_script(local_dir: str,


# _____________________________________________________________________________
def get_subfile_list(in_file_list, event_list, fraction):
def get_subfile_list(in_file_list: list[str],
event_list: list[int],
fraction: float) -> list[str]:
'''
Obtain list of files roughly containing the requested fraction of events.
'''
nevts_total = sum(event_list)
nevts_target = int(nevts_total * fraction)
nevts_total: int = sum(event_list)
nevts_target: int = int(nevts_total * fraction)

if nevts_target <= 0:
LOGGER.error('The reduction fraction %f too stringent, no events '
'left!\nAborting...', fraction)
sys.exit(3)

nevts_real = 0
out_file_list = []
for i in enumerate(event_list):
nevts_real: int = 0
out_file_list: list[str] = []
for i, nevts in enumerate(event_list):
if nevts_real >= nevts_target:
break
nevts_real += event_list[i]
nevts_real += nevts
out_file_list.append(in_file_list[i])

info_msg = f'Reducing the input file list by fraction "{fraction}" of '
Expand Down Expand Up @@ -325,13 +328,19 @@ def run_rdf(rdf_module,
for bname in blist:
branch_list.push_back(bname)

evtcount = df2.Count()

# Generate computational graph of the analysis
if args.graph:
generate_graph(df2, args)

df2.Snapshot("events", out_file, branch_list)
except Exception as excp:
LOGGER.error('During the execution of the analysis file exception '
'occurred:\n%s', excp)
sys.exit(3)

return df2.Count()
return evtcount


# _____________________________________________________________________________
Expand Down Expand Up @@ -749,13 +758,18 @@ def run_histmaker(args, rdf_module, anapath):
info_msg += f'\n\toutput = {output}\n\tchunks = {chunks}'
LOGGER.info(info_msg)

df = ROOT.ROOT.RDataFrame("events", file_list_root)
evtcount = df.Count()
res, hweight = graph_function(df, process)
dframe = ROOT.ROOT.RDataFrame("events", file_list_root)
evtcount = dframe.Count()

res, hweight = graph_function(dframe, process)
results.append(res)
hweights.append(hweight)
evtcounts.append(evtcount)

# Generate computational graph of the analysis
if args.graph:
generate_graph(dframe, args)

LOGGER.info('Starting the event loop...')
start_time = time.time()
ROOT.ROOT.RDF.RunGraphs(evtcounts)
Expand Down Expand Up @@ -906,6 +920,13 @@ def run(parser):
rdf_module = importlib.util.module_from_spec(rdf_spec)
rdf_spec.loader.exec_module(rdf_module)

# Merge configuration from analysis script file with command line arguments
if get_element(rdf_module, 'graph'):
args.graph = True

if get_element(rdf_module, 'graphPath') != '':
args.graph_path = get_element(rdf_module, 'graphPath')

if hasattr(rdf_module, "build_graph") and \
hasattr(rdf_module, "RDFanalysis"):
LOGGER.error('Analysis file ambiguous!\nBoth "RDFanalysis" '
Expand Down
27 changes: 18 additions & 9 deletions python/run_final_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import ROOT # type: ignore
from anascript import get_element, get_element_dict
from process import get_process_dict
from frame import generate_graph

LOGGER = logging.getLogger('FCCAnalyses.run_final')

Expand Down Expand Up @@ -70,7 +71,7 @@ def testfile(f: str) -> bool:


# __________________________________________________________
def run(rdf_module):
def run(rdf_module, args):
'''
Main loop.
'''
Expand Down Expand Up @@ -117,7 +118,7 @@ def run(rdf_module):
if not os.path.exists(output_dir) and output_dir != '':
os.system(f'mkdir -p {output_dir}')

cut_list = get_element(rdf_module, "cutList", True)
cut_list: dict[str, str] = get_element(rdf_module, "cutList", True)
length_cuts_names = max(len(cut) for cut in cut_list)
cut_labels = get_element(rdf_module, "cutLabels", True)

Expand Down Expand Up @@ -206,12 +207,13 @@ def run(rdf_module):

# Define all histos, snapshots, etc...
LOGGER.info('Defining snapshots and histograms')
for cut in cut_list:
for cut_name, cut_definition in cut_list.items():
# output file for tree
fout = output_dir + process_name + '_' + cut + '.root'
fout = output_dir + process_name + '_' + cut_name + '.root'
fout_list.append(fout)

df_cut = df.Filter(cut_list[cut])
df_cut = df.Filter(cut_definition)

count_list.append(df_cut.Count())

histos = []
Expand Down Expand Up @@ -270,6 +272,9 @@ def run(rdf_module):
# the snapshot
tdf_list.append(snapshot_tdf)

if args.graph:
generate_graph(df, args)

# Now perform the loop and evaluate everything at once.
LOGGER.info('Evaluating...')
all_events = df.Count().GetValue()
Expand Down Expand Up @@ -407,11 +412,8 @@ def run(rdf_module):
' \\begin{tabular}{|l||')
outfile.write('c|' * (len(cuts_list)-1))
outfile.write('} \\hline\n')
print(eff_list)
for i in range(len(eff_list)):
outfile.write(' ')
print('i:', i)
print(efficiency_list)
v = [row[i] for row in efficiency_list]
outfile.write(' & '.join(str(v)))
outfile.write(' \\\\\n')
Expand Down Expand Up @@ -493,4 +495,11 @@ def run_final(parser):
rdf_module = importlib.util.module_from_spec(rdf_spec)
rdf_spec.loader.exec_module(rdf_module)

run(rdf_module)
# Merge configuration from analysis script file with command line arguments
if get_element(rdf_module, 'graph'):
args.graph = True

if get_element(rdf_module, 'graphPath') != '':
args.graph_path = get_element(rdf_module, 'graphPath')

run(rdf_module, args)

0 comments on commit bb9370d

Please sign in to comment.