From d6a6a9b349270f2008744643e6c5bec53968d414 Mon Sep 17 00:00:00 2001 From: Casey Jao Date: Tue, 6 Sep 2022 09:59:34 -0400 Subject: [PATCH] Reorg BaseExecutors (#1169) * Reorganize BaseExecutors * Move `log_stdout`, `log_stderr`, `cache_dir` to `_AbstractBaseExecutor` * Add `time_limit` and `retries` to `_AbstractBaseExecutor` * Remove conda env logic from `BaseExecutor` * Update changelog --- CHANGELOG.md | 4 + covalent/executor/base.py | 276 ++++---------------- tests/covalent_tests/executor/base_test.py | 93 +++---- tests/functional_tests/choose_conda_test.py | 53 ---- 4 files changed, 85 insertions(+), 341 deletions(-) delete mode 100644 tests/functional_tests/choose_conda_test.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 5af49269b..18622d816 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [UNRELEASED] +### Changed + +- Refactored executor base classes + ## [0.192.0] - 2022-09-02 ### Authors diff --git a/covalent/executor/base.py b/covalent/executor/base.py index 93711ad87..326543d6b 100644 --- a/covalent/executor/base.py +++ b/covalent/executor/base.py @@ -106,10 +106,33 @@ def wrapper_fn( class _AbstractBaseExecutor(ABC): """ - Private class that contains attributes and methods common to both - BaseExecutor and AsyncBaseExecutor + Private parent class for BaseExecutor and AsyncBaseExecutor + + Attributes: + log_stdout: The path to the file to be used for redirecting stdout. + log_stderr: The path to the file to be used for redirecting stderr. + cache_dir: The location used for cached files in the executor. + time_limit: time limit for the task + retries: Number of times to retry execution upon failure + """ + def __init__( + self, + log_stdout: str = "", + log_stderr: str = "", + cache_dir: str = "", + time_limit: int = -1, + retries: int = 0, + *args, + **kwargs, + ): + self.log_stdout = log_stdout + self.log_stderr = log_stderr + self.cache_dir = cache_dir + self.time_limit = time_limit + self.retries = retries + def get_dispatch_context(self, dispatch_info: DispatchInfo) -> ContextManager[DispatchInfo]: """ Start a context manager that will be used to @@ -158,36 +181,22 @@ class BaseExecutor(_AbstractBaseExecutor): plugin. Subclassing this class will allow you to define your own executor plugin which can be used in covalent. - Note: When using a conda environment, it is assumed that - covalent with all its dependencies are also installed in - that environment. - Attributes: log_stdout: The path to the file to be used for redirecting stdout. log_stderr: The path to the file to be used for redirecting stderr. - conda_env: The name of the Conda environment to be used. cache_dir: The location used for cached files in the executor. - current_env_on_conda_fail: If True, the current environment will be used - if conda fails to activate specified env. + time_limit: time limit for the task + retries: Number of times to retry execution upon failure + """ def __init__( self, - log_stdout: str = "", - log_stderr: str = "", - conda_env: str = "", - cache_dir: str = "", - current_env_on_conda_fail: bool = False, *args, **kwargs, ) -> None: - self.log_stdout = log_stdout - self.log_stderr = log_stderr - self.conda_env = conda_env - self.cache_dir = cache_dir - self.current_env_on_conda_fail = current_env_on_conda_fail - self.current_env = "" + super().__init__(*args, **kwargs) def write_streams_to_file( self, @@ -261,23 +270,9 @@ def execute( io.StringIO() ) as stdout, redirect_stderr(io.StringIO()) as stderr: - if self.conda_env != "": - result = None - - result = self.execute_in_conda_env( - function, - fn_version, - args, - kwargs, - self.conda_env, - self.cache_dir, - node_id, - ) - - else: - self.setup(task_metadata=task_metadata) - result = self.run(function, args, kwargs, task_metadata) - self.teardown(task_metadata=task_metadata) + self.setup(task_metadata=task_metadata) + result = self.run(function, args, kwargs, task_metadata) + self.teardown(task_metadata=task_metadata) self.write_streams_to_file( (stdout.getvalue(), stderr.getvalue()), @@ -313,209 +308,32 @@ def teardown(self, task_metadata: Dict) -> Any: """Placeholder to run nay executor specific cleanup/teardown actions""" pass - def execute_in_conda_env( - self, - fn: Callable, - fn_version: str, - args: List, - kwargs: Dict, - conda_env: str, - cache_dir: str, - node_id: int, - ) -> Tuple[bool, Any]: - """ - Execute the function with the given arguments, in a Conda environment. - - Args: - fn: The input python function which will be executed and whose result - is ultimately returned by this function. - fn_version: The python version the function was created with. - args: List of positional arguments to be used by the function. - kwargs: Dictionary of keyword arguments to be used by the function. - conda_env: Name of a Conda environment in which to execute the task. - cache_dir: The directory where temporary files and logs (if any) are stored. - node_id: The integer identifier for the current node. - - Returns: - output: The result of the function execution. - """ - - if not self.get_conda_path(): - return self._on_conda_env_fail(fn, args, kwargs, node_id) - - # Pickle the function - temp_filename = "" - with tempfile.NamedTemporaryFile(dir=cache_dir, delete=False) as f: - pickle.dump(fn, f) - temp_filename = f.name - - result_filename = os.path.join(cache_dir, f'result_{temp_filename.split("/")[-1]}') - - # Write a bash script to activate the environment - shell_commands = "#!/bin/bash\n" - - # Add commands to initialize the Conda shell and activate the environment: - conda_sh = os.path.join( - os.path.dirname(self.conda_path), "..", "etc", "profile.d", "conda.sh" - ) - conda_sh = os.environ.get("CONDA_SHELL", conda_sh) - if os.path.exists(conda_sh): - shell_commands += f"source {conda_sh}\n" - else: - message = "No Conda installation found on this compute node." - app_log.warning(message) - return self._on_conda_env_fail(fn, args, kwargs, node_id) - - shell_commands += f"conda activate {conda_env}\n" - shell_commands += "retval=$?\n" - shell_commands += "if [ $retval -ne 0 ]; then\n" - shell_commands += ( - f' echo "Conda environment {conda_env} is not present on this compute node."\n' - ) - shell_commands += ' echo "Please create that environment (or use an existing environment) and try again."\n' - shell_commands += " exit 99\n" - shell_commands += "fi\n\n" - - # Check Python version and give a warning if there is a mismatch: - shell_commands += "py_version=`python -V | awk '{{print $2}}'`\n" - shell_commands += f'if [[ "{fn_version}" != "$py_version" ]]; then\n' - shell_commands += ' echo "Warning: Python version mismatch:"\n' - shell_commands += f' echo "Workflow version is {fn_version}. Conda environment version is $py_version."\n' - shell_commands += "fi\n\n" - - shell_commands += "python - < None: - """ - Print a list of Conda environments detected on the system. - - Args: - None - - Returns: - None - """ - - self.conda_envs = [] - - env_output = subprocess.run( - ["conda", "env", "list"], capture_output=True, encoding="utf-8" - ) - - if len(env_output.stderr) > 0: - message = f"Problem in listing Conda environments:\n{env_output.stderr}" - app_log.warning(message) - return - - for line in env_output.stdout.split("\n"): - if not line.startswith("#"): - row = line.split() - if len(row) > 1: - if "*" in row: - self.current_env = row[0] - self.conda_envs.append(row[0]) - - app_log.debug(f"Conda environments:\n{self.conda_envs}") - - def get_conda_path(self) -> bool: - """ - Query the path where the conda executable can be found. +class AsyncBaseExecutor(_AbstractBaseExecutor): + """Async base executor class to be used for defining any executor + plugin. Subclassing this class will allow you to define + your own executor plugin which can be used in covalent. - Args: - None + This is analogous to `BaseExecutor` except the `run()` method, + together with the optional `setup()` and `teardown()` methods, are + coroutines. - Returns: - found: True if Conda is found on the system. - """ - - self.conda_path = "" - which_conda = subprocess.run( - ["which", "conda"], capture_output=True, encoding="utf-8" - ).stdout - if which_conda == "": - message = "No Conda installation found on this compute node." - app_log.warning(message) - return False - self.conda_path = which_conda - return True + Attributes: + log_stdout: The path to the file to be used for redirecting stdout. + log_stderr: The path to the file to be used for redirecting stderr. + cache_dir: The location used for cached files in the executor. + time_limit: time limit for the task + retries: Number of times to retry execution upon failure + """ -class AsyncBaseExecutor(_AbstractBaseExecutor): def __init__( self, - log_stdout: str = "", - log_stderr: str = "", *args, **kwargs, ) -> None: - self.log_stdout = log_stdout - self.log_stderr = log_stderr + super().__init__(*args, **kwargs) async def write_streams_to_file( self, diff --git a/tests/covalent_tests/executor/base_test.py b/tests/covalent_tests/executor/base_test.py index c3df1a821..896b8b076 100644 --- a/tests/covalent_tests/executor/base_test.py +++ b/tests/covalent_tests/executor/base_test.py @@ -27,32 +27,22 @@ from functools import partial from unittest.mock import AsyncMock, MagicMock +import pytest + from covalent import DepsCall, TransportableObject from covalent.executor import BaseExecutor, wrapper_fn from covalent.executor.base import AsyncBaseExecutor, _AbstractBaseExecutor class MockExecutor(BaseExecutor): - def setup(self, task_metadata): - pass - def run(self, function, args, kwargs, task_metadata): return function(*args, **kwargs) - def teardown(self, task_metadata): - pass - class MockAsyncExecutor(AsyncBaseExecutor): - async def setup(self, task_metadata): - pass - async def run(self, function, args, kwargs, task_metadata): return function(*args, **kwargs) - async def teardown(self, task_metadata): - pass - def test_write_streams_to_file(mocker): """Test write log streams to file method in BaseExecutor via LocalExecutor.""" @@ -87,25 +77,6 @@ def test_write_streams_to_file(mocker): assert lines[0] == "absolute" -def test_execute_in_conda_env(mocker): - """Test execute in conda enve method in Base Executor object.""" - - me = MockExecutor() - - mocker.patch("covalent.executor.BaseExecutor.get_conda_path", return_value=False) - conda_env_fail_mock = mocker.patch("covalent.executor.BaseExecutor._on_conda_env_fail") - me.execute_in_conda_env( - "function", - "function_version", - "args", - "kwargs", - "conda_env", - "cache_dir", - "node_id", - ) - conda_env_fail_mock.assert_called_once_with("function", "args", "kwargs", "node_id") - - def test_wrapper_fn(): import tempfile from pathlib import Path @@ -212,6 +183,25 @@ def __init__(self): assert True +def test_base_executor_run(mocker): + """Cover BaseExecutor.run() abstract method""" + + def f(x): + return x + + function = TransportableObject(f) + args = [TransportableObject(2)] + kwargs = {} + + mocker.patch("covalent.executor.BaseExecutor.__abstractmethods__", set()) + be = BaseExecutor() + try: + be.run(function, args, kwargs, {}) + assert False + except NotImplementedError: + assert True + + def test_base_executor_execute(mocker): """Test the execute method""" @@ -243,39 +233,24 @@ def f(x, y): assert result.get_deserialized() == 5 -def test_base_executor_execute_conda(mocker): - """Test the execute method with a condaenv""" +@pytest.mark.asyncio +async def test_async_base_executor_run(mocker): + """Cover AsyncBaseExecutor.run() abstract method""" - def f(x, y): - return x + y + def f(x): + return x - me = MockExecutor(conda_env="testenv") function = TransportableObject(f) args = [TransportableObject(2)] - kwargs = {"y": TransportableObject(3)} - call_before = [] - call_after = [] - dispatch_id = "asdf" - results_dir = "/tmp" - node_id = -1 - - mock_conda_exec = mocker.patch( - "covalent.executor.BaseExecutor.execute_in_conda_env", return_value=TransportableObject(5) - ) - - assembled_callable = partial(wrapper_fn, function, call_before, call_after) - - result, stdout, stderr = me.execute( - function=assembled_callable, - args=args, - kwargs=kwargs, - dispatch_id=dispatch_id, - results_dir=results_dir, - node_id=node_id, - ) + kwargs = {} - assert result.get_deserialized() == 5 - mock_conda_exec.assert_called_once() + mocker.patch("covalent.executor.base.AsyncBaseExecutor.__abstractmethods__", set()) + be = AsyncBaseExecutor() + try: + await be.run(function, args, kwargs, {}) + assert False + except NotImplementedError: + assert True def test_base_executor_passes_task_metadata(mocker): diff --git a/tests/functional_tests/choose_conda_test.py b/tests/functional_tests/choose_conda_test.py deleted file mode 100644 index 28191cb63..000000000 --- a/tests/functional_tests/choose_conda_test.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright 2021 Agnostiq Inc. -# -# This file is part of Covalent. -# -# Licensed under the GNU Affero General Public License 3.0 (the "License"). -# A copy of the License may be obtained with this software package or at -# -# https://www.gnu.org/licenses/agpl-3.0.en.html -# -# Use of this file is prohibited except in compliance with the License. Any -# modifications or derivative works of this file must retain this copyright -# notice, and modified files must contain a notice indicating that they have -# been altered from the originals. -# -# Covalent is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -# FITNESS FOR A PARTICULAR PURPOSE. See the License for more details. -# -# Relief from the License may be granted by purchasing a commercial license. - -""" -Integration test for choosing Conda environments within an executor. -""" - -import covalent as ct - - -def test_using_current_env() -> None: - """Test that the Conda environment can be specified in the executor - initialization and used in a simple electron.""" - - tmp_executor = ct.executor.LocalExecutor() - has_conda = tmp_executor.get_conda_path() - if not has_conda: - return - - tmp_executor.get_conda_envs() - conda_env = tmp_executor.current_env - - executor = ct.executor.LocalExecutor(conda_env=conda_env, current_env_on_conda_fail=True) - - @ct.electron(executor=executor) - def passthrough(x): - return x - - @ct.lattice() - def workflow(y): - return passthrough(x=y) - - dispatch_id = ct.dispatch(workflow)(y="input") - result = ct.get_result(dispatch_id, wait=True) - - assert result.result == "input"