Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/benc-sander-wq-cpu-scaling' into…
Browse files Browse the repository at this point in the history
… benc-tmp-sander
  • Loading branch information
benclifford committed Jun 5, 2024
2 parents 77fbb4c + dfaba58 commit 026b18d
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,14 @@ def __init__(self,
full_debug: bool = True,
worker_executable: str = 'work_queue_worker',
function_dir: Optional[str] = None,
coprocess: bool = False):
coprocess: bool = False,
scaling_assume_core_slots_per_worker: int = 1):
BlockProviderExecutor.__init__(self, provider=provider,
block_error_handler=True)
if not _work_queue_enabled:
raise OptionalModuleMissing(['work_queue'], "WorkQueueExecutor requires the work_queue module.")

self.scaling_assume_core_slots_per_worker = scaling_assume_core_slots_per_worker
self.label = label
self.task_queue = multiprocessing.Queue() # type: multiprocessing.Queue
self.collector_queue = multiprocessing.Queue() # type: multiprocessing.Queue
Expand Down Expand Up @@ -469,6 +471,8 @@ def submit(self, func, resource_specification, *args, **kwargs):
# Create a Future object and have it be mapped from the task ID in the tasks dictionary
fu = Future()
fu.parsl_executor_task_id = executor_task_id
assert isinstance(resource_specification, dict)
fu.resource_specification = resource_specification
logger.debug("Getting tasks_lock to set WQ-level task entry")
with self.tasks_lock:
logger.debug("Got tasks_lock to set WQ-level task entry")
Expand Down Expand Up @@ -654,20 +658,28 @@ def initialize_scaling(self):

@property
def outstanding(self) -> int:
"""Count the number of outstanding tasks. This is inefficiently
"""Count the number of outstanding slots required. This is inefficiently
implemented and probably could be replaced with a counter.
"""
logger.debug("Calculating outstanding task slot load")
outstanding = 0
tasks = 0 # only for log message...
with self.tasks_lock:
for fut in self.tasks.values():
if not fut.done():
outstanding += 1
logger.debug(f"Counted {outstanding} outstanding tasks")
# if a task defines a resource spec with a core count, use the number
# of cores as required task slot count, instead of 1 slot.
assert isinstance(fut.resource_specification, dict) # type: ignore[attr-defined]

outstanding += fut.resource_specification.get('cores', # type: ignore[attr-defined]
self.scaling_assume_core_slots_per_worker)
tasks += 1
logger.debug(f"Counted {tasks} outstanding tasks with {outstanding} outstanding slots")
return outstanding

@property
def workers_per_node(self) -> Union[int, float]:
return 1
return self.scaling_assume_core_slots_per_worker

def scale_in(self, count: int) -> List[str]:
"""Scale in method.
Expand Down

0 comments on commit 026b18d

Please sign in to comment.