-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add priority queue functionality to HTEX #3575
base: master
Are you sure you want to change the base?
Conversation
…hewc2003/parsl into prototype_sorted_task_queue
if self.enable_mpi_mode: | ||
validate_resource_spec(resource_specification, self.enable_mpi_mode) | ||
else: | ||
if resource_specification and isinstance(resource_specification, dict): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this all belongs in validate_resource_spec()
try implementing #3519 as a separate PR first to get the structure in place in that function for "mpi vs non-mpi" resource validation (with, I guess, no resource spec allowed for non-mpi mode?) and you can get a parsl issue fixed as a nice side effect
@@ -131,7 +133,8 @@ def __init__(self, | |||
self.hub_address = hub_address | |||
self.hub_zmq_port = hub_zmq_port | |||
|
|||
self.pending_task_queue: queue.Queue[Any] = queue.Queue(maxsize=10 ** 6) | |||
self.priority_pending_task_queue: SortedList[Any] = SortedList(key=lambda msg: -msg['resource_spec']['priority']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my gut says to avoid two queues and have a single queue. tasks without any priority effectively receive an infinitely-weak priority in your implementation, I think, and you can implement that same behaviour with a single priority queue?
resource_spec = msg.get('resource_spec', {}) | ||
if 'priority' in resource_spec: | ||
priority = resource_spec['priority'] | ||
if priority < self.queue_threshold: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm unclear what semantics you're trying to express with queue_threshold
-- I'm going to give you a low priority but under a particular priority, parsl should round it to (effectively) +infinity?
@@ -253,7 +276,7 @@ def _command_server(self) -> NoReturn: | |||
command_req = self.command_channel.recv_pyobj() | |||
logger.debug("Received command request: {}".format(command_req)) | |||
if command_req == "OUTSTANDING_C": | |||
outstanding = self.pending_task_queue.qsize() | |||
outstanding = self.priority_pending_task_queue.__len__() + self.general_pending_task_queue.qsize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use len(priority_pending_task_queue)
rather than invoking __len__
Description
Adds second queue for pending task queue in the interchange. Used to prioritize certain user-labeled tasks when distributing tasks to managers.
Changed Behaviour
Users can now use the parsl_resource_specification keyword in python apps to specify a 'priority'. Lower value is higher priority, tasks with higher priority will be sent out first.
Users can also specify a 'queue_threshold' in the HTEX config to decide the cutoff value for tasks to go to the priority queue or the general FIFO queue.
Fixes
Part of #3323 work
Type of change
Choose which options apply, and delete the ones which do not apply.