From 3a726665c4818bdb9d5564612154bfc0bf193c9a Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:47 +1100 Subject: [PATCH 01/29] Implemented hash-change guards in TaskBase._run() --- pydra/engine/core.py | 22 ++++++++++++++++------ pydra/engine/specs.py | 30 +++++++++++++++++------------- pydra/engine/tests/test_task.py | 15 +++++++++++++++ pydra/utils/hash.py | 10 ++++++---- 4 files changed, 54 insertions(+), 23 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 82400a1beb..f80cb8c88e 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -487,8 +487,8 @@ def _modify_inputs(self): modified_inputs = template_update( self.inputs, self.output_dir, map_copyfiles=map_copyfiles ) - if modified_inputs: - self.inputs = attr.evolve(self.inputs, **modified_inputs) + for name, value in modified_inputs.items(): + setattr(self.inputs, name, value) return orig_inputs def _populate_filesystem(self, checksum, output_dir): @@ -549,12 +549,22 @@ def _run(self, rerun=False, environment=None, **kwargs): # removing the additional file with the checksum (self.cache_dir / f"{self.uid}_info.json").unlink() # # function etc. shouldn't change anyway, so removing - orig_inputs = { - k: v for k, v in orig_inputs.items() if not k.startswith("_") - } - self.inputs = attr.evolve(self.inputs, **orig_inputs) + # Restore original values to inputs + for field_name, field_value in orig_inputs.items(): + if not field_name.startswith("_"): + setattr(self.inputs, field_name, field_value) os.chdir(cwd) + # Check for any changes to the input hashes that have occurred during the execution + # of the task + hash_changes = self.inputs.hash_changes() + if hash_changes: + raise RuntimeError( + f"Hashes have changed for {hash_changes} input fields during the " + f"execution of {self}. Please check all output files/directories are " + "typed with `pathlib.Path` instead of `fileformats` classes" + ) self.hooks.post_run(self, result) + return result def _collect_outputs(self, output_dir): diff --git a/pydra/engine/specs.py b/pydra/engine/specs.py index 289eb8d3cd..b2e0f1ceb2 100644 --- a/pydra/engine/specs.py +++ b/pydra/engine/specs.py @@ -15,7 +15,7 @@ ) import pydra from .helpers_file import template_update_single -from ..utils.hash import hash_function +from ..utils.hash import hash_function, Cache # from ..utils.misc import add_exc_note @@ -73,21 +73,22 @@ class SpecInfo: class BaseSpec: """The base dataclass specs for all inputs and outputs.""" - # def __attrs_post_init__(self): - # self.files_hash = { - # field.name: {} - # for field in attr_fields( - # self, exclude_names=("_graph_checksums", "bindings", "files_hash") - # ) - # if field.metadata.get("output_file_template") is None - # } - def collect_additional_outputs(self, inputs, output_dir, outputs): """Get additional outputs.""" return {} @property def hash(self): + hsh, self._hashes = self._compute_hashes() + return hsh + + def hash_changes(self): + """Detects any changes in the hashed values between the current inputs and the + previously calculated values""" + _, new_hashes = self._compute_hashes() + return [k for k, v in new_hashes.items() if v != self._hashes[k]] + + def _compute_hashes(self) -> ty.Tuple[bytes, ty.Dict[str, bytes]]: """Compute a basic hash for any given set of fields.""" inp_dict = {} for field in attr_fields( @@ -101,10 +102,13 @@ def hash(self): if "container_path" in field.metadata: continue inp_dict[field.name] = getattr(self, field.name) - inp_hash = hash_function(inp_dict) + hash_cache = Cache({}) + field_hashes = { + k: hash_function(v, cache=hash_cache) for k, v in inp_dict.items() + } if hasattr(self, "_graph_checksums"): - inp_hash = hash_function((inp_hash, self._graph_checksums)) - return inp_hash + field_hashes["_graph_checksums"] = self._graph_checksums + return hash_function(sorted(field_hashes.items())), field_hashes def retrieve_values(self, wf, state_index: ty.Optional[int] = None): """Get values contained by this spec.""" diff --git a/pydra/engine/tests/test_task.py b/pydra/engine/tests/test_task.py index 8ff3ce6d47..b1ddc6898d 100644 --- a/pydra/engine/tests/test_task.py +++ b/pydra/engine/tests/test_task.py @@ -5,6 +5,8 @@ import cloudpickle as cp from pathlib import Path import json +import glob as glob +from fileformats.generic import Directory from ... import mark from ..core import Workflow from ..task import AuditFlag, ShellCommandTask @@ -1581,3 +1583,16 @@ def testfunc(a: A): result = testfunc(a=A(x=7))() assert result.output.out == 7 + + +@mark.task +def output_dir_as_input(out_dir: Directory) -> Directory: + (out_dir.fspath / "new-file.txt").touch() + return out_dir + + +# @pytest.mark.xfail(reason="Not sure") +def test_hash_changes(tmp_path): + task = output_dir_as_input(out_dir=tmp_path) + with pytest.raises(RuntimeError, match="Hashes have changed"): + task() diff --git a/pydra/utils/hash.py b/pydra/utils/hash.py index 0c5f5f8705..0c5afac9e9 100644 --- a/pydra/utils/hash.py +++ b/pydra/utils/hash.py @@ -59,12 +59,12 @@ class UnhashableError(ValueError): """Error for objects that cannot be hashed""" -def hash_function(obj): +def hash_function(obj, cache=None): """Generate hash of object.""" - return hash_object(obj).hex() + return hash_object(obj, cache=cache).hex() -def hash_object(obj: object) -> Hash: +def hash_object(obj: object, cache=None) -> Hash: """Hash an object Constructs a byte string that uniquely identifies the object, @@ -73,8 +73,10 @@ def hash_object(obj: object) -> Hash: Base Python types are implemented, including recursive lists and dicts. Custom types can be registered with :func:`register_serializer`. """ + if cache is None: + cache = Cache({}) try: - return hash_single(obj, Cache({})) + return hash_single(obj, cache) except Exception as e: raise UnhashableError(f"Cannot hash object {obj!r}") from e From b378f83105aedbc45fd1cd47c3dd081d4b1dce49 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:47 +1100 Subject: [PATCH 02/29] added hash guards to workflow execution (i.e. on workflow inputs) --- pydra/engine/core.py | 9 ++++++++ pydra/engine/tests/test_submitter.py | 33 ++++++++++++++++++++++++++-- pydra/engine/tests/test_task.py | 14 ------------ 3 files changed, 40 insertions(+), 16 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index f80cb8c88e..13f9dfe112 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -1266,6 +1266,15 @@ async def _run(self, submitter=None, rerun=False, **kwargs): (self.cache_dir / f"{self.uid}_info.json").unlink() os.chdir(cwd) self.hooks.post_run(self, result) + # Check for any changes to the input hashes that have occurred during the execution + # of the task + hash_changes = self.inputs.hash_changes() + if hash_changes: + raise RuntimeError( + f"Hashes have changed for {hash_changes} input fields during the " + f"execution of {self} workflow. Please check all output files/directories are " + "typed with `pathlib.Path` instead of `fileformats` classes" + ) if result is None: raise Exception("This should never happen, please open new issue") return result diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index d65247e96a..4ad655fced 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -2,9 +2,8 @@ import re import subprocess as sp import time - import pytest - +from fileformats.generic import Directory from .utils import ( need_sge, need_slurm, @@ -612,3 +611,33 @@ def alter_input(x): @mark.task def to_tuple(x, y): return (x, y) + + +# @pytest.mark.xfail(reason="Not sure") +def test_hash_changes(tmp_path): + task = output_dir_as_input(out_dir=tmp_path) + with pytest.raises(RuntimeError, match="Hashes have changed"): + task() + + +# @pytest.mark.xfail(reason="Not sure") +def test_hash_changes_workflow(tmp_path): + wf = Workflow( + name="test_hash_change", input_spec={"in_dir": Directory}, in_dir=tmp_path + ) + wf.add(output_dir_as_output(out_dir=wf.lzin.in_dir, name="task")) + wf.set_output(("out_dir", wf.task.lzout.out)) + with pytest.raises(RuntimeError, match="Hashes have changed.*workflow\."): + wf() + + +@mark.task +def output_dir_as_output(out_dir: Path) -> Directory: + (out_dir / "new-file.txt").touch() + return out_dir + + +@mark.task +def output_dir_as_input(out_dir: Directory) -> Directory: + (out_dir.fspath / "new-file.txt").touch() + return out_dir diff --git a/pydra/engine/tests/test_task.py b/pydra/engine/tests/test_task.py index b1ddc6898d..0d666574e3 100644 --- a/pydra/engine/tests/test_task.py +++ b/pydra/engine/tests/test_task.py @@ -6,7 +6,6 @@ from pathlib import Path import json import glob as glob -from fileformats.generic import Directory from ... import mark from ..core import Workflow from ..task import AuditFlag, ShellCommandTask @@ -1583,16 +1582,3 @@ def testfunc(a: A): result = testfunc(a=A(x=7))() assert result.output.out == 7 - - -@mark.task -def output_dir_as_input(out_dir: Directory) -> Directory: - (out_dir.fspath / "new-file.txt").touch() - return out_dir - - -# @pytest.mark.xfail(reason="Not sure") -def test_hash_changes(tmp_path): - task = output_dir_as_input(out_dir=tmp_path) - with pytest.raises(RuntimeError, match="Hashes have changed"): - task() From 5aaf62f766f8e0d44aa7c4eb094a22c6230a4d28 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:47 +1100 Subject: [PATCH 03/29] reworked error message for blocked upstream tasks to specifiy which tasks are blocked by referencing Workflow.inputs._graph_checksums. Reworked blocked tasks unittest so that it now raises an error again --- pydra/engine/core.py | 8 +- pydra/engine/submitter.py | 51 ++++++++----- pydra/engine/tests/test_submitter.py | 106 ++++++++++++++------------- 3 files changed, 93 insertions(+), 72 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 13f9dfe112..d4c8143ae6 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -281,7 +281,9 @@ def checksum_states(self, state_index=None): """ if is_workflow(self) and self.inputs._graph_checksums is attr.NOTHING: - self.inputs._graph_checksums = [nd.checksum for nd in self.graph_sorted] + self.inputs._graph_checksums = [ + (nd.name, nd.checksum) for nd in self.graph_sorted + ] if state_index is not None: inputs_copy = deepcopy(self.inputs) @@ -1086,7 +1088,9 @@ def checksum(self): """ # if checksum is called before run the _graph_checksums is not ready if is_workflow(self) and self.inputs._graph_checksums is attr.NOTHING: - self.inputs._graph_checksums = [nd.checksum for nd in self.graph_sorted] + self.inputs._graph_checksums = [ + (nd.name, nd.checksum) for nd in self.graph_sorted + ] input_hash = self.inputs.hash if not self.state: diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index 3906955b2c..af574a3f50 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -171,25 +171,40 @@ async def expand_workflow(self, wf, rerun=False): ii += 1 # don't block the event loop! await asyncio.sleep(1) - if ii > 60: - blocked = _list_blocked_tasks(graph_copy) - # get_runnable_tasks(graph_copy) # Uncomment to debug `get_runnable_tasks` - raise Exception( - "graph is not empty, but not able to get more tasks " - "- something may have gone wrong when retrieving the results " - "of predecessor tasks. This could be caused by a file-system " - "error or a bug in the internal workflow logic, but is likely " - "to be caused by the hash of an upstream node being unstable." - " \n\nHash instability can be caused by an input of the node being " - "modified in place, or by psuedo-random ordering of `set` or " - "`frozenset` inputs (or nested attributes of inputs) in the hash " - "calculation. To ensure that sets are hashed consistently you can " - "you can try set the environment variable PYTHONHASHSEED=0 for " - "all processes, but it is best to try to identify where the set " - "objects are occurring and manually hash their sorted elements. " - "(or use list objects instead)" - "\n\nBlocked tasks\n-------------\n" + "\n".join(blocked) + if ii > 60: # QUESTION: why is this 60? + msg = ( + f"Graph of '{wf}' workflow is not empty, but not able to get " + "more tasks - something has gone wrong when retrieving the " + "results predecessors:\n\n" ) + # Get blocked tasks and the predecessors they are waiting on + outstanding = [ + ( + t, + [ + p + for p in graph_copy.predecessors[t.name] + if not p.done + ], + ) + for t in graph_copy.sorted_nodes + ] + graph_checksums = dict(wf.inputs._graph_checksums) + + for task, waiting_on in outstanding: + if not waiting_on: + continue + msg += f"- '{task.name}' node blocked due to\n" + for pred in waiting_on: + if pred.checksum != graph_checksums[pred.name]: + msg += f" - hash changes in '{pred.name}' node inputs\n" + else: + msg += ( + f" - undiagnosed issues in '{pred.name}' node " + "(potentially related to the file-system)" + ) + msg += "\n" + raise RuntimeError(msg) for task in tasks: # grab inputs if needed logger.debug(f"Retrieving inputs for {task}") diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index 4ad655fced..416745269a 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -2,6 +2,7 @@ import re import subprocess as sp import time +import typing as ty import pytest from fileformats.generic import Directory from .utils import ( @@ -572,56 +573,23 @@ def test_sge_no_limit_maxthreads(tmpdir): assert job_1_endtime > job_2_starttime -# @pytest.mark.xfail(reason="Not sure") -def test_wf_with_blocked_tasks(tmpdir): - wf = Workflow(name="wf_with_blocked_tasks", input_spec=["x"]) - wf.add(identity(name="taska", x=wf.lzin.x)) - wf.add(alter_input(name="taskb", x=wf.taska.lzout.out)) - wf.add(to_tuple(name="taskc", x=wf.taska.lzout.out, y=wf.taskb.lzout.out)) - wf.set_output([("out", wf.taskb.lzout.out)]) - - wf.inputs.x = A(1) - - wf.cache_dir = tmpdir - - # with pytest.raises(Exception, match="graph is not empty,"): - with Submitter("serial") as sub: - sub(wf) - - -class A: - def __init__(self, a): - self.a = a - - def __bytes_repr__(self, cache): - yield bytes(self.a) - - -@mark.task -def identity(x): - return x - - -@mark.task -def alter_input(x): - x.a = 2 - return x +def test_hash_changes_in_task_inputs(tmp_path): + @mark.task + def output_dir_as_input(out_dir: Directory) -> Directory: + (out_dir.fspath / "new-file.txt").touch() + return out_dir - -@mark.task -def to_tuple(x, y): - return (x, y) - - -# @pytest.mark.xfail(reason="Not sure") -def test_hash_changes(tmp_path): task = output_dir_as_input(out_dir=tmp_path) with pytest.raises(RuntimeError, match="Hashes have changed"): task() -# @pytest.mark.xfail(reason="Not sure") -def test_hash_changes_workflow(tmp_path): +def test_hash_changes_in_workflow_inputs(tmp_path): + @mark.task + def output_dir_as_output(out_dir: Path) -> Directory: + (out_dir / "new-file.txt").touch() + return out_dir + wf = Workflow( name="test_hash_change", input_spec={"in_dir": Directory}, in_dir=tmp_path ) @@ -631,13 +599,47 @@ def test_hash_changes_workflow(tmp_path): wf() -@mark.task -def output_dir_as_output(out_dir: Path) -> Directory: - (out_dir / "new-file.txt").touch() - return out_dir +def test_hash_changes_in_workflow_graph(tmpdir): + class X: + """Dummy class with unstable hash (i.e. which isn't altered in a node in which + it is an input)""" + # class attribute to change + x = 1 -@mark.task -def output_dir_as_input(out_dir: Directory) -> Directory: - (out_dir.fspath / "new-file.txt").touch() - return out_dir + def __bytes_repr__(self, cache): + """Bytes representation from class attribute, which will be changed be 'alter_x" node. + + NB: this is a very silly bytes_repr implementation just to trigger the exception, + hopefully cases like this will be very rare.""" + yield bytes(self.x) + + @mark.task + @mark.annotate({"return": {"x": X, "y": int}}) + def identity(x) -> ty.Tuple[X, int]: + return x, 3 + + @mark.task + def alter_x(y): + X.x = 2 + return y + + @mark.task + def to_tuple(x, y): + return (x, y) + + wf = Workflow(name="wf_with_blocked_tasks", input_spec=["x", "y"]) + wf.add(identity(name="taska", x=wf.lzin.x)) + wf.add(alter_x(name="taskb", x=wf.taska.lzout.y)) + wf.add(to_tuple(name="taskc", x=wf.taska.lzout.x, y=wf.taskb.lzout.out)) + wf.set_output([("out", wf.taskc.lzout.out)]) + + wf.inputs.x = X() + + wf.cache_dir = tmpdir + + with pytest.raises( + Exception, match="Graph of 'wf_with_blocked_tasks' workflow is not empty" + ): + with Submitter("serial") as sub: + result = sub(wf) From 0397a180329e79b7f618b2cf22d7d1760c81dc23 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:47 +1100 Subject: [PATCH 04/29] touched up messages for hash change errors --- pydra/engine/core.py | 45 ++++++++++++++++++---------- pydra/engine/tests/test_submitter.py | 4 +-- 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index d4c8143ae6..cedd234191 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -558,15 +558,9 @@ def _run(self, rerun=False, environment=None, **kwargs): os.chdir(cwd) # Check for any changes to the input hashes that have occurred during the execution # of the task - hash_changes = self.inputs.hash_changes() - if hash_changes: - raise RuntimeError( - f"Hashes have changed for {hash_changes} input fields during the " - f"execution of {self}. Please check all output files/directories are " - "typed with `pathlib.Path` instead of `fileformats` classes" - ) - self.hooks.post_run(self, result) + self.hooks.post_run(self, result) + self._check_for_hash_changes() return result def _collect_outputs(self, output_dir): @@ -896,6 +890,33 @@ def _reset(self): for task in self.graph.nodes: task._reset() + def _check_for_hash_changes(self): + hash_changes = self.inputs.hash_changes() + details = "" + for changed in hash_changes: + field = getattr(attr.fields(type(self.inputs)), changed) + if issubclass(field.type, FileSet): + details += ( + f"- '{changed}' field is of type {field.type}. If it is intended to " + "contain output data then the type of the field it should be changed to " + "`pathlib.Path`. Otherwise, if it is an input field that gets " + "altered by the task, the 'copyfile' flag should be set to 'copy' " + "in the task interface metadata to ensure a copy of " + "the files/directories are created before the task is run\n" + ) + else: + details += ( + f"- the {field.type} object passed to '{changed}' field appears to " + f"have an unstable hash. The {field.type}.__bytes_repr__() method " + "can be implemented to provide stable hashes for this type. " + "See pydra/utils.hash.py for examples.\n" + ) + if hash_changes: + raise RuntimeError( + f"Input field hashes have changed during the execution of the " + f"'{self.name}' {type(self).__name__}.\n{details}" + ) + SUPPORTED_COPY_MODES = FileSet.CopyMode.any DEFAULT_COPY_COLLATION = FileSet.CopyCollation.any @@ -1272,13 +1293,7 @@ async def _run(self, submitter=None, rerun=False, **kwargs): self.hooks.post_run(self, result) # Check for any changes to the input hashes that have occurred during the execution # of the task - hash_changes = self.inputs.hash_changes() - if hash_changes: - raise RuntimeError( - f"Hashes have changed for {hash_changes} input fields during the " - f"execution of {self} workflow. Please check all output files/directories are " - "typed with `pathlib.Path` instead of `fileformats` classes" - ) + self._check_for_hash_changes() if result is None: raise Exception("This should never happen, please open new issue") return result diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index 416745269a..bb7c9a2b62 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -580,7 +580,7 @@ def output_dir_as_input(out_dir: Directory) -> Directory: return out_dir task = output_dir_as_input(out_dir=tmp_path) - with pytest.raises(RuntimeError, match="Hashes have changed"): + with pytest.raises(RuntimeError, match="Input field hashes have changed"): task() @@ -595,7 +595,7 @@ def output_dir_as_output(out_dir: Path) -> Directory: ) wf.add(output_dir_as_output(out_dir=wf.lzin.in_dir, name="task")) wf.set_output(("out_dir", wf.task.lzout.out)) - with pytest.raises(RuntimeError, match="Hashes have changed.*workflow\."): + with pytest.raises(RuntimeError, match="Input field hashes have changed.*Workflow"): wf() From 1d77f4ba30c2a70b4b2306e9d1984f7dd31e2467 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:47 +1100 Subject: [PATCH 05/29] replaced attrs.evolve with manual setattr of inputs within Task._run() in order to retain hidden _hashes member used to provided detailed information on any inadvertant hash changes pre/post run --- pydra/engine/core.py | 3 +-- pydra/engine/task.py | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index cedd234191..67d3c6f566 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -556,10 +556,9 @@ def _run(self, rerun=False, environment=None, **kwargs): if not field_name.startswith("_"): setattr(self.inputs, field_name, field_value) os.chdir(cwd) + self.hooks.post_run(self, result) # Check for any changes to the input hashes that have occurred during the execution # of the task - - self.hooks.post_run(self, result) self._check_for_hash_changes() return result diff --git a/pydra/engine/task.py b/pydra/engine/task.py index cfe12af3bb..cb55d9e390 100644 --- a/pydra/engine/task.py +++ b/pydra/engine/task.py @@ -337,8 +337,8 @@ def command_args(self, root=None): raise NotImplementedError modified_inputs = template_update(self.inputs, output_dir=self.output_dir) - if modified_inputs is not None: - self.inputs = attr.evolve(self.inputs, **modified_inputs) + for field_name, field_value in modified_inputs.items(): + setattr(self.inputs, field_name, field_value) pos_args = [] # list for (position, command arg) self._positions_provided = [] From e2d009b444a24924f9a4b57768550c12b4684863 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:47 +1100 Subject: [PATCH 06/29] updated explicit hashes in unittests to match new values produced by altered spec.hash() function --- pydra/engine/tests/test_specs.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pydra/engine/tests/test_specs.py b/pydra/engine/tests/test_specs.py index 0370d92a5b..f7edd0f57d 100644 --- a/pydra/engine/tests/test_specs.py +++ b/pydra/engine/tests/test_specs.py @@ -25,7 +25,7 @@ def test_basespec(): spec = BaseSpec() - assert spec.hash == "06fe829a5dca34cc5f0710b454c24808" + assert spec.hash == "0b1d98df22ecd1733562711c205abca2" def test_runtime(): @@ -132,14 +132,14 @@ def test_input_file_hash_1(tmp_path): fields = [("in_file", ty.Any)] input_spec = SpecInfo(name="Inputs", fields=fields, bases=(BaseSpec,)) inputs = make_klass(input_spec) - assert inputs(in_file=outfile).hash == "02e248cb7ca3628af6b97aa27723b623" + assert inputs(in_file=outfile).hash == "9a106eb2830850834d9b5bf098d5fa85" with open(outfile, "w") as fp: fp.write("test") fields = [("in_file", File)] input_spec = SpecInfo(name="Inputs", fields=fields, bases=(BaseSpec,)) inputs = make_klass(input_spec) - assert inputs(in_file=outfile).hash == "c1156e9576b0266f23c30771bf59482a" + assert inputs(in_file=outfile).hash == "0e9306e5cae1de1b4dff1f27cca03bce" def test_input_file_hash_2(tmp_path): @@ -153,7 +153,7 @@ def test_input_file_hash_2(tmp_path): # checking specific hash value hash1 = inputs(in_file=file).hash - assert hash1 == "73745b60b45052d6020918fce5801581" + assert hash1 == "17e4e2b4d8ce8f36bf3fd65804958dbb" # checking if different name doesn't affect the hash file_diffname = tmp_path / "in_file_2.txt" @@ -183,7 +183,7 @@ def test_input_file_hash_2a(tmp_path): # checking specific hash value hash1 = inputs(in_file=file).hash - assert hash1 == "73745b60b45052d6020918fce5801581" + assert hash1 == "17e4e2b4d8ce8f36bf3fd65804958dbb" # checking if different name doesn't affect the hash file_diffname = tmp_path / "in_file_2.txt" @@ -201,7 +201,7 @@ def test_input_file_hash_2a(tmp_path): # checking if string is also accepted hash4 = inputs(in_file=str(file)).hash - assert hash4 == "aaee75d79f1bc492619fabfa68cb3c69" + assert hash4 == "aee7c7ae25509fb4c92a081d58d17a67" def test_input_file_hash_3(tmp_path): @@ -274,7 +274,7 @@ def test_input_file_hash_4(tmp_path): # checking specific hash value hash1 = inputs(in_file=[[file, 3]]).hash - assert hash1 == "b8d8255b923b7bb8817da16e6ec57fae" + assert hash1 == "11b7e9c90bc8d9dc5ccfc8d4526ba091" # the same file, but int field changes hash1a = inputs(in_file=[[file, 5]]).hash @@ -310,7 +310,7 @@ def test_input_file_hash_5(tmp_path): # checking specific hash value hash1 = inputs(in_file=[{"file": file, "int": 3}]).hash - assert hash1 == "dedaf3899cce99d19238c2efb1b19a89" + assert hash1 == "5fd53b79e55bbf62a4bb3027eb753a2c" # the same file, but int field changes hash1a = inputs(in_file=[{"file": file, "int": 5}]).hash From 48d68f527b5d9f75622c09f737e46099c7570390 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:47 +1100 Subject: [PATCH 07/29] added a sleep to alter_x to make sure it runs after identity --- pydra/engine/tests/test_submitter.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index bb7c9a2b62..efc80bc3e3 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -599,6 +599,7 @@ def output_dir_as_output(out_dir: Path) -> Directory: wf() +@pytest.mark.flaky(reruns=3) # need for travis def test_hash_changes_in_workflow_graph(tmpdir): class X: """Dummy class with unstable hash (i.e. which isn't altered in a node in which @@ -621,6 +622,7 @@ def identity(x) -> ty.Tuple[X, int]: @mark.task def alter_x(y): + time.sleep(5) X.x = 2 return y From 1a9eed0f5c94de944f6278c2a89311a5f4658a12 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:47 +1100 Subject: [PATCH 08/29] fix up --- pydra/engine/tests/test_submitter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index efc80bc3e3..a2457a09d1 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -622,7 +622,7 @@ def identity(x) -> ty.Tuple[X, int]: @mark.task def alter_x(y): - time.sleep(5) + time.sleep(2) X.x = 2 return y From d76bb3da1a7834adf19e6b98c6c97f57d3d68783 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:47 +1100 Subject: [PATCH 09/29] [skip ci] updates comment --- pydra/engine/tests/test_submitter.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index a2457a09d1..cadeec8fe4 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -599,7 +599,9 @@ def output_dir_as_output(out_dir: Path) -> Directory: wf() -@pytest.mark.flaky(reruns=3) # need for travis +@pytest.mark.flaky( + reruns=2 +) # chance of race-condition where alter_x completes before identity def test_hash_changes_in_workflow_graph(tmpdir): class X: """Dummy class with unstable hash (i.e. which isn't altered in a node in which From 6e1f03007bf5f0269c5fabb92027631fcc98b7bc Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:47 +1100 Subject: [PATCH 10/29] upped sleep --- pydra/engine/tests/test_submitter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index cadeec8fe4..e309c429f6 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -624,7 +624,7 @@ def identity(x) -> ty.Tuple[X, int]: @mark.task def alter_x(y): - time.sleep(2) + time.sleep(10) X.x = 2 return y From 6717d8523331e59956f9b66d05c8fc858ecd41ec Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:47 +1100 Subject: [PATCH 11/29] added random in to ensure that alter_x finishes after identity --- pydra/engine/tests/test_submitter.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index e309c429f6..8b58fcfb97 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -3,6 +3,7 @@ import subprocess as sp import time import typing as ty +from random import randint import pytest from fileformats.generic import Directory from .utils import ( @@ -599,9 +600,9 @@ def output_dir_as_output(out_dir: Path) -> Directory: wf() -@pytest.mark.flaky( - reruns=2 -) # chance of race-condition where alter_x completes before identity +# @pytest.mark.flaky( +# reruns=2 +# ) # chance of race-condition where alter_x completes before identity def test_hash_changes_in_workflow_graph(tmpdir): class X: """Dummy class with unstable hash (i.e. which isn't altered in a node in which @@ -620,11 +621,11 @@ def __bytes_repr__(self, cache): @mark.task @mark.annotate({"return": {"x": X, "y": int}}) def identity(x) -> ty.Tuple[X, int]: - return x, 3 + return x, randint(0, 10) @mark.task def alter_x(y): - time.sleep(10) + # time.sleep(10) X.x = 2 return y From 1cd94912afe089d2bc11a40411c4285e0eea1255 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:47 +1100 Subject: [PATCH 12/29] added more reruns to test_hash_changes_in_workflow_graph --- pydra/engine/tests/test_submitter.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index 8b58fcfb97..3bf47756f1 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -600,9 +600,9 @@ def output_dir_as_output(out_dir: Path) -> Directory: wf() -# @pytest.mark.flaky( -# reruns=2 -# ) # chance of race-condition where alter_x completes before identity +@pytest.mark.flaky( + reruns=5 +) # chance of race-condition where alter_x completes before identity def test_hash_changes_in_workflow_graph(tmpdir): class X: """Dummy class with unstable hash (i.e. which isn't altered in a node in which From 7d6849b2d51ac017de27d334226bafd25042a1ec Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:47 +1100 Subject: [PATCH 13/29] fixed bugs in graph hash error test --- pydra/engine/submitter.py | 21 ++++++++----------- pydra/engine/tests/test_submitter.py | 30 +++++++++++++--------------- 2 files changed, 22 insertions(+), 29 deletions(-) diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index af574a3f50..94533fad8b 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -171,34 +171,29 @@ async def expand_workflow(self, wf, rerun=False): ii += 1 # don't block the event loop! await asyncio.sleep(1) - if ii > 60: # QUESTION: why is this 60? + if ii > 60: msg = ( f"Graph of '{wf}' workflow is not empty, but not able to get " "more tasks - something has gone wrong when retrieving the " "results predecessors:\n\n" ) # Get blocked tasks and the predecessors they are waiting on - outstanding = [ - ( - t, - [ - p - for p in graph_copy.predecessors[t.name] - if not p.done - ], - ) + outstanding = { + t: [ + p for p in graph_copy.predecessors[t.name] if not p.done + ] for t in graph_copy.sorted_nodes - ] + } graph_checksums = dict(wf.inputs._graph_checksums) - for task, waiting_on in outstanding: + for task, waiting_on in outstanding.items(): if not waiting_on: continue msg += f"- '{task.name}' node blocked due to\n" for pred in waiting_on: if pred.checksum != graph_checksums[pred.name]: msg += f" - hash changes in '{pred.name}' node inputs\n" - else: + elif pred not in outstanding: msg += ( f" - undiagnosed issues in '{pred.name}' node " "(potentially related to the file-system)" diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index 3bf47756f1..9002e13ba0 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -2,6 +2,7 @@ import re import subprocess as sp import time +import attrs import typing as ty from random import randint import pytest @@ -600,33 +601,30 @@ def output_dir_as_output(out_dir: Path) -> Directory: wf() -@pytest.mark.flaky( - reruns=5 -) # chance of race-condition where alter_x completes before identity def test_hash_changes_in_workflow_graph(tmpdir): class X: """Dummy class with unstable hash (i.e. which isn't altered in a node in which it is an input)""" - # class attribute to change - x = 1 + value = 1 def __bytes_repr__(self, cache): - """Bytes representation from class attribute, which will be changed be 'alter_x" node. + """Bytes representation from class attribute, which will be changed be + 'alter_x" node. - NB: this is a very silly bytes_repr implementation just to trigger the exception, - hopefully cases like this will be very rare.""" - yield bytes(self.x) + NB: this is a contrived example where the bytes_repr implementation returns + a bytes representation of a class attribute in order to trigger the exception, + hopefully cases like this will be very rare""" + yield bytes(self.value) @mark.task @mark.annotate({"return": {"x": X, "y": int}}) - def identity(x) -> ty.Tuple[X, int]: - return x, randint(0, 10) + def identity(x: X) -> ty.Tuple[X, int]: + return x, 99 @mark.task def alter_x(y): - # time.sleep(10) - X.x = 2 + X.value = 2 return y @mark.task @@ -635,7 +633,7 @@ def to_tuple(x, y): wf = Workflow(name="wf_with_blocked_tasks", input_spec=["x", "y"]) wf.add(identity(name="taska", x=wf.lzin.x)) - wf.add(alter_x(name="taskb", x=wf.taska.lzout.y)) + wf.add(alter_x(name="taskb", y=wf.taska.lzout.y)) wf.add(to_tuple(name="taskc", x=wf.taska.lzout.x, y=wf.taskb.lzout.out)) wf.set_output([("out", wf.taskc.lzout.out)]) @@ -644,7 +642,7 @@ def to_tuple(x, y): wf.cache_dir = tmpdir with pytest.raises( - Exception, match="Graph of 'wf_with_blocked_tasks' workflow is not empty" + RuntimeError, match="Graph of 'wf_with_blocked_tasks' workflow is not empty" ): - with Submitter("serial") as sub: + with Submitter("cf") as sub: result = sub(wf) From df9ecc6222e7cd82bbc95c2692a0c7eb0c424e06 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:47 +1100 Subject: [PATCH 14/29] added logging to assist in tracking down hashing issues --- pydra/engine/core.py | 7 +++++++ pydra/engine/submitter.py | 1 + 2 files changed, 8 insertions(+) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 67d3c6f566..245923ee52 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -915,6 +915,13 @@ def _check_for_hash_changes(self): f"Input field hashes have changed during the execution of the " f"'{self.name}' {type(self).__name__}.\n{details}" ) + logger.debug( + "Input values and hashes for '%s' %s node:\n%s\n%s", + self.name, + type(self).__name__, + self.inputs, + self.inputs._hashes, + ) SUPPORTED_COPY_MODES = FileSet.CopyMode.any DEFAULT_COPY_COLLATION = FileSet.CopyCollation.any diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index 94533fad8b..82121c859c 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -199,6 +199,7 @@ async def expand_workflow(self, wf, rerun=False): "(potentially related to the file-system)" ) msg += "\n" + msg += "Set loglevel to 'debug' in order to track hash changes" raise RuntimeError(msg) for task in tasks: # grab inputs if needed From 930dfc1d019999668e68e7468a024ce59e4acec2 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:48 +1100 Subject: [PATCH 15/29] [skip ci] touched up error message --- pydra/engine/core.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 245923ee52..181b794aa9 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -896,12 +896,13 @@ def _check_for_hash_changes(self): field = getattr(attr.fields(type(self.inputs)), changed) if issubclass(field.type, FileSet): details += ( - f"- '{changed}' field is of type {field.type}. If it is intended to " - "contain output data then the type of the field it should be changed to " - "`pathlib.Path`. Otherwise, if it is an input field that gets " - "altered by the task, the 'copyfile' flag should be set to 'copy' " - "in the task interface metadata to ensure a copy of " - "the files/directories are created before the task is run\n" + f"- '{changed}' field is of file-type {field.type}. If it " + "is intended to contain output data then the type of the field in " + "the interface class should be changed to `pathlib.Path`. Otherwise, " + "if the field is intended to be an input field but it gets altered by " + "the task in some way, then the 'copyfile' flag should be set to " + "'copy' in the field metadata of the task interface class so copies of " + "the files/directories in it are passed to the task instead\n" ) else: details += ( From 6128951f4335a6310794819740866ec8adeeb836 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:48 +1100 Subject: [PATCH 16/29] added docstring to bytes_repr to retrigger checks --- pydra/utils/hash.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pydra/utils/hash.py b/pydra/utils/hash.py index 0c5afac9e9..71c0f731fe 100644 --- a/pydra/utils/hash.py +++ b/pydra/utils/hash.py @@ -107,6 +107,17 @@ def __bytes_repr__(self, cache: Cache) -> Iterator[bytes]: @singledispatch def bytes_repr(obj: object, cache: Cache) -> Iterator[bytes]: + """Default implementation of hashing for generic objects. Single dispatch is used + to provide hooks for class-specific implementations + + Parameters + ---------- + obj: object + the object to hash + cache : Cache + a dictionary object used to store a cache of previously cached objects to + handle circular object references + """ cls = obj.__class__ yield f"{cls.__module__}.{cls.__name__}:{{".encode() dct: Dict[str, ty.Any] From 4ba8da08ed9bb59931f5442972b9d77102033e7a Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:48 +1100 Subject: [PATCH 17/29] updated bytes_repr doc string --- pydra/utils/hash.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pydra/utils/hash.py b/pydra/utils/hash.py index 71c0f731fe..5d1bd4b273 100644 --- a/pydra/utils/hash.py +++ b/pydra/utils/hash.py @@ -117,6 +117,11 @@ def bytes_repr(obj: object, cache: Cache) -> Iterator[bytes]: cache : Cache a dictionary object used to store a cache of previously cached objects to handle circular object references + + Yields + ------- + bytes + unique representation of the object in a series of bytes """ cls = obj.__class__ yield f"{cls.__module__}.{cls.__name__}:{{".encode() From 15a775d69683db71b4f04c409088b63fb41c317d Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:48 +1100 Subject: [PATCH 18/29] removed check for result None in TaskBase._run(). Shouldn't occur now that hash changes are being detected --- pydra/engine/core.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 181b794aa9..ac487e83ae 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -1301,8 +1301,6 @@ async def _run(self, submitter=None, rerun=False, **kwargs): # Check for any changes to the input hashes that have occurred during the execution # of the task self._check_for_hash_changes() - if result is None: - raise Exception("This should never happen, please open new issue") return result async def _run_task(self, submitter, rerun=False): From 1263f5a88faba219a89692efca963d50d4a8c5b9 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:48 +1100 Subject: [PATCH 19/29] added test to hit unstable hash check branch --- pydra/engine/tests/test_submitter.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index 9002e13ba0..cba1970bc3 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -1,6 +1,7 @@ from dateutil import parser import re import subprocess as sp +import struct import time import attrs import typing as ty @@ -575,7 +576,7 @@ def test_sge_no_limit_maxthreads(tmpdir): assert job_1_endtime > job_2_starttime -def test_hash_changes_in_task_inputs(tmp_path): +def test_hash_changes_in_task_inputs_file(tmp_path): @mark.task def output_dir_as_input(out_dir: Directory) -> Directory: (out_dir.fspath / "new-file.txt").touch() @@ -586,6 +587,25 @@ def output_dir_as_input(out_dir: Directory) -> Directory: task() +def test_hash_changes_in_task_inputs_unstable(tmp_path): + @attrs.define + class Unstable: + value: int # type: ignore + + def __bytes_repr__(self, cache) -> ty.Iterator[bytes]: + """Bytes repr based on time-stamp -> inherently unstable""" + yield struct.pack("!I", int(time.time())) + + @mark.task + def unstable_input(unstable: Unstable) -> int: + time.sleep(1) # Ensure the timestamp changes during the task run + return unstable.value + + task = unstable_input(unstable=Unstable(1)) + with pytest.raises(RuntimeError, match="Input field hashes have changed"): + task() + + def test_hash_changes_in_workflow_inputs(tmp_path): @mark.task def output_dir_as_output(out_dir: Path) -> Directory: From b54166e756f0990057eb8de69ad427d5b70bb790 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:48 +1100 Subject: [PATCH 20/29] expanded hash graph error message to include final values/hashes of problematic nodes --- pydra/engine/submitter.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index 82121c859c..13a578abc3 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -192,7 +192,11 @@ async def expand_workflow(self, wf, rerun=False): msg += f"- '{task.name}' node blocked due to\n" for pred in waiting_on: if pred.checksum != graph_checksums[pred.name]: - msg += f" - hash changes in '{pred.name}' node inputs\n" + msg += ( + f" - hash changes in '{pred.name}' node inputs. " + f"Current values and hashes: {pred.inputs}, " + f"{pred.inputs._hashes}\n" + ) elif pred not in outstanding: msg += ( f" - undiagnosed issues in '{pred.name}' node " From 048c0a15e816ea3e4f81221f6f4ca3d57e9747ec Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:48 +1100 Subject: [PATCH 21/29] More touch ups to error messages --- pydra/engine/core.py | 2 +- pydra/engine/submitter.py | 19 ++++++++++++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index ac487e83ae..45d838b7c0 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -909,7 +909,7 @@ def _check_for_hash_changes(self): f"- the {field.type} object passed to '{changed}' field appears to " f"have an unstable hash. The {field.type}.__bytes_repr__() method " "can be implemented to provide stable hashes for this type. " - "See pydra/utils.hash.py for examples.\n" + "See pydra/utils/hash.py for examples.\n" ) if hash_changes: raise RuntimeError( diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index 13a578abc3..46b5fbaaba 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -186,6 +186,7 @@ async def expand_workflow(self, wf, rerun=False): } graph_checksums = dict(wf.inputs._graph_checksums) + hashes_have_changed = False for task, waiting_on in outstanding.items(): if not waiting_on: continue @@ -197,13 +198,25 @@ async def expand_workflow(self, wf, rerun=False): f"Current values and hashes: {pred.inputs}, " f"{pred.inputs._hashes}\n" ) + hashes_have_changed = True elif pred not in outstanding: msg += ( - f" - undiagnosed issues in '{pred.name}' node " - "(potentially related to the file-system)" + f" - undiagnosed issues in '{pred.name}' node, " + "potentially related to file-system access issues " ) msg += "\n" - msg += "Set loglevel to 'debug' in order to track hash changes" + if hashes_have_changed: + msg += ( + "Set loglevel to 'debug' in order to track hash changes " + "throughout the execution of the workflow.\n\n " + "These issues may have been caused by `bytes_repr()` methods " + "that don't return stable hash values for specific object " + "types across multiple processes (see bytes_repr() " + '"singledispatch "function in pydra/utils/hash.py).' + "You may need to implement a specific `bytes_repr()` " + '"singledispatch overload"s or `__bytes_repr__()` ' + "dunder methods to handle one or more types in " + ) raise RuntimeError(msg) for task in tasks: # grab inputs if needed From c282d3e445d9839c872d690be87565220be1deb5 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:48 +1100 Subject: [PATCH 22/29] Update pydra/engine/tests/test_submitter.py Co-authored-by: Chris Markiewicz --- pydra/engine/tests/test_submitter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index cba1970bc3..16118d2bbe 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -593,8 +593,8 @@ class Unstable: value: int # type: ignore def __bytes_repr__(self, cache) -> ty.Iterator[bytes]: - """Bytes repr based on time-stamp -> inherently unstable""" - yield struct.pack("!I", int(time.time())) + """Random 128-bit bytestring""" + yield random.randbytes(16) @mark.task def unstable_input(unstable: Unstable) -> int: From 509afc7e41dd92c8935e70e79b5b033111b1866d Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:48 +1100 Subject: [PATCH 23/29] Update pydra/engine/tests/test_submitter.py Co-authored-by: Chris Markiewicz --- pydra/engine/tests/test_submitter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index 16118d2bbe..1d2a9e5afc 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -1,4 +1,5 @@ from dateutil import parser +import random import re import subprocess as sp import struct From 6e9106520bb9fa27ff14725fd1625d09736d6868 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:48 +1100 Subject: [PATCH 24/29] reworked hash change detection error messages --- pydra/engine/core.py | 34 ++++++++++++++++++++-------------- pydra/engine/submitter.py | 1 + pydra/utils/hash.py | 2 +- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 45d838b7c0..384a0ffd1a 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -821,8 +821,8 @@ def result(self, state_index=None, return_inputs=False): Returns ------- - result : - + result : Result + the result of the task """ # TODO: check if result is available in load_result and # return a future if not @@ -894,27 +894,33 @@ def _check_for_hash_changes(self): details = "" for changed in hash_changes: field = getattr(attr.fields(type(self.inputs)), changed) + val = getattr(self.inputs, changed) + field_type = type(val) if issubclass(field.type, FileSet): details += ( - f"- '{changed}' field is of file-type {field.type}. If it " - "is intended to contain output data then the type of the field in " - "the interface class should be changed to `pathlib.Path`. Otherwise, " - "if the field is intended to be an input field but it gets altered by " - "the task in some way, then the 'copyfile' flag should be set to " - "'copy' in the field metadata of the task interface class so copies of " - "the files/directories in it are passed to the task instead\n" + f"- {changed}: value passed to the {field.type} field is of type " + f"{field_type} ('{val}'). If it is intended to contain output data " + "then the type of the field in the interface class should be changed " + "to `pathlib.Path`. Otherwise, if the field is intended to be an " + "input field but it gets altered by the task in some way, then the " + "'copyfile' flag should be set to 'copy' in the field metadata of " + "the task interface class so copies of the files/directories in it " + "are passed to the task instead.\n" ) else: details += ( - f"- the {field.type} object passed to '{changed}' field appears to " - f"have an unstable hash. The {field.type}.__bytes_repr__() method " - "can be implemented to provide stable hashes for this type. " - "See pydra/utils/hash.py for examples.\n" + f"- {changed}: the {field_type} object passed to the {field.type}" + f"field appears to have an unstable hash. This could be due to " + "a stochastic/non-thread-safe attribute(s) of the object\n\n" + f"The {field.type}.__bytes_repr__() method can be implemented to " + "bespoke hashing methods based only on the stable attributes for " + f"the `{field_type.__module__}.{field_type.__name__}` type. " + f"See pydra/utils/hash.py for examples. Value: {val}\n" ) if hash_changes: raise RuntimeError( f"Input field hashes have changed during the execution of the " - f"'{self.name}' {type(self).__name__}.\n{details}" + f"'{self.name}' {type(self).__name__}.\n\n{details}" ) logger.debug( "Input values and hashes for '%s' %s node:\n%s\n%s", diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index 46b5fbaaba..9eea2a2c75 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -216,6 +216,7 @@ async def expand_workflow(self, wf, rerun=False): "You may need to implement a specific `bytes_repr()` " '"singledispatch overload"s or `__bytes_repr__()` ' "dunder methods to handle one or more types in " + "your interface inputs." ) raise RuntimeError(msg) for task in tasks: diff --git a/pydra/utils/hash.py b/pydra/utils/hash.py index 5d1bd4b273..583f680931 100644 --- a/pydra/utils/hash.py +++ b/pydra/utils/hash.py @@ -64,7 +64,7 @@ def hash_function(obj, cache=None): return hash_object(obj, cache=cache).hex() -def hash_object(obj: object, cache=None) -> Hash: +def hash_object(obj: object, cache: Cache | None = None) -> Hash: """Hash an object Constructs a byte string that uniquely identifies the object, From 989673525c1ee850c510358e4ec779741d761ecc Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:48 +1100 Subject: [PATCH 25/29] modified modified inputs so it returns the actual original inputs not a deepcopy as for some poorly behaved objects the hash of the deepcopy can be different --- pydra/engine/core.py | 38 +++++++++++++++++++++++++++++--------- pydra/engine/specs.py | 15 ++++++++++++++- pydra/utils/hash.py | 2 +- 3 files changed, 44 insertions(+), 11 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 384a0ffd1a..f289782afc 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -464,13 +464,25 @@ def __call__( return res def _modify_inputs(self): - """Update and preserve a Task's original inputs""" + """This method modifies the inputs of the task ahead of its execution: + - links/copies upstream files and directories into the destination tasks + working directory as required select state array values corresponding to + state index (it will try to leave them where they are unless specified or + they are on different file systems) + - resolve template values (e.g. output_file_template) + - deepcopy all inputs to guard against in-place changes during the task's + execution (they will be replaced after the task's execution with the + original inputs to ensure the tasks checksums are consistent) + """ orig_inputs = { - k: deepcopy(v) for k, v in attr.asdict(self.inputs, recurse=False).items() + k: v + for k, v in attr.asdict(self.inputs, recurse=False).items() + if not k.startswith("_") } map_copyfiles = {} - for fld in attr_fields(self.inputs): - value = getattr(self.inputs, fld.name) + input_fields = attr.fields(type(self.inputs)) + for name, value in orig_inputs.items(): + fld = getattr(input_fields, name) copy_mode, copy_collation = parse_copyfile( fld, default_collation=self.DEFAULT_COPY_COLLATION ) @@ -485,11 +497,21 @@ def _modify_inputs(self): supported_modes=self.SUPPORTED_COPY_MODES, ) if value is not copied_value: - map_copyfiles[fld.name] = copied_value + map_copyfiles[name] = copied_value modified_inputs = template_update( self.inputs, self.output_dir, map_copyfiles=map_copyfiles ) - for name, value in modified_inputs.items(): + assert all(m in orig_inputs for m in modified_inputs), ( + "Modified inputs contain fields not present in original inputs. " + "This is likely a bug." + ) + for name, orig_value in orig_inputs.items(): + try: + value = modified_inputs[name] + except KeyError: + # Ensure we pass a copy not the original just in case inner + # attributes are modified during execution + value = deepcopy(orig_value) setattr(self.inputs, name, value) return orig_inputs @@ -550,11 +572,9 @@ def _run(self, rerun=False, environment=None, **kwargs): save(output_dir, result=result, task=self) # removing the additional file with the checksum (self.cache_dir / f"{self.uid}_info.json").unlink() - # # function etc. shouldn't change anyway, so removing # Restore original values to inputs for field_name, field_value in orig_inputs.items(): - if not field_name.startswith("_"): - setattr(self.inputs, field_name, field_value) + setattr(self.inputs, field_name, field_value) os.chdir(cwd) self.hooks.post_run(self, result) # Check for any changes to the input hashes that have occurred during the execution diff --git a/pydra/engine/specs.py b/pydra/engine/specs.py index b2e0f1ceb2..02f14a78d1 100644 --- a/pydra/engine/specs.py +++ b/pydra/engine/specs.py @@ -985,8 +985,21 @@ def get_value( if result is None: raise RuntimeError( f"Could not find results of '{node.name}' node in a sub-directory " - f"named '{node.checksum}' in any of the cache locations:\n" + f"named '{node.checksum}' in any of the cache locations.\n" + "\n".join(str(p) for p in set(node.cache_locations)) + + f"\n\nThis is likely due to hash changes in '{self.name}' node inputs. " + f"Current values and hashes: {self.inputs}, " + f"{self.inputs._hashes}\n\n" + "Set loglevel to 'debug' in order to track hash changes " + "throughout the execution of the workflow.\n\n " + "These issues may have been caused by `bytes_repr()` methods " + "that don't return stable hash values for specific object " + "types across multiple processes (see bytes_repr() " + '"singledispatch "function in pydra/utils/hash.py).' + "You may need to implement a specific `bytes_repr()` " + '"singledispatch overload"s or `__bytes_repr__()` ' + "dunder methods to handle one or more types in " + "your interface inputs." ) _, split_depth = TypeParser.strip_splits(self.type) diff --git a/pydra/utils/hash.py b/pydra/utils/hash.py index 583f680931..74d3b3a44e 100644 --- a/pydra/utils/hash.py +++ b/pydra/utils/hash.py @@ -64,7 +64,7 @@ def hash_function(obj, cache=None): return hash_object(obj, cache=cache).hex() -def hash_object(obj: object, cache: Cache | None = None) -> Hash: +def hash_object(obj: object, cache: ty.Optional[Cache] = None) -> Hash: """Hash an object Constructs a byte string that uniquely identifies the object, From e660351a0538a520692232e37757004d25e914ec Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:48 +1100 Subject: [PATCH 26/29] changed deepcopy to copy in checksum_states (not sure whether this is ok) --- pydra/engine/core.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index f289782afc..4143b10175 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -9,7 +9,7 @@ import sys from pathlib import Path import typing as ty -from copy import deepcopy +from copy import deepcopy, copy from uuid import uuid4 from filelock import SoftFileLock import shutil @@ -286,10 +286,10 @@ def checksum_states(self, state_index=None): ] if state_index is not None: - inputs_copy = deepcopy(self.inputs) + inputs_copy = copy(self.inputs) for key, ind in self.state.inputs_ind[state_index].items(): val = self._extract_input_el( - inputs=inputs_copy, inp_nm=key.split(".")[1], ind=ind + inputs=self.inputs, inp_nm=key.split(".")[1], ind=ind ) setattr(inputs_copy, key.split(".")[1], val) # setting files_hash again in case it was cleaned by setting specific element From 8a5541c83c210d781e25f7e4d45658d4c1791ee5 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:03:48 +1100 Subject: [PATCH 27/29] changed test_hash_changes unittest so it works with python 3.8 --- pydra/engine/tests/test_submitter.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index 1d2a9e5afc..761c11d88b 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -1,8 +1,7 @@ from dateutil import parser -import random +import secrets import re import subprocess as sp -import struct import time import attrs import typing as ty @@ -595,11 +594,10 @@ class Unstable: def __bytes_repr__(self, cache) -> ty.Iterator[bytes]: """Random 128-bit bytestring""" - yield random.randbytes(16) + yield secrets.token_bytes(16) @mark.task def unstable_input(unstable: Unstable) -> int: - time.sleep(1) # Ensure the timestamp changes during the task run return unstable.value task = unstable_input(unstable=Unstable(1)) From 4e1d4a89f40857453ac8cb86f1061426a8ebcb3e Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:52:10 +1100 Subject: [PATCH 28/29] changed _graph_checksums to a dict instead of a list --- pydra/engine/core.py | 12 ++++++------ pydra/engine/submitter.py | 6 ++++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 4143b10175..a523c24525 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -281,9 +281,9 @@ def checksum_states(self, state_index=None): """ if is_workflow(self) and self.inputs._graph_checksums is attr.NOTHING: - self.inputs._graph_checksums = [ - (nd.name, nd.checksum) for nd in self.graph_sorted - ] + self.inputs._graph_checksums = { + nd.name: nd.checksum for nd in self.graph_sorted + } if state_index is not None: inputs_copy = copy(self.inputs) @@ -1142,9 +1142,9 @@ def checksum(self): """ # if checksum is called before run the _graph_checksums is not ready if is_workflow(self) and self.inputs._graph_checksums is attr.NOTHING: - self.inputs._graph_checksums = [ - (nd.name, nd.checksum) for nd in self.graph_sorted - ] + self.inputs._graph_checksums = { + nd.name: nd.checksum for nd in self.graph_sorted + } input_hash = self.inputs.hash if not self.state: diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index 9eea2a2c75..6effed2538 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -184,7 +184,6 @@ async def expand_workflow(self, wf, rerun=False): ] for t in graph_copy.sorted_nodes } - graph_checksums = dict(wf.inputs._graph_checksums) hashes_have_changed = False for task, waiting_on in outstanding.items(): @@ -192,7 +191,10 @@ async def expand_workflow(self, wf, rerun=False): continue msg += f"- '{task.name}' node blocked due to\n" for pred in waiting_on: - if pred.checksum != graph_checksums[pred.name]: + if ( + pred.checksum + != wf.inputs._graph_checksums[pred.name] + ): msg += ( f" - hash changes in '{pred.name}' node inputs. " f"Current values and hashes: {pred.inputs}, " From ff281aa74a8bce1f73649f0962c040401d74da9f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 2 Mar 2024 03:01:41 +0000 Subject: [PATCH 29/29] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pydra/engine/tests/test_submitter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index 7098f66880..298e7e74b4 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -669,6 +669,7 @@ def to_tuple(x, y): with Submitter("cf") as sub: result = sub(wf) + @mark.task def to_tuple(x, y): return (x, y)