Skip to content

Commit

Permalink
Merge pull request #249 from caracal-pipeline/issue-234
Browse files Browse the repository at this point in the history
Fixes standalone cab running
  • Loading branch information
SpheMakh authored Mar 2, 2024
2 parents 7d02af6 + cba2ef1 commit 4344313
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 55 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "stimela"
version = "2.0rc13"
version = "2.0rc14"
description = "Framework for system agnostic pipelines for (not just) radio interferometry"
authors = ["Sphesihle Makhathini <[email protected]>", "Oleg Smirnov and RATT <[email protected]>"]
readme = "README.rst"
Expand Down
16 changes: 10 additions & 6 deletions stimela/backends/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ class SlurmOptions(object):
srun_opts: Dict[str, str] = EmptyDictDefault() # extra options passed to srun. "--" prepended, and "_" replaced by "-"
srun_opts_build: Optional[Dict[str, str]] = None # extra options passed to srun for build commands. If None, use srun_opts
build_local: bool = True # if True, images will be built locally (i.e. on the head node) even when slurm is enabled
# these will be checked for
required_mem_opts: Optional[List[str]] = ListDefault("mem", "mem-per-cpu", "mem-per-gpu")

# ## disabling this for now
# # these will be checked for
# required_mem_opts: Optional[List[str]] = ListDefault("mem", "mem-per-cpu", "mem-per-gpu")
# # this will be applied if the required above are missing
# default_mem_opt: str = "8GB"

def get_executable(self):
global _default_srun_path
Expand Down Expand Up @@ -66,10 +70,10 @@ def wrap_build_command(self, args: List[str], fqname: Optional[str]=None, log: O
return self._wrap(self.srun_opts_build if self.srun_opts_build is not None else self.srun_opts, args, fqname)

def validate(self, log: logging.Logger):
if self.required_mem_opts:
if not set(self.srun_opts.keys()).intersection(self.required_mem_opts):
raise BackendError(f"slurm.srun_opts must set one of the following: {', '.join(self.required_mem_opts)}")

pass
# if self.required_mem_opts:
# if not set(self.srun_opts.keys()).intersection(self.required_mem_opts):
# self.srun_opts['mem'] = self.default_mem_opt



Expand Down
12 changes: 10 additions & 2 deletions stimela/commands/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,20 @@
tab completion feature.""")
@click.option("-l", "--last-recipe", is_flag=True,
help="""if multiple recipes are defined, selects the last one for building.""")
@click.option("-S", "--singularity", "enable_singularity", is_flag=True,
help="""Selects the singularity backend (shortcut for -C opts.backend.select=singularity)""")
@click.option("--slurm", "enable_slurm", is_flag=True,
help="""Enables the slurm backend wrapper (shortcut for -C backend.slurm.enable=True)""")
@click.argument("what", metavar="filename.yml ... [recipe name] [PARAM=VALUE] ...", nargs=-1, required=True)
def build(what: str, last_recipe: bool = False, rebuild: bool = False, all_steps: bool=False,
config_equals: List[str] = [],
config_assign: List[Tuple[str, str]] = [],
step_ranges: List[str] = [], tags: List[str] = [], skip_tags: List[str] = [], enable_steps: List[str] = []):
step_ranges: List[str] = [], tags: List[str] = [], skip_tags: List[str] = [], enable_steps: List[str] = [],
enable_singularity=False,
enable_slurm=False):
return run.callback(what, last_recipe=last_recipe, step_ranges=step_ranges,
tags=tags, skip_tags=skip_tags, enable_steps=enable_steps,
config_equals=config_equals, config_assign=config_assign,
build=True, rebuild=rebuild, build_skips=all_steps)
build=True, rebuild=rebuild, build_skips=all_steps,
enable_singularity=enable_singularity,
enable_slurm=enable_slurm)
18 changes: 14 additions & 4 deletions stimela/commands/doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from stimela.exceptions import RecipeValidationError
from stimela.task_stats import destroy_progress_bar

from .run import load_recipe_files
from .run import load_recipe_files, resolve_recipe_file

@cli.command("doc",
help="""
Expand Down Expand Up @@ -66,10 +66,17 @@ def load_recipe(name: str, section: Dict):
# load all recipe/config files
files_to_load = []
names_to_document = []
errcode = None
for item in what:
if os.path.splitext(item)[1].lower() in (".yml", ".yaml"):
files_to_load.append(item)
log.info(f"will load recipe/config file '{item}'")
try:
filename = resolve_recipe_file(item)
except FileNotFoundError as exc:
log_exception(exc)
errcode = 2
continue
if filename:
files_to_load.append(filename)
log.info(f"will load recipe/config file {filename}")
else:
names_to_document.append(item)

Expand All @@ -78,6 +85,9 @@ def load_recipe(name: str, section: Dict):
load_recipe_files(files_to_load)

destroy_progress_bar()

if errcode:
sys.exit(errcode)

log.info(f"loaded {len(stimela.CONFIG.cabs)} cab definition(s) and {len(stimela.CONFIG.lib.recipes)} recipe(s)")

Expand Down
211 changes: 170 additions & 41 deletions stimela/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,72 @@
from collections import OrderedDict
from omegaconf.omegaconf import OmegaConf, OmegaConfBaseException


import stimela
from scabha import configuratt
from scabha.basetypes import UNSET
from scabha.exceptions import ScabhaBaseException
from scabha.substitutions import SubstitutionNS
from stimela import stimelogging
import stimela.config
from stimela.config import ConfigExceptionTypes
from stimela import logger, log_exception
from stimela.exceptions import RecipeValidationError, StimelaRuntimeError, StepSelectionError
from stimela.exceptions import RecipeValidationError, StimelaRuntimeError, StepSelectionError, StepValidationError
from stimela.main import cli
from stimela.kitchen.recipe import Recipe, Step, RecipeSchema, join_quote
from stimela import task_stats
import stimela.backends

_yaml_extensions = {".yml", ".yaml", ".YML", ".YAML"}

def resolve_recipe_file(filename: str):
"""
Resolves a recipe file, which may be specified as (module)recipe.yml or module::recipe.yml, with
the suffix being optional.
Returns real path if file resolved, or None if filename should not be treated as a recipe file.
Raises FileNotFoundError if filename is a recipe file that doesn't exist.
"""
ext = os.path.splitext(filename)[1].lower()
# unrecognized extension -- treat as non-filename
if ext and ext not in _yaml_extensions:
return None

# check for (location)filename.yml or (location)/filename.yml style
match1 = re.fullmatch("^\\((.+)\\)/?(.+)$", filename)
match2 = re.fullmatch("^([\w.]+)::(.+)$", filename)
if match1 or match2:
modulename, fname = (match1 or match2).groups()
try:
mod = importlib.import_module(modulename)
except ImportError as exc:
raise FileNotFoundError(f"{filename} not found ({exc})")
# get filename
fname = os.path.join(os.path.dirname(mod.__file__), fname)
if ext:
if os.path.exists(fname):
return fname
else:
raise FileNotFoundError(f"{filename} resolves to {fname}, which doesn't exist")
# else check for implicit extension
else:
for ext in _yaml_extensions:
path = f"{fname}{ext}"
if os.path.exists(path):
return path
else:
raise FileNotFoundError(f"{filename} resolves to {fname}, which doesn't match any YaML files")
# no match and no extension, treat as non-filename
if not ext:
return None
if os.path.exists(filename):
return filename
raise FileNotFoundError(f"{filename} doesn't exist")


def load_recipe_files(filenames: List[str]):
"""Loads a set of config or recipe files. Returns list of recipes loaded."""

full_conf = OmegaConf.create()
full_deps = configuratt.ConfigDependencies()
Expand All @@ -45,6 +96,9 @@ def load_recipe_files(filenames: List[str]):
# try loading
try:
conf, deps = configuratt.load(filename, use_sources=[stimela.CONFIG, full_conf], no_toplevel_cache=True)
except FileNotFoundError as exc:
log_exception(exc)
sys.exit(2)
except ConfigExceptionTypes as exc:
log_exception(f"error loading {filename}", exc)
sys.exit(2)
Expand Down Expand Up @@ -127,18 +181,30 @@ def load_recipe_files(filenames: List[str]):
help="""Doesn't actually run anything, only prints the selected steps.""")
@click.option("-p", "--profile", metavar="DEPTH", type=int,
help="""Print per-step profiling stats to this depth. 0 disables.""")
@click.argument("parameters", nargs=-1, metavar="filename.yml ... [recipe name] [PARAM=VALUE] ...", required=True)
@click.option("-N", "--native", "enable_native", is_flag=True,
help="""Selects the native backend (shortcut for -C opts.backend.select=native)""")
@click.option("-S", "--singularity", "enable_singularity", is_flag=True,
help="""Selects the singularity backend (shortcut for -C opts.backend.select=singularity)""")
@click.option("-K", "--kube", "enable_kube", is_flag=True,
help="""Selects the kubernetes backend (shortcut for -C opts.backend.select=kube)""")
@click.option("--slurm", "enable_slurm", is_flag=True,
help="""Enables the slurm backend wrapper (shortcut for -C backend.slurm.enable=True)""")
@click.argument("parameters", nargs=-1, metavar="filename.yml ... [recipe or cab name] [PARAM=VALUE] ...", required=True)
def run(parameters: List[str] = [], dry_run: bool = False, last_recipe: bool = False, profile: Optional[int] = None,
assign: List[Tuple[str, str]] = [],
config_equals: List[str] = [],
config_assign: List[Tuple[str, str]] = [],
step_ranges: List[str] = [], tags: List[str] = [], skip_tags: List[str] = [], enable_steps: List[str] = [],
build=False, rebuild=False, build_skips=False):
build=False, rebuild=False, build_skips=False,
enable_native=False,
enable_singularity=False,
enable_kube=False,
enable_slurm=False):

log = logger()
params = OrderedDict()
errcode = 0
recipe_name = cab_name = None
recipe_or_cab = None
files_to_load = []

def convert_value(value):
Expand Down Expand Up @@ -166,16 +232,22 @@ def convert_value(value):
except Exception as exc:
log_exception(f"error parsing {pp}", exc)
errcode = 2
elif os.path.splitext(pp)[1].lower() in (".yml", ".yaml"):
files_to_load.append(pp)
log.info(f"will load recipe/config file '{pp}'")
else:
if recipe_name is not None:
log_exception(f"multiple recipe names given")
try:
filename = resolve_recipe_file(pp)
except FileNotFoundError as exc:
log_exception(exc)
errcode = 2
continue
if filename:
files_to_load.append(filename)
log.info(f"will load recipe/config file {filename}")
elif recipe_or_cab is not None:
log_exception(f"multiple recipe/cab names given")
errcode = 2
else:
recipe_name = pp
log.info(f"treating '{pp}' as a recipe name")
recipe_or_cab = pp
log.info(f"treating {pp} as a recipe or cab name")

if errcode:
sys.exit(errcode)
Expand All @@ -199,44 +271,98 @@ def convert_value(value):
log_exception(f"error loading -C/--config-assign assignments", exc)
sys.exit(2)

# run a cab
# this is currenty always None (see https://github.com/caracal-pipeline/stimela/issues/234), so effectively
# disabled. Leaving the code in place to re-enable another time.
if cab_name is not None:
log.info(f"setting up cab {cab_name}")
# enable backends
if enable_native:
log.info("selecting the native backend")
stimela.CONFIG.opts.backend.select = 'native'
elif enable_singularity:
log.info("selecting the singularity backend")
stimela.CONFIG.opts.backend.select = 'singularity'
elif enable_kube:
log.info("selecting the kube backend")
stimela.CONFIG.opts.backend.select = 'kube'
if enable_slurm:
log.info("enabling the slurm backend wrapper")
stimela.CONFIG.opts.backend.slurm.enable = True

def log_available_runnables():
"""Helper function to list available recipes or cabs"""
if available_recipes:
log.info(f"available recipes: {' '.join(available_recipes)}")
if stimela.CONFIG.cabs:
log.info(f"available cabs: {' '.join(stimela.CONFIG.cabs.keys())}")

# figure out what we're running, recipe or cab
recipe_name = cab_name = None
# do we need to make an implicit choice?
if recipe_or_cab is None:
# -l specified, pick the last recipe
if last_recipe:
if not available_recipes:
log.error(f"-l/--last-recipe specified, but no valid recipes were loaded")
sys.exit(2)
else:
recipe_name = available_recipes[-1]
log.info(f"-l/--last-recipe specified, selecting '{recipe_name}'")
# nothing specified, either we have exactly 1 recipe defined (pick that), or 0 recipes and 1 cab
elif len(available_recipes) == 1:
recipe_name = available_recipes[0]
log.info(f"found single recipe '{recipe_name}', selecting it implicitly")
elif len(stimela.CONFIG.cabs) == 1 and not available_recipes:
cab_name = next(iter(stimela.CONFIG.cabs))
log.info(f"found single cab '{cab_name}', selecting it implicitly")
else:
log.error("found multiple recipes or cabs, please specify one on the command line")
log_available_runnables()
sys.exit(2)
# else something was specified
elif recipe_or_cab in available_recipes:
recipe_name = recipe_or_cab
log.info(f"selected recipe is '{recipe_name}'")
elif recipe_or_cab in stimela.CONFIG.cabs:
cab_name = recipe_or_cab
log.info(f"selected cab is '{cab_name}'")
else:
if not available_recipes and not stimela.CONFIG.cabs:
log.error("no valid recipes or cabs were loaded")
else:
log.error(f"'{recipe_or_cab}' does not refer to a recipe or a cab")
log_available_runnables()
sys.exit(2)

# are we running a standalone cab?
if cab_name is not None:
# create step config by merging in settings (var=value pairs from the command line)
outer_step = Step(cab=cab_name, params=params)
outer_step.name = outer_step.fqname = cab_name
# provide basic substitutions for running the step below
subst = SubstitutionNS()
info = SubstitutionNS(fqname=cab_name, label=cab_name, label_parts=[], suffix='', taskname=cab_name)
subst._add_('info', info, nosubst=True)
subst._add_('config', stimela.CONFIG, nosubst=True)
subst._add_('current', SubstitutionNS(**params))

# prevalidate() is done by run() automatically if not already done, but it does set up the recipe's logger, so do it anyway
try:
outer_step.prevalidate(root=True)
outer_step.prevalidate(root=True, subst=subst)
except ScabhaBaseException as exc:
log_exception(exc)
sys.exit(1)
# run a recipe
else:
if not stimela.CONFIG.lib.recipes:
log_exception(f"no recipes were specified")
sys.exit(2)

if recipe_name:
if recipe_name not in stimela.CONFIG.lib.recipes:
log_exception(f"recipe '{recipe_name}' not found")
sys.exit(2)
else:
if len(available_recipes) == 0:
log_exception(f"no top-level recipes were found")
sys.exit(2)
elif last_recipe or len(available_recipes) == 1:
recipe_name = available_recipes[-1]
else:
logger().info(f"found multiple top-level recipes: {', '.join(available_recipes)}")
log_exception(f"please specify a recipe on the command line, or use -l/--last-recipe")
sys.exit(2)

log.info(f"selected recipe is '{recipe_name}'")
# check for missing parameters
if not build and (outer_step.missing_params or outer_step.unresolved_params):
missing = {}
for name in outer_step.missing_params:
missing[name] = outer_step.inputs_outputs[name].info
# don't report unresolved implicits, since that's just a consequence of a missing input
for name in outer_step.unresolved_params:
if not outer_step.inputs_outputs[name].implicit:
missing[name] = outer_step.inputs_outputs[name].info
#
if missing:
log_exception(StepValidationError(f"cab '{cab_name}' is missing required parameter(s)", missing))
sys.exit(1)

# else run a recipe
else:
# create recipe object from the config
kwargs = dict(**stimela.CONFIG.lib.recipes[recipe_name])
kwargs.setdefault('name', recipe_name)
Expand Down Expand Up @@ -302,6 +428,9 @@ def convert_value(value):
stimela.config.CONFIG_DEPS.save(filename)
log.info(f"saved recipe dependencies to {filename}")

# no substitutions provided, recipe initializes its own
subst = None

# in debug mode, pretty-print the recipe
if log.isEnabledFor(logging.DEBUG):
log.debug("---------- prevalidated step follows ----------")
Expand Down Expand Up @@ -337,7 +466,7 @@ def elapsed():
# else run the recipe
else:
try:
outputs = outer_step.run(is_outer_step=True)
outputs = outer_step.run(is_outer_step=True, subst=subst)
except Exception as exc:
stimela.backends.close_backends(log)

Expand Down
Loading

0 comments on commit 4344313

Please sign in to comment.