Skip to content

Commit

Permalink
Merge pull request #3117 from radical-cybertools/feature/pilot_resources
Browse files Browse the repository at this point in the history
application level scheduling
  • Loading branch information
andre-merzky authored Aug 28, 2024
2 parents 61a3bbb + dd04a6e commit 95d3d7e
Show file tree
Hide file tree
Showing 94 changed files with 2,699 additions and 1,007 deletions.
2 changes: 2 additions & 0 deletions examples/00_getting_started.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
td.executable = '/bin/date'
td.ranks = 1
td.cores_per_rank = 1
td.gpus_per_rank = 1
td.gpu_type = rp.CUDA

tds.append(td)
report.progress()
Expand Down
140 changes: 140 additions & 0 deletions examples/misc/pilot_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#!/usr/bin/env python3

__copyright__ = 'Copyright 2013-2014, http://radical.rutgers.edu'
__license__ = 'MIT'

import sys
import time
import random

import radical.pilot as rp
import radical.utils as ru


if True:

n_nodes = 9472
gpus_per_node = 8
cores_per_node = 64
mem_per_node = 512
lfs_per_node = 1920

n_tasks = 10000
ranks_per_task = 2
cores_per_task = 16
gpus_per_task = 2
mem_per_task = 0
lfs_per_task = 0

nodes = [{'index' : i,
'name' : 'node_%05d' % i,
'cores' : [rp.RO(index=x, occupation=rp.FREE)
for x in range(cores_per_node)],
'gpus' : [rp.RO(index=x, occupation=rp.FREE)
for x in range(gpus_per_node)],
'lfs' : lfs_per_node,
'mem' : mem_per_node
} for i in range(n_nodes)]

nl = rp.NodeList(nodes=[rp.NodeResources(ni) for ni in nodes])
rr = rp.RankRequirements(n_cores=cores_per_task,
n_gpus=gpus_per_task,
mem=mem_per_task,
lfs=lfs_per_task)

allocs = list()
start = time.time()
for i in range(n_tasks):

slots = nl.find_slots(rr, n_slots=ranks_per_task)
if slots:
allocs.append(slots)

if allocs and random.random() < 0.5:
to_release = random.choice(allocs)
allocs.remove(to_release)
nl.release_slots(to_release)

stop = time.time()
print('find_slots: %.2f' % (stop - start))

for slots in allocs:
nl.release_slots(slots)

for _ in range(5):

slots = nl.find_slots(rp.RankRequirements(n_cores=1,
core_occupation=0.5))
print(slots)

sys.exit()


# ------------------------------------------------------------------------------
#
if __name__ == '__main__':

report = ru.Reporter(name='radical.pilot')
report.title('pilot resource example')

session = rp.Session()

try:
pmgr = rp.PilotManager(session=session)
tmgr = rp.TaskManager(session=session)

report.header('submit pilots')

pd_init = {'resource': 'local.localhost',
'runtime' : 15,
'nodes' : 1}
pdesc = rp.PilotDescription(pd_init)
pilot = pmgr.submit_pilots(pdesc)

tmgr.add_pilots(pilot)
pilot.wait([rp.PMGR_ACTIVE, rp.FAILED])

n = 1
report.header('submit %d tasks' % n)

tds = list()
for i in range(n):
# slots = pilot.nodelist.find_slots(rp.RankRequirements(n_cores=1,
# lfs=512),
# n_slots=2)
#
# print()
# if slots:
# for slot in slots:
# print('=== %s' % slots)


td = rp.TaskDescription()
td.executable = '/bin/date'
td.ranks = 2
td.cores_per_rank = 2
# td.slots = slots

tds.append(td)

tasks = tmgr.submit_tasks(tds)
tmgr.wait_tasks()

import pprint
for task in tasks:
print(' * %s: %s [%s], %s' % (task.uid, task.state, task.exit_code,
task.stdout.strip()))

print()
for slot in task.slots:
print(slot)

finally:
report.header('finalize')
session.close(download=True)

report.header()


# ------------------------------------------------------------------------------

3 changes: 3 additions & 0 deletions src/radical/pilot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
from .task_description import RAPTOR_MASTER, RAPTOR_WORKER
from .task_description import AGENT_SERVICE
from .resource_config import ResourceConfig
from .resource_config import ResourceOccupation, RO, Slot
from .resource_config import RankRequirements
from .resource_config import NodeResources, NodeList

from .pilot_manager import PilotManager
from .pilot import Pilot
Expand Down
25 changes: 17 additions & 8 deletions src/radical/pilot/agent/agent_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from .. import Session
from .. import TaskDescription, AGENT_SERVICE

from ..resource_config import RO


# ------------------------------------------------------------------------------
#
Expand Down Expand Up @@ -499,8 +501,15 @@ def _start_sub_agents(self):
launch_script = '%s/%s.launch.sh' % (self._pwd, sa)
exec_script = '%s/%s.exec.sh' % (self._pwd, sa)

node_cores = [cid for cid, cstate in enumerate(node['cores'])
if cstate == rpc.FREE]
node_cores = [RO(index=cid, occuapation=rpc.BUSY)
for cid, cstate
in enumerate(node['cores'])
if cstate == rpc.FREE]

node_gpus = [RO(index=gid, occuapation=rpc.BUSY)
for gid, gstate
in enumerate(node['gpus'])
if gstate == rpc.FREE]

agent_task = {
'uid' : sa,
Expand All @@ -512,12 +521,12 @@ def _start_sub_agents(self):
'executable' : self._shell,
'arguments' : [bs_name % self._pwd] + bs_args
}).as_dict(),
'slots': {'ranks' : [{'node_name': node['node_name'],
'node_id' : node['node_id'],
'core_map' : [node_cores],
'gpu_map' : [],
'lfs' : 0,
'mem' : 0}]}
'slots' : [{'node_name' : node['name'],
'node_index': node['index'],
'cores' : node_cores,
'gpus' : node_gpus,
'lfs' : 0,
'mem' : 0}]
}

# find a launcher to use
Expand Down
21 changes: 9 additions & 12 deletions src/radical/pilot/agent/executing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def _create_exec_script(self, launcher, task):
n_ranks = td['ranks']
slots = task.setdefault('slots', {})

self._extend_pre_exec(td, slots.get('ranks'))
self._extend_pre_exec(td, slots)

with ru.ru_open(exec_fullpath, 'w') as fout:

Expand Down Expand Up @@ -312,7 +312,7 @@ def _create_exec_script(self, launcher, task):

# need to set `DEBUG_5` or higher to get slot debug logs
if self._log._debug_level >= 5:
ru.write_json('%s/%s.sl' % (sbox, tid), slots)
ru.write_json(slots, '%s/%s.sl' % (sbox, tid))

return exec_path, exec_fullpath

Expand Down Expand Up @@ -366,7 +366,7 @@ def _get_rank_ids(self, n_ranks, launcher):

# --------------------------------------------------------------------------
#
def _extend_pre_exec(self, td, ranks=None):
def _extend_pre_exec(self, td, slots=None):

# FIXME: this assumes that the rank has a `gpu_maps` and `core_maps`
# with exactly one entry, corresponding to the rank process to be
Expand All @@ -375,22 +375,19 @@ def _extend_pre_exec(self, td, ranks=None):
# FIXME: need to distinguish between logical and physical IDs

if td['threading_type'] == rpc.OpenMP:
# for future updates: if task ranks are heterogeneous in terms of
# for future updates: if task slots are heterogeneous in terms of
# number of threads, then the following string
# should be converted into dictionary (per rank)
num_threads = td.get('cores_per_rank', 1)
td['pre_exec'].append('export OMP_NUM_THREADS=%d' % num_threads)

if td['gpus_per_rank'] and td['gpu_type'] == rpc.CUDA and ranks:
if td['gpus_per_rank'] and td['gpu_type'] == rpc.CUDA and slots:
# equivalent to the 'physical' value for original `cvd_id_mode`
rank_id = 0
rank_env = {}
for slot_ranks in ranks:
for gpu_map in slot_ranks['gpu_map']:
rank_env[str(rank_id)] = \
'export CUDA_VISIBLE_DEVICES=%s' % \
','.join([str(g) for g in gpu_map])
rank_id += 1
for rank_id,slot in enumerate(slots):
rank_env[str(rank_id)] = \
'export CUDA_VISIBLE_DEVICES=%s' % \
','.join([str(g['index']) for g in slot['gpus']])
td['pre_exec'].append(rank_env)

# pre-defined `pre_exec` per platform configuration
Expand Down
14 changes: 2 additions & 12 deletions src/radical/pilot/agent/launch_method/aprun.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,7 @@ def get_launch_cmds(self, task, exec_path):
# – Environment variables: -e <env_var>
# – Core specialization: -r <n_threads>

if 'SAGA_PPN' in os.environ:
max_ranks_per_node = int(os.environ['SAGA_PPN'])
if total_cores <= max_ranks_per_node:
ranks_per_node = ranks
else:
ranks_per_node = max_ranks_per_node // cores_per_rank
else:
ranks_per_node = 1

cmd_options = '-N %s ' % ranks_per_node + \
'-n %s ' % ranks + \
cmd_options = '-n %s ' % ranks + \
'-d %s' % cores_per_rank

# CPU affinity binding
Expand All @@ -120,7 +110,7 @@ def get_launch_cmds(self, task, exec_path):
# cores and memory on other NUMA nodes on that compute node.
#
# slots = task['slots']
# nodes = set([rank['node_name'] for rank in slots['ranks']])
# nodes = set([slot['node_name'] for slot in slots])
# if len(nodes) < 2:
# cmd_options += ' -F share' # default is `exclusive`
# cmd_options += ' -L %s ' % ','.join(nodes)
Expand Down
1 change: 1 addition & 0 deletions src/radical/pilot/agent/launch_method/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self, name, lm_cfg, rm_info, log, prof):
self._prof = prof
self._pwd = os.getcwd()
self._env_orig = ru.env_eval('env/bs0_orig.env')
self._in_pytest = False

reg = ru.zmq.RegistryClient(url=self._lm_cfg.reg_addr)
lm_info = reg.get('lm.%s' % self.name.lower())
Expand Down
4 changes: 2 additions & 2 deletions src/radical/pilot/agent/launch_method/fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ def finalize(self):
#
def can_launch(self, task):

if len(task['slots']['ranks']) > 1:
if len(task['slots']) > 1:
return False, 'more than one rank'

node = task['slots']['ranks'][0]['node_name']
node = task['slots'][0]['node_name']
if node not in ['localhost', self.node_name]:
return False, 'not on localhost'

Expand Down
18 changes: 9 additions & 9 deletions src/radical/pilot/agent/launch_method/ibrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def get_launcher_env(self):
def get_launch_cmds(self, task, exec_path):

slots = task['slots']
assert slots.get('ranks'), 'task.slots.ranks is not set or empty'
assert slots, 'task.slots is not set or empty'

td = task['description']
n_ranks = td['ranks']
Expand All @@ -94,21 +94,21 @@ def get_launch_cmds(self, task, exec_path):
# NOTE: in case of performance issue: reconsider this parameter
launch_env = 'IBRUN_TASKS_PER_NODE=%d' % tasks_per_node

rank_node_ids = set([n['node_id'] for n in slots['ranks']])
tasks_offset = 0
ibrun_offset = 0
rank_node_idxs = set([slot['node_index'] for slot in slots])
tasks_offset = 0
ibrun_offset = 0

for node in self._rm_info.node_list:

if node['node_id'] not in rank_node_ids:
if node['index'] not in rank_node_idxs:
tasks_offset += tasks_per_node
continue

# core_map contains core ids for each thread,
# cores contains core ids for each thread,
# but threads are ignored for offset
core_id_min = min([rank['core_map'][0][0]
for rank in slots['ranks']
if rank['node_id'] == node['node_id']])
core_id_min = min([slot['cores'][0]['index']
for slot in slots
if slot['node_index'] == node['index']])
# offset into processor (cpus) hostlist
ibrun_offset = tasks_offset + (core_id_min // n_threads_per_rank)
break
Expand Down
Loading

0 comments on commit 95d3d7e

Please sign in to comment.