Skip to content

Commit

Permalink
feat(job): add global locking using git lfs (fix #94)
Browse files Browse the repository at this point in the history
  • Loading branch information
aszs committed Aug 5, 2024
1 parent 8f2d8fe commit ab0be9a
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 15 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ Arguments after `--` are passed to the test runner, e.g. to run an individual te
Be mindful of these limitations:

- Only clone and deploy trusted repositories and projects. The docker runtime is not configured to provide isolation so you should assume any project may contain executable code that can gain full access to your system.
- Locking to prevent multiple instances of Unfurl from modifying the same resources at the same time currently only works with instances accessing the same local copy of an ensemble.
- Incremental updates are only partially implemented. You can incrementally update an ensemble by explicitly limiting jobs with the `--force` and `--instance` [command line options](https://docs.unfurl.run/cli.html#unfurl-deploy).

## Unfurl Cloud
Expand Down
41 changes: 36 additions & 5 deletions docs/jobs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ Job Lifecycle
When a command that invokes a workflow is executed (`deploy`, `undeploy`, :ref:`check`, :std:ref:`discover` and :ref:`run`)
a job is created and run. Running a job entails these steps:

1. YAML parsed and :ref:`merge directives<yaml_merge_directives>` are processed
1. YAML parsed and :ref:`merge directives<yaml_merge_directives>` are processed, including converting Python `DSL` code to TOSCA YAML.
2. Schema is validated and model instantiated. The command will exit if there are errors.
3. A plan is constructed based on the selected workflow and job options (use :cli:`unfurl plan<unfurl-plan>` command to preview) and the job begins.
4. For each operation a task is generated and the operation's :std:ref:`inputs` are lazily evaluated
if referenced, including Unfurl expressions, TOSCA functions, and template strings.
5. After the job completes, `ensemble.yaml` is updated with any changes to its instances status.
``jobs.tsv`` will also be updated with line for each task run and a new `job.yaml` file is created in the ``jobs`` folder.
6. Depending on the commit options of the job, the ensemble's git repository will see a new commit,
along with any other repository that had changes to it (e.g. files in the ``spec`` directory).
5. Render the plan. The command will exit if there are unrecoverable errors. But render operations that depend on live attributes that will be deferred until those attributes are set when the job runs.
6. Execute the plan. You will be prompted with a plan summary to approve unless the `--approve` flag was used. As it runs, it tracks dependencies, and changes to resource attributes and status.
7. Re-execute steps 5 and 6 to render and run any deferred operations if needed. Repeat until all operations complete.
8. After the job completes, `ensemble.yaml` is updated with any changes to its instances status. It's ``jobs`` folder with have new `job.yaml` file and associated log files.
9. If the ``--commit`` flag was set, the ensemble's git repository will see a new commit,
along with any other project repository that had changes to it.

Operational status and state
=============================
Expand Down Expand Up @@ -93,3 +95,32 @@ can be found in its `readyState` section, for example:
state: started # node state
lastConfigChange: A0AP4P9C0001 # change id of the last ConfigChange that was applied
lastStateChange: A0DEVF0003 # change id of the last detected change to the instance
Locking
==========

When a job is running Unfurl will lock the ensemble to prevent other instances of Unfurl from modifying the same ensemble.
There are two kinds of locks: a local lock that prevents access to the same local copy of an ensemble and a global lock which takes a `git lfs`_ lock to prevent access to the ensemble across remote servers.
Note that locks don't cause the job to block, instead the job will just abort when it starts. It is up to the user to re-run the job if aborts due to locking.

The local locking is always enabled but global remote lock need to be enabled via the ``lfs_lock`` section in your project's `environment` configuration. The following settings are available:


:lock: Whether to use git lfs when locking an ensemble, it can be set to one of:

"no", don't try to lock (the default if missing)

"try", take a lock if th e git lfs server is available

"require", abort job if unable to take a git lfs lock

:url: The URL of the Git LFS server to use. If missing, the ensemble's git repository's "origin" remote will be used.

:name: Name of the lock file to use. Note that with git lfs, the file doesn't need to exist in in the git repository. If omitted, the local lock's file path will be used.
By setting this you can set the scope to be coarser (or narrower) than each Individual ensemble as any ensemble using the same name will be locked.

The following string substitutions are available to dynamically generate the name: ``$ensemble_uri``, ``$environment``, and ``$local_lock_path``. For example, setting a name like "group1/$environment" would prevent jobs from simultaneously running for ensemble with the lock name "group1" and in same the environment.

As these settings are `environment` settings, they will be merged with the current project, home project, and the ensemble's environment sections. Unlike most other environment settings, the ensemble's settings takes priority and overrides the project's settings.

.. _git lfs: https://git-lfs.com/
26 changes: 26 additions & 0 deletions unfurl/manifest-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,32 @@
}
}
]
},
"lfs_lock": {
"type": "object",
"$$target": "#/definitions/environment/lfs_lock",
"description": "Configure the global git lfs lock for jobs. See Job `locking`",
"properties": {
"lock": {
"type": "string",
"enum": [
"no",
"try",
"require"
],
"default": "no",
"description": "Whether to use git lfs when locking an ensemble. 'try' use locking if lfs is available; 'require' means error if unable to lock"
},
"name": {
"type": "string",
"description": "Name of the lock file"
},
"url": {
"type": "string",
"format": "uri",
"description": "Git LFS server url; if missing, use the ensemble's git repository"
}
}
}
}
},
Expand Down
46 changes: 45 additions & 1 deletion unfurl/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ def reset(self, args: str = "--hard HEAD~1") -> bool:
return not self.run_cmd(("reset " + args).split())[0]

def run_cmd(
self, args, with_exceptions: bool=False, **kw
self, args, with_exceptions: bool = False, **kw
) -> Tuple[int, str, str]:
"""
:return:
Expand Down Expand Up @@ -978,6 +978,50 @@ def delete_dir(self, path, commit=None):
if commit:
self.repo.index.commit(commit)

def is_lfs_enabled(self, url=None) -> bool:
if self.repo.remotes:
status, out, err = self.run_cmd(["lfs", "locks"], remote=url)
if status:
logger.warning(
"git lfs on %s not available, `git lfs locks` says: %s",
self.safe_url,
err,
)
else:
logger.debug(
"git lfs on %s available, `git lfs locks` says: %s",
self.safe_url,
out,
)
return True
return False

def lock_lfs(self, lockfilepath: str, url=None) -> bool:
try:
# note: file doesn't have to exist or added to the repo or have its .gitattributes set
self.run_cmd(
["lfs", "lock", lockfilepath], remote=url, with_exceptions=True
)
except git.exc.GitCommandError as e:
if "already locked" in e.stderr:
return False
else:
raise
return True

def unlock_lfs(self, lockfilepath: str, url=None) -> bool:
try:
# note: file doesn't have to exist or added to the repo or have its .gitattributes set
self.run_cmd(
["lfs", "unlock", lockfilepath], remote=url, with_exceptions=True
)
except git.exc.GitCommandError as e:
if "no matching locks found" in e.stderr:
return False
else:
raise
return True

# XXX: def getDependentRepos()
# XXX: def canManage()

Expand Down
4 changes: 2 additions & 2 deletions unfurl/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ def validate_unfurl_identifier(name):
return re.match(r"^[A-Za-z._][A-Za-z0-9._:\-]*$", name) is not None


def encode_unfurl_identifier(name):
def encode_unfurl_identifier(name, escape=r"[^A-Za-z0-9._:-]"):
def encode(match):
return f"-{ord(match.group(0))}-"

return re.sub(r"[^A-Za-z0-9._:\-]", encode, name)
return re.sub(escape, encode, name)


def decode_unfurl_identifier(name):
Expand Down
79 changes: 73 additions & 6 deletions unfurl/yamlmanifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

import io
import json
from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING, cast
from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING, TypedDict, cast
from typing_extensions import NotRequired
import sys
from collections.abc import MutableSequence, Mapping
import numbers
Expand All @@ -19,15 +20,15 @@
from functools import lru_cache as cache

from . import DefaultNames
from .util import UnfurlError, get_base_dir, to_yaml_text, filter_env
from .util import UnfurlError, get_base_dir, substitute_env, to_yaml_text, filter_env
from .merge import patch_dict, intersect_dict
from .yamlloader import YamlConfig, make_yaml
from .result import serialize_value
from .support import ResourceChanges, Defaults, Status
from .localenv import LocalEnv
from .lock import Lock
from .manifest import Manifest, relabel_dict, ChangeRecordRecord
from .spec import ArtifactSpec, NodeSpec, find_env_vars
from .spec import ArtifactSpec, NodeSpec, encode_unfurl_identifier, find_env_vars
from .runtime import EntityInstance, NodeInstance, TopologyInstance
from .eval import map_value
from .planrequests import create_instance_from_spec
Expand Down Expand Up @@ -205,6 +206,12 @@ def get_manifest_schema(format: str) -> dict:
return schema


class LfsSettings(TypedDict):
lock: NotRequired[str] # require, no, try
name: NotRequired[str] # name of the lock $ensemble or $environment
url: NotRequired[str] # otherwise use the ensemble's git repository


class ReadOnlyManifest(Manifest):
"""Loads an ensemble from a manifest but doesn't instantiate the instance model."""

Expand All @@ -225,7 +232,7 @@ def __init__(
readonly = bool(localEnv and localEnv.readonly)
self.safe_mode = bool(safe_mode)
schema = get_manifest_schema(
localEnv and localEnv.overrides.get("format") or ""
localEnv and localEnv.overrides.get("format") or ""
)
self.manifest = YamlConfig(
manifest,
Expand Down Expand Up @@ -333,6 +340,8 @@ class YamlManifest(ReadOnlyManifest):
_operationIndex: Optional[Dict[Tuple[str, str], str]] = None
lockfilepath = None
lockfile = None
lfs_locked: Optional[str] = None
lfs_url: Optional[str] = None

def __init__(
self,
Expand Down Expand Up @@ -681,8 +690,58 @@ def load_changes(self, changes: Optional[List[dict]], changeLogPath: str) -> boo
}
return self.changeSets is not None

def lock(self):
def lfs_settings(self) -> Tuple[bool, bool, str, Optional[str]]:
local = self.manifest.expanded.get("environment", {}).get("lfs_lock")
env = self.context.get("lfs_lock", {}).copy()
# give ensemble priority:
if local:
env.update(local)
lock = cast(LfsSettings, env)
enable = lock.get("lock", "no")
assert enable in ("require", "no", "try")
if not lock or enable == "no":
return False, False, "", None
else:
lfs_try = True
lfs_required = enable == "require"
lfs_url = lock.get("url", "")
lfs_lock_path = lock.get("name")
if lfs_lock_path and self.localEnv:
lock_vars = dict(
environment=self.localEnv.manifest_context_name, ensemble_uri=self.uri
)
if self.repo:
lock_vars["local_lock_path"] = os.path.relpath(
self.lockfilepath or "", self.repo.working_dir
)
lfs_lock_path = substitute_env(lfs_lock_path, lock_vars)
elif self.lockfilepath and self.repo:
lfs_lock_path = os.path.relpath(self.lockfilepath, self.repo.working_dir)
else:
lfs_lock_path = self.uri
escaped_lfs_lock_path = encode_unfurl_identifier(lfs_lock_path, r"[^\w/-]")
return lfs_try, lfs_required, escaped_lfs_lock_path, lfs_url

def _lock_lfs(self) -> bool:
lfs_try, lfs_required, lfs_lock_path, lfs_url = self.lfs_settings()
if lfs_try:
if self.repo and self.repo.is_lfs_enabled(lfs_url):
if self.repo.lock_lfs(lfs_lock_path, lfs_url):
self.lfs_locked = lfs_lock_path
self.lfs_url = lfs_url
logger.debug(f"git lfs locked {self.lockfilepath}")
return True
else:
msg = f"Ensemble {self.path} is remotely locked at {lfs_lock_path}"
raise UnfurlError(msg)
elif lfs_required:
msg = "git lfs is not available but is required to use this ensemble"
raise UnfurlError(msg)
return False

def lock(self) -> bool:
# implement simple local file locking -- no waiting on the lock
# lock() should never be called when already holding a lock, raise error if it does
msg = f"Ensemble {self.path} was already locked -- is there a circular reference between external ensembles?"
if self.lockfile:
raise UnfurlError(msg)
Expand All @@ -698,12 +757,20 @@ def lock(self):
f"Lockfile '{self.lockfilepath}' already created by another process {pid} "
)
else:
# ok if we race here, we'll just raise an error
# open exclusively, ok if we race here, we'll just raise an error
self.lockfile = open(self.lockfilepath, "xb", buffering=0)
self.lockfile.write(bytes(str(os.getpid()), "ascii")) # type: ignore
try:
self._lock_lfs()
except Exception:
self.unlock()
raise
return True

def unlock(self):
if self.repo and self.lfs_locked:
self.repo.unlock_lfs(self.lfs_locked, self.lfs_url)
self.lfs_locked = None
if self.lockfile and self.lockfilepath:
# unlink first to avoid race (this will fail on Windows)
os.unlink(self.lockfilepath)
Expand Down

0 comments on commit ab0be9a

Please sign in to comment.