Skip to content

Commit

Permalink
method for dependency status checker.
Browse files Browse the repository at this point in the history
  • Loading branch information
atroiano-glue authored Nov 19, 2024
1 parent 295308d commit e2cae2b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 46 deletions.
95 changes: 51 additions & 44 deletions src/opentaskpy/taskhandlers/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,38 @@ def return_result(
"""
return super().return_result(status, message, exception) # type: ignore[no-any-return]

def check_dependencies(
self, order_id: int, batch_task: dict, check_only_failed: bool = False
) -> bool:
"""Check batch task dependencies.
Args:
order_id (int): Order id of task in batch
batch_task (dict): Batch task dictionary
check_only_failed (bool): Check for completed tasks if false (default), else failed if True
Returns:
bool: True if all batch task dependencies have completed (or failed if check_failed), False otherwise .
"""
for dependency in batch_task["batch_task_spec"]["dependencies"]:
if (
self.task_order_tree[dependency]["status"] not in ["COMPLETED"]
) and not check_only_failed:
self.logger.log(
12,
(
"Skipping task"
f" {order_id} ({batch_task['task_id']}) as"
f" its dependency has not completed"
),
)
return False
if (
self.task_order_tree[dependency]["status"] in ["FAILED", "TIMED_OUT"]
) and check_only_failed:
return False
return True

def run(self, kill_event: threading.Event | None = None) -> bool: # noqa: C901
"""Run the batch.
Expand All @@ -211,7 +243,6 @@ def run(self, kill_event: threading.Event | None = None) -> bool: # noqa: C901

ex = None
while True:
dependency_failed = False
# Loop through every task in the tree
for order_id, batch_task in self.task_order_tree.items():
task = batch_task["task"]
Expand All @@ -228,42 +259,11 @@ def run(self, kill_event: threading.Event | None = None) -> bool: # noqa: C901
# Check if there are dependencies for this task that have not yet completed, if so then we skip it
# Check if there are failed dependencies, if so set flag and break out of loop
if "dependencies" in batch_task["batch_task_spec"]:
all_dependencies_complete = True
some_dependencies_failed = False
for dependency in batch_task["batch_task_spec"]["dependencies"]:
if self.task_order_tree[dependency]["status"] not in [
"COMPLETED",
"FAILED",
]:
self.logger.log(
12,
(
"Skipping task"
f" {order_id} ({batch_task['task_id']}) as"
f" dependency {dependency} has not completed"
),
)
all_dependencies_complete = False
continue
if (
self.task_order_tree[dependency]["status"] in ["FAILED"]
and not self.task_order_tree[dependency]["continue_on_fail"]
):
some_dependencies_failed = True

if some_dependencies_failed:
self.logger.error(
f"Task {order_id} ({batch_task['task_id']}) will not run due to a failed dependency"
)
dependency_failed = True
break

all_dependencies_complete = self.check_dependencies(
order_id, batch_task
)
if not all_dependencies_complete:
continue
self.logger.info(
"All dependencies for task"
f" {order_id} ({batch_task['task_id']}) have completed"
)

# Check if the task has already been triggered
if batch_task["status"] == "NOT_STARTED":
Expand Down Expand Up @@ -365,20 +365,27 @@ def run(self, kill_event: threading.Event | None = None) -> bool: # noqa: C901
logged = True

batch_task["logged_status"] = logged

if dependency_failed:
self.logger.error("Breaking out of loop due to a failed dependency.")
break
batch_task["logged_status"] = logged
batch_task["logged_status"] = logged

# Check if there are any tasks that are still in RUNNING state, if not then we are done
remaining_tasks = [
running_tasks = [
task
for task in self.task_order_tree.values()
if task["status"] in ["RUNNING", "NOT_STARTED"]
if task["status"] in ["RUNNING"]
]
self.logger.debug(f"Remaining tasks: {remaining_tasks}")
if len(remaining_tasks) == 0:
self.logger.debug("No remaining tasks, breaking out of loop")
self.logger.debug(f"Running tasks: {running_tasks}")
if len(running_tasks) == 0:
failed_deps = [
task
for task in self.task_order_tree.values()
if task["status"] in ["NOT_STARTED"]
and not self.check_dependencies(0, task, True)
]
if len(failed_deps) > 0:
self.logger.info(
f"No remaining runnable tasks, tasks with failed dependencies: {failed_deps}"
)
break

# Sleep 5 seconds before checking again
Expand Down
8 changes: 6 additions & 2 deletions tests/test_taskhandler_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,14 @@
"tasks": [
{
"order_id": 1,
"task_id": "sleep-5",
},
{
"order_id": 2,
"task_id": "filewatch-5-error-sftp",
},
{"order_id": 2, "task_id": "filewatch-5-error-sftp", "dependencies": [1]},
{"order_id": 3, "task_id": "filewatch-5-error-sftp", "dependencies": [2]},
{"order_id": 3, "task_id": "filewatch-5-error-sftp", "dependencies": [1, 2]},
{"order_id": 4, "task_id": "filewatch-5-error-sftp", "dependencies": [3]},
],
}

Expand Down

0 comments on commit e2cae2b

Please sign in to comment.