Skip to content

Commit

Permalink
Added an archiver performance summarizer. #1391
Browse files Browse the repository at this point in the history
  • Loading branch information
mfeit-internet2 committed Jul 19, 2024
1 parent eee073c commit 35e6d7c
Show file tree
Hide file tree
Showing 2 changed files with 243 additions and 0 deletions.
1 change: 1 addition & 0 deletions pscheduler-core/pscheduler-core/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ RAWPROGRAMS=\
pscheduler

COMMANDS=\
archiving-summary \
batch \
cancel \
clock \
Expand Down
242 changes: 242 additions & 0 deletions pscheduler-core/pscheduler-core/archiving-summary
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
#!/usr/bin/env python3
#
# Show a summary of how archiving has been going
#

from dateutil.parser import isoparse

import copy
import datetime
import optparse
import os
import pscheduler
import sys

pscheduler.set_graceful_exit()


#
# Gargle the arguments
#

whoami = os.path.basename(sys.argv[0])
args = sys.argv[1:]


class VerbatimParser(optparse.OptionParser):
def format_epilog(self, formatter):
return self.epilog


opt_parser = VerbatimParser(
usage='Usage: %prog [ OPTIONS ] [ DELTA ]',
epilog=
'''
Example:
check-archiving
Check on archiving for runs in the last hour
check-archiving PT23M
Check on archiving for runs in the last 23 minutes
'''
)
opt_parser.disable_interspersed_args()

opt_parser.add_option('--bind',
help='Make the request from the provided address',
default=None,
action='store', type='string',
dest='bind')

opt_parser.add_option('--host',
help='Check arciving on the named host',
default=pscheduler.api_local_host(),
action='store', type='string',
dest='host')

opt_parser.add_option('--verbose',
help='Add additional information where appropriate',
default=False,
action='store_true',
dest='verbose')


(options, remaining_args) = opt_parser.parse_args(args)


# The one remaining argument should be a delta.

if len(remaining_args) > 1:
opt_parser.print_help()
pscheduler.fail()

delta_text = remaining_args[0] if remaining_args else 'PT1H'

try:
delta = pscheduler.iso8601_as_timedelta(delta_text)
except ValueError as ex:
pscheduler.fail(f'Invalid time delta "{delta_text}": {ex}')


if delta > datetime.timedelta(hours=6):
print(f'Warning: Long time deltas may put a heavy load on the pScheduler server.', file=sys.stderr)


now = pscheduler.time_now()

#
# Fetch the schedule
#

params = {
'start': pscheduler.datetime_as_iso8601(now - delta),
'end': pscheduler.datetime_as_iso8601(now)
}

status, schedule = pscheduler.url_get(
pscheduler.api_url(host=options.host, path='schedule'),
bind=options.bind,
params=params,
throw=False
)

if status != 200:
pscheduler.fail('Server returned status %d: %s' % (status, schedule))



def make_hash(o):
"""
Makes a hash from a dictionary, list, tuple or set to any level, that contains
only other hashable types (including any lists, tuples, sets, and
dictionaries).
Source: https://gist.github.com/charlax/b8731de51d2ea86c6eb9
"""

if isinstance(o, (set, tuple, list)):
return tuple([make_hash(e) for e in o])
elif not isinstance(o, dict):
return hash(o)

new_o = copy.deepcopy(o)
for k, v in new_o.items():
new_o[k] = make_hash(v)

return hash(tuple(frozenset(sorted(new_o.items()))))


total = 0
total_incomplete = 0
total_complete = 0
total_succeeded = 0
total_failed = 0

labels = {}
longest_label = 0

results = {}

for run_item in schedule:

if run_item['state'] != 'finished' or 'archives' not in run_item['task']:
continue

href = run_item['href']

(status, run) = pscheduler.url_get(href, bind=options.bind, throw=False)

archivings = run.get('archivings')

if not archivings:
continue

for archiving in archivings:

total += 1

# Skip those that might finish successfully later
if not archiving['completed']:
total_incomplete += 1
continue

total_complete += 1

archived = archiving['archived']

spec = archiving['spec']

archiver = spec['archiver']
data = spec['data']
# Stuff this in here to make same-data, different-archiver instances unique
data['__archiver'] = archiver
data_hash = make_hash(data)

# Generate a label, which is either what's specified or the
# archiver used. Then number those with the same label
# different data hashes so they're different.

label = spec.get('label', archiver)

if label not in labels:
labels[label] = {}

label_set = labels[label]
if data_hash not in label_set:
label_suffix = f' #{len(label_set)+1}' if len(label_set) else ''
label = f'{label}{label_suffix}'
label_set[data_hash] = label
longest_label = max(longest_label, len(label))
else:
label = label_set[data_hash]


# Add a result for this archiving instance and increment its counters

if data_hash not in results:
results[data_hash] = {
'label': label,
# TODO: Hold the data?
'succeeded': 0,
'failed': 0
}

if archived:
results[data_hash]['succeeded'] += 1
total_succeeded += 1
else:
results[data_hash]['failed'] += 1
total_failed += 1


if not total:
pscheduler.succeed(f'Nothing archived since {now-delta}')

def format_count(count, total):
"""
Format a count and percentage as '12 ( 57%)'
"""
return f'{count:9d} ({round(count/total*100):3d}%)'


def header_line(label_len):
print('---------------- ---------------- ', '-' * label_len)

print('\n Succeeded Failed Label')
header_line(longest_label)

# Sort labels alphabetically
for result in [ v for k, v in sorted(results.items(), key=lambda item: item[1]['label'].lower()) ]:

result_total = result['succeeded'] + result['failed']
print(
f'''{format_count(result['succeeded'], result_total)}'''
f''' {format_count(result['failed'], result_total)}'''
f''' {result['label']}'''
)

header_line(longest_label)
print(f'{format_count(total_succeeded, total_complete)}'
f' {format_count(total_failed, total_complete)}'
f' Total ({total_complete})\n'
)

0 comments on commit 35e6d7c

Please sign in to comment.