Skip to content

Commit

Permalink
Merge pull request #202 from caracal-pipeline/slurmify
Browse files Browse the repository at this point in the history
Adds slurm support
  • Loading branch information
SpheMakh authored Jan 30, 2024
2 parents 9a626ba + e61296d commit b8cbeb9
Show file tree
Hide file tree
Showing 17 changed files with 498 additions and 264 deletions.
15 changes: 11 additions & 4 deletions stimela/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .singularity import SingularityBackendOptions
from .kube import KubeBackendOptions
from .native import NativeBackendOptions
from .slurm import SlurmOptions

import stimela

Expand All @@ -20,11 +21,15 @@
SUPPORTED_BACKENDS = set(Backend.__members__)


def get_backend(name: str):
def get_backend(name: str, backend_opts: Optional[Dict] = None):
"""
Gets backend, given a name and an optional set of options for that backend.
Returns backend module, or None if it is not available.
"""
if name not in SUPPORTED_BACKENDS:
return None
backend = __import__(f"stimela.backends.{name}", fromlist=[name])
if backend.is_available():
if backend.is_available(backend_opts):
return backend
return None

Expand All @@ -48,7 +53,7 @@ class StimelaBackendOptions(object):
kube: Optional[KubeBackendOptions] = None
native: Optional[NativeBackendOptions] = None
docker: Optional[Dict] = None # placeholder for future impl
slurm: Optional[Dict] = None # placeholder for future impl
slurm: Optional[SlurmOptions] = None

## Resource limits applied during run -- see resource module
rlimits: Dict[str, Any] = EmptyDictDefault()
Expand All @@ -71,6 +76,8 @@ def __post_init__(self):
self.native = NativeBackendOptions()
if self.kube is None and get_backend("kube"):
self.kube = KubeBackendOptions()
if self.slurm is None:
self.slurm = SlurmOptions()

StimelaBackendSchema = OmegaConf.structured(StimelaBackendOptions)

Expand Down Expand Up @@ -106,7 +113,7 @@ def _call_backends(backend_opts: StimelaBackendOptions, log: logging.Logger, met
# check that backend has not been disabled
opts = getattr(backend_opts, engine, None)
if not opts or opts.enable:
backend = get_backend(engine)
backend = get_backend(engine, opts)
func = backend and getattr(backend, method, None)
if func:
try:
Expand Down
2 changes: 1 addition & 1 deletion stimela/backends/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

STATUS = VERSION = BINARY = None

def is_available():
def is_available(opts = None):
global STATUS, VERSION, BINARY
if STATUS is None:
BINARY = which("docker")
Expand Down
101 changes: 52 additions & 49 deletions stimela/backends/kube/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Optional, Any
from typing import Dict, List, Optional, Any, Callable
from enum import Enum
from omegaconf import OmegaConf
from dataclasses import dataclass
Expand Down Expand Up @@ -36,52 +36,6 @@
def run(*args, **kw):
raise RuntimeError(f"kubernetes backend {STATUS}")

def is_available():
return AVAILABLE

def get_status():
return STATUS

def is_remote():
return True

def init(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger):
from . import infrastructure
global AVAILABLE, STATUS
if not infrastructure.init(backend, log):
AVAILABLE = False
STATUS = "initialization error"

def close(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger):
from . import infrastructure
if AVAILABLE:
infrastructure.close(backend, log)

def cleanup(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger):
from . import infrastructure
infrastructure.cleanup(backend, log)

def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str,
backend: 'stimela.backend.StimelaBackendOptions',
log: logging.Logger, subst: Optional[Dict[str, Any]] = None):
from . import run_kube
return run_kube.run(cab=cab, params=params, fqname=fqname, backend=backend, log=log, subst=subst)

_kube_client = _kube_config = _kube_context = None

def get_kube_api(context: Optional[str]=None):
global _kube_client
global _kube_config
global _kube_context

if _kube_config is None:
_kube_config = True
_kube_context = context
kubernetes.config.load_kube_config(context=context)
elif context != _kube_context:
raise BackendError(f"k8s context has changed (was {_kube_context}, now {context}), this is not permitted")

return core_v1_api.CoreV1Api(), CustomObjectsApi()


# dict of methods for converting an object to text format
Expand Down Expand Up @@ -110,8 +64,6 @@ class PodLimits(object):
# arbitrary additional structure copied into the pod spec
custom_pod_spec: Dict[str, Any] = EmptyDictDefault()



@dataclass
class KubeBackendOptions(object):
"""
Expand Down Expand Up @@ -268,6 +220,57 @@ class UserInfo(object):


KubeBackendSchema = OmegaConf.structured(KubeBackendOptions)

def is_available(opts: Optional[KubeBackendOptions]= None):
return AVAILABLE

def get_status():
return STATUS

def is_remote():
return True

def init(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger):
from . import infrastructure
global AVAILABLE, STATUS
if not infrastructure.init(backend, log):
AVAILABLE = False
STATUS = "initialization error"

def close(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger):
from . import infrastructure
if AVAILABLE:
infrastructure.close(backend, log)

def cleanup(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger):
from . import infrastructure
infrastructure.cleanup(backend, log)

def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str,
backend: 'stimela.backend.StimelaBackendOptions',
log: logging.Logger, subst: Optional[Dict[str, Any]] = None,
command_wrapper: Optional[Callable] = None):
# normally runner.py won't allow this, but check just in case
if command_wrapper:
raise BackendError(f"kube backend cannot be used with a command wrapper")
from . import run_kube
return run_kube.run(cab=cab, params=params, fqname=fqname, backend=backend, log=log, subst=subst)

_kube_client = _kube_config = _kube_context = None

def get_kube_api(context: Optional[str]=None):
global _kube_client
global _kube_config
global _kube_context

if _kube_config is None:
_kube_config = True
_kube_context = context
kubernetes.config.load_kube_config(context=context)
elif context != _kube_context:
raise BackendError(f"k8s context has changed (was {_kube_context}, now {context}), this is not permitted")

return core_v1_api.CoreV1Api(), CustomObjectsApi()

_uid = os.getuid()
_gid = os.getgid()
Expand Down
6 changes: 3 additions & 3 deletions stimela/backends/kube/run_kube.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging, time, json, datetime, os.path, pathlib, secrets, shlex
from typing import Dict, Optional, Any, List
from typing import Dict, Optional, Any, List, Callable
from dataclasses import fields
from datetime import timedelta
from requests import ConnectionError
Expand Down Expand Up @@ -44,13 +44,13 @@ def run(cab: Cab, params: Dict[str, Any], fqname: str,
"""

if not cab.image:
raise StimelaCabRuntimeError(f"kube runner requires cab.image to be set")
raise BackendError(f"kube backend requires cab.image to be set")

kube = backend.kube

namespace = kube.namespace
if not namespace:
raise StimelaCabRuntimeError(f"runtime.kube.namespace must be set")
raise BackendError(f"runtime.kube.namespace must be set")

args = cab.flavour.get_arguments(cab, params, subst, check_executable=False)
log.debug(f"command line is {args}")
Expand Down
2 changes: 1 addition & 1 deletion stimela/backends/native/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import stimela

def is_available():
def is_available(opts = None):
return True

def get_status():
Expand Down
17 changes: 12 additions & 5 deletions stimela/backends/native/run_native.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging, datetime, resource, os.path

from typing import Dict, Optional, Any, List
from typing import Dict, Optional, Any, List, Callable

import stimela
import stimela.kitchen
Expand Down Expand Up @@ -37,14 +37,18 @@ def build_command_line(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], s

def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str,
backend: 'stimela.backend.StimelaBackendOptions',
log: logging.Logger, subst: Optional[Dict[str, Any]] = None):
"""Runs cab contents
log: logging.Logger, subst: Optional[Dict[str, Any]] = None,
command_wrapper: Optional[Callable] = None):
"""
Runs cab contents
Args:
cab (Cab): cab object
cab: cab object
params: cab parameters
backend: backed settings object
log (logger): logger to use
subst (Optional[Dict[str, Any]]): Substitution dict for commands etc., if any.
command_wrapper (Callable): takes a list of args and returns modified list of args
Returns:
Any: return value (e.g. exit code) of content
"""
Expand Down Expand Up @@ -79,6 +83,9 @@ def elapsed(since=None):
return str(datetime.datetime.now() - (since or start_time)).split('.', 1)[0]

# log.info(f"argument lengths are {[len(a) for a in args]}")

if command_wrapper:
args = command_wrapper(args)

retcode = xrun(args[0], args[1:], shell=False, log=log,
output_wrangler=cabstat.apply_wranglers,
Expand Down
2 changes: 1 addition & 1 deletion stimela/backends/podman.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import datetime
import tempfile

def is_available():
def is_available(opts = None):
return False

def get_status():
Expand Down
67 changes: 51 additions & 16 deletions stimela/backends/runner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,38 @@
from typing import Dict, Optional, Any
import logging
from typing import Dict, Optional, Any, Callable
from dataclasses import dataclass
from omegaconf import OmegaConf
from omegaconf.errors import OmegaConfBaseException
import stimela
from stimela.backends import StimelaBackendOptions, StimelaBackendSchema
from stimela.exceptions import BackendError

from . import get_backend, get_backend_status
from . import get_backend, get_backend_status, slurm

def validate_backend_settings(backend_opts: Dict[str, Any]):

@dataclass
class BackendWrapper(object):
opts: StimelaBackendOptions
is_remote: bool
is_remote_fs: bool
backend: Any
backend_name: str
run_command_wrapper: Optional[Callable]
build_command_wrapper: Optional[Callable]

def run(self, cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str,
log: logging.Logger, subst: Optional[Dict[str, Any]] = None):
return self.backend.run(cab, params, fqname=fqname, backend=self.opts, log=log, subst=subst,
command_wrapper=self.run_command_wrapper)

def build(self, cab: 'stimela.kitchen.cab.Cab', log: logging.Logger, rebuild=False):
if not hasattr(self.backend, 'build'):
raise BackendError(f"{self.backend_name} backend does not support the build command")
return self.backend.build(cab, backend=self.opts, log=log, rebuild=rebuild,
command_wrapper=self.build_command_wrapper)


def validate_backend_settings(backend_opts: Dict[str, Any]) -> BackendWrapper:
"""Checks that backend settings refer to a valid backend
Returs tuple of options, main, wrapper, where 'main' the the main backend, and 'wrapper' is an optional wrapper backend
Expand All @@ -15,26 +41,35 @@ def validate_backend_settings(backend_opts: Dict[str, Any]):
if not isinstance(backend_opts, StimelaBackendOptions):
backend_opts = OmegaConf.to_object(backend_opts)

main = main_backend = None
backend_name = backend = None
selected = backend_opts.select or ['native']
# select containerization engine, if any
for engine in selected:
for name in selected:
# check that backend has not been disabled
opts = getattr(backend_opts, engine, None)
opts = getattr(backend_opts, name, None)
if not opts or opts.enable:
main_backend = get_backend(engine)
if main_backend is not None:
main = engine
backend = get_backend(name, opts)
if backend is not None:
backend_name = name
break
else:
raise BackendError(f"selected backends ({', '.join(selected)}) not available")


is_remote = is_remote_fs = backend.is_remote()

# check if slurm wrapper is to be applied
wrapper = None
if False: # placeholder -- should be: if backend.slurm and backed.slurm.enable
wrapper = get_backend('slurm')
if wrapper is None:
raise BackendError(f"backend 'slurm' not available ({get_backend_status('slurm')})")
if backend_opts.slurm.enable:
if is_remote:
raise BackendError(f"can't combine slurm with {backend_name} backend")
is_remote = True
is_remote_fs = False
run_command_wrapper = backend_opts.slurm.run_command_wrapper
build_command_wrapper = backend_opts.slurm.build_command_wrapper
else:
run_command_wrapper = build_command_wrapper = None

return backend_opts, main_backend, wrapper
return BackendWrapper(opts=backend_opts, is_remote=is_remote, is_remote_fs=is_remote_fs,
backend=backend, backend_name=backend_name,
run_command_wrapper=run_command_wrapper,
build_command_wrapper=build_command_wrapper)

Loading

0 comments on commit b8cbeb9

Please sign in to comment.