diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 5d31f4ef8..34e02a745 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -36,9 +36,7 @@ def depends_on_virtualenv_dir(method: Callable[[Any], Any]) -> Callable[[Any], A def wrapper(operator: DbtVirtualenvBaseOperator, *args: Any) -> None: if operator.virtualenv_dir is None: raise CosmosValueError(f"Method relies on value of parameter `virtualenv_dir` which is None.") - - logger.info(f"Operator: {operator}") - logger.info(f"Args: {args}") + method(operator, *args) return wrapper @@ -153,14 +151,17 @@ def __lock_file(self) -> Path: def _pid(self) -> int: return os.getpid() - @depends_on_virtualenv_dir + #@depends_on_virtualenv_dir def _is_lock_available(self) -> bool: if self.__lock_file.is_file(): with open(self.__lock_file, "r") as lf: pid = int(lf.read()) self.log.info(f"Checking for running process with PID {pid}") - _process_running = psutil.Process(pid).is_running() + try: + _process_running = psutil.Process(pid).is_running() + except psutil.NoSuchProcess: + _process_running = False self.log.info(f"Process {pid} running: {_process_running}") return not _process_running @@ -179,7 +180,9 @@ def __acquire_venv_lock(self) -> None: @depends_on_virtualenv_dir def __release_venv_lock(self) -> None: if not self.__lock_file.is_file(): - raise FileNotFoundError(f"Lockfile {self.__lock_file} not found") + self.log.warn(f"Lockfile {self.__lock_file} not found, perhaps deleted by other concurrent operator?") + + return with open(self.__lock_file, "r") as lf: lock_file_pid = int(lf.read())