Skip to content

Commit

Permalink
Merge pull request #65 from isi-vista/fix-nuke-noop-confirm
Browse files Browse the repository at this point in the history
Fix nuke noop confirm
  • Loading branch information
joecummings authored Nov 2, 2020
2 parents cfd7b6b + 7c89395 commit 334c7c7
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 7 deletions.
21 changes: 14 additions & 7 deletions pegasus_wrapper/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,17 @@ def _conf_limits(self) -> str:
for category, max_jobs in self._category_to_max_jobs.items()
)

def _nuke_checkpoints_and_clear_rc(self, output_xml_dir: Path) -> None:
subprocess.run(
["python", nuke_checkpoints.__file__, output_xml_dir],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
)
self._replica_catalog.open(
"w"
).close() # open with `w` followed by close empties it

def write_dax_to_dir(self, output_xml_dir: Optional[Path] = None) -> Path:
if not output_xml_dir:
output_xml_dir = self._workflow_directory
Expand All @@ -253,15 +264,11 @@ def write_dax_to_dir(self, output_xml_dir: Optional[Path] = None) -> Path:
num_ckpts = len([ckpt_file for ckpt_file in output_xml_dir.rglob("___ckpt")])
if num_jobs == num_ckpts:
nuke = input(
"DAX *may* create a NOOP workflow. Do you want to nuke the checkpoints? [y/n]"
"DAX *may* create a NOOP workflow. Do you want to nuke the checkpoints and regenerate? [y/n]"
)
if nuke == "y":
subprocess.run(
["python", nuke_checkpoints.__file__, output_xml_dir],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
)
self._nuke_checkpoints_and_clear_rc(output_xml_dir)
logging.info("Checkpoints cleared!")

dax_file_name = f"{self.name}.dax"
dax_file = output_xml_dir / dax_file_name
Expand Down
74 changes: 74 additions & 0 deletions tests/workflow_builder_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,80 @@ def test_dax_with_checkpointed_jobs_on_saga(tmp_path):
assert replica_catalog.stat().st_size > 0


def test_clearing_ckpts(monkeypatch, tmp_path):

workflow_params = Parameters.from_mapping(
{
"workflow_name": "Test",
"workflow_created": "Testing",
"workflow_log_dir": str(tmp_path / "log"),
"workflow_directory": str(tmp_path / "working"),
"site": "saga",
"namespace": "test",
"partition": "scavenge",
}
)

workflow_builder = WorkflowBuilder.from_parameters(workflow_params)

multiply_job_name = Locator(_parse_parts("jobs/multiply"))
multiply_output_file = tmp_path / "multiplied_nums.txt"
multiply_input_file = tmp_path / "raw_nums.txt"
multiply_params = Parameters.from_mapping(
{"input_file": multiply_input_file, "output_file": multiply_output_file, "x": 4}
)

multiple_dir = workflow_builder.directory_for(multiply_job_name)

checkpointed_multiply_file = multiple_dir / "___ckpt"
checkpointed_multiply_file.touch()
multiply_output_file.touch()

workflow_builder.run_python_on_parameters(
multiply_job_name, multiply_by_x_main, multiply_params, depends_on=[]
)
monkeypatch.setattr("builtins.input", lambda _: "y")
workflow_builder.write_dax_to_dir()
assert not checkpointed_multiply_file.exists()


def test_not_clearing_ckpts(monkeypatch, tmp_path):

workflow_params = Parameters.from_mapping(
{
"workflow_name": "Test",
"workflow_created": "Testing",
"workflow_log_dir": str(tmp_path / "log"),
"workflow_directory": str(tmp_path / "working"),
"site": "saga",
"namespace": "test",
"partition": "scavenge",
}
)

workflow_builder = WorkflowBuilder.from_parameters(workflow_params)

multiply_job_name = Locator(_parse_parts("jobs/multiply"))
multiply_output_file = tmp_path / "multiplied_nums.txt"
multiply_input_file = tmp_path / "raw_nums.txt"
multiply_params = Parameters.from_mapping(
{"input_file": multiply_input_file, "output_file": multiply_output_file, "x": 4}
)

multiple_dir = workflow_builder.directory_for(multiply_job_name)

checkpointed_multiply_file = multiple_dir / "___ckpt"
checkpointed_multiply_file.touch()
multiply_output_file.touch()

workflow_builder.run_python_on_parameters(
multiply_job_name, multiply_by_x_main, multiply_params, depends_on=[]
)
monkeypatch.setattr("builtins.input", lambda _: "n")
workflow_builder.write_dax_to_dir()
assert checkpointed_multiply_file.exists()


class _JobWithNameHasCategoryHandler(saxhandler.ContentHandler):
"""
SAX handler which checks whether a DAX file
Expand Down

0 comments on commit 334c7c7

Please sign in to comment.