diff --git a/config/config.yaml b/config/config.yaml index cdf980836..1b58a6685 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -16,9 +16,6 @@ resource_manager: plugins: mrta: - # Assuming we can have different types of tasks, specify from where to import the Task and TaskStatus classes - task_type: - class: 'mrs.structs.allocation' allocation_method: mrta-srea stp_solver: srea robot_proxies: true @@ -26,6 +23,8 @@ plugins: auctioneer: round_time: 15 # seconds alternative_timeslots: True + dispatcher: + re-allocate: True robot_proxy: bidder: diff --git a/mrs/db_interface.py b/mrs/db_interface.py deleted file mode 100644 index 45f293974..000000000 --- a/mrs/db_interface.py +++ /dev/null @@ -1,92 +0,0 @@ -from mrs.structs.timetable import Timetable - - -class DBInterface(object): - def __init__(self, store): - self.store = store - - def add_task(self, task): - """Saves the given task to a database as a new document under the "tasks" collection. - """ - collection = self.store.db['tasks'] - dict_task = task.to_dict() - self.store.unique_insert(collection, dict_task, 'id', task.id) - - def update_task(self, task): - """ Updates the given task under the "tasks" collection - """ - collection = self.store.db['tasks'] - task_dict = task.to_dict() - - found_dict = collection.find_one({'id': task_dict['id']}) - - if found_dict is None: - collection.insert(task_dict) - else: - collection.replace_one({'id': task.id}, task_dict) - - def get_tasks(self): - """ Returns a dictionary with the tasks in the "tasks" collection - - """ - collection = self.store.db['tasks'] - tasks_dict = dict() - for task in collection.find(): - tasks_dict[task['id']] = task - return tasks_dict - - def remove_task(self, task_id): - """ Removes task with task_id from the collection "tasks" - """ - collection = self.store.db['tasks'] - collection.delete_one({'id': task_id}) - - def update_task_status(self, task, status): - task.status.status = status - self.update_task(task) - - def get_task(self, task_id): - """Returns a task dictionary representing the task with the given id. - """ - collection = self.store.db['tasks'] - task_dict = collection.find_one({'id': task_id}) - return task_dict - - def add_timetable(self, timetable): - """ - Saves the given timetable under the "timetables" collection - """ - collection = self.store.db['timetables'] - robot_id = timetable.robot_id - timetable_dict = timetable.to_dict() - - self.store.unique_insert(collection, timetable_dict, 'robot_id', robot_id) - - def update_timetable(self, timetable): - """ Updates the given timetable under the "timetables" collection - """ - collection = self.store.db['timetables'] - timetable_dict = timetable.to_dict() - robot_id = timetable.robot_id - - found_dict = collection.find_one({'robot_id': robot_id}) - - if found_dict is None: - collection.insert(timetable_dict) - else: - collection.replace_one({'robot_id': robot_id}, timetable_dict) - - def get_timetable(self, robot_id, stp): - collection = self.store.db['timetables'] - - timetable_dict = collection.find_one({'robot_id': robot_id}) - - if timetable_dict is None: - timetable = Timetable(robot_id, stp) - else: - timetable = Timetable.from_dict(timetable_dict, stp) - return timetable - - def clean(self): - self.store.client.drop_database(self.store.db_name) - diff --git a/mrs/task_allocation/auctioneer.py b/mrs/task_allocation/auctioneer.py index 0ad324441..a1a50c21d 100644 --- a/mrs/task_allocation/auctioneer.py +++ b/mrs/task_allocation/auctioneer.py @@ -19,7 +19,7 @@ class Auctioneer(object): - def __init__(self, ccu_store, api, stp_solver, task_type, allocation_method, + def __init__(self, ccu_store, api, stp_solver, allocation_method, round_time=5, **kwargs): self.logger = logging.getLogger("mrs.auctioneer") diff --git a/mrs/task_execution/dispatcher.py b/mrs/task_execution/dispatcher.py index 3a5c3ebc7..10fc7e658 100644 --- a/mrs/task_execution/dispatcher.py +++ b/mrs/task_execution/dispatcher.py @@ -1,163 +1,37 @@ -import logging -import time -import uuid -from ropod.utils.timestamp import TimeStamp as ts -from stn.stp import STP -from importlib import import_module +import logging +from datetime import timedelta -from mrs.db_interface import DBInterface -from mrs.exceptions.task_allocation import NoSTPSolution -from mrs.exceptions.task_execution import InconsistentSchedule -from mrs.db.models.task import TaskStatus -from mrs.structs.timetable import Timetable from mrs.task_execution.scheduler import Scheduler +from stn.stp import STP class Dispatcher(object): - def __init__(self, robot_id, api, robot_store, task_type, - stp_solver, corrective_measure, freeze_window): + def __init__(self, ccu_store, api, stp_solver, freeze_window, **kwargs): + self.logger = logging.getLogger('mrs.dispatcher') - self.id = robot_id self.api = api - self.db_interface = DBInterface(robot_store) - - task_class_path = task_type.get('class', 'mrs.structs.task') - self.task_cls = getattr(import_module(task_class_path), 'Task') - self.stp = STP(stp_solver) - self.stp_solver = stp_solver - - self.corrective_measure = corrective_measure - self.freeze_window = freeze_window - - self.scheduler = Scheduler(robot_store, self.stp) - - timetable = self.db_interface.get_timetable(self.id, self.stp) - if timetable is None: - timetable = Timetable(self.stp, robot_id) - self.timetable = timetable - - def run(self): - self.timetable = self.db_interface.get_timetable(self.id, self.stp) - if self.timetable is not None: - task = self.get_earliest_task() - if task is not None: - self.check_earliest_task_status(task) - - def check_earliest_task_status(self, task): - if task.status.status == TaskStatus.ALLOCATED: - self.schedule_task(task) - - elif task.status.status == TaskStatus.COMPLETED: - if self.stp_solver == 'drea': - self.recompute_timetable(task) - self.scheduler.reset_schedule(self.timetable) - - elif task.status.status == TaskStatus.ONGOING and task.status.delayed: - self.apply_corrective_measure(task) - - elif task.status.status == TaskStatus.SCHEDULED and self.time_to_dispatch(): - self.dispatch(task) - - def apply_corrective_measure(self, task): - if self.corrective_measure == 're-schedule': - self.recompute_timetable(task) - - elif self.corrective_measure == 're-allocate': - self.scheduler.reset_schedule(self.timetable) - self.request_reallocation(task) - - else: - logging.debug("Not applying corrective measure") - - def get_earliest_task(self): - task_id = self.timetable.get_earliest_task_id() - if task_id: - task_dict = self.db_interface.get_task(task_id) - task = self.task_cls.from_dict(task_dict) - return task - - def schedule_task(self, task): - print("Scheduling task") - - navigation_start = self.timetable.dispatchable_graph.get_task_navigation_start_time(task.id) - current_time = ts.get_time_stamp() - - # Schedule the task freeze_window time before the navigation start and freeze it - # i.e., the task cannot longer change position in the dispatchable graph - if (navigation_start - current_time) <= self.freeze_window: - try: - self.scheduler.schedule_task(task, navigation_start, self.timetable) - except InconsistentSchedule as e: - logging.error("Task %s could not be scheduled.", e.task) - if self.corrective_measure == 're-allocate': - self.request_reallocation(task) - - self.timetable = self.db_interface.get_timetable(self.id, self.stp) - - def recompute_timetable(self, task): - try: - self.timetable.solve_stp() - - logging.debug("Dispatchable graph %s: ", self.timetable.dispatchable_graph) - logging.debug("Robustness Metric %s: ", self.timetable.risk_metric) - - except NoSTPSolution: - logging.exception("The stp solver could not solve the problem") - self.db_interface.update_task_status(task, TaskStatus.FAILED) - self.timetable.remove_task() - - def time_to_dispatch(self): - current_time = ts.get_time_stamp() - if current_time < self.scheduler.navigation_start_time: - return False - return True - - def dispatch(self, task): - current_time = ts.get_time_stamp() - print("Dispatching task at: ", current_time) - - logging.info("Dispatching task to robot %s", self.id) - - task_msg = dict() - task_msg['header'] = dict() - task_msg['payload'] = dict() - task_msg['header']['type'] = 'TASK' - task_msg['header']['metamodel'] = 'ropod-msg-schema.json' - task_msg['header']['msgId'] = str(uuid.uuid4()) - task_msg['header']['timestamp'] = int(round(time.time()) * 1000) - - task_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' - task_msg['payload']['task'] = task.to_dict() - - self.db_interface.update_task_status(task, TaskStatus.SHIPPED) - self.timetable.remove_task() - - self.api.publish(task_msg, groups=['ROPOD']) - - def request_reallocation(self, task): - self.db_interface.update_task_status(task, TaskStatus.UNALLOCATED) # ABORTED - task_msg = dict() - task_msg['header'] = dict() - task_msg['payload'] = dict() - task_msg['header']['type'] = 'TASK' - task_msg['header']['metamodel'] = 'ropod-msg-schema.json' - task_msg['header']['msgId'] = str(uuid.uuid4()) - task_msg['header']['timestamp'] = int(round(time.time()) * 1000) - - task_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' - task_msg['payload']['task'] = task.to_dict() - - self.api.publish(task_msg, groups=['TASK-ALLOCATION']) - + self.freeze_window = timedelta(minutes=freeze_window) + self.re_allocate = kwargs.get('re_allocate', False) + self.robot_ids = list() + self.scheduler = Scheduler(self.stp) + self.logger.debug("Dispatcher started") +class DispatcherBuilder: + def __init__(self): + self._instance = None + def __call__(self, **kwargs): + if not self._instance: + self._instance = Dispatcher(**kwargs) + return self._instance +configure = DispatcherBuilder() diff --git a/mrs/task_execution/schedule_monitor.py b/mrs/task_execution/schedule_monitor.py index f63c5c1c5..9a20f0557 100644 --- a/mrs/task_execution/schedule_monitor.py +++ b/mrs/task_execution/schedule_monitor.py @@ -1,4 +1,3 @@ -from mrs.db_interface import DBInterface from mrs.robot_base import RobotBase diff --git a/mrs/task_execution/scheduler.py b/mrs/task_execution/scheduler.py index 75d8fe0b2..29f427062 100644 --- a/mrs/task_execution/scheduler.py +++ b/mrs/task_execution/scheduler.py @@ -1,49 +1,9 @@ import logging -from mrs.db_interface import DBInterface -from mrs.exceptions.task_execution import InconsistentSchedule -from mrs.db.models.task import TaskStatus - class Scheduler(object): - - def __init__(self, robot_store, stp): - self.db_interface = DBInterface(robot_store) + def __init__(self, stp): self.stp = stp - self.navigation_start_time = -float('inf') # of scheduled task - - def schedule_task(self, task, navigation_start, timetable): - print("Dispatchable graph:", timetable.dispatchable_graph) - - try: - self.assign_timepoint(task, navigation_start, timetable) - except InconsistentSchedule as e: - logging.exception("Task %s could not be scheduled.", e.task) - raise InconsistentSchedule(e.task) - - def assign_timepoint(self, task, navigation_start, timetable): - - timetable.dispatchable_graph.assign_timepoint(navigation_start) - minimal_network = self.stp.propagate_constraints(timetable.dispatchable_graph) - - if minimal_network: - print("The assignment is consistent") - print("Dispatchable graph:", timetable.dispatchable_graph) - - timetable.get_schedule(task.id) - - print("Schedule: ", timetable.schedule) - - self.db_interface.update_timetable(timetable) - self.db_interface.update_task_status(task, TaskStatus.SCHEDULED) - self.navigation_start_time = navigation_start - - else: - raise InconsistentSchedule(task) - - def reallocate(self): - pass + self.n_tasks_sub_graphs = 2 + self.logger = logging.getLogger("mrs.scheduler") - def reset_schedule(self, timetable): - timetable.remove_task() - self.db_interface.update_timetable(timetable) diff --git a/mrs/task_execution/task_monitor.py b/mrs/task_execution/task_monitor.py deleted file mode 100644 index 0057f623f..000000000 --- a/mrs/task_execution/task_monitor.py +++ /dev/null @@ -1,42 +0,0 @@ -import logging - -from mrs.db_interface import DBInterface -from mrs.db.models.task import TaskStatus - - -class TaskMonitor(object): - def __init__(self, ccu_store, task_cls, api): - self.db_interface = DBInterface(ccu_store) - self.task_cls = task_cls - self.api = api - - def run(self): - pass - - def task_progress_cb(self, msg): - task_id = msg["payload"]["taskId"] - robot_id = msg["payload"]["robotId"] - task_status = msg["payload"]["status"]["taskStatus"] - - logging.debug("Robot %s received TASK-PROGRESS msg of task %s from %s ", task_id, robot_id) - - task_dict = self.db_interface.get_task(task_id) - task = self.task_cls.from_dict(task_dict) - - if task_status == TaskStatus.COMPLETED or \ - task_status == TaskStatus.CANCELED or \ - task_status == TaskStatus.FAILED or \ - task_status == TaskStatus.PREEMPTED: - self.archieve_task(task) - - elif task_status == TaskStatus.ONGOING: - self.check_execution_progress(task) - - def archieve_task(self, task): - # TODO: Update timetable - pass - - def check_execution_progress(self, task): - # TODO: check schedule consistency - pass - diff --git a/setup.py b/setup.py index 348d1e8a0..866d06ac8 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup(name='mrs', packages=['mrs', 'mrs.config', 'mrs.db.models', 'mrs.db.models.performance', 'mrs.db.queries', 'mrs.structs', 'mrs.utils', 'mrs.exceptions', 'mrs.task_allocation', 'mrs.task_execution'], - version='0.1.0', + version='0.2.0', install_requires=[ 'numpy' ],