From 920319a5fdd5311b74695de46c898751a4f68ef2 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Mon, 26 Aug 2024 13:57:05 -0500 Subject: [PATCH 1/3] Support for GlobusComputeExecutor to submit functions to globus_compute_endpoints --- parsl/executors/__init__.py | 4 +- parsl/executors/globus_compute.py | 145 ++++++++++++++++++++++++++++++ setup.py | 1 + 3 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 parsl/executors/globus_compute.py diff --git a/parsl/executors/__init__.py b/parsl/executors/__init__.py index bc29204502..81955aab76 100644 --- a/parsl/executors/__init__.py +++ b/parsl/executors/__init__.py @@ -1,4 +1,5 @@ from parsl.executors.flux.executor import FluxExecutor +from parsl.executors.globus_compute import GlobusComputeExecutor from parsl.executors.high_throughput.executor import HighThroughputExecutor from parsl.executors.high_throughput.mpi_executor import MPIExecutor from parsl.executors.threads import ThreadPoolExecutor @@ -8,4 +9,5 @@ 'HighThroughputExecutor', 'MPIExecutor', 'WorkQueueExecutor', - 'FluxExecutor'] + 'FluxExecutor', + 'GlobusComputeExecutor'] diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py new file mode 100644 index 0000000000..0f76314deb --- /dev/null +++ b/parsl/executors/globus_compute.py @@ -0,0 +1,145 @@ +import uuid +from concurrent.futures import Future +from typing import Any, Callable, Dict, Optional, Union + +import typeguard + +from parsl.errors import OptionalModuleMissing +from parsl.executors.base import ParslExecutor +from parsl.utils import RepresentationMixin + +UUID_LIKE_T = Union[uuid.UUID, str] + + + +class GlobusComputeExecutor(ParslExecutor, RepresentationMixin): + """ GlobusComputeExecutor enables remote execution on Globus Compute endpoints + + GlobusComputeExecutor is a thin wrapper over globus_compute_sdk.Executor + Refer to `globus-compute user documentation `_ + and `reference documentation `_ + for more details. + """ + + def __init__( + self, + endpoint_id: Optional[UUID_LIKE_T] = None, + task_group_id: Optional[UUID_LIKE_T] = None, + resource_specification: Optional[dict[str, Any]] = None, + user_endpoint_config: Optional[dict[str, Any]] = None, + label: str = "GlobusComputeExecutor", + batch_size: int = 128, + amqp_port: Optional[int] = None, + **kwargs, + ): + """ + + Parameters + ---------- + + endpoint_id: + id of the endpoint to which to submit tasks + + task_group_id: + The Task Group to which to associate tasks. If not set, + one will be instantiated. + + resource_specification: + Specify resource requirements for individual task execution. + + user_endpoint_config: + User endpoint configuration values as described + and allowed by endpoint administrators. Must be a JSON-serializable dict + or None. + + label: + a label to name the executor; mainly utilized for + logging and advanced needs with multiple executors. + + batch_size: + the maximum number of tasks to coalesce before + sending upstream [min: 1, default: 128] + + amqp_port: + Port to use when connecting to results queue. Note that the + Compute web services only support 5671, 5672, and 443. + + kwargs: + Other kwargs listed will be passed through to globus_compute_sdk.Executor + as is + """ + super().__init__() + self.endpoint_id = endpoint_id + self.task_group_id = task_group_id + self.resource_specification = resource_specification + self.user_endpoint_config = user_endpoint_config + self.label = label + self.batch_size = batch_size + self.amqp_port = amqp_port + + try: + from globus_compute_sdk import Executor + except ImportError: + raise OptionalModuleMissing( + ['globus-compute-sdk'], + "GlobusComputeExecutor requires globus-compute-sdk installed" + ) + self._executor: Executor = Executor( + endpoint_id=endpoint_id, + task_group_id=task_group_id, + resource_specification=resource_specification, + user_endpoint_config=user_endpoint_config, + label=label, + batch_size=batch_size, + amqp_port=amqp_port, + **kwargs + ) + + def start(self) -> None: + """Empty function + """ + pass + + def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future: + """ Submit fn to globus-compute + + + Parameters + ---------- + + func: Callable + Python function to execute remotely + resource_specification: Dict[str, Any] + Resource specification used to run MPI applications on Endpoints configured + to use globus compute's MPIEngine + args: + Args to pass to the function + kwargs: + kwargs to pass to the function + + Returns + ------- + + Future + """ + self._executor.resource_specification = resource_specification or self.resource_specification + return self._executor.submit(func, *args, **kwargs) + + def shutdown(self, wait=True, *, cancel_futures=False): + """Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other methods + can be called after this one. + + Parameters + ---------- + + wait: If True, then this method will not return until all pending + futures have received results. + cancel_futures: If True, then this method will cancel all futures + that have not yet registered their tasks with the Compute web services. + Tasks cannot be cancelled once they are registered. + """ + return self._executor.shutdown() + + diff --git a/setup.py b/setup.py index b381ecfc2b..752da534f1 100755 --- a/setup.py +++ b/setup.py @@ -36,6 +36,7 @@ 'proxystore': ['proxystore'], 'radical-pilot': ['radical.pilot==1.60', 'radical.utils==1.60'], 'ssh': ['paramiko'], + 'globus_compute': ['globus_compute_sdk>=2.27.1'], # Disabling psi-j since github direct links are not allowed by pypi # 'psij': ['psi-j-parsl@git+https://github.com/ExaWorks/psi-j-parsl'] } From a3cad96a74b6f97cdda0c92a3a367452d70d78ba Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Mon, 26 Aug 2024 15:25:27 -0500 Subject: [PATCH 2/3] Github Action for GlobusComputeExecutor (#3619) * Support for testing GlobusComputeExecutor in a github action * Adding shared_fs and staging_required tags to tests * Adding GlobusComputeExecutor test config --- .github/workflows/gce_test.yaml | 112 ++++++++++++++++++ Makefile | 4 + docs/reference.rst | 1 + parsl/executors/globus_compute.py | 12 +- parsl/tests/configs/globus_compute.py | 18 +++ parsl/tests/conftest.py | 12 ++ parsl/tests/test_bash_apps/test_basic.py | 3 + .../tests/test_bash_apps/test_error_codes.py | 4 + .../test_bash_apps/test_kwarg_storage.py | 1 + parsl/tests/test_bash_apps/test_memoize.py | 8 +- .../test_memoize_ignore_args.py | 3 + .../test_memoize_ignore_args_regr.py | 1 + parsl/tests/test_bash_apps/test_multiline.py | 1 + parsl/tests/test_bash_apps/test_stdout.py | 3 + parsl/tests/test_docs/test_from_slides.py | 3 + parsl/tests/test_docs/test_kwargs.py | 3 + parsl/tests/test_docs/test_workflow1.py | 1 + .../test_error_handling/test_resource_spec.py | 3 + parsl/tests/test_python_apps/test_outputs.py | 1 + parsl/tests/test_regression/test_226.py | 1 + parsl/tests/test_staging/test_docs_1.py | 1 + .../test_output_chain_filenames.py | 3 + parsl/tests/test_staging/test_staging_ftp.py | 1 + .../tests/test_staging/test_staging_https.py | 3 + .../tests/test_staging/test_staging_stdout.py | 2 + test-requirements.txt | 1 + 26 files changed, 191 insertions(+), 15 deletions(-) create mode 100644 .github/workflows/gce_test.yaml create mode 100644 parsl/tests/configs/globus_compute.py diff --git a/.github/workflows/gce_test.yaml b/.github/workflows/gce_test.yaml new file mode 100644 index 0000000000..59d739efd5 --- /dev/null +++ b/.github/workflows/gce_test.yaml @@ -0,0 +1,112 @@ +name: GlobusComputeExecutor tests + +on: + pull_request: + types: + - opened + - synchronize + + workflow_dispatch: + inputs: + tags: + description: 'Test scenario tags' + required: false + type: boolean + +jobs: + main-test-suite: + strategy: + matrix: + python-version: ["3.11"] + runs-on: ubuntu-20.04 + timeout-minutes: 60 + + steps: + - uses: actions/checkout@master + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Collect Job Information + id: job-info + run: | + echo "Python Version: ${{ matrix.python-version }}" >> ci_job_info.txt + echo "CI Triggering Event: ${{ github.event_name }}" >> ci_job_info.txt + echo "Triggering Git Ref: ${{ github.ref }}" >> ci_job_info.txt + echo "Triggering Git SHA: ${{ github.sha }}" >> ci_job_info.txt + echo "Workflow Run: ${{ github.run_number }}" >> ci_job_info.txt + echo "Workflow Attempt: ${{ github.run_attempt }}" >> ci_job_info.txt + as_ascii="$(echo "${{ github.ref_name }}" | perl -pe "s/[^A-z0-9-]+/-/g; s/^-+|-+\$//g; s/--+/-/g;")" + echo "as-ascii=$as_ascii" >> $GITHUB_OUTPUT + + - name: Non-requirements based install + run: | + # libpython3.5: make workqueue binary installer happy + # mpich: required by radical executor + sudo apt-get update -q + sudo apt-get install -qy libpython3.5 mpich + + - name: setup virtual env + run: | + make virtualenv + source .venv/bin/activate + + - name: make deps clean_coverage + run: | + source .venv/bin/activate + make deps + make clean_coverage + # Installing parsl into venv required for GCendpoint + pip3 install . + + # Temporary fix, until changes make it into compute releases + git clone -b configure_tasks_working_dir https://github.com/globus/globus-compute.git + pip3 install globus-compute/compute_sdk globus-compute/compute_endpoint + + - name: start globus_compute_endpoint + env: + GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }} + GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }} + run: | + source /home/runner/work/parsl/parsl/.venv/bin/activate + globus-compute-endpoint configure default + which globus-compute-endpoint + python3 -c "import globus_compute_sdk; print(globus_compute_sdk.__version__)" + python3 -c "import globus_compute_endpoint; print(globus_compute_endpoint.__version__)" + cat << EOF > /home/runner/.globus_compute/default/config.yaml + engine: + type: ThreadPoolEngine + max_workers: 4 + working_dir: /home/runner/.globus_compute/default/tasks_working_dir + EOF + cat /home/runner/.globus_compute/default/config.yaml + mkdir ~/.globus_compute/default/tasks_working_dir + globus-compute-endpoint start default + globus-compute-endpoint list + - name: make test + env: + GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }} + GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }} + run: | + source .venv/bin/activate + export GLOBUS_COMPUTE_ENDPOINT=$(globus-compute-endpoint list | grep default | cut -c 3-38) + echo "GLOBUS_COMPUTE_ENDPOINT = $GLOBUS_COMPUTE_ENDPOINT" + + # temporary; until test-matrixification + export PARSL_TEST_PRESERVE_NUM_RUNS=7 + + make gce_test + ln -s .pytest/parsltest-current test_runinfo + + - name: Archive runinfo logs + if: ${{ always() }} + uses: actions/upload-artifact@v4 + with: + name: runinfo-${{ matrix.python-version }}-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }} + path: | + runinfo/ + .pytest/ + ci_job_info.txt + compression-level: 9 diff --git a/Makefile b/Makefile index 90f20601e9..d452c81016 100644 --- a/Makefile +++ b/Makefile @@ -51,6 +51,10 @@ clean_coverage: mypy: ## run mypy checks MYPYPATH=$(CWD)/mypy-stubs mypy parsl/ +.PHONY: gce_test +gce_test: ## Run tests with GlobusComputeExecutor + pytest -v -k "not shared_fs and not issue_3620 and not staging_required" --config parsl/tests/configs/globus_compute.py parsl/tests/ --random-order --durations 10 + .PHONY: local_thread_test local_thread_test: ## run all tests with local_thread config pytest parsl/tests/ -k "not cleannet" --config parsl/tests/configs/local_threads.py --random-order --durations 10 diff --git a/docs/reference.rst b/docs/reference.rst index 3436635cad..5c36fcf297 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -77,6 +77,7 @@ Executors parsl.executors.taskvine.TaskVineExecutor parsl.executors.FluxExecutor parsl.executors.radical.RadicalPilotExecutor + parsl.executors.globus_compute.GlobusComputeExecutor Manager Selectors ================= diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py index 0f76314deb..29a8d4be41 100644 --- a/parsl/executors/globus_compute.py +++ b/parsl/executors/globus_compute.py @@ -2,8 +2,6 @@ from concurrent.futures import Future from typing import Any, Callable, Dict, Optional, Union -import typeguard - from parsl.errors import OptionalModuleMissing from parsl.executors.base import ParslExecutor from parsl.utils import RepresentationMixin @@ -11,7 +9,6 @@ UUID_LIKE_T = Union[uuid.UUID, str] - class GlobusComputeExecutor(ParslExecutor, RepresentationMixin): """ GlobusComputeExecutor enables remote execution on Globus Compute endpoints @@ -25,15 +22,14 @@ def __init__( self, endpoint_id: Optional[UUID_LIKE_T] = None, task_group_id: Optional[UUID_LIKE_T] = None, - resource_specification: Optional[dict[str, Any]] = None, - user_endpoint_config: Optional[dict[str, Any]] = None, + resource_specification: Optional[Dict[str, Any]] = None, + user_endpoint_config: Optional[Dict[str, Any]] = None, label: str = "GlobusComputeExecutor", batch_size: int = 128, amqp_port: Optional[int] = None, **kwargs, - ): + ): """ - Parameters ---------- @@ -141,5 +137,3 @@ def shutdown(self, wait=True, *, cancel_futures=False): Tasks cannot be cancelled once they are registered. """ return self._executor.shutdown() - - diff --git a/parsl/tests/configs/globus_compute.py b/parsl/tests/configs/globus_compute.py new file mode 100644 index 0000000000..edb45801e0 --- /dev/null +++ b/parsl/tests/configs/globus_compute.py @@ -0,0 +1,18 @@ +import os + +from parsl.config import Config +from parsl.executors import GlobusComputeExecutor + + +def fresh_config(): + + endpoint_id = os.environ["GLOBUS_COMPUTE_ENDPOINT"] + + return Config( + executors=[ + GlobusComputeExecutor( + label="globus_compute", + endpoint_id=endpoint_id + ) + ] + ) diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index b8af73e4bf..228f78d271 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -163,6 +163,18 @@ def pytest_configure(config): 'markers', 'executor_supports_std_stream_tuples: Marks tests that require tuple support for stdout/stderr' ) + config.addinivalue_line( + 'markers', + 'globus_compute: Marks tests that require a valid globus_compute target' + ) + config.addinivalue_line( + 'markers', + 'shared_fs: Marks tests that require a shared_fs between the workers are the test client' + ) + config.addinivalue_line( + 'markers', + 'issue_3620: Marks tests that do not work correctly on GlobusComputeExecutor (ref: issue 3620)' + ) @pytest.fixture(autouse=True, scope='session') diff --git a/parsl/tests/test_bash_apps/test_basic.py b/parsl/tests/test_bash_apps/test_basic.py index 56d56d8ed1..0eea6d4d97 100644 --- a/parsl/tests/test_bash_apps/test_basic.py +++ b/parsl/tests/test_bash_apps/test_basic.py @@ -24,6 +24,7 @@ def foo(x, y, z=10, stdout=None, label=None): return f"echo {x} {y} {z}" +@pytest.mark.shared_fs def test_command_format_1(tmpd_cwd): """Testing command format for BashApps""" @@ -38,6 +39,7 @@ def test_command_format_1(tmpd_cwd): assert so_content == "1 4 10" +@pytest.mark.shared_fs def test_auto_log_filename_format(caplog): """Testing auto log filename format for BashApps """ @@ -66,6 +68,7 @@ def test_auto_log_filename_format(caplog): assert record.levelno < logging.ERROR +@pytest.mark.shared_fs def test_parallel_for(tmpd_cwd, n=3): """Testing a simple parallel for loop""" outdir = tmpd_cwd / "outputs/test_parallel" diff --git a/parsl/tests/test_bash_apps/test_error_codes.py b/parsl/tests/test_bash_apps/test_error_codes.py index 4a0b835728..bccded91a9 100644 --- a/parsl/tests/test_bash_apps/test_error_codes.py +++ b/parsl/tests/test_bash_apps/test_error_codes.py @@ -58,6 +58,7 @@ def bad_format(stderr='std.err', stdout='std.out'): whitelist = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'configs', '*threads*') +@pytest.mark.shared_fs def test_div_0(test_fn=div_0): err_code = test_matrix[test_fn]['exit_code'] f = test_fn() @@ -73,6 +74,7 @@ def test_div_0(test_fn=div_0): os.remove('std.out') +@pytest.mark.shared_fs def test_bash_misuse(test_fn=bash_misuse): err_code = test_matrix[test_fn]['exit_code'] f = test_fn() @@ -87,6 +89,7 @@ def test_bash_misuse(test_fn=bash_misuse): os.remove('std.out') +@pytest.mark.shared_fs def test_command_not_found(test_fn=command_not_found): err_code = test_matrix[test_fn]['exit_code'] f = test_fn() @@ -103,6 +106,7 @@ def test_command_not_found(test_fn=command_not_found): return True +@pytest.mark.shared_fs def test_not_executable(test_fn=not_executable): err_code = test_matrix[test_fn]['exit_code'] f = test_fn() diff --git a/parsl/tests/test_bash_apps/test_kwarg_storage.py b/parsl/tests/test_bash_apps/test_kwarg_storage.py index 8e0d48c661..e88a4c2967 100644 --- a/parsl/tests/test_bash_apps/test_kwarg_storage.py +++ b/parsl/tests/test_bash_apps/test_kwarg_storage.py @@ -8,6 +8,7 @@ def foo(z=2, stdout=None): return f"echo {z}" +@pytest.mark.shared_fs def test_command_format_1(tmpd_cwd): """Testing command format for BashApps """ diff --git a/parsl/tests/test_bash_apps/test_memoize.py b/parsl/tests/test_bash_apps/test_memoize.py index d53460b50b..387837f4d2 100644 --- a/parsl/tests/test_bash_apps/test_memoize.py +++ b/parsl/tests/test_bash_apps/test_memoize.py @@ -9,9 +9,7 @@ def fail_on_presence(outputs=()): return 'if [ -f {0} ] ; then exit 1 ; else touch {0}; fi'.format(outputs[0]) -# This test is an oddity that requires a shared-FS and simply -# won't work if there's a staging provider. -# @pytest.mark.sharedFS_required +@pytest.mark.shared_fs def test_bash_memoization(tmpd_cwd, n=2): """Testing bash memoization """ @@ -29,9 +27,7 @@ def fail_on_presence_kw(outputs=(), foo=None): return 'if [ -f {0} ] ; then exit 1 ; else touch {0}; fi'.format(outputs[0]) -# This test is an oddity that requires a shared-FS and simply -# won't work if there's a staging provider. -# @pytest.mark.sharedFS_required +@pytest.mark.shared_fs def test_bash_memoization_keywords(tmpd_cwd, n=2): """Testing bash memoization """ diff --git a/parsl/tests/test_bash_apps/test_memoize_ignore_args.py b/parsl/tests/test_bash_apps/test_memoize_ignore_args.py index ee3917e561..16cb919f1a 100644 --- a/parsl/tests/test_bash_apps/test_memoize_ignore_args.py +++ b/parsl/tests/test_bash_apps/test_memoize_ignore_args.py @@ -1,5 +1,7 @@ import os +import pytest + import parsl from parsl.app.app import bash_app @@ -21,6 +23,7 @@ def no_checkpoint_stdout_app_ignore_args(stdout=None): return "echo X" +@pytest.mark.shared_fs def test_memo_stdout(tmpd_cwd): path_x = tmpd_cwd / "test.memo.stdout.x" diff --git a/parsl/tests/test_bash_apps/test_memoize_ignore_args_regr.py b/parsl/tests/test_bash_apps/test_memoize_ignore_args_regr.py index 8f03c055a1..e1a9465fb3 100644 --- a/parsl/tests/test_bash_apps/test_memoize_ignore_args_regr.py +++ b/parsl/tests/test_bash_apps/test_memoize_ignore_args_regr.py @@ -29,6 +29,7 @@ def no_checkpoint_stdout_app(stdout=None): return "echo X" +@pytest.mark.shared_fs def test_memo_stdout(tmpd_cwd): assert const_list_x == const_list_x_arg diff --git a/parsl/tests/test_bash_apps/test_multiline.py b/parsl/tests/test_bash_apps/test_multiline.py index 59fb5ed7b2..cfdb674e9d 100644 --- a/parsl/tests/test_bash_apps/test_multiline.py +++ b/parsl/tests/test_bash_apps/test_multiline.py @@ -14,6 +14,7 @@ def multiline(inputs=(), outputs=(), stderr=None, stdout=None): """.format(inputs=inputs, outputs=outputs) +@pytest.mark.shared_fs def test_multiline(tmpd_cwd): so, se = tmpd_cwd / "std.out", tmpd_cwd / "std.err" f = multiline( diff --git a/parsl/tests/test_bash_apps/test_stdout.py b/parsl/tests/test_bash_apps/test_stdout.py index eba6a7b80d..c8404451f6 100644 --- a/parsl/tests/test_bash_apps/test_stdout.py +++ b/parsl/tests/test_bash_apps/test_stdout.py @@ -34,6 +34,7 @@ def echo_to_streams(msg, stderr=None, stdout=None): ] +@pytest.mark.shared_fs @pytest.mark.parametrize('spec', speclist, ids=testids) def test_bad_stdout_specs(spec): """Testing bad stdout spec cases""" @@ -91,6 +92,7 @@ def test_bad_stderr_file(): @pytest.mark.executor_supports_std_stream_tuples +@pytest.mark.shared_fs def test_stdout_truncate(tmpd_cwd, caplog): """Testing truncation of prior content of stdout""" @@ -110,6 +112,7 @@ def test_stdout_truncate(tmpd_cwd, caplog): assert record.levelno < logging.ERROR +@pytest.mark.shared_fs def test_stdout_append(tmpd_cwd, caplog): """Testing appending to prior content of stdout (default open() mode)""" diff --git a/parsl/tests/test_docs/test_from_slides.py b/parsl/tests/test_docs/test_from_slides.py index b07092b4ae..b3242e813e 100644 --- a/parsl/tests/test_docs/test_from_slides.py +++ b/parsl/tests/test_docs/test_from_slides.py @@ -1,5 +1,7 @@ import os +import pytest + from parsl.app.app import bash_app, python_app from parsl.data_provider.files import File @@ -15,6 +17,7 @@ def cat(inputs=[]): return f.readlines() +@pytest.mark.staging_required def test_slides(): """Testing code snippet from slides """ diff --git a/parsl/tests/test_docs/test_kwargs.py b/parsl/tests/test_docs/test_kwargs.py index 5524c0b819..80907ebe08 100644 --- a/parsl/tests/test_docs/test_kwargs.py +++ b/parsl/tests/test_docs/test_kwargs.py @@ -1,6 +1,8 @@ """Functions used to explain kwargs""" from pathlib import Path +import pytest + from parsl import File, python_app @@ -19,6 +21,7 @@ def reduce_app(inputs=()): assert reduce_future.result() == 6 +@pytest.mark.shared_fs def test_outputs(tmpd_cwd): @python_app() def write_app(message, outputs=()): diff --git a/parsl/tests/test_docs/test_workflow1.py b/parsl/tests/test_docs/test_workflow1.py index 271baab4d8..b0f92b6ab9 100644 --- a/parsl/tests/test_docs/test_workflow1.py +++ b/parsl/tests/test_docs/test_workflow1.py @@ -22,6 +22,7 @@ def save(message, outputs=[]): return 'echo {m} &> {o}'.format(m=message, o=outputs[0]) +@pytest.mark.shared_fs @pytest.mark.staging_required def test_procedural(N=2): """Procedural workflow example from docs on diff --git a/parsl/tests/test_error_handling/test_resource_spec.py b/parsl/tests/test_error_handling/test_resource_spec.py index 4616219be2..7def2b736c 100644 --- a/parsl/tests/test_error_handling/test_resource_spec.py +++ b/parsl/tests/test_error_handling/test_resource_spec.py @@ -1,3 +1,5 @@ +import pytest + import parsl from parsl.app.app import python_app from parsl.executors import WorkQueueExecutor @@ -11,6 +13,7 @@ def double(x, parsl_resource_specification={}): return x * 2 +@pytest.mark.issue_3620 def test_resource(n=2): executors = parsl.dfk().executors executor = None diff --git a/parsl/tests/test_python_apps/test_outputs.py b/parsl/tests/test_python_apps/test_outputs.py index c4b9dbabe2..b4273cf286 100644 --- a/parsl/tests/test_python_apps/test_outputs.py +++ b/parsl/tests/test_python_apps/test_outputs.py @@ -16,6 +16,7 @@ def double(x, outputs=[]): whitelist = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'configs', '*threads*') +@pytest.mark.shared_fs def test_launch_apps(tmpd_cwd, n=2): outdir = tmpd_cwd / "outputs" outdir.mkdir() diff --git a/parsl/tests/test_regression/test_226.py b/parsl/tests/test_regression/test_226.py index 2f560466dc..8babf4cbe0 100644 --- a/parsl/tests/test_regression/test_226.py +++ b/parsl/tests/test_regression/test_226.py @@ -53,6 +53,7 @@ def test_get_dataframe(): assert res.equals(data), 'Unexpected dataframe' +@pytest.mark.shared_fs def test_bash_default_arg(): if os.path.exists('std.out'): os.remove('std.out') diff --git a/parsl/tests/test_staging/test_docs_1.py b/parsl/tests/test_staging/test_docs_1.py index 8f549ae9b3..cc6236076a 100644 --- a/parsl/tests/test_staging/test_docs_1.py +++ b/parsl/tests/test_staging/test_docs_1.py @@ -12,6 +12,7 @@ def convert(inputs=[], outputs=[]): @pytest.mark.cleannet +@pytest.mark.shared_fs def test(): # create an remote Parsl file inp = File('ftp://ftp.iana.org/pub/mirror/rirstats/arin/ARIN-STATS-FORMAT-CHANGE.txt') diff --git a/parsl/tests/test_staging/test_output_chain_filenames.py b/parsl/tests/test_staging/test_output_chain_filenames.py index 016714497b..442bd6fd5a 100644 --- a/parsl/tests/test_staging/test_output_chain_filenames.py +++ b/parsl/tests/test_staging/test_output_chain_filenames.py @@ -1,5 +1,7 @@ from concurrent.futures import Future +import pytest + from parsl import File from parsl.app.app import bash_app @@ -14,6 +16,7 @@ def app2(inputs=(), outputs=(), stdout=None, stderr=None, mock=False): return f"echo '{inputs[0]}' > {outputs[0]}" +@pytest.mark.shared_fs def test_behavior(tmpd_cwd): expected_path = str(tmpd_cwd / "simple-out.txt") app1_future = app1( diff --git a/parsl/tests/test_staging/test_staging_ftp.py b/parsl/tests/test_staging/test_staging_ftp.py index 12becdf9c4..a004f5a575 100644 --- a/parsl/tests/test_staging/test_staging_ftp.py +++ b/parsl/tests/test_staging/test_staging_ftp.py @@ -15,6 +15,7 @@ def sort_strings(inputs=[], outputs=[]): @pytest.mark.cleannet +@pytest.mark.staging_required def test_staging_ftp(): """Test staging for an ftp file diff --git a/parsl/tests/test_staging/test_staging_https.py b/parsl/tests/test_staging/test_staging_https.py index 4a68e66a5c..c977472249 100644 --- a/parsl/tests/test_staging/test_staging_https.py +++ b/parsl/tests/test_staging/test_staging_https.py @@ -48,6 +48,7 @@ def sort_strings_additional_executor(inputs=(), outputs=()): @pytest.mark.cleannet +@pytest.mark.staging_required def test_staging_https_cleannet(tmpd_cwd): unsorted_file = File(_unsorted_url) sorted_file = File(tmpd_cwd / 'sorted.txt') @@ -68,6 +69,7 @@ def test_staging_https_local(tmpd_cwd): @pytest.mark.cleannet +@pytest.mark.staging_required def test_staging_https_kwargs(tmpd_cwd): unsorted_file = File(_unsorted_url) sorted_file = File(tmpd_cwd / 'sorted.txt') @@ -78,6 +80,7 @@ def test_staging_https_kwargs(tmpd_cwd): @pytest.mark.cleannet +@pytest.mark.staging_required def test_staging_https_args(tmpd_cwd): unsorted_file = File(_unsorted_url) sorted_file = File(tmpd_cwd / 'sorted.txt') diff --git a/parsl/tests/test_staging/test_staging_stdout.py b/parsl/tests/test_staging/test_staging_stdout.py index aaa45440a7..dc4044ec2e 100644 --- a/parsl/tests/test_staging/test_staging_stdout.py +++ b/parsl/tests/test_staging/test_staging_stdout.py @@ -15,6 +15,7 @@ def output_to_stds(*, stdout=parsl.AUTO_LOGNAME, stderr=parsl.AUTO_LOGNAME): return "echo hello ; echo goodbye >&2" +@pytest.mark.shared_fs def test_stdout_staging_file(tmpd_cwd, caplog): basename = str(tmpd_cwd) + "/stdout.txt" stdout_file = File("file://" + basename) @@ -30,6 +31,7 @@ def test_stdout_staging_file(tmpd_cwd, caplog): assert record.levelno < logging.ERROR +@pytest.mark.shared_fs def test_stdout_stderr_staging_zip(tmpd_cwd, caplog): zipfile_name = str(tmpd_cwd) + "/staging.zip" stdout_relative_path = "somewhere/test-out.txt" diff --git a/test-requirements.txt b/test-requirements.txt index 6abf727ccd..27b8da3dd5 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -12,6 +12,7 @@ types-python-dateutil types-requests types-paramiko mpi4py +globus-compute-sdk>=2.27.1 # sqlalchemy is needed for typechecking, so it's here # as well as at runtime for optional monitoring execution From 4154d6797bc2ffc5d51d85b2a11b9a21b2a28933 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Mon, 21 Oct 2024 10:54:13 -0500 Subject: [PATCH 3/3] Remove pip install in GCE action --- .github/workflows/gce_test.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/gce_test.yaml b/.github/workflows/gce_test.yaml index 59d739efd5..2c486a70ce 100644 --- a/.github/workflows/gce_test.yaml +++ b/.github/workflows/gce_test.yaml @@ -59,7 +59,8 @@ jobs: make deps make clean_coverage # Installing parsl into venv required for GCendpoint - pip3 install . + # TODO: Test if the following install is necessary + # pip3 install . # Temporary fix, until changes make it into compute releases git clone -b configure_tasks_working_dir https://github.com/globus/globus-compute.git