Skip to content

Commit

Permalink
[resotocore][fix] Define correct progress order (#1310)
Browse files Browse the repository at this point in the history
* allow custom ordering for serialization

* order progress by step name, progress and name
  • Loading branch information
aquamatthias authored Nov 25, 2022
1 parent fe6fcba commit 31be217
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 12 deletions.
18 changes: 17 additions & 1 deletion resotocore/resotocore/task/task_description.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ def __init__(
end = EndState(self)
states: List[StepState] = [start, *steps, end]
self.states: Dict[str, StepState] = {state.step.name: state for state in states}
self.step_name_index = {step.name: i for i, step in enumerate(descriptor.steps)}
self.machine = Machine(self, states, start, auto_transitions=False, queued=True)

for current_state, next_state in interleave(states):
Expand Down Expand Up @@ -676,9 +677,24 @@ def current_step_progress(self) -> Progress:
return self.progresses.sub_progress(self.current_step.name) or ProgressDone(self.current_step.name, 0, 1)

@property
def progress(self) -> Progress:
def progress(self) -> ProgressTree:
return self.progresses

def progress_json(self) -> Json:
max_idx = len(self.step_name_index)

def order_progress(p: Progress) -> Tuple[int, int, str]:
# if the progress is nested, take the first path else the name of the progress
step_name = p.path[0] if len(p.path) > 0 else p.name
# lookup the index of the step or fallback to the max index
idx = self.step_name_index.get(step_name)
index = idx if idx is not None else max_idx
progress = p.overall_progress().percentage
# order by step, progress (done first and in progress later) and name
return index, -progress, p.name

return self.progresses.to_json(key=order_progress)

@property
def current_state(self) -> StepState:
return self.machine.get_state(self.state) # type: ignore # pylint: disable=no-member
Expand Down
4 changes: 2 additions & 2 deletions resotocore/resotocore/task/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ async def handle_action_result(
await self.execute_task_commands(wi, commands, done)
# check if progress has changed in the meantime (by the running task itself)
if wi.progresses != progress:
msg = {"workflow": wi.descriptor.name, "task": wi.id, "message": wi.progress.to_json()}
msg = {"workflow": wi.descriptor.name, "task": wi.id, "message": wi.progress_json()}
await self.message_bus.emit_event(CoreMessage.ProgressMessage, msg)
else:
log.warning(
Expand Down Expand Up @@ -444,7 +444,7 @@ async def handle_action_progress(self, info: ActionProgress) -> None:
rt.handle_progress(info)
await self.message_bus.emit_event(
CoreMessage.ProgressMessage,
{"workflow": rt.descriptor.name, "task": rt.id, "message": rt.progress.to_json()},
{"workflow": rt.descriptor.name, "task": rt.id, "message": rt.progress_json()},
)

async def execute_task_commands(
Expand Down
39 changes: 38 additions & 1 deletion resotocore/tests/resotocore/task/task_description_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import timedelta
from typing import Any, List, Dict, Tuple
from typing import Any, List, Dict, Tuple, Callable

from deepdiff import DeepDiff
from frozendict import frozendict
Expand Down Expand Up @@ -236,3 +236,40 @@ def roundtrip(obj: Any) -> None:
js = to_js(obj)
again = from_js(js, type(obj))
assert DeepDiff(obj, again) == {}, f"Json: {js} serialized as {again}"


def test_task_progress() -> None:
actions = ["collect", "encode"]
wf = Workflow(TaskDescriptorId("test_workflow"), "name", [Step(a, PerformAction(a)) for a in actions], [])
sb = Subscriber(SubscriberId("test"), {s: Subscription(s) for s in actions})
rt, _ = RunningTask.empty(wf, lambda: {s: [sb] for s in actions})

def progress(step: str, fn: Callable[[int], ProgressDone]) -> None:
# use revers order to test that the correct order is used
for idx in range(3):
rt.handle_progress(ActionProgress(step, rt.id, step, sb.id, fn(idx), utc()))

# report progress start on the collect step
progress("collect", lambda idx: ProgressDone(str(idx), 0, 100, path=["foo", "bla"]))
assert [x["name"] for x in rt.progress_json()["parts"]] == ["0", "1", "2", "encode"]

# report index as progress on the collect step
progress("collect", lambda idx: ProgressDone(str(idx), idx, 100, path=["foo", "bla"]))
assert [x["name"] for x in rt.progress_json()["parts"]] == ["2", "1", "0", "encode"]

# report progress done on the collect step
rt.handle_done(ActionDone("collect", rt.id, "collect", sb.id))
assert [x["name"] for x in rt.progress_json()["parts"]] == ["collect", "encode"]
assert rt.progress.overall_progress().percentage == 50

# report progress done on the encode step
progress("encode", lambda idx: ProgressDone(str(idx), 0, 100, path=["foo", "bla"]))
assert [x["name"] for x in rt.progress_json()["parts"]] == ["collect", "0", "1", "2"]

# report index as progress on the collect step
progress("collect", lambda idx: ProgressDone(str(idx), idx, 100, path=["foo", "bla"]))
assert [x["name"] for x in rt.progress_json()["parts"]] == ["collect", "2", "1", "0"]

rt.handle_done(ActionDone("encode", rt.id, "encode", sb.id))
assert [x["name"] for x in rt.progress_json()["parts"]] == ["collect", "encode"]
assert rt.progress.overall_progress().percentage == 100
13 changes: 5 additions & 8 deletions resotolib/resotolib/core/progress.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from abc import abstractmethod, ABC
from typing import List, Optional, Any, Dict
from typing import List, Optional, Any, Dict, Callable

from attr import define, field, evolve
from treelib import Tree, Node
Expand Down Expand Up @@ -53,7 +53,7 @@ def from_progresses(name: str, progresses: List[Progress]) -> ProgressTree:
updated.add_progress(p)
return updated

def to_json(self) -> Json:
def to_json(self, key: Optional[Callable[[Progress], Any]] = None) -> Json:
p = {"path": self.path} if self.path else {}
if isinstance(self, ProgressDone):
return {
Expand All @@ -64,12 +64,9 @@ def to_json(self) -> Json:
"total": self.total,
}
elif isinstance(self, ProgressTree):
return {
"kind": "tree",
"name": self.name,
**p,
"parts": [part.data.to_json() for part in self.sub_tree.all_nodes() if part.data is not None],
}
node_iter = (part.data for part in self.sub_tree.all_nodes_itr() if part.data is not None)
nodes: List[Progress] = sorted(node_iter, key=key) if key else list(node_iter) # type: ignore
return {"kind": "tree", "name": self.name, **p, "parts": [part.to_json() for part in nodes]}
else:
raise AttributeError("No handler to marshal progress")

Expand Down
16 changes: 16 additions & 0 deletions resotolib/test/core/progress_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Callable, Any, List

from pytest import fixture, raises

from resotolib.core.progress import ProgressDone, Progress, ProgressInfo, ProgressTree
Expand Down Expand Up @@ -72,3 +74,17 @@ def test_equality(progress: ProgressTree) -> None:
assert pgc == progress
pgc.add_progress(ProgressDone("region1", 2, 2, path=["account1"]))
assert pgc != progress


def test_order() -> None:
def order(key: Callable[[Progress], Any]) -> List[str]:
pt = ProgressTree("test")
pt.add_progress(ProgressDone("a", 10, 10))
pt.add_progress(ProgressDone("b", 8, 10))
pt.add_progress(ProgressDone("c", 3, 10))
pt.add_progress(ProgressDone("d", 1, 10))
js = pt.to_json(key=key)
return [x["name"] for x in js["parts"]]

assert order(lambda x: x.name) == ["a", "b", "c", "d"]
assert order(lambda x: x.overall_progress().percentage) == ["d", "c", "b", "a"]

0 comments on commit 31be217

Please sign in to comment.