Skip to content

Commit

Permalink
initial commit for parallel execution
Browse files Browse the repository at this point in the history
  • Loading branch information
franklinselva committed Sep 28, 2023
1 parent 8667ab2 commit cb28eeb
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 107 deletions.
5 changes: 3 additions & 2 deletions up_esb/execution/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Execution module for UP-ESB."""

from .executor import ActionExecutor, ActionResult, TaskExecutor
from .executor import ActionExecutor, ActionResult
from .task_manager import TaskManager

__all__ = ["TaskExecutor", "ActionResult", "ActionExecutor"]
__all__ = ["ActionResult", "ActionExecutor", "TaskManager"]
51 changes: 14 additions & 37 deletions up_esb/execution/executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# Copyright 2023 Selvakumar H S, LAAS-CNRS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Executor for executing tasks."""
from threading import Condition, Lock, Thread
from threading import Thread
from typing import NamedTuple

import networkx as nx
Expand All @@ -20,42 +33,6 @@
)


class TaskTracker:
"""Track the amount of tasks that is being executed."""

def __init__(self):
self._lock = Lock()
self._condition = Condition(self._lock)
self._tasks = 0

def __enter__(self):
with self._lock:
self._tasks += 1
return self

def __exit__(self, exc_type, exc_val, exc_tb):
with self._lock:
self._tasks -= 1
self._condition.notify()

def wait(self):
"""Wait until all tasks are finished."""
with self._lock:
while self._tasks > 0:
self._condition.wait()


class TaskExecutor:
"""Base class for task executors."""

def __init__(self):
self._lock = Lock()
self._condition = Condition(self._lock)
self._task = None
self._thread = None
self._task_tracker = None


class ActionExecutor:
"""Base class for action executors."""

Expand Down
68 changes: 0 additions & 68 deletions up_esb/execution/parallel_executor.py

This file was deleted.

140 changes: 140 additions & 0 deletions up_esb/execution/task_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Copyright 2023 Selvakumar H S, LAAS-CNRS
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Task Manager for the execution of tasks."""
from threading import Condition, Lock
from typing import List, Union


class TaskTracker:
"""Track the amount of tasks that is being executed."""

def __init__(self):
self._lock = Lock()
self._condition = Condition(self._lock)
self._tasks = 0

def __enter__(self):
with self._lock:
self._tasks += 1
return self

def __exit__(self, exc_type, exc_val, exc_tb):
with self._lock:
self._tasks -= 1
self._condition.notify()

def wait(self):
"""Wait until all tasks are finished."""
with self._lock:
while self._tasks > 0:
self._condition.wait()


class TaskContainer:
"""Task container for enveloping one or more actions."""

def __init__(self):
self._container_lock = Lock()
self._condition = Condition(self._container_lock)
self._task: Union[int, List[int]] = 0
self._task_tracker = None

def __enter__(self):
with self._container_lock:
self._task_tracker = TaskTracker()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
with self._container_lock:
self._task_tracker.wait()
self._task_tracker = None
self._condition.notify()

def add_task(self, task: Union[int, List[int]]) -> bool:
"""Add a single task to the execution queue."""
if self._task is None:
self._task = task
return True
return False

def execute_task(self):
"""Execute the task."""
if isinstance(self._task, int):
self._execute_task(self._task)
elif isinstance(self._task, list):
self._execute_tasks(self._task)

def _execute_task(self, task_id: int):
"""Execute a single task."""
raise NotImplementedError

def _execute_tasks(self, task_ids: List[int]):
"""Execute multiple tasks."""
# TODO: Implement parallel execution of tasks
raise NotImplementedError


class TaskManager:
"""Task Manager for the execution of tasks."""

def __init__(self):
self._lock = Lock()
self._condition = Condition(self._lock)
self._task = None
self._thread = None
self._execution_queue = []

def add_task(self, task_id: int) -> bool:
"""Add a single task to the execution queue."""
return self._add_task(task_id)

def _add_task(self, task_id: int) -> bool:
"""Add a single task to the execution queue."""
_container = TaskContainer()
_container.add_task(task_id)
if _container:
self._execution_queue.append(_container)
self._condition.notify()
return True
return False

def add_tasks(self, task_ids: List[int]) -> bool:
"""Add multiple tasks on the same level to the execution queue."""
return self._add_tasks(task_ids)

def _add_tasks(self, task_ids: List[int]) -> bool:
"""Add multiple tasks to the execution queue."""
_container = TaskContainer()
_container.add_task(task_ids)
if _container:
self._execution_queue.append(_container)
self._condition.notify()
return True
return False

def remove_task(self, task_id: int) -> bool:
"""Remove a task from the execution queue."""
return self._remove_task(task_id)

def _remove_task(self, task_id: int) -> bool:
"""Remove a task from the execution queue."""
raise NotImplementedError

def remove_tasks(self, task_ids: List[int]) -> bool:
"""Remove multiple tasks on the same level from the execution queue."""
return self._remove_tasks(task_ids)

def _remove_tasks(self, task_ids: List[int]) -> bool:
"""Remove multiple tasks from the execution queue."""
raise NotImplementedError

0 comments on commit cb28eeb

Please sign in to comment.