Skip to content

Commit

Permalink
Add more typing around scale-in decision state (#3119)
Browse files Browse the repository at this point in the history
Use a typed dataclass with human readable names, instead of a numerically
index list for the decision state itself, and add type anontations on the
method signature.

This is intended to make some of the decision code easier to read and
typecheck, but should not change behaviour.
  • Loading branch information
benclifford authored Mar 5, 2024
1 parent ab2a338 commit 23a8c70
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import queue
import datetime
import pickle
from dataclasses import dataclass
from multiprocessing import Process, Queue
from typing import Dict, Sequence
from typing import List, Optional, Tuple, Union, Callable
Expand Down Expand Up @@ -694,7 +695,7 @@ def create_monitoring_info(self, status):
def workers_per_node(self) -> Union[int, float]:
return self._workers_per_node

def scale_in(self, blocks, max_idletime=None):
def scale_in(self, blocks: int, max_idletime: Optional[float] = None) -> List[str]:
"""Scale in the number of active blocks by specified amount.
The scale in method here is very rude. It doesn't give the workers
Expand All @@ -721,25 +722,31 @@ def scale_in(self, blocks, max_idletime=None):
List of block IDs scaled in
"""
logger.debug(f"Scale in called, blocks={blocks}")

@dataclass
class BlockInfo:
tasks: int # sum of tasks in this block
idle: float # shortest idle time of any manager in this block

managers = self.connected_managers()
block_info = {} # block id -> list( tasks, idle duration )
block_info: Dict[str, BlockInfo] = {}
for manager in managers:
if not manager['active']:
continue
b_id = manager['block_id']
if b_id not in block_info:
block_info[b_id] = [0, float('inf')]
block_info[b_id][0] += manager['tasks']
block_info[b_id][1] = min(block_info[b_id][1], manager['idle_duration'])
block_info[b_id] = BlockInfo(tasks=0, idle=float('inf'))
block_info[b_id].tasks += manager['tasks']
block_info[b_id].idle = min(block_info[b_id].idle, manager['idle_duration'])

sorted_blocks = sorted(block_info.items(), key=lambda item: (item[1][1], item[1][0]))
sorted_blocks = sorted(block_info.items(), key=lambda item: (item[1].idle, item[1].tasks))
logger.debug(f"Scale in selecting from {len(sorted_blocks)} blocks")
if max_idletime is None:
block_ids_to_kill = [x[0] for x in sorted_blocks[:blocks]]
else:
block_ids_to_kill = []
for x in sorted_blocks:
if x[1][1] > max_idletime and x[1][0] == 0:
if x[1].idle > max_idletime and x[1].tasks == 0:
block_ids_to_kill.append(x[0])
if len(block_ids_to_kill) == blocks:
break
Expand Down

0 comments on commit 23a8c70

Please sign in to comment.