Skip to content

Commit

Permalink
Merge pull request #23 from anenriquez/fix/dispatcher-builder
Browse files Browse the repository at this point in the history
Fix/dispatcher builder
  • Loading branch information
argenos authored Sep 26, 2019
2 parents d87a38e + 2119369 commit 469caa0
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 327 deletions.
5 changes: 2 additions & 3 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ resource_manager:

plugins:
mrta:
# Assuming we can have different types of tasks, specify from where to import the Task and TaskStatus classes
task_type:
class: 'mrs.structs.allocation'
allocation_method: mrta-srea
stp_solver: srea
robot_proxies: true
freeze_window: 3 # minutes
auctioneer:
round_time: 15 # seconds
alternative_timeslots: True
dispatcher:
re-allocate: True

robot_proxy:
bidder:
Expand Down
92 changes: 0 additions & 92 deletions mrs/db_interface.py

This file was deleted.

2 changes: 1 addition & 1 deletion mrs/task_allocation/auctioneer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

class Auctioneer(object):

def __init__(self, ccu_store, api, stp_solver, task_type, allocation_method,
def __init__(self, ccu_store, api, stp_solver, allocation_method,
round_time=5, **kwargs):

self.logger = logging.getLogger("mrs.auctioneer")
Expand Down
162 changes: 18 additions & 144 deletions mrs/task_execution/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,163 +1,37 @@
import logging
import time
import uuid

from ropod.utils.timestamp import TimeStamp as ts
from stn.stp import STP
from importlib import import_module
import logging
from datetime import timedelta

from mrs.db_interface import DBInterface
from mrs.exceptions.task_allocation import NoSTPSolution
from mrs.exceptions.task_execution import InconsistentSchedule
from mrs.db.models.task import TaskStatus
from mrs.structs.timetable import Timetable
from mrs.task_execution.scheduler import Scheduler
from stn.stp import STP


class Dispatcher(object):

def __init__(self, robot_id, api, robot_store, task_type,
stp_solver, corrective_measure, freeze_window):
def __init__(self, ccu_store, api, stp_solver, freeze_window, **kwargs):
self.logger = logging.getLogger('mrs.dispatcher')

self.id = robot_id
self.api = api
self.db_interface = DBInterface(robot_store)

task_class_path = task_type.get('class', 'mrs.structs.task')
self.task_cls = getattr(import_module(task_class_path), 'Task')

self.stp = STP(stp_solver)
self.stp_solver = stp_solver

self.corrective_measure = corrective_measure
self.freeze_window = freeze_window

self.scheduler = Scheduler(robot_store, self.stp)

timetable = self.db_interface.get_timetable(self.id, self.stp)
if timetable is None:
timetable = Timetable(self.stp, robot_id)
self.timetable = timetable

def run(self):
self.timetable = self.db_interface.get_timetable(self.id, self.stp)
if self.timetable is not None:
task = self.get_earliest_task()
if task is not None:
self.check_earliest_task_status(task)

def check_earliest_task_status(self, task):
if task.status.status == TaskStatus.ALLOCATED:
self.schedule_task(task)

elif task.status.status == TaskStatus.COMPLETED:
if self.stp_solver == 'drea':
self.recompute_timetable(task)
self.scheduler.reset_schedule(self.timetable)

elif task.status.status == TaskStatus.ONGOING and task.status.delayed:
self.apply_corrective_measure(task)

elif task.status.status == TaskStatus.SCHEDULED and self.time_to_dispatch():
self.dispatch(task)

def apply_corrective_measure(self, task):
if self.corrective_measure == 're-schedule':
self.recompute_timetable(task)

elif self.corrective_measure == 're-allocate':
self.scheduler.reset_schedule(self.timetable)
self.request_reallocation(task)

else:
logging.debug("Not applying corrective measure")

def get_earliest_task(self):
task_id = self.timetable.get_earliest_task_id()
if task_id:
task_dict = self.db_interface.get_task(task_id)
task = self.task_cls.from_dict(task_dict)
return task

def schedule_task(self, task):
print("Scheduling task")

navigation_start = self.timetable.dispatchable_graph.get_task_navigation_start_time(task.id)
current_time = ts.get_time_stamp()

# Schedule the task freeze_window time before the navigation start and freeze it
# i.e., the task cannot longer change position in the dispatchable graph
if (navigation_start - current_time) <= self.freeze_window:
try:
self.scheduler.schedule_task(task, navigation_start, self.timetable)
except InconsistentSchedule as e:
logging.error("Task %s could not be scheduled.", e.task)
if self.corrective_measure == 're-allocate':
self.request_reallocation(task)

self.timetable = self.db_interface.get_timetable(self.id, self.stp)

def recompute_timetable(self, task):
try:
self.timetable.solve_stp()

logging.debug("Dispatchable graph %s: ", self.timetable.dispatchable_graph)
logging.debug("Robustness Metric %s: ", self.timetable.risk_metric)

except NoSTPSolution:
logging.exception("The stp solver could not solve the problem")
self.db_interface.update_task_status(task, TaskStatus.FAILED)
self.timetable.remove_task()

def time_to_dispatch(self):
current_time = ts.get_time_stamp()
if current_time < self.scheduler.navigation_start_time:
return False
return True

def dispatch(self, task):
current_time = ts.get_time_stamp()
print("Dispatching task at: ", current_time)

logging.info("Dispatching task to robot %s", self.id)

task_msg = dict()
task_msg['header'] = dict()
task_msg['payload'] = dict()
task_msg['header']['type'] = 'TASK'
task_msg['header']['metamodel'] = 'ropod-msg-schema.json'
task_msg['header']['msgId'] = str(uuid.uuid4())
task_msg['header']['timestamp'] = int(round(time.time()) * 1000)

task_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json'
task_msg['payload']['task'] = task.to_dict()

self.db_interface.update_task_status(task, TaskStatus.SHIPPED)
self.timetable.remove_task()

self.api.publish(task_msg, groups=['ROPOD'])

def request_reallocation(self, task):
self.db_interface.update_task_status(task, TaskStatus.UNALLOCATED) # ABORTED
task_msg = dict()
task_msg['header'] = dict()
task_msg['payload'] = dict()
task_msg['header']['type'] = 'TASK'
task_msg['header']['metamodel'] = 'ropod-msg-schema.json'
task_msg['header']['msgId'] = str(uuid.uuid4())
task_msg['header']['timestamp'] = int(round(time.time()) * 1000)

task_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json'
task_msg['payload']['task'] = task.to_dict()

self.api.publish(task_msg, groups=['TASK-ALLOCATION'])

self.freeze_window = timedelta(minutes=freeze_window)
self.re_allocate = kwargs.get('re_allocate', False)
self.robot_ids = list()
self.scheduler = Scheduler(self.stp)

self.logger.debug("Dispatcher started")


class DispatcherBuilder:
def __init__(self):
self._instance = None

def __call__(self, **kwargs):
if not self._instance:
self._instance = Dispatcher(**kwargs)
return self._instance


configure = DispatcherBuilder()



1 change: 0 additions & 1 deletion mrs/task_execution/schedule_monitor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from mrs.db_interface import DBInterface
from mrs.robot_base import RobotBase


Expand Down
46 changes: 3 additions & 43 deletions mrs/task_execution/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,9 @@
import logging

from mrs.db_interface import DBInterface
from mrs.exceptions.task_execution import InconsistentSchedule
from mrs.db.models.task import TaskStatus


class Scheduler(object):

def __init__(self, robot_store, stp):
self.db_interface = DBInterface(robot_store)
def __init__(self, stp):
self.stp = stp
self.navigation_start_time = -float('inf') # of scheduled task

def schedule_task(self, task, navigation_start, timetable):
print("Dispatchable graph:", timetable.dispatchable_graph)

try:
self.assign_timepoint(task, navigation_start, timetable)
except InconsistentSchedule as e:
logging.exception("Task %s could not be scheduled.", e.task)
raise InconsistentSchedule(e.task)

def assign_timepoint(self, task, navigation_start, timetable):

timetable.dispatchable_graph.assign_timepoint(navigation_start)
minimal_network = self.stp.propagate_constraints(timetable.dispatchable_graph)

if minimal_network:
print("The assignment is consistent")
print("Dispatchable graph:", timetable.dispatchable_graph)

timetable.get_schedule(task.id)

print("Schedule: ", timetable.schedule)

self.db_interface.update_timetable(timetable)
self.db_interface.update_task_status(task, TaskStatus.SCHEDULED)
self.navigation_start_time = navigation_start

else:
raise InconsistentSchedule(task)

def reallocate(self):
pass
self.n_tasks_sub_graphs = 2
self.logger = logging.getLogger("mrs.scheduler")

def reset_schedule(self, timetable):
timetable.remove_task()
self.db_interface.update_timetable(timetable)
Loading

0 comments on commit 469caa0

Please sign in to comment.