diff --git a/src/raythena/actors/esworker.py b/src/raythena/actors/esworker.py index c889f46..7bc129f 100644 --- a/src/raythena/actors/esworker.py +++ b/src/raythena/actors/esworker.py @@ -469,8 +469,15 @@ def stageout_event_service_files( self.output_dir, os.path.basename(cfile) if os.path.isabs(cfile) else cfile) if os.path.isfile(cfile): - os.replace(cfile, dst) + try: + os.replace(cfile, dst) + except OSError as e: + self._logger.error(f"Failed to move file {cfile} to {dst}: errno {e.errno}: {e.strerror}") + raise StageOutFailed(self.id) range_update[cfile_key] = dst + else: + self._logger.warning(f"Couldn't stageout file {cfile} as it doesn't exist") + raise StageOutFailed(self.id) return ranges def get_payload_message(self) -> Optional[WorkerResponse]: diff --git a/src/raythena/drivers/esdriver.py b/src/raythena/drivers/esdriver.py index 5acb873..1cfc230 100644 --- a/src/raythena/drivers/esdriver.py +++ b/src/raythena/drivers/esdriver.py @@ -76,6 +76,8 @@ def __init__(self, config: Config, session_dir: str) -> None: workdir = os.getcwd() self.config.ray['workdir'] = workdir self.workdir = workdir + self.output_dir = str() + self.merged_files_dir = str() logfile = self.config.logging.get("driverlogfile", None) if logfile: log_to_file(self.config.logging.get("level", None), logfile) @@ -493,6 +495,9 @@ def setup_dirs(self): self.tar_merge_es_output_dir = self.output_dir self.tar_merge_es_files_dir = self.output_dir self.job_reports_dir = os.path.join(self.output_dir, "job-reports") + self.merged_files_dir = os.path.join(self.output_dir, "final") + self.bookKeeper.output_dir = self.output_dir + self.bookKeeper.merged_files_dir = self.merged_files_dir self.config_remote = ray.put(self.config) # create the output directories if needed try: @@ -500,6 +505,8 @@ def setup_dirs(self): os.mkdir(self.output_dir) if not os.path.isdir(self.job_reports_dir): os.mkdir(self.job_reports_dir) + if not os.path.isdir(self.merged_files_dir): + os.mkdir(self.merged_files_dir) if not os.path.isdir(self.tar_merge_es_output_dir): os.mkdir(self.tar_merge_es_output_dir) if not os.path.isdir(self.tar_merge_es_files_dir): @@ -549,12 +556,11 @@ def run(self) -> None: return job_id = self.bookKeeper.jobs.next_job_id_to_process() total_events = self.bookKeeper.n_ready(job_id) + os.makedirs(os.path.join(self.config.ray['workdir'], job_id)) if total_events: self.available_events_per_actor = max(1, ceil(total_events / self.n_actors)) for pandaID in self.bookKeeper.jobs: cjob = self.bookKeeper.jobs[pandaID] - os.makedirs( - os.path.join(self.config.ray['workdir'], cjob['PandaID'])) self.remote_jobdef_byid[pandaID] = ray.put(cjob) self.create_actors() @@ -592,13 +598,28 @@ def run(self) -> None: self.bookKeeper.save_status() task_status = self.bookKeeper.taskstatus.get(self.panda_taskid, None) if task_status and task_status.get_nmerged() + task_status.get_nfailed() == task_status.total_events(): - self.produce_final_report() + assert job_id + output_map = self.bookKeeper.remap_output_files(job_id) + self.rename_output_files(output_map) + self.produce_final_report(output_map) self.communicator.stop() # self.cpu_monitor.stop() self.bookKeeper.print_status() self._logger.debug("All driver threads stopped. Quitting...") - def produce_final_report(self): + def rename_output_files(self, output_map: Dict[str, str]): + """ + Rename final output files + """ + for file in os.listdir(self.merged_files_dir): + try: + new_filename = output_map[file] + except KeyError: + # read the commit log to recover the correct name. If we get another KeyError, we can't recover + new_filename = output_map[self.bookKeeper.recover_outputfile_name(file)] + os.rename(os.path.join(self.merged_files_dir, file), os.path.join(self.merged_files_dir, new_filename)) + + def produce_final_report(self, output_map: Dict[str, str]): """ Merge job reports from individual merge transforms to produce the final jobReport for Panda. """ @@ -610,11 +631,35 @@ def produce_final_report(self): with open(os.path.join(self.job_reports_dir, files[0]), 'r') as f: final_report = json.load(f) final_report_files = final_report["files"] + + # rename first file on disk and in report + output_file_entry = final_report_files["output"][0]["subFiles"][0] + old_filename = output_file_entry["name"] + try: + new_filename = output_map[old_filename] + except KeyError: + # read the commit log to recover the correct name. If we get another KeyError, we can't recover + new_filename = output_map[self.bookKeeper.recover_outputfile_name(old_filename)] + output_file_entry["name"] = new_filename + with open(os.path.join(self.job_reports_dir, files[0]), 'w') as f: + json.dump(final_report, f) + for file in files[1:]: - with open(os.path.join(self.job_reports_dir, file), 'r') as f: + current_file = os.path.join(self.job_reports_dir, file) + with open(current_file, 'r') as f: current_report = json.load(f) final_report_files["input"].append(current_report["files"]["input"][0]) - final_report_files["output"][0]["subFiles"].append(current_report["files"]["output"][0]["subFiles"][0]) + output_file_entry = current_report["files"]["output"][0]["subFiles"][0] + old_filename = output_file_entry["name"] + try: + new_filename = output_map[old_filename] + except KeyError: + # read the commit log to recover the correct name. If we get another KeyError, we can't recover + new_filename = output_map[self.bookKeeper.recover_outputfile_name(old_filename)] + output_file_entry["name"] = new_filename + final_report_files["output"][0]["subFiles"].append(output_file_entry) + with open(current_file, 'w') as f: + json.dump(current_report, f) tmp = os.path.join(self.workdir, self.jobreport_name + ".tmp") with open(tmp, 'w') as f: @@ -740,7 +785,7 @@ def hits_merge_transform(self, input_files: Iterable[str], output_file: str) -> tmp_dir = tempfile.mkdtemp() file_list = ",".join(input_files) job_report_name = os.path.join(self.job_reports_dir, output_file) + ".json" - output_file = os.path.join(self.output_dir, output_file) + output_file = os.path.join(self.merged_files_dir, output_file) transform_params = re.sub(r"@inputFor_\$\{OUTPUT0\}", file_list, self.merge_transform_params) transform_params = re.sub(r"\$\{OUTPUT0\}", output_file, transform_params, count=1) diff --git a/src/raythena/utils/bookkeeper.py b/src/raythena/utils/bookkeeper.py index 86137f5..ce06ad4 100644 --- a/src/raythena/utils/bookkeeper.py +++ b/src/raythena/utils/bookkeeper.py @@ -38,11 +38,12 @@ class TaskStatus: MERGING = "merging" FAILED = "failed" - def __init__(self, job: PandaJob, config: Config) -> None: + def __init__(self, job: PandaJob, merged_files_dir: str, config: Config) -> None: self.config = config self.job = job self._logger = make_logger(self.config, "TaskStatus") self.output_dir = config.ray.get("outputdir") + self.merged_files_dir = merged_files_dir self.filepath = os.path.join(self.output_dir, "state.json") self.tmpfilepath = f"{self.filepath}.tmp" self._events_per_file = int(job['nEventsPerInputFile']) @@ -95,7 +96,7 @@ def _restore_status(self): self._logger.error(ee.strerror) self._default_init_status() - def save_status(self, write_to_tmp=True): + def save_status(self, write_to_tmp=True, force_update = False): """ Save the current status to a json file. Before saving to file, the update queue will be drained, actually carrying out the operations to the dictionary that will be written to file. @@ -105,7 +106,7 @@ def save_status(self, write_to_tmp=True): """ # dequeue is empty, nothing new to save - if not self._update_queue: + if not force_update and not self._update_queue: return # Drain the update deque, actually applying update to the status dictionnary @@ -130,6 +131,12 @@ def save_status(self, write_to_tmp=True): except OSError as e: self._logger.error(f"Failed to save task status: {e.strerror}") + def is_stale(self) -> bool: + """ + Checks if update stil need to be written to disk + """ + return len(self._update_queue) > 0 + @staticmethod def build_eventrange_dict(eventrange: EventRange, output_file: str = None) -> Dict[str, Any]: """ @@ -205,7 +212,7 @@ def _set_file_merged(self, input_files: List[str], outputfile: str, event_ranges merged_dict = dict() self._status[TaskStatus.MERGED][inputfile] = merged_dict for merged_outputfile in self._status[TaskStatus.MERGING][inputfile].keys(): - merged_dict[merged_outputfile] = {"path": os.path.join(self.output_dir, merged_outputfile), "guid": guid if guid else ""} + merged_dict[merged_outputfile] = {"path": os.path.join(self.merged_files_dir, merged_outputfile), "guid": guid if guid else ""} del self._status[TaskStatus.MERGING][inputfile] del self._status[TaskStatus.SIMULATED][inputfile] else: @@ -304,7 +311,9 @@ class BookKeeper(object): def __init__(self, config: Config) -> None: self.jobs: PandaJobQueue = PandaJobQueue() self.config: Config = config - self.output_dir = config.ray.get("outputdir") + self.output_dir = str() + self.merged_files_dir = str() + self.commitlog = str() self._logger = make_logger(self.config, "BookKeeper") self.actors: Dict[str, Optional[str]] = dict() self.rangesID_by_actor: Dict[str, Set[str]] = dict() @@ -320,6 +329,8 @@ def __init__(self, config: Config) -> None: self.files_guids: Dict[str, str] = dict() self.last_status_print = time.time() self.taskstatus: Dict[str, TaskStatus] = dict() + self._input_output_mapping: Dict[str, List[str]] = dict() + self._output_input_mapping: Dict[str, List[str]] = dict() self.stop_saver = threading.Event() self.stop_cleaner = threading.Event() self.save_state_thread = ExThread(target=self._saver_thead_run, name="status-saver-thread") @@ -436,10 +447,11 @@ def add_jobs(self, jobs: Mapping[str, JobDef], start_threads=True) -> None: for pandaID in self.jobs: job = self.jobs[pandaID] if job["taskID"] not in self.taskstatus: - ts = TaskStatus(job, self.config) + assert self.output_dir + assert self.merged_files_dir + ts = TaskStatus(job, self.merged_files_dir, self.config) self.taskstatus[job['taskID']] = ts - # TODO: have esdriver provide outputdir to make sure both are consistent - self.output_dir = os.path.join(os.path.expandvars(self.config.ray.get("taskprogressbasedir")), str(job['taskID'])) + self.commitlog = os.path.join(self.output_dir, "commit_log") self._generate_input_output_mapping(job) self._generate_event_ranges(job, ts) if start_threads: @@ -484,6 +496,55 @@ def _generate_input_output_mapping(self, job: PandaJob): def generate_event_range_id(file: str, n: str): return f"{file}-{n}" + def remap_output_files(self, panda_id: str) -> Dict[str, str]: + """ + Translate an existing output file to an output filename matching the current job definition. + """ + job = self.jobs[panda_id] + task_status = self.taskstatus[job["taskID"]] + if task_status.is_stale(): + task_status.save_status() + merged_files = task_status._status[TaskStatus.MERGED] + previous_to_current_output_lookup: Dict[str, str] = dict() + + with open(self.commitlog, 'a') as f: + for input_file, output_files in self._input_output_mapping.items(): + merged_output_files = merged_files[input_file] + assert isinstance(merged_output_files, dict) + assert len(merged_output_files) == len(output_files) + for merged_file, new_file in zip(merged_output_files, output_files): + if merged_file in previous_to_current_output_lookup: + assert new_file == previous_to_current_output_lookup[merged_file] + continue + previous_to_current_output_lookup[merged_file] = new_file + f.write(f"rename_output {merged_file} {new_file}\n") + + # Rename old merged files to output file names matching the current job in state.json + for output_files in merged_files.values(): + assert isinstance(output_files, dict) + for old_file in list(output_files.keys()): + new_file = previous_to_current_output_lookup[old_file] + entry = output_files.pop(old_file) + entry["path"] = entry["path"].replace(old_file, new_file) + output_files[new_file] = entry + task_status.save_status(force_update=True) + + return previous_to_current_output_lookup + + def recover_outputfile_name(self, filename: str) -> str: + """ + Read the commitlog change history of filename and return the current filename + """ + with open(self.commitlog, 'r') as f: + for line in f: + op, *args = line.rstrip().split(" ") + if op != "rename_output": + continue + old, new = args[0], args[1] + if old == filename: + filename = new + return filename + def get_files_to_merge_with(self, file: str): """ Find all the input file names that should be merged with the given file. diff --git a/tests/test_bookkeeper.py b/tests/test_bookkeeper.py index 44a4f6b..705089e 100644 --- a/tests/test_bookkeeper.py +++ b/tests/test_bookkeeper.py @@ -8,6 +8,8 @@ class TestBookKeeper: def test_add_jobs(self, is_eventservice, config, sample_multijobs, njobs): bookKeeper = BookKeeper(config) + bookKeeper.output_dir = "dummy" + bookKeeper.merged_files_dir = "dummy" bookKeeper.add_jobs(sample_multijobs, False) assert len(bookKeeper.jobs) == njobs for pandaID in bookKeeper.jobs: @@ -16,6 +18,8 @@ def test_add_jobs(self, is_eventservice, config, sample_multijobs, njobs): def test_assign_job_to_actor(elf, is_eventservice, config, sample_multijobs, njobs, sample_ranges, nevents): bookKeeper = BookKeeper(config) + bookKeeper.output_dir = "dummy" + bookKeeper.merged_files_dir = "dummy" bookKeeper.add_jobs(sample_multijobs, False) actor_id = "a1" if not is_eventservice: @@ -45,6 +49,8 @@ def test_add_event_ranges(self, is_eventservice, config, sample_multijobs, pytest.skip() bookKeeper = BookKeeper(config) + bookKeeper.output_dir = "dummy" + bookKeeper.merged_files_dir = "dummy" bookKeeper.add_jobs(sample_multijobs, False) assert bookKeeper.has_jobs_ready() @@ -60,6 +66,8 @@ def test_fetch_event_ranges(self, is_eventservice, config, sample_multijobs, worker_ids = [f"w_{i}" for i in range(10)] bookKeeper = BookKeeper(config) + bookKeeper.output_dir = "dummy" + bookKeeper.merged_files_dir = "dummy" bookKeeper.add_jobs(sample_multijobs, False) bookKeeper.add_event_ranges(sample_ranges) @@ -86,6 +94,8 @@ def test_process_event_ranges_update(self, is_eventservice, config, def __inner__(range_update, failed=False): bookKeeper = BookKeeper(config) + bookKeeper.output_dir = "dummy" + bookKeeper.merged_files_dir = "dummy" bookKeeper.add_jobs(sample_multijobs, False) for i in range(njobs): @@ -105,6 +115,8 @@ def __inner__(range_update, failed=False): __inner__(sample_failed_rangeupdate, True) bookKeeper = BookKeeper(config) + bookKeeper.output_dir = "dummy" + bookKeeper.merged_files_dir = "dummy" bookKeeper.add_jobs(sample_multijobs, False) for _ in range(njobs): job = bookKeeper.assign_job_to_actor(actor_id) @@ -142,6 +154,8 @@ def test_process_actor_end(self, is_eventservice, config, njobs, actor_id_2 = "a2" bookKeeper = BookKeeper(config) + bookKeeper.output_dir = "dummy" + bookKeeper.merged_files_dir = "dummy" bookKeeper.add_jobs(sample_multijobs, False) job = bookKeeper.assign_job_to_actor(actor_id_1) diff --git a/tests/test_taskstatus.py b/tests/test_taskstatus.py index 7792594..0d1bd53 100644 --- a/tests/test_taskstatus.py +++ b/tests/test_taskstatus.py @@ -7,7 +7,7 @@ class TestTaskStatus: def test_save_restore_status(self, nfiles, tmp_path, config, sample_job, sample_ranges): config.ray["outputdir"] = tmp_path job = PandaJob(list(sample_job.values())[0]) - ts = TaskStatus(job, config) + ts = TaskStatus(job, tmp_path, config) ranges = list(sample_ranges.values())[0] hits_per_file = int(job['esmergeSpec']['nEventsPerOutputFile']) events_per_file = int(job['nEventsPerInputFile']) @@ -30,14 +30,14 @@ def test_save_restore_status(self, nfiles, tmp_path, config, sample_job, sample_ ts.set_file_merged([fname], outputfile, ranges_map, "guid") ts.save_status() - ts2 = TaskStatus(job, config) + ts2 = TaskStatus(job, tmp_path, config) print(ts._status) assert ts._status == ts2._status def test_set_processed(self, nfiles, nevents, tmp_path, config, sample_job, sample_ranges): config.ray["outputdir"] = tmp_path job = PandaJob(list(sample_job.values())[0]) - ts = TaskStatus(job, config) + ts = TaskStatus(job, tmp_path, config) ranges_list = list(sample_ranges.values())[0] for r in ranges_list: @@ -52,7 +52,7 @@ def test_set_processed(self, nfiles, nevents, tmp_path, config, sample_job, samp def test_set_failed(self, nfiles, nevents, tmp_path, config, sample_job, sample_ranges): config.ray["outputdir"] = tmp_path job = PandaJob(list(sample_job.values())[0]) - ts = TaskStatus(job, config) + ts = TaskStatus(job, tmp_path, config) ranges_list = list(sample_ranges.values())[0] for r in ranges_list: @@ -67,7 +67,7 @@ def test_set_failed(self, nfiles, nevents, tmp_path, config, sample_job, sample_ def test_set_merged(self, nfiles, nevents, tmp_path, config, sample_job, sample_ranges): config.ray["outputdir"] = tmp_path job = PandaJob(list(sample_job.values())[0]) - ts = TaskStatus(job, config) + ts = TaskStatus(job, tmp_path, config) ranges = list(sample_ranges.values())[0] for e in ranges: