From cb28eebea2ce7dacf5b0159c35e1542b2970d588 Mon Sep 17 00:00:00 2001 From: franklinselva Date: Thu, 28 Sep 2023 12:00:40 +0200 Subject: [PATCH] initial commit for parallel execution --- up_esb/execution/__init__.py | 5 +- up_esb/execution/executor.py | 51 +++------- up_esb/execution/parallel_executor.py | 68 ------------- up_esb/execution/task_manager.py | 140 ++++++++++++++++++++++++++ 4 files changed, 157 insertions(+), 107 deletions(-) delete mode 100644 up_esb/execution/parallel_executor.py create mode 100644 up_esb/execution/task_manager.py diff --git a/up_esb/execution/__init__.py b/up_esb/execution/__init__.py index c7a9cef..f01d540 100644 --- a/up_esb/execution/__init__.py +++ b/up_esb/execution/__init__.py @@ -1,5 +1,6 @@ """Execution module for UP-ESB.""" -from .executor import ActionExecutor, ActionResult, TaskExecutor +from .executor import ActionExecutor, ActionResult +from .task_manager import TaskManager -__all__ = ["TaskExecutor", "ActionResult", "ActionExecutor"] +__all__ = ["ActionResult", "ActionExecutor", "TaskManager"] diff --git a/up_esb/execution/executor.py b/up_esb/execution/executor.py index 346d530..90fe49a 100644 --- a/up_esb/execution/executor.py +++ b/up_esb/execution/executor.py @@ -1,5 +1,18 @@ +# Copyright 2023 Selvakumar H S, LAAS-CNRS +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. """Executor for executing tasks.""" -from threading import Condition, Lock, Thread +from threading import Thread from typing import NamedTuple import networkx as nx @@ -20,42 +33,6 @@ ) -class TaskTracker: - """Track the amount of tasks that is being executed.""" - - def __init__(self): - self._lock = Lock() - self._condition = Condition(self._lock) - self._tasks = 0 - - def __enter__(self): - with self._lock: - self._tasks += 1 - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - with self._lock: - self._tasks -= 1 - self._condition.notify() - - def wait(self): - """Wait until all tasks are finished.""" - with self._lock: - while self._tasks > 0: - self._condition.wait() - - -class TaskExecutor: - """Base class for task executors.""" - - def __init__(self): - self._lock = Lock() - self._condition = Condition(self._lock) - self._task = None - self._thread = None - self._task_tracker = None - - class ActionExecutor: """Base class for action executors.""" diff --git a/up_esb/execution/parallel_executor.py b/up_esb/execution/parallel_executor.py deleted file mode 100644 index 01341a3..0000000 --- a/up_esb/execution/parallel_executor.py +++ /dev/null @@ -1,68 +0,0 @@ -# Copyright 2022 Selvakumar H S, LAAS-CNRS -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Simple executor for UP Bridge. Currently no await is supported.""" - -import asyncio - -import networkx as nx - - -# TODO: Move to dispatcher -class Executor: - """Add simple executor for UP Bridge. Currently no await is supported. - - - Handles sequential, time-triggered and parallel actions - """ - - def __init__(self): - pass - - def execute(self, graph: nx.DiGraph): - """Execute the graph.""" - result = None - for node in graph.nodes(data=True): - if node[1]["node_name"] in ["start", "end"]: - continue - successors = list(graph.successors(node[0])) - if len(successors) > 1: - # Assume all successors are parallel actions - - # Fetch complete node data - for s in successors: - successors[successors.index(s)] = (s, graph.nodes[s]) - self._execute_coroutines(successors) - else: - parameters = node[1]["parameters"] - executor = node[1]["context"][node[1]["action"]] - result = executor(**parameters) - - return result - - async def _execute_concurrent_action(self, action): - # TODO: Better implementation - # FIXME: Duplicate execution of actions - parameters = action[1]["parameters"] - executor = action[1]["context"][action[1]["action"]] - result = executor(**parameters) - - return result - - async def _run_concurrent_tasks(self, tasks): - # TODO: add await on parallel actions - await asyncio.gather(*tasks) - - def _execute_coroutines(self, actions): - tasks = [self._execute_concurrent_action(a) for a in actions] - asyncio.run(self._run_concurrent_tasks(tasks)) diff --git a/up_esb/execution/task_manager.py b/up_esb/execution/task_manager.py new file mode 100644 index 0000000..c26b139 --- /dev/null +++ b/up_esb/execution/task_manager.py @@ -0,0 +1,140 @@ +# Copyright 2023 Selvakumar H S, LAAS-CNRS +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Task Manager for the execution of tasks.""" +from threading import Condition, Lock +from typing import List, Union + + +class TaskTracker: + """Track the amount of tasks that is being executed.""" + + def __init__(self): + self._lock = Lock() + self._condition = Condition(self._lock) + self._tasks = 0 + + def __enter__(self): + with self._lock: + self._tasks += 1 + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + with self._lock: + self._tasks -= 1 + self._condition.notify() + + def wait(self): + """Wait until all tasks are finished.""" + with self._lock: + while self._tasks > 0: + self._condition.wait() + + +class TaskContainer: + """Task container for enveloping one or more actions.""" + + def __init__(self): + self._container_lock = Lock() + self._condition = Condition(self._container_lock) + self._task: Union[int, List[int]] = 0 + self._task_tracker = None + + def __enter__(self): + with self._container_lock: + self._task_tracker = TaskTracker() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + with self._container_lock: + self._task_tracker.wait() + self._task_tracker = None + self._condition.notify() + + def add_task(self, task: Union[int, List[int]]) -> bool: + """Add a single task to the execution queue.""" + if self._task is None: + self._task = task + return True + return False + + def execute_task(self): + """Execute the task.""" + if isinstance(self._task, int): + self._execute_task(self._task) + elif isinstance(self._task, list): + self._execute_tasks(self._task) + + def _execute_task(self, task_id: int): + """Execute a single task.""" + raise NotImplementedError + + def _execute_tasks(self, task_ids: List[int]): + """Execute multiple tasks.""" + # TODO: Implement parallel execution of tasks + raise NotImplementedError + + +class TaskManager: + """Task Manager for the execution of tasks.""" + + def __init__(self): + self._lock = Lock() + self._condition = Condition(self._lock) + self._task = None + self._thread = None + self._execution_queue = [] + + def add_task(self, task_id: int) -> bool: + """Add a single task to the execution queue.""" + return self._add_task(task_id) + + def _add_task(self, task_id: int) -> bool: + """Add a single task to the execution queue.""" + _container = TaskContainer() + _container.add_task(task_id) + if _container: + self._execution_queue.append(_container) + self._condition.notify() + return True + return False + + def add_tasks(self, task_ids: List[int]) -> bool: + """Add multiple tasks on the same level to the execution queue.""" + return self._add_tasks(task_ids) + + def _add_tasks(self, task_ids: List[int]) -> bool: + """Add multiple tasks to the execution queue.""" + _container = TaskContainer() + _container.add_task(task_ids) + if _container: + self._execution_queue.append(_container) + self._condition.notify() + return True + return False + + def remove_task(self, task_id: int) -> bool: + """Remove a task from the execution queue.""" + return self._remove_task(task_id) + + def _remove_task(self, task_id: int) -> bool: + """Remove a task from the execution queue.""" + raise NotImplementedError + + def remove_tasks(self, task_ids: List[int]) -> bool: + """Remove multiple tasks on the same level from the execution queue.""" + return self._remove_tasks(task_ids) + + def _remove_tasks(self, task_ids: List[int]) -> bool: + """Remove multiple tasks from the execution queue.""" + raise NotImplementedError