Skip to content

Commit

Permalink
Merge branch 'master' into benc-drain-typo
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Oct 15, 2024
2 parents 6f97421 + ac62e04 commit 6e0ec1c
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 0 deletions.
10 changes: 10 additions & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ Executors
parsl.executors.FluxExecutor
parsl.executors.radical.RadicalPilotExecutor

Manager Selectors
=================

.. autosummary::
:toctree: stubs
:nosignatures:

parsl.executors.high_throughput.manager_selector.RandomManagerSelector
parsl.executors.high_throughput.manager_selector.BlockIdManagerSelector

Launchers
=========

Expand Down
5 changes: 5 additions & 0 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@
encrypted : bool
Flag to enable/disable encryption (CurveZMQ). Default is False.
manager_selector: ManagerSelector
Determines what strategy the interchange uses to select managers during task distribution.
See API reference under "Manager Selectors" regarding the various manager selectors.
Default: 'RandomManagerSelector'
""" # Documentation for params used by both HTEx and MPIEx


Expand Down
30 changes: 30 additions & 0 deletions parsl/executors/high_throughput/manager_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,37 @@ def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list

class RandomManagerSelector(ManagerSelector):

"""Returns a shuffled list of interesting_managers
By default this strategy is used by the interchange. Works well
in distributing workloads equally across all availble compute
resources. The random workload strategy is not effective in
conjunction with elastic scaling behavior as the even task
distribution does not allow the scaling down of blocks, leading
to wasted resource consumption.
"""

def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list: Set[bytes]) -> List[bytes]:
c_manager_list = list(manager_list)
random.shuffle(c_manager_list)
return c_manager_list


class BlockIdManagerSelector(ManagerSelector):

"""Returns an interesting_managers list sorted by block ID
Observations:
1. BlockID manager selector helps with workloads that see a varying
amount of tasks over time. New blocks are prioritized with the
blockID manager selector, when used with 'htex_auto_scaling', results
in compute cost savings.
2. Doesn't really work with bag-of-tasks workloads. When all the tasks
are put into the queue upfront, all blocks operate at near full
utilization for the majority of the workload, which task goes where
doesn't really matter.
"""

def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list: Set[bytes]) -> List[bytes]:
return sorted(manager_list, key=lambda x: (ready_managers[x]['block_id'] is not None, ready_managers[x]['block_id']))
20 changes: 20 additions & 0 deletions parsl/tests/test_htex/test_block_manager_selector_unit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pytest

from parsl.executors.high_throughput.manager_record import ManagerRecord
from parsl.executors.high_throughput.manager_selector import BlockIdManagerSelector


@pytest.mark.local
def test_sort_managers():
ready_managers = {
b'manager1': {'block_id': 1},
b'manager2': {'block_id': None},
b'manager3': {'block_id': 3},
b'manager4': {'block_id': 2}
}

manager_list = {b'manager1', b'manager2', b'manager3', b'manager4'}
expected_sorted_list = [b'manager2', b'manager1', b'manager4', b'manager3']
manager_selector = BlockIdManagerSelector()
sorted_managers = manager_selector.sort_managers(ready_managers, manager_list)
assert sorted_managers == expected_sorted_list
53 changes: 53 additions & 0 deletions parsl/tests/test_htex/test_manager_selector_by_block.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import time

import pytest

import parsl
from parsl.app.app import bash_app, python_app
from parsl.channels import LocalChannel
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.executors.high_throughput.manager_selector import (
BlockIdManagerSelector,
ManagerSelector,
)
from parsl.launchers import WrappedLauncher
from parsl.providers import LocalProvider
from parsl.usage_tracking.levels import LEVEL_1

BLOCK_COUNT = 2


@parsl.python_app
def get_worker_pid():
import os
return os.environ.get('PARSL_WORKER_BLOCK_ID')


@pytest.mark.local
def test_block_id_selection(try_assert):
htex = HighThroughputExecutor(
label="htex_local",
max_workers_per_node=1,
manager_selector=BlockIdManagerSelector(),
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=BLOCK_COUNT,
max_blocks=BLOCK_COUNT,
min_blocks=BLOCK_COUNT,
),
)

config = Config(
executors=[htex],
usage_tracking=LEVEL_1,
)

with parsl.load(config):
blockids = []
try_assert(lambda: len(htex.connected_managers()) == BLOCK_COUNT, timeout_ms=20000)
for i in range(10):
future = get_worker_pid()
blockids.append(future.result())

assert all(blockid == "1" for blockid in blockids)

0 comments on commit 6e0ec1c

Please sign in to comment.