diff --git a/docs/reference.rst b/docs/reference.rst index f2d89afaf8..3436635cad 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -78,6 +78,16 @@ Executors parsl.executors.FluxExecutor parsl.executors.radical.RadicalPilotExecutor +Manager Selectors +================= + +.. autosummary:: + :toctree: stubs + :nosignatures: + + parsl.executors.high_throughput.manager_selector.RandomManagerSelector + parsl.executors.high_throughput.manager_selector.BlockIdManagerSelector + Launchers ========= diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 7f8ea42d7e..fb38c0121e 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -146,6 +146,11 @@ encrypted : bool Flag to enable/disable encryption (CurveZMQ). Default is False. + + manager_selector: ManagerSelector + Determines what strategy the interchange uses to select managers during task distribution. + See API reference under "Manager Selectors" regarding the various manager selectors. + Default: 'RandomManagerSelector' """ # Documentation for params used by both HTEx and MPIEx diff --git a/parsl/executors/high_throughput/manager_selector.py b/parsl/executors/high_throughput/manager_selector.py index 0ede28ee7d..25a9c49ebc 100644 --- a/parsl/executors/high_throughput/manager_selector.py +++ b/parsl/executors/high_throughput/manager_selector.py @@ -19,7 +19,37 @@ def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list class RandomManagerSelector(ManagerSelector): + """Returns a shuffled list of interesting_managers + + By default this strategy is used by the interchange. Works well + in distributing workloads equally across all availble compute + resources. The random workload strategy is not effective in + conjunction with elastic scaling behavior as the even task + distribution does not allow the scaling down of blocks, leading + to wasted resource consumption. + """ + def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list: Set[bytes]) -> List[bytes]: c_manager_list = list(manager_list) random.shuffle(c_manager_list) return c_manager_list + + +class BlockIdManagerSelector(ManagerSelector): + + """Returns an interesting_managers list sorted by block ID + + Observations: + 1. BlockID manager selector helps with workloads that see a varying + amount of tasks over time. New blocks are prioritized with the + blockID manager selector, when used with 'htex_auto_scaling', results + in compute cost savings. + + 2. Doesn't really work with bag-of-tasks workloads. When all the tasks + are put into the queue upfront, all blocks operate at near full + utilization for the majority of the workload, which task goes where + doesn't really matter. + """ + + def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list: Set[bytes]) -> List[bytes]: + return sorted(manager_list, key=lambda x: (ready_managers[x]['block_id'] is not None, ready_managers[x]['block_id'])) diff --git a/parsl/tests/test_htex/test_block_manager_selector_unit.py b/parsl/tests/test_htex/test_block_manager_selector_unit.py new file mode 100644 index 0000000000..46b2bb1e5d --- /dev/null +++ b/parsl/tests/test_htex/test_block_manager_selector_unit.py @@ -0,0 +1,20 @@ +import pytest + +from parsl.executors.high_throughput.manager_record import ManagerRecord +from parsl.executors.high_throughput.manager_selector import BlockIdManagerSelector + + +@pytest.mark.local +def test_sort_managers(): + ready_managers = { + b'manager1': {'block_id': 1}, + b'manager2': {'block_id': None}, + b'manager3': {'block_id': 3}, + b'manager4': {'block_id': 2} + } + + manager_list = {b'manager1', b'manager2', b'manager3', b'manager4'} + expected_sorted_list = [b'manager2', b'manager1', b'manager4', b'manager3'] + manager_selector = BlockIdManagerSelector() + sorted_managers = manager_selector.sort_managers(ready_managers, manager_list) + assert sorted_managers == expected_sorted_list diff --git a/parsl/tests/test_htex/test_manager_selector_by_block.py b/parsl/tests/test_htex/test_manager_selector_by_block.py new file mode 100644 index 0000000000..0933b581ff --- /dev/null +++ b/parsl/tests/test_htex/test_manager_selector_by_block.py @@ -0,0 +1,53 @@ +import time + +import pytest + +import parsl +from parsl.app.app import bash_app, python_app +from parsl.channels import LocalChannel +from parsl.config import Config +from parsl.executors import HighThroughputExecutor +from parsl.executors.high_throughput.manager_selector import ( + BlockIdManagerSelector, + ManagerSelector, +) +from parsl.launchers import WrappedLauncher +from parsl.providers import LocalProvider +from parsl.usage_tracking.levels import LEVEL_1 + +BLOCK_COUNT = 2 + + +@parsl.python_app +def get_worker_pid(): + import os + return os.environ.get('PARSL_WORKER_BLOCK_ID') + + +@pytest.mark.local +def test_block_id_selection(try_assert): + htex = HighThroughputExecutor( + label="htex_local", + max_workers_per_node=1, + manager_selector=BlockIdManagerSelector(), + provider=LocalProvider( + channel=LocalChannel(), + init_blocks=BLOCK_COUNT, + max_blocks=BLOCK_COUNT, + min_blocks=BLOCK_COUNT, + ), + ) + + config = Config( + executors=[htex], + usage_tracking=LEVEL_1, + ) + + with parsl.load(config): + blockids = [] + try_assert(lambda: len(htex.connected_managers()) == BLOCK_COUNT, timeout_ms=20000) + for i in range(10): + future = get_worker_pid() + blockids.append(future.result()) + + assert all(blockid == "1" for blockid in blockids)