Skip to content

Commit

Permalink
Iron out locking flow
Browse files Browse the repository at this point in the history
  • Loading branch information
LennartKloppenburg committed Dec 20, 2023
1 parent 3a84aa7 commit 547b0af
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions cosmos/operators/virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ def depends_on_virtualenv_dir(method: Callable[[Any], Any]) -> Callable[[Any], A
def wrapper(operator: DbtVirtualenvBaseOperator, *args: Any) -> None:
if operator.virtualenv_dir is None:
raise CosmosValueError(f"Method relies on value of parameter `virtualenv_dir` which is None.")

logger.info(f"Operator: {operator}")
logger.info(f"Args: {args}")

method(operator, *args)
return wrapper

Expand Down Expand Up @@ -153,14 +151,17 @@ def __lock_file(self) -> Path:
def _pid(self) -> int:
return os.getpid()

@depends_on_virtualenv_dir
#@depends_on_virtualenv_dir
def _is_lock_available(self) -> bool:
if self.__lock_file.is_file():
with open(self.__lock_file, "r") as lf:
pid = int(lf.read())

self.log.info(f"Checking for running process with PID {pid}")
_process_running = psutil.Process(pid).is_running()
try:
_process_running = psutil.Process(pid).is_running()
except psutil.NoSuchProcess:
_process_running = False

self.log.info(f"Process {pid} running: {_process_running}")
return not _process_running
Expand All @@ -179,7 +180,9 @@ def __acquire_venv_lock(self) -> None:
@depends_on_virtualenv_dir
def __release_venv_lock(self) -> None:
if not self.__lock_file.is_file():
raise FileNotFoundError(f"Lockfile {self.__lock_file} not found")
self.log.warn(f"Lockfile {self.__lock_file} not found, perhaps deleted by other concurrent operator?")

return

with open(self.__lock_file, "r") as lf:
lock_file_pid = int(lf.read())
Expand Down

0 comments on commit 547b0af

Please sign in to comment.