From 0bc0cdfe0aef36f393d5171e27400384e10dffcb Mon Sep 17 00:00:00 2001 From: Adam Souzis Date: Sat, 3 Aug 2024 19:28:42 -0700 Subject: [PATCH] feat(job): add global locking using git lfs (fix #94) --- README.md | 1 - docs/jobs.rst | 29 ++++++++++++++ unfurl/manifest-schema.json | 26 ++++++++++++ unfurl/repo.py | 46 ++++++++++++++++++++- unfurl/spec.py | 4 +- unfurl/yamlmanifest.py | 79 ++++++++++++++++++++++++++++++++++--- 6 files changed, 175 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 183c94aa..c4ce21a7 100644 --- a/README.md +++ b/README.md @@ -178,7 +178,6 @@ Arguments after `--` are passed to the test runner, e.g. to run an individual te Be mindful of these limitations: - Only clone and deploy trusted repositories and projects. The docker runtime is not configured to provide isolation so you should assume any project may contain executable code that can gain full access to your system. -- Locking to prevent multiple instances of Unfurl from modifying the same resources at the same time currently only works with instances accessing the same local copy of an ensemble. - Incremental updates are only partially implemented. You can incrementally update an ensemble by explicitly limiting jobs with the `--force` and `--instance` [command line options](https://docs.unfurl.run/cli.html#unfurl-deploy). ## Unfurl Cloud diff --git a/docs/jobs.rst b/docs/jobs.rst index 5af46c37..f866ecea 100644 --- a/docs/jobs.rst +++ b/docs/jobs.rst @@ -93,3 +93,32 @@ can be found in its `readyState` section, for example: state: started # node state lastConfigChange: A0AP4P9C0001 # change id of the last ConfigChange that was applied lastStateChange: A0DEVF0003 # change id of the last detected change to the instance + +Locking +~~~~~~~ + +When a job is running Unfurl will lock the ensemble to prevent other instances of Unfurl from modifying the same ensemble. +There are two kinds of locks: a local lock that prevents access to the same local copy of an ensemble and a global lock which takes a `git lfs`_ lock to prevent access to the ensemble across remote servers. +Note that locks don't cause the job to block, instead the job will just abort when it starts. It is up to the user to re-run the job if aborts due to locking. + +The local locking is always enabled but global remote lock need to be enabled via the ``lfs_lock`` section in your project's `environment` configuration. The following settings are available: + + +:lock: Whether to use git lfs when locking an ensemble, it can be set to one of: + + "no", don't try to lock (the default) + + "try", take a lock if th e git lfs server is available + + "require" abort job if unable to take a git lfs lock + +:url: The URL of the Git LFS server to use. If missing, the ensemble's git repository origin remote will be used. + +:name: Name of the lock file to use. Note that with git lfs, the file doesn't need to exist in in the git repository. If omitted, the local lock's file path will be used. + By setting this you can set the scope to be coarser (or narrower) than each Individual ensemble as any ensemble using the same name will be locked. + + The following string substitutions are available to dynamically generate the name: ``$ensemble_uri``, ``$environment``, and ``$local_lock_path``. For example, setting a name like "group1/$environment" would prevent jobs from simultaneously running that share the lock name "group1" and were in same environment. + +As these settings are `environment`` settings, they will be merged with the current project, home project, and the ensemble's environment sections. Unlike most other environment settings, the ensemble's settings takes priority and overrides the project's settings. + +.. _git lfs: https://git-lfs.com/ \ No newline at end of file diff --git a/unfurl/manifest-schema.json b/unfurl/manifest-schema.json index 346c3b62..beb046dc 100644 --- a/unfurl/manifest-schema.json +++ b/unfurl/manifest-schema.json @@ -147,6 +147,32 @@ } } ] + }, + "lfs_lock": { + "type": "object", + "$$target": "#/definitions/environment/lfs_lock", + "description": "Configure the global git lfs lock for jobs. See Job `locking`", + "properties": { + "lock": { + "type": "string", + "enum": [ + "no", + "try", + "require" + ], + "default": "no", + "description": "Whether to use git lfs when locking an ensemble. 'try' use locking if lfs is available; 'require' means error if unable to lock" + }, + "name": { + "type": "string", + "description": "Name of the lock file" + }, + "url": { + "type": "string", + "format": "uri", + "description": "Git LFS server url; if missing, use the ensemble's git repository" + } + } } } }, diff --git a/unfurl/repo.py b/unfurl/repo.py index 68ad1a8c..b9cba5ad 100644 --- a/unfurl/repo.py +++ b/unfurl/repo.py @@ -813,7 +813,7 @@ def reset(self, args: str = "--hard HEAD~1") -> bool: return not self.run_cmd(("reset " + args).split())[0] def run_cmd( - self, args, with_exceptions: bool=False, **kw + self, args, with_exceptions: bool = False, **kw ) -> Tuple[int, str, str]: """ :return: @@ -978,6 +978,50 @@ def delete_dir(self, path, commit=None): if commit: self.repo.index.commit(commit) + def is_lfs_enabled(self, url=None) -> bool: + if self.repo.remotes: + status, out, err = self.run_cmd(["lfs", "locks"], remote=url) + if status: + logger.warning( + "git lfs on %s not available, `git lfs locks` says: %s", + self.safe_url, + err, + ) + else: + logger.debug( + "git lfs on %s available, `git lfs locks` says: %s", + self.safe_url, + out, + ) + return True + return False + + def lock_lfs(self, lockfilepath: str, url=None) -> bool: + try: + # note: file doesn't have to exist or added to the repo or have its .gitattributes set + self.run_cmd( + ["lfs", "lock", lockfilepath], remote=url, with_exceptions=True + ) + except git.exc.GitCommandError as e: + if "already locked" in e.stderr: + return False + else: + raise + return True + + def unlock_lfs(self, lockfilepath: str, url=None) -> bool: + try: + # note: file doesn't have to exist or added to the repo or have its .gitattributes set + self.run_cmd( + ["lfs", "unlock", lockfilepath], remote=url, with_exceptions=True + ) + except git.exc.GitCommandError as e: + if "no matching locks found" in e.stderr: + return False + else: + raise + return True + # XXX: def getDependentRepos() # XXX: def canManage() diff --git a/unfurl/spec.py b/unfurl/spec.py index acb3d258..6ae40dc0 100644 --- a/unfurl/spec.py +++ b/unfurl/spec.py @@ -91,11 +91,11 @@ def validate_unfurl_identifier(name): return re.match(r"^[A-Za-z._][A-Za-z0-9._:\-]*$", name) is not None -def encode_unfurl_identifier(name): +def encode_unfurl_identifier(name, escape=r"[^A-Za-z0-9._:-]"): def encode(match): return f"-{ord(match.group(0))}-" - return re.sub(r"[^A-Za-z0-9._:\-]", encode, name) + return re.sub(escape, encode, name) def decode_unfurl_identifier(name): diff --git a/unfurl/yamlmanifest.py b/unfurl/yamlmanifest.py index c4f73f8e..7c63a6bc 100644 --- a/unfurl/yamlmanifest.py +++ b/unfurl/yamlmanifest.py @@ -4,7 +4,8 @@ import io import json -from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING, cast +from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING, TypedDict, cast +from typing_extensions import NotRequired import sys from collections.abc import MutableSequence, Mapping import numbers @@ -19,7 +20,7 @@ from functools import lru_cache as cache from . import DefaultNames -from .util import UnfurlError, get_base_dir, to_yaml_text, filter_env +from .util import UnfurlError, get_base_dir, substitute_env, to_yaml_text, filter_env from .merge import patch_dict, intersect_dict from .yamlloader import YamlConfig, make_yaml from .result import serialize_value @@ -27,7 +28,7 @@ from .localenv import LocalEnv from .lock import Lock from .manifest import Manifest, relabel_dict, ChangeRecordRecord -from .spec import ArtifactSpec, NodeSpec, find_env_vars +from .spec import ArtifactSpec, NodeSpec, encode_unfurl_identifier, find_env_vars from .runtime import EntityInstance, NodeInstance, TopologyInstance from .eval import map_value from .planrequests import create_instance_from_spec @@ -205,6 +206,12 @@ def get_manifest_schema(format: str) -> dict: return schema +class LfsSettings(TypedDict): + lock: NotRequired[str] # require, no, try + name: NotRequired[str] # name of the lock $ensemble or $environment + url: NotRequired[str] # otherwise use the ensemble's git repository + + class ReadOnlyManifest(Manifest): """Loads an ensemble from a manifest but doesn't instantiate the instance model.""" @@ -225,7 +232,7 @@ def __init__( readonly = bool(localEnv and localEnv.readonly) self.safe_mode = bool(safe_mode) schema = get_manifest_schema( - localEnv and localEnv.overrides.get("format") or "" + localEnv and localEnv.overrides.get("format") or "" ) self.manifest = YamlConfig( manifest, @@ -333,6 +340,8 @@ class YamlManifest(ReadOnlyManifest): _operationIndex: Optional[Dict[Tuple[str, str], str]] = None lockfilepath = None lockfile = None + lfs_locked: Optional[str] = None + lfs_url: Optional[str] = None def __init__( self, @@ -681,8 +690,58 @@ def load_changes(self, changes: Optional[List[dict]], changeLogPath: str) -> boo } return self.changeSets is not None - def lock(self): + def lfs_settings(self) -> Tuple[bool, bool, str, Optional[str]]: + local = self.manifest.expanded.get("environment", {}).get("lfs_lock") + env = self.context.get("lfs_lock", {}).copy() + # give ensemble priority: + if local: + env.update(local) + lock = cast(LfsSettings, env) + enable = lock.get("lock", "no") + assert enable in ("require", "no", "try") + if not lock or enable == "no": + return False, False, "", None + else: + lfs_try = True + lfs_required = enable == "require" + lfs_url = lock.get("url", "") + lfs_lock_path = lock.get("name") + if lfs_lock_path and self.localEnv: + lock_vars = dict( + environment=self.localEnv.manifest_context_name, ensemble_uri=self.uri + ) + if self.repo: + lock_vars["local_lock_path"] = os.path.relpath( + self.lockfilepath or "", self.repo.working_dir + ) + lfs_lock_path = substitute_env(lfs_lock_path, lock_vars) + elif self.lockfilepath and self.repo: + lfs_lock_path = os.path.relpath(self.lockfilepath, self.repo.working_dir) + else: + lfs_lock_path = self.uri + escaped_lfs_lock_path = encode_unfurl_identifier(lfs_lock_path, r"[^\w/-]") + return lfs_try, lfs_required, escaped_lfs_lock_path, lfs_url + + def _lock_lfs(self) -> bool: + lfs_try, lfs_required, lfs_lock_path, lfs_url = self.lfs_settings() + if lfs_try: + if self.repo and self.repo.is_lfs_enabled(lfs_url): + if self.repo.lock_lfs(lfs_lock_path, lfs_url): + self.lfs_locked = lfs_lock_path + self.lfs_url = lfs_url + logger.debug(f"git lfs locked {self.lockfilepath}") + return True + else: + msg = f"Ensemble {self.path} is remotely locked at {lfs_lock_path}" + raise UnfurlError(msg) + elif lfs_required: + msg = "git lfs is not available but is required to use this ensemble" + raise UnfurlError(msg) + return False + + def lock(self) -> bool: # implement simple local file locking -- no waiting on the lock + # lock() should never be called when already holding a lock, raise error if it does msg = f"Ensemble {self.path} was already locked -- is there a circular reference between external ensembles?" if self.lockfile: raise UnfurlError(msg) @@ -698,12 +757,20 @@ def lock(self): f"Lockfile '{self.lockfilepath}' already created by another process {pid} " ) else: - # ok if we race here, we'll just raise an error + # open exclusively, ok if we race here, we'll just raise an error self.lockfile = open(self.lockfilepath, "xb", buffering=0) self.lockfile.write(bytes(str(os.getpid()), "ascii")) # type: ignore + try: + self._lock_lfs() + except Exception: + self.unlock() + raise return True def unlock(self): + if self.repo and self.lfs_locked: + self.repo.unlock_lfs(self.lfs_locked, self.lfs_url) + self.lfs_locked = None if self.lockfile and self.lockfilepath: # unlink first to avoid race (this will fail on Windows) os.unlink(self.lockfilepath)