Skip to content

Commit

Permalink
Merge pull request #1 from DarkEnergySurvey/addingMigrationScript
Browse files Browse the repository at this point in the history
Adding migration script
  • Loading branch information
astro-friedel authored Jul 18, 2022
2 parents c2f1b38 + 96038a8 commit 00120a8
Show file tree
Hide file tree
Showing 16 changed files with 2,549 additions and 1,243 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,5 @@ dmypy.json

# Pyre type checker
.pyre/

.idea/
169 changes: 169 additions & 0 deletions bin/compact_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
#!/usr/bin/env python3

import sys
import argparse
import signal
import math
import copy
import curses
import queue
import time
import multiprocessing as mp
import datetime

from filemgmt import fmutils
from filemgmt import compact_utils as cu


def parse_cmd_line(argv):
""" Parse command line arguments
Parameters
----------
args : command line arguments
Returns
-------
Dictionary continaing the command line arguments
"""
epilog = """\
The files to be migrated can be specified in multiple ways:
- pfwid will select all files from the given pfw_attempot_id(s), note that multiple
pfw_attempt_ids can be given as a comma separated list
- tag will select all files from all pfw_attempt_ids linked to the given tag
All of the above are mutually exclusive selection criteria.
The use of the date_range argument can be used to narrow the selection criteria.
The following are all valid ways to select the files:
--pfwid 123456
--pfwid 123456,789012,345678
--tag Y1A2_JUNK
"""

parser = argparse.ArgumentParser(description='Migrate files from one filesystem to another, performing integrity checks of all files',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=epilog)
parser.add_argument('--des_services', action='store', help='Services file.')
parser.add_argument('--section', '-s', action='store', help='Must be specified if DES_DB_SECTION is not set in environment')
parser.add_argument('--archive', action='store', default='desar2home', help='Archive_name from file_archive_info table. Only use if compressing logs in the archive.')
parser.add_argument('--verbose', action='store_true', help='print differences between db and disk')
parser.add_argument('--debug', action='store_true', help='print all files, recommend >= 300 char wide terminal')
parser.add_argument('--script', action='store_true', help='Print only if there are errors, usefule for running in loops in scripts')
parser.add_argument('--pfwid', action='store', help='pfw attempt id to search for')
parser.add_argument('--silent', action='store_true', help='Run with minimal printing, only print ERROR or OK')
parser.add_argument('--tag', action='store', help='Compare all data from a specific tag (this can take a long time)')
parser.add_argument('--dbh', action='store', help=argparse.SUPPRESS) # used internally
parser.add_argument('--log', action='store', help='Log file to write to, default is to write to sdtout')
parser.add_argument('--parallel', action='store', help='Specify the parallelization of the work, e.g. 3 would spread the work across 3 subprocesses.', type=int, default=1)
parser.add_argument('--live', action='store_true', help='Used to specify running on a live system')
parser.add_argument('--tarfile', action='store', help='Name of the tar file to create, if not specified one based on the reqnum, unitname, and attnum will be used.', default=None)
cargs = parser.parse_args(argv)
if cargs.script:
cargs.verbose = False
return cargs


def main():
""" Main program module
"""
start = datetime.datetime.now()
args = parse_cmd_line(sys.argv[1:])
if args.log is not None:
stdp = fmutils.Print(args.log)
sys.stdout = stdp
(args, pfwids) = fmutils.determine_ids(args)
if args.live:
if not args.pfwid:
raise Exception("pfwid must be specified")
cul = cu.CompactLogs(0, args, [args.pfwid], None)
cul.run()
return 0
manager = mp.Manager()
event = manager.Event()

def interrupt(x, y):
event.set()

signal.signal(signal.SIGINT, interrupt)

if args.parallel <= 1:
cul = cu.CompactLogs(0, args, pfwids, event)
cul.run()
else:
if args.parallel > 8:
args.parallel = 8
args.dbh.close()
args.dbh = None
npids = len(pfwids)
jobs = []
for _ in range(args.parallel):
jobs.append([])
pos = 0
while pfwids:
jobs[pos].append(pfwids.pop())
pos += 1
if pos >= args.parallel:
pos = 0
manager = mp.Manager()
queu = manager.Queue()
done = [False] * len(jobs)
wins = []
errors = {}
try:
stdscr = curses.initscr()
curses.cbreak()
num_rows, num_cols = stdscr.getmaxyx()
step = math.floor(num_rows/len(jobs))
for i in range(len(jobs)):
wins.append(curses.newwin(step, num_cols, i*step, 0))

with mp.Pool(processes=len(jobs), maxtasksperchild=1) as pool:
_ = [pool.apply_async(fmutils.run, args=((cu.CompactLogs, i, copy.deepcopy(args), jobs[i], event, queu,),), error_callback=fmutils.results_error) for i in range(len(jobs))]
pool.close()
#pool.join()
while not all(done):
while True:
try:
ms = queu.get_nowait()
if ms.err:
if ms.pfwid not in errors:
errors[ms.pfwid] = []
errors[ms.pfwid].append(ms.msg)
continue
if ms.msg == fmutils.COMPLETE:
done[ms.win] = True
wins[ms.win].clear()
wins[ms.win].addstr("Complete\n")
elif ms.msg is not None:
wins[ms.win].clear()
wins[ms.win].addstr(ms.msg + '\n')
else:
fmutils.printProgressBar(wins[ms.win], ms.iteration, ms.count)
wins[ms.win].refresh()
except queue.Empty:
break
time.sleep(0.2)
finally:
curses.endwin()
if errors:
print(f"Issues were encountered in {len(errors)}/{npids} jobs.")
for pid, msgs in errors.items():
print(f"pfwid: {pid}")
for m in msgs:
m = m.strip()
print(f" {m}")
else:
print("All tasks accomplished")

end = datetime.datetime.now()
duration = end-start
print(f"\nJob took {duration.total_seconds():.1f} seconds")

if args.log is not None:
sys.stdout.flush()
sys.stdout = stdp.close()
sys.exit(0)

if __name__ == "__main__":
main()
45 changes: 6 additions & 39 deletions bin/compare_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,50 +7,15 @@
import sys
import argparse
import filemgmt.compare_utils as compare

class Print:
""" Class to capture printed output and write it to a log file
Parameters
----------
logfile : str
The log file to write to
"""
def __init__(self, logfile):
self.old_stdout = sys.stdout
self.logfile = open(logfile, 'w')

def write(self, text):
""" Method to capture, reformat, and write out the requested text
Parameters
----------
test : str
The text to reformat
"""
self.logfile.write(text)

def close(self):
""" Method to return stdout to its original handle
"""
return self.old_stdout

def flush(self):
""" Method to force the buffer to flush
"""
self.old_stdout.flush()
from filemgmt import fmutils


def parse_cmd_line(argv):
""" Parse command line arguments
Parameters
----------
args : command line arguments
argv : command line arguments
Returns
-------
Expand Down Expand Up @@ -112,9 +77,11 @@ def main():
"""
args = parse_cmd_line(sys.argv[1:])
if args.log is not None:
stdp = Print(args.log)
stdp = compare.Print(args.log)
sys.stdout = stdp
ret = compare.run_compare(args)
(args, pfwids) = fmutils.determine_ids(args)
comp = compare.FileCompare(args, pfwids)
ret = compare.run()
if args.log is not None:
sys.stdout.flush()
sys.stdout = stdp.close()
Expand Down
Loading

0 comments on commit 00120a8

Please sign in to comment.