Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds slurm support #202

Merged
merged 4 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions stimela/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .singularity import SingularityBackendOptions
from .kube import KubeBackendOptions
from .native import NativeBackendOptions
from .slurm import SlurmOptions

import stimela

Expand All @@ -20,11 +21,15 @@
SUPPORTED_BACKENDS = set(Backend.__members__)


def get_backend(name: str):
def get_backend(name: str, backend_opts: Optional[Dict] = None):
"""
Gets backend, given a name and an optional set of options for that backend.
Returns backend module, or None if it is not available.
"""
if name not in SUPPORTED_BACKENDS:
return None
backend = __import__(f"stimela.backends.{name}", fromlist=[name])
if backend.is_available():
if backend.is_available(backend_opts):
return backend
return None

Expand All @@ -48,7 +53,7 @@ class StimelaBackendOptions(object):
kube: Optional[KubeBackendOptions] = None
native: Optional[NativeBackendOptions] = None
docker: Optional[Dict] = None # placeholder for future impl
slurm: Optional[Dict] = None # placeholder for future impl
slurm: Optional[SlurmOptions] = None

## Resource limits applied during run -- see resource module
rlimits: Dict[str, Any] = EmptyDictDefault()
Expand All @@ -71,6 +76,8 @@ def __post_init__(self):
self.native = NativeBackendOptions()
if self.kube is None and get_backend("kube"):
self.kube = KubeBackendOptions()
if self.slurm is None:
self.slurm = SlurmOptions()

StimelaBackendSchema = OmegaConf.structured(StimelaBackendOptions)

Expand Down Expand Up @@ -106,7 +113,7 @@ def _call_backends(backend_opts: StimelaBackendOptions, log: logging.Logger, met
# check that backend has not been disabled
opts = getattr(backend_opts, engine, None)
if not opts or opts.enable:
backend = get_backend(engine)
backend = get_backend(engine, opts)
func = backend and getattr(backend, method, None)
if func:
try:
Expand Down
2 changes: 1 addition & 1 deletion stimela/backends/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

STATUS = VERSION = BINARY = None

def is_available():
def is_available(opts = None):
global STATUS, VERSION, BINARY
if STATUS is None:
BINARY = which("docker")
Expand Down
101 changes: 52 additions & 49 deletions stimela/backends/kube/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Optional, Any
from typing import Dict, List, Optional, Any, Callable
from enum import Enum
from omegaconf import OmegaConf
from dataclasses import dataclass
Expand Down Expand Up @@ -36,52 +36,6 @@
def run(*args, **kw):
raise RuntimeError(f"kubernetes backend {STATUS}")

def is_available():
return AVAILABLE

def get_status():
return STATUS

def is_remote():
return True

def init(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger):
from . import infrastructure
global AVAILABLE, STATUS
if not infrastructure.init(backend, log):
AVAILABLE = False
STATUS = "initialization error"

def close(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger):
from . import infrastructure
if AVAILABLE:
infrastructure.close(backend, log)

def cleanup(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger):
from . import infrastructure
infrastructure.cleanup(backend, log)

def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str,
backend: 'stimela.backend.StimelaBackendOptions',
log: logging.Logger, subst: Optional[Dict[str, Any]] = None):
from . import run_kube
return run_kube.run(cab=cab, params=params, fqname=fqname, backend=backend, log=log, subst=subst)

_kube_client = _kube_config = _kube_context = None

def get_kube_api(context: Optional[str]=None):
global _kube_client
global _kube_config
global _kube_context

if _kube_config is None:
_kube_config = True
_kube_context = context
kubernetes.config.load_kube_config(context=context)
elif context != _kube_context:
raise BackendError(f"k8s context has changed (was {_kube_context}, now {context}), this is not permitted")

return core_v1_api.CoreV1Api(), CustomObjectsApi()


# dict of methods for converting an object to text format
Expand Down Expand Up @@ -110,8 +64,6 @@ class PodLimits(object):
# arbitrary additional structure copied into the pod spec
custom_pod_spec: Dict[str, Any] = EmptyDictDefault()



@dataclass
class KubeBackendOptions(object):
"""
Expand Down Expand Up @@ -267,6 +219,57 @@ class UserInfo(object):


KubeBackendSchema = OmegaConf.structured(KubeBackendOptions)

def is_available(opts: Optional[KubeBackendOptions]= None):
return AVAILABLE

def get_status():
return STATUS

def is_remote():
return True

def init(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger):
from . import infrastructure
global AVAILABLE, STATUS
if not infrastructure.init(backend, log):
AVAILABLE = False
STATUS = "initialization error"

def close(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger):
from . import infrastructure
if AVAILABLE:
infrastructure.close(backend, log)

def cleanup(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger):
from . import infrastructure
infrastructure.cleanup(backend, log)

def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str,
backend: 'stimela.backend.StimelaBackendOptions',
log: logging.Logger, subst: Optional[Dict[str, Any]] = None,
command_wrapper: Optional[Callable] = None):
# normally runner.py won't allow this, but check just in case
if command_wrapper:
raise BackendError(f"kube backend cannot be used with a command wrapper")
from . import run_kube
return run_kube.run(cab=cab, params=params, fqname=fqname, backend=backend, log=log, subst=subst)

_kube_client = _kube_config = _kube_context = None

def get_kube_api(context: Optional[str]=None):
global _kube_client
global _kube_config
global _kube_context

if _kube_config is None:
_kube_config = True
_kube_context = context
kubernetes.config.load_kube_config(context=context)
elif context != _kube_context:
raise BackendError(f"k8s context has changed (was {_kube_context}, now {context}), this is not permitted")

return core_v1_api.CoreV1Api(), CustomObjectsApi()

_uid = os.getuid()
_gid = os.getgid()
Expand Down
6 changes: 3 additions & 3 deletions stimela/backends/kube/run_kube.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging, time, json, datetime, os.path, pathlib, secrets, shlex
from typing import Dict, Optional, Any, List
from typing import Dict, Optional, Any, List, Callable
from dataclasses import fields
from datetime import timedelta
from requests import ConnectionError
Expand Down Expand Up @@ -44,13 +44,13 @@ def run(cab: Cab, params: Dict[str, Any], fqname: str,
"""

if not cab.image:
raise StimelaCabRuntimeError(f"kube runner requires cab.image to be set")
raise BackendError(f"kube backend requires cab.image to be set")

kube = backend.kube

namespace = kube.namespace
if not namespace:
raise StimelaCabRuntimeError(f"runtime.kube.namespace must be set")
raise BackendError(f"runtime.kube.namespace must be set")

args = cab.flavour.get_arguments(cab, params, subst, check_executable=False)
log.debug(f"command line is {args}")
Expand Down
2 changes: 1 addition & 1 deletion stimela/backends/native/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import stimela

def is_available():
def is_available(opts = None):
return True

def get_status():
Expand Down
17 changes: 12 additions & 5 deletions stimela/backends/native/run_native.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging, datetime, resource, os.path

from typing import Dict, Optional, Any, List
from typing import Dict, Optional, Any, List, Callable

import stimela
import stimela.kitchen
Expand Down Expand Up @@ -37,14 +37,18 @@ def build_command_line(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], s

def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str,
backend: 'stimela.backend.StimelaBackendOptions',
log: logging.Logger, subst: Optional[Dict[str, Any]] = None):
"""Runs cab contents
log: logging.Logger, subst: Optional[Dict[str, Any]] = None,
command_wrapper: Optional[Callable] = None):
"""
Runs cab contents

Args:
cab (Cab): cab object
cab: cab object
params: cab parameters
backend: backed settings object
log (logger): logger to use
subst (Optional[Dict[str, Any]]): Substitution dict for commands etc., if any.

command_wrapper (Callable): takes a list of args and returns modified list of args
Returns:
Any: return value (e.g. exit code) of content
"""
Expand Down Expand Up @@ -79,6 +83,9 @@ def elapsed(since=None):
return str(datetime.datetime.now() - (since or start_time)).split('.', 1)[0]

# log.info(f"argument lengths are {[len(a) for a in args]}")

if command_wrapper:
args = command_wrapper(args)

retcode = xrun(args[0], args[1:], shell=False, log=log,
output_wrangler=cabstat.apply_wranglers,
Expand Down
2 changes: 1 addition & 1 deletion stimela/backends/podman.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import datetime
import tempfile

def is_available():
def is_available(opts = None):
return False

def get_status():
Expand Down
67 changes: 51 additions & 16 deletions stimela/backends/runner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,38 @@
from typing import Dict, Optional, Any
import logging
from typing import Dict, Optional, Any, Callable
from dataclasses import dataclass
from omegaconf import OmegaConf
from omegaconf.errors import OmegaConfBaseException
import stimela
from stimela.backends import StimelaBackendOptions, StimelaBackendSchema
from stimela.exceptions import BackendError

from . import get_backend, get_backend_status
from . import get_backend, get_backend_status, slurm

def validate_backend_settings(backend_opts: Dict[str, Any]):

@dataclass
class BackendWrapper(object):
opts: StimelaBackendOptions
is_remote: bool
is_remote_fs: bool
backend: Any
backend_name: str
run_command_wrapper: Optional[Callable]
build_command_wrapper: Optional[Callable]

def run(self, cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str,
log: logging.Logger, subst: Optional[Dict[str, Any]] = None):
return self.backend.run(cab, params, fqname=fqname, backend=self.opts, log=log, subst=subst,
command_wrapper=self.run_command_wrapper)

def build(self, cab: 'stimela.kitchen.cab.Cab', log: logging.Logger, rebuild=False):
if not hasattr(self.backend, 'build'):
raise BackendError(f"{self.backend_name} backend does not support the build command")
return self.backend.build(cab, backend=self.opts, log=log, rebuild=rebuild,
command_wrapper=self.build_command_wrapper)


def validate_backend_settings(backend_opts: Dict[str, Any]) -> BackendWrapper:
"""Checks that backend settings refer to a valid backend

Returs tuple of options, main, wrapper, where 'main' the the main backend, and 'wrapper' is an optional wrapper backend
Expand All @@ -15,26 +41,35 @@ def validate_backend_settings(backend_opts: Dict[str, Any]):
if not isinstance(backend_opts, StimelaBackendOptions):
backend_opts = OmegaConf.to_object(backend_opts)

main = main_backend = None
backend_name = backend = None
selected = backend_opts.select or ['native']
# select containerization engine, if any
for engine in selected:
for name in selected:
# check that backend has not been disabled
opts = getattr(backend_opts, engine, None)
opts = getattr(backend_opts, name, None)
if not opts or opts.enable:
main_backend = get_backend(engine)
if main_backend is not None:
main = engine
backend = get_backend(name, opts)
if backend is not None:
backend_name = name
break
else:
raise BackendError(f"selected backends ({', '.join(selected)}) not available")


is_remote = is_remote_fs = backend.is_remote()

# check if slurm wrapper is to be applied
wrapper = None
if False: # placeholder -- should be: if backend.slurm and backed.slurm.enable
wrapper = get_backend('slurm')
if wrapper is None:
raise BackendError(f"backend 'slurm' not available ({get_backend_status('slurm')})")
if backend_opts.slurm.enable:
if is_remote:
raise BackendError(f"can't combine slurm with {backend_name} backend")
is_remote = True
is_remote_fs = False
run_command_wrapper = backend_opts.slurm.run_command_wrapper
build_command_wrapper = backend_opts.slurm.build_command_wrapper
else:
run_command_wrapper = build_command_wrapper = None

return backend_opts, main_backend, wrapper
return BackendWrapper(opts=backend_opts, is_remote=is_remote, is_remote_fs=is_remote_fs,
backend=backend, backend_name=backend_name,
run_command_wrapper=run_command_wrapper,
build_command_wrapper=build_command_wrapper)

Loading
Loading