Skip to content

Commit

Permalink
Rename output files (#55)
Browse files Browse the repository at this point in the history
* rename existing output file to match current job definition

* Fix issue renaming files in state.json

* update path to output file

* rename first file

* report files that didn't stage out

* fix key error

* update first job report

* log renaming of output files to recover if preempted

* read commitlog to recover from error when generating jobReport

* write final output files into a separate directory

* Use commit log when renaming output files

* fix unit tests

* Create jobdir regardless of how many events we need to process

* revert pilot src
  • Loading branch information
esseivaju authored Oct 13, 2023
1 parent 92622c4 commit 0790431
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 21 deletions.
9 changes: 8 additions & 1 deletion src/raythena/actors/esworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,15 @@ def stageout_event_service_files(
self.output_dir,
os.path.basename(cfile) if os.path.isabs(cfile) else cfile)
if os.path.isfile(cfile):
os.replace(cfile, dst)
try:
os.replace(cfile, dst)
except OSError as e:
self._logger.error(f"Failed to move file {cfile} to {dst}: errno {e.errno}: {e.strerror}")
raise StageOutFailed(self.id)
range_update[cfile_key] = dst
else:
self._logger.warning(f"Couldn't stageout file {cfile} as it doesn't exist")
raise StageOutFailed(self.id)
return ranges

def get_payload_message(self) -> Optional[WorkerResponse]:
Expand Down
59 changes: 52 additions & 7 deletions src/raythena/drivers/esdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ def __init__(self, config: Config, session_dir: str) -> None:
workdir = os.getcwd()
self.config.ray['workdir'] = workdir
self.workdir = workdir
self.output_dir = str()
self.merged_files_dir = str()
logfile = self.config.logging.get("driverlogfile", None)
if logfile:
log_to_file(self.config.logging.get("level", None), logfile)
Expand Down Expand Up @@ -493,13 +495,18 @@ def setup_dirs(self):
self.tar_merge_es_output_dir = self.output_dir
self.tar_merge_es_files_dir = self.output_dir
self.job_reports_dir = os.path.join(self.output_dir, "job-reports")
self.merged_files_dir = os.path.join(self.output_dir, "final")
self.bookKeeper.output_dir = self.output_dir
self.bookKeeper.merged_files_dir = self.merged_files_dir
self.config_remote = ray.put(self.config)
# create the output directories if needed
try:
if not os.path.isdir(self.output_dir):
os.mkdir(self.output_dir)
if not os.path.isdir(self.job_reports_dir):
os.mkdir(self.job_reports_dir)
if not os.path.isdir(self.merged_files_dir):
os.mkdir(self.merged_files_dir)
if not os.path.isdir(self.tar_merge_es_output_dir):
os.mkdir(self.tar_merge_es_output_dir)
if not os.path.isdir(self.tar_merge_es_files_dir):
Expand Down Expand Up @@ -549,12 +556,11 @@ def run(self) -> None:
return
job_id = self.bookKeeper.jobs.next_job_id_to_process()
total_events = self.bookKeeper.n_ready(job_id)
os.makedirs(os.path.join(self.config.ray['workdir'], job_id))
if total_events:
self.available_events_per_actor = max(1, ceil(total_events / self.n_actors))
for pandaID in self.bookKeeper.jobs:
cjob = self.bookKeeper.jobs[pandaID]
os.makedirs(
os.path.join(self.config.ray['workdir'], cjob['PandaID']))
self.remote_jobdef_byid[pandaID] = ray.put(cjob)

self.create_actors()
Expand Down Expand Up @@ -592,13 +598,28 @@ def run(self) -> None:
self.bookKeeper.save_status()
task_status = self.bookKeeper.taskstatus.get(self.panda_taskid, None)
if task_status and task_status.get_nmerged() + task_status.get_nfailed() == task_status.total_events():
self.produce_final_report()
assert job_id
output_map = self.bookKeeper.remap_output_files(job_id)
self.rename_output_files(output_map)
self.produce_final_report(output_map)
self.communicator.stop()
# self.cpu_monitor.stop()
self.bookKeeper.print_status()
self._logger.debug("All driver threads stopped. Quitting...")

def produce_final_report(self):
def rename_output_files(self, output_map: Dict[str, str]):
"""
Rename final output files
"""
for file in os.listdir(self.merged_files_dir):
try:
new_filename = output_map[file]
except KeyError:
# read the commit log to recover the correct name. If we get another KeyError, we can't recover
new_filename = output_map[self.bookKeeper.recover_outputfile_name(file)]
os.rename(os.path.join(self.merged_files_dir, file), os.path.join(self.merged_files_dir, new_filename))

def produce_final_report(self, output_map: Dict[str, str]):
"""
Merge job reports from individual merge transforms to produce the final jobReport for Panda.
"""
Expand All @@ -610,11 +631,35 @@ def produce_final_report(self):
with open(os.path.join(self.job_reports_dir, files[0]), 'r') as f:
final_report = json.load(f)
final_report_files = final_report["files"]

# rename first file on disk and in report
output_file_entry = final_report_files["output"][0]["subFiles"][0]
old_filename = output_file_entry["name"]
try:
new_filename = output_map[old_filename]
except KeyError:
# read the commit log to recover the correct name. If we get another KeyError, we can't recover
new_filename = output_map[self.bookKeeper.recover_outputfile_name(old_filename)]
output_file_entry["name"] = new_filename
with open(os.path.join(self.job_reports_dir, files[0]), 'w') as f:
json.dump(final_report, f)

for file in files[1:]:
with open(os.path.join(self.job_reports_dir, file), 'r') as f:
current_file = os.path.join(self.job_reports_dir, file)
with open(current_file, 'r') as f:
current_report = json.load(f)
final_report_files["input"].append(current_report["files"]["input"][0])
final_report_files["output"][0]["subFiles"].append(current_report["files"]["output"][0]["subFiles"][0])
output_file_entry = current_report["files"]["output"][0]["subFiles"][0]
old_filename = output_file_entry["name"]
try:
new_filename = output_map[old_filename]
except KeyError:
# read the commit log to recover the correct name. If we get another KeyError, we can't recover
new_filename = output_map[self.bookKeeper.recover_outputfile_name(old_filename)]
output_file_entry["name"] = new_filename
final_report_files["output"][0]["subFiles"].append(output_file_entry)
with open(current_file, 'w') as f:
json.dump(current_report, f)

tmp = os.path.join(self.workdir, self.jobreport_name + ".tmp")
with open(tmp, 'w') as f:
Expand Down Expand Up @@ -740,7 +785,7 @@ def hits_merge_transform(self, input_files: Iterable[str], output_file: str) ->
tmp_dir = tempfile.mkdtemp()
file_list = ",".join(input_files)
job_report_name = os.path.join(self.job_reports_dir, output_file) + ".json"
output_file = os.path.join(self.output_dir, output_file)
output_file = os.path.join(self.merged_files_dir, output_file)

transform_params = re.sub(r"@inputFor_\$\{OUTPUT0\}", file_list, self.merge_transform_params)
transform_params = re.sub(r"\$\{OUTPUT0\}", output_file, transform_params, count=1)
Expand Down
77 changes: 69 additions & 8 deletions src/raythena/utils/bookkeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ class TaskStatus:
MERGING = "merging"
FAILED = "failed"

def __init__(self, job: PandaJob, config: Config) -> None:
def __init__(self, job: PandaJob, merged_files_dir: str, config: Config) -> None:
self.config = config
self.job = job
self._logger = make_logger(self.config, "TaskStatus")
self.output_dir = config.ray.get("outputdir")
self.merged_files_dir = merged_files_dir
self.filepath = os.path.join(self.output_dir, "state.json")
self.tmpfilepath = f"{self.filepath}.tmp"
self._events_per_file = int(job['nEventsPerInputFile'])
Expand Down Expand Up @@ -95,7 +96,7 @@ def _restore_status(self):
self._logger.error(ee.strerror)
self._default_init_status()

def save_status(self, write_to_tmp=True):
def save_status(self, write_to_tmp=True, force_update = False):
"""
Save the current status to a json file. Before saving to file, the update queue will be drained, actually carrying out the operations to the dictionary
that will be written to file.
Expand All @@ -105,7 +106,7 @@ def save_status(self, write_to_tmp=True):
"""

# dequeue is empty, nothing new to save
if not self._update_queue:
if not force_update and not self._update_queue:
return

# Drain the update deque, actually applying update to the status dictionnary
Expand All @@ -130,6 +131,12 @@ def save_status(self, write_to_tmp=True):
except OSError as e:
self._logger.error(f"Failed to save task status: {e.strerror}")

def is_stale(self) -> bool:
"""
Checks if update stil need to be written to disk
"""
return len(self._update_queue) > 0

@staticmethod
def build_eventrange_dict(eventrange: EventRange, output_file: str = None) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -205,7 +212,7 @@ def _set_file_merged(self, input_files: List[str], outputfile: str, event_ranges
merged_dict = dict()
self._status[TaskStatus.MERGED][inputfile] = merged_dict
for merged_outputfile in self._status[TaskStatus.MERGING][inputfile].keys():
merged_dict[merged_outputfile] = {"path": os.path.join(self.output_dir, merged_outputfile), "guid": guid if guid else ""}
merged_dict[merged_outputfile] = {"path": os.path.join(self.merged_files_dir, merged_outputfile), "guid": guid if guid else ""}
del self._status[TaskStatus.MERGING][inputfile]
del self._status[TaskStatus.SIMULATED][inputfile]
else:
Expand Down Expand Up @@ -304,7 +311,9 @@ class BookKeeper(object):
def __init__(self, config: Config) -> None:
self.jobs: PandaJobQueue = PandaJobQueue()
self.config: Config = config
self.output_dir = config.ray.get("outputdir")
self.output_dir = str()
self.merged_files_dir = str()
self.commitlog = str()
self._logger = make_logger(self.config, "BookKeeper")
self.actors: Dict[str, Optional[str]] = dict()
self.rangesID_by_actor: Dict[str, Set[str]] = dict()
Expand All @@ -320,6 +329,8 @@ def __init__(self, config: Config) -> None:
self.files_guids: Dict[str, str] = dict()
self.last_status_print = time.time()
self.taskstatus: Dict[str, TaskStatus] = dict()
self._input_output_mapping: Dict[str, List[str]] = dict()
self._output_input_mapping: Dict[str, List[str]] = dict()
self.stop_saver = threading.Event()
self.stop_cleaner = threading.Event()
self.save_state_thread = ExThread(target=self._saver_thead_run, name="status-saver-thread")
Expand Down Expand Up @@ -436,10 +447,11 @@ def add_jobs(self, jobs: Mapping[str, JobDef], start_threads=True) -> None:
for pandaID in self.jobs:
job = self.jobs[pandaID]
if job["taskID"] not in self.taskstatus:
ts = TaskStatus(job, self.config)
assert self.output_dir
assert self.merged_files_dir
ts = TaskStatus(job, self.merged_files_dir, self.config)
self.taskstatus[job['taskID']] = ts
# TODO: have esdriver provide outputdir to make sure both are consistent
self.output_dir = os.path.join(os.path.expandvars(self.config.ray.get("taskprogressbasedir")), str(job['taskID']))
self.commitlog = os.path.join(self.output_dir, "commit_log")
self._generate_input_output_mapping(job)
self._generate_event_ranges(job, ts)
if start_threads:
Expand Down Expand Up @@ -484,6 +496,55 @@ def _generate_input_output_mapping(self, job: PandaJob):
def generate_event_range_id(file: str, n: str):
return f"{file}-{n}"

def remap_output_files(self, panda_id: str) -> Dict[str, str]:
"""
Translate an existing output file to an output filename matching the current job definition.
"""
job = self.jobs[panda_id]
task_status = self.taskstatus[job["taskID"]]
if task_status.is_stale():
task_status.save_status()
merged_files = task_status._status[TaskStatus.MERGED]
previous_to_current_output_lookup: Dict[str, str] = dict()

with open(self.commitlog, 'a') as f:
for input_file, output_files in self._input_output_mapping.items():
merged_output_files = merged_files[input_file]
assert isinstance(merged_output_files, dict)
assert len(merged_output_files) == len(output_files)
for merged_file, new_file in zip(merged_output_files, output_files):
if merged_file in previous_to_current_output_lookup:
assert new_file == previous_to_current_output_lookup[merged_file]
continue
previous_to_current_output_lookup[merged_file] = new_file
f.write(f"rename_output {merged_file} {new_file}\n")

# Rename old merged files to output file names matching the current job in state.json
for output_files in merged_files.values():
assert isinstance(output_files, dict)
for old_file in list(output_files.keys()):
new_file = previous_to_current_output_lookup[old_file]
entry = output_files.pop(old_file)
entry["path"] = entry["path"].replace(old_file, new_file)
output_files[new_file] = entry
task_status.save_status(force_update=True)

return previous_to_current_output_lookup

def recover_outputfile_name(self, filename: str) -> str:
"""
Read the commitlog change history of filename and return the current filename
"""
with open(self.commitlog, 'r') as f:
for line in f:
op, *args = line.rstrip().split(" ")
if op != "rename_output":
continue
old, new = args[0], args[1]
if old == filename:
filename = new
return filename

def get_files_to_merge_with(self, file: str):
"""
Find all the input file names that should be merged with the given file.
Expand Down
14 changes: 14 additions & 0 deletions tests/test_bookkeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ class TestBookKeeper:

def test_add_jobs(self, is_eventservice, config, sample_multijobs, njobs):
bookKeeper = BookKeeper(config)
bookKeeper.output_dir = "dummy"
bookKeeper.merged_files_dir = "dummy"
bookKeeper.add_jobs(sample_multijobs, False)
assert len(bookKeeper.jobs) == njobs
for pandaID in bookKeeper.jobs:
Expand All @@ -16,6 +18,8 @@ def test_add_jobs(self, is_eventservice, config, sample_multijobs, njobs):
def test_assign_job_to_actor(elf, is_eventservice, config, sample_multijobs,
njobs, sample_ranges, nevents):
bookKeeper = BookKeeper(config)
bookKeeper.output_dir = "dummy"
bookKeeper.merged_files_dir = "dummy"
bookKeeper.add_jobs(sample_multijobs, False)
actor_id = "a1"
if not is_eventservice:
Expand Down Expand Up @@ -45,6 +49,8 @@ def test_add_event_ranges(self, is_eventservice, config, sample_multijobs,
pytest.skip()

bookKeeper = BookKeeper(config)
bookKeeper.output_dir = "dummy"
bookKeeper.merged_files_dir = "dummy"
bookKeeper.add_jobs(sample_multijobs, False)

assert bookKeeper.has_jobs_ready()
Expand All @@ -60,6 +66,8 @@ def test_fetch_event_ranges(self, is_eventservice, config, sample_multijobs,
worker_ids = [f"w_{i}" for i in range(10)]

bookKeeper = BookKeeper(config)
bookKeeper.output_dir = "dummy"
bookKeeper.merged_files_dir = "dummy"
bookKeeper.add_jobs(sample_multijobs, False)
bookKeeper.add_event_ranges(sample_ranges)

Expand All @@ -86,6 +94,8 @@ def test_process_event_ranges_update(self, is_eventservice, config,

def __inner__(range_update, failed=False):
bookKeeper = BookKeeper(config)
bookKeeper.output_dir = "dummy"
bookKeeper.merged_files_dir = "dummy"
bookKeeper.add_jobs(sample_multijobs, False)

for i in range(njobs):
Expand All @@ -105,6 +115,8 @@ def __inner__(range_update, failed=False):
__inner__(sample_failed_rangeupdate, True)

bookKeeper = BookKeeper(config)
bookKeeper.output_dir = "dummy"
bookKeeper.merged_files_dir = "dummy"
bookKeeper.add_jobs(sample_multijobs, False)
for _ in range(njobs):
job = bookKeeper.assign_job_to_actor(actor_id)
Expand Down Expand Up @@ -142,6 +154,8 @@ def test_process_actor_end(self, is_eventservice, config, njobs,
actor_id_2 = "a2"

bookKeeper = BookKeeper(config)
bookKeeper.output_dir = "dummy"
bookKeeper.merged_files_dir = "dummy"
bookKeeper.add_jobs(sample_multijobs, False)

job = bookKeeper.assign_job_to_actor(actor_id_1)
Expand Down
10 changes: 5 additions & 5 deletions tests/test_taskstatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class TestTaskStatus:
def test_save_restore_status(self, nfiles, tmp_path, config, sample_job, sample_ranges):
config.ray["outputdir"] = tmp_path
job = PandaJob(list(sample_job.values())[0])
ts = TaskStatus(job, config)
ts = TaskStatus(job, tmp_path, config)
ranges = list(sample_ranges.values())[0]
hits_per_file = int(job['esmergeSpec']['nEventsPerOutputFile'])
events_per_file = int(job['nEventsPerInputFile'])
Expand All @@ -30,14 +30,14 @@ def test_save_restore_status(self, nfiles, tmp_path, config, sample_job, sample_

ts.set_file_merged([fname], outputfile, ranges_map, "guid")
ts.save_status()
ts2 = TaskStatus(job, config)
ts2 = TaskStatus(job, tmp_path, config)
print(ts._status)
assert ts._status == ts2._status

def test_set_processed(self, nfiles, nevents, tmp_path, config, sample_job, sample_ranges):
config.ray["outputdir"] = tmp_path
job = PandaJob(list(sample_job.values())[0])
ts = TaskStatus(job, config)
ts = TaskStatus(job, tmp_path, config)

ranges_list = list(sample_ranges.values())[0]
for r in ranges_list:
Expand All @@ -52,7 +52,7 @@ def test_set_processed(self, nfiles, nevents, tmp_path, config, sample_job, samp
def test_set_failed(self, nfiles, nevents, tmp_path, config, sample_job, sample_ranges):
config.ray["outputdir"] = tmp_path
job = PandaJob(list(sample_job.values())[0])
ts = TaskStatus(job, config)
ts = TaskStatus(job, tmp_path, config)

ranges_list = list(sample_ranges.values())[0]
for r in ranges_list:
Expand All @@ -67,7 +67,7 @@ def test_set_failed(self, nfiles, nevents, tmp_path, config, sample_job, sample_
def test_set_merged(self, nfiles, nevents, tmp_path, config, sample_job, sample_ranges):
config.ray["outputdir"] = tmp_path
job = PandaJob(list(sample_job.values())[0])
ts = TaskStatus(job, config)
ts = TaskStatus(job, tmp_path, config)

ranges = list(sample_ranges.values())[0]
for e in ranges:
Expand Down

0 comments on commit 0790431

Please sign in to comment.