Skip to content

Commit

Permalink
Merge pull request #679 from materialsproject/flow-metadata
Browse files Browse the repository at this point in the history
Add `Flow.metadata` attribute used in `Flow.update_metadata` method
  • Loading branch information
janosh authored Nov 22, 2024
2 parents c374d24 + b6786ca commit b4b7dc9
Show file tree
Hide file tree
Showing 5 changed files with 460 additions and 26 deletions.
49 changes: 46 additions & 3 deletions src/jobflow/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ class Flow(MSONable):
automatically when a flow is included in the jobs array of another flow.
The object identified by one UUID of the list should be contained in objects
identified by its subsequent elements.
metadata
A dictionary of information that will get stored in the Flow collection.
metadata_updates
A list of updates for the metadata that will be applied to any dynamically
generated sub Flow/Job.
Raises
------
Expand Down Expand Up @@ -128,6 +133,8 @@ def __init__(
order: JobOrder = JobOrder.AUTO,
uuid: str = None,
hosts: list[str] = None,
metadata: dict[str, Any] = None,
metadata_updates: list[dict[str, Any]] = None,
):
from jobflow.core.job import Job

Expand All @@ -141,6 +148,8 @@ def __init__(
self.order = order
self.uuid = uuid
self.hosts = hosts or []
self.metadata = metadata or {}
self.metadata_updates = metadata_updates or []

self._jobs: tuple[Flow | Job, ...] = ()
self.add_jobs(jobs)
Expand Down Expand Up @@ -608,9 +617,10 @@ def update_metadata(
function_filter: Callable = None,
dict_mod: bool = False,
dynamic: bool = True,
callback_filter: Callable[[Flow | Job], bool] = lambda _: True,
):
"""
Update the metadata of all Jobs in the Flow.
Update the metadata of the Flow and/or its Jobs.
Note that updates will be applied to jobs in nested Flow.
Expand All @@ -630,6 +640,10 @@ def update_metadata(
dynamic
The updates will be propagated to Jobs/Flows dynamically generated at
runtime.
callback_filter
A function that takes a Flow or Job instance and returns True if updates
should be applied to that instance. Allows for custom filtering logic.
Applies recursively to nested Flows and Jobs so best be specific.
Examples
--------
Expand All @@ -646,16 +660,45 @@ def update_metadata(
The ``metadata`` of both jobs could be updated as follows:
>>> flow.update_metadata({"tag": "addition_job"})
Or using a callback filter to only update flows containing a specific maker:
>>> flow.update_metadata(
... {"material_id": 42},
... callback_filter=lambda flow: SomeMaker in map(type, flow)
... and flow.name == "flow name"
... )
"""
for job in self:
job.update_metadata(
from jobflow.utils.dict_mods import apply_mod

for job_or_flow in self:
job_or_flow.update_metadata(
update,
name_filter=name_filter,
function_filter=function_filter,
dict_mod=dict_mod,
dynamic=dynamic,
callback_filter=callback_filter,
)

if callback_filter(self) is False:
return

if dict_mod:
apply_mod(update, self.metadata)
else:
self.metadata.update(update)

if dynamic:
dict_input = {
"update": update,
"name_filter": name_filter,
"function_filter": function_filter,
"dict_mod": dict_mod,
"callback_filter": callback_filter,
}
self.metadata_updates.append(dict_input)

def update_config(
self,
config: jobflow.JobConfig | dict,
Expand Down
25 changes: 18 additions & 7 deletions src/jobflow/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ def __init__(
function_args = () if function_args is None else function_args
function_kwargs = {} if function_kwargs is None else function_kwargs
uuid = suid() if uuid is None else uuid
metadata = {} if metadata is None else metadata
config = JobConfig() if config is None else config

# make a deep copy of the function (means makers do not share the same instance)
Expand All @@ -354,7 +353,7 @@ def __init__(
self.uuid = uuid
self.index = index
self.name = name
self.metadata = metadata
self.metadata = metadata or {}
self.config = config
self.hosts = hosts or []
self.metadata_updates = metadata_updates or []
Expand Down Expand Up @@ -927,6 +926,7 @@ def update_metadata(
function_filter: Callable = None,
dict_mod: bool = False,
dynamic: bool = True,
callback_filter: Callable[[jobflow.Flow | Job], bool] = lambda _: True,
):
"""
Update the metadata of the job.
Expand All @@ -950,6 +950,9 @@ def update_metadata(
dynamic
The updates will be propagated to Jobs/Flows dynamically generated at
runtime.
callback_filter
A function that takes a Flow or Job instance and returns True if updates
should be applied to that instance. Allows for custom filtering logic.
Examples
--------
Expand All @@ -968,11 +971,16 @@ def update_metadata(
will not only set the `example` metadata to the `test_job`, but also to all the
new Jobs that will be generated at runtime by the ExampleMaker.
`update_metadata` can be called multiple times with different `name_filter` or
`function_filter` to control which Jobs will be updated.
`update_metadata` can be called multiple times with different filters to control
which Jobs will be updated. For example, using a callback filter:
>>> test_job.update_metadata(
... {"material_id": 42},
... callback_filter=lambda job: isinstance(job.maker, SomeMaker)
... )
At variance, if `dynamic` is set to `False` the `example` metadata will only be
added to the `test_job` and not to the generated Jobs.
At variance, if `dynamic` is set to `False` the metadata will only be
added to the filtered Jobs and not to any generated Jobs.
"""
from jobflow.utils.dict_mods import apply_mod

Expand All @@ -982,14 +990,14 @@ def update_metadata(
"name_filter": name_filter,
"function_filter": function_filter,
"dict_mod": dict_mod,
"callback_filter": callback_filter,
}
self.metadata_updates.append(dict_input)

# unwrap the functions in case the job is a decorated one
function_filter = getattr(function_filter, "__wrapped__", function_filter)
function = getattr(self.function, "__wrapped__", self.function)

# if function_filter is not None and function_filter != self.function:
if function_filter is not None and function_filter != function:
return

Expand All @@ -998,6 +1006,9 @@ def update_metadata(
):
return

if callback_filter(self) is False:
return

# if we get to here then we pass all the filters
if dict_mod:
apply_mod(update, self.metadata)
Expand Down
2 changes: 1 addition & 1 deletion src/jobflow/managers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def run_locally(
Raise an error if the flow was not executed successfully.
allow_external_references : bool
If False all the references to other outputs should be from other Jobs
of the Flow.
of the same Flow.
raise_immediately : bool
If True, raise an exception immediately if a job fails. If False, continue
running the flow and only raise an exception at the end if the flow did not
Expand Down
Loading

0 comments on commit b4b7dc9

Please sign in to comment.