diff --git a/.github/workflows/gce_test.yaml b/.github/workflows/gce_test.yaml new file mode 100644 index 0000000000..724ee84112 --- /dev/null +++ b/.github/workflows/gce_test.yaml @@ -0,0 +1,110 @@ +name: GlobusComputeExecutor tests + +on: + pull_request: + types: + - opened + - synchronize + + workflow_dispatch: + inputs: + tags: + description: 'Test scenario tags' + required: false + type: boolean + +jobs: + main-test-suite: + strategy: + matrix: + python-version: ["3.11"] + runs-on: ubuntu-20.04 + timeout-minutes: 60 + + steps: + - uses: actions/checkout@master + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Collect Job Information + id: job-info + run: | + echo "Python Version: ${{ matrix.python-version }}" >> ci_job_info.txt + echo "CI Triggering Event: ${{ github.event_name }}" >> ci_job_info.txt + echo "Triggering Git Ref: ${{ github.ref }}" >> ci_job_info.txt + echo "Triggering Git SHA: ${{ github.sha }}" >> ci_job_info.txt + echo "Workflow Run: ${{ github.run_number }}" >> ci_job_info.txt + echo "Workflow Attempt: ${{ github.run_attempt }}" >> ci_job_info.txt + as_ascii="$(echo "${{ github.ref_name }}" | perl -pe "s/[^A-z0-9-]+/-/g; s/^-+|-+\$//g; s/--+/-/g;")" + echo "as-ascii=$as_ascii" >> $GITHUB_OUTPUT + + - name: Non-requirements based install + run: | + # libpython3.5: make workqueue binary installer happy + # mpich: required by radical executor + sudo apt-get update -q + sudo apt-get install -qy libpython3.5 mpich + + - name: setup virtual env + run: | + make virtualenv + source .venv/bin/activate + + - name: make deps clean_coverage + run: | + source .venv/bin/activate + make deps + make clean_coverage + + # Temporary fix, until changes make it into compute releases + git clone -b configure_tasks_working_dir https://github.com/globus/globus-compute.git + pip3 install globus-compute/compute_sdk globus-compute/compute_endpoint + + - name: start globus_compute_endpoint + env: + GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }} + GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }} + run: | + source /home/runner/work/parsl/parsl/.venv/bin/activate + globus-compute-endpoint configure default + which globus-compute-endpoint + python3 -c "import globus_compute_sdk; print(globus_compute_sdk.__version__)" + python3 -c "import globus_compute_endpoint; print(globus_compute_endpoint.__version__)" + cat << EOF > /home/runner/.globus_compute/default/config.yaml + engine: + type: ThreadPoolEngine + max_workers: 4 + working_dir: /home/runner/.globus_compute/default/tasks_working_dir + EOF + cat /home/runner/.globus_compute/default/config.yaml + mkdir ~/.globus_compute/default/tasks_working_dir + globus-compute-endpoint start default + globus-compute-endpoint list + - name: make test + env: + GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }} + GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }} + run: | + source .venv/bin/activate + export GLOBUS_COMPUTE_ENDPOINT=$(globus-compute-endpoint list | grep default | cut -c 3-38) + echo "GLOBUS_COMPUTE_ENDPOINT = $GLOBUS_COMPUTE_ENDPOINT" + + # temporary; until test-matrixification + export PARSL_TEST_PRESERVE_NUM_RUNS=7 + + make gce_test + ln -s .pytest/parsltest-current test_runinfo + + - name: Archive runinfo logs + if: ${{ always() }} + uses: actions/upload-artifact@v4 + with: + name: runinfo-${{ matrix.python-version }}-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }} + path: | + runinfo/ + .pytest/ + ci_job_info.txt + compression-level: 9 diff --git a/Makefile b/Makefile index 90f20601e9..d452c81016 100644 --- a/Makefile +++ b/Makefile @@ -51,6 +51,10 @@ clean_coverage: mypy: ## run mypy checks MYPYPATH=$(CWD)/mypy-stubs mypy parsl/ +.PHONY: gce_test +gce_test: ## Run tests with GlobusComputeExecutor + pytest -v -k "not shared_fs and not issue_3620 and not staging_required" --config parsl/tests/configs/globus_compute.py parsl/tests/ --random-order --durations 10 + .PHONY: local_thread_test local_thread_test: ## run all tests with local_thread config pytest parsl/tests/ -k "not cleannet" --config parsl/tests/configs/local_threads.py --random-order --durations 10 diff --git a/docs/reference.rst b/docs/reference.rst index 3436635cad..5c36fcf297 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -77,6 +77,7 @@ Executors parsl.executors.taskvine.TaskVineExecutor parsl.executors.FluxExecutor parsl.executors.radical.RadicalPilotExecutor + parsl.executors.globus_compute.GlobusComputeExecutor Manager Selectors ================= diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py index 0f76314deb..29a8d4be41 100644 --- a/parsl/executors/globus_compute.py +++ b/parsl/executors/globus_compute.py @@ -2,8 +2,6 @@ from concurrent.futures import Future from typing import Any, Callable, Dict, Optional, Union -import typeguard - from parsl.errors import OptionalModuleMissing from parsl.executors.base import ParslExecutor from parsl.utils import RepresentationMixin @@ -11,7 +9,6 @@ UUID_LIKE_T = Union[uuid.UUID, str] - class GlobusComputeExecutor(ParslExecutor, RepresentationMixin): """ GlobusComputeExecutor enables remote execution on Globus Compute endpoints @@ -25,15 +22,14 @@ def __init__( self, endpoint_id: Optional[UUID_LIKE_T] = None, task_group_id: Optional[UUID_LIKE_T] = None, - resource_specification: Optional[dict[str, Any]] = None, - user_endpoint_config: Optional[dict[str, Any]] = None, + resource_specification: Optional[Dict[str, Any]] = None, + user_endpoint_config: Optional[Dict[str, Any]] = None, label: str = "GlobusComputeExecutor", batch_size: int = 128, amqp_port: Optional[int] = None, **kwargs, - ): + ): """ - Parameters ---------- @@ -141,5 +137,3 @@ def shutdown(self, wait=True, *, cancel_futures=False): Tasks cannot be cancelled once they are registered. """ return self._executor.shutdown() - - diff --git a/parsl/tests/configs/globus_compute.py b/parsl/tests/configs/globus_compute.py new file mode 100644 index 0000000000..edb45801e0 --- /dev/null +++ b/parsl/tests/configs/globus_compute.py @@ -0,0 +1,18 @@ +import os + +from parsl.config import Config +from parsl.executors import GlobusComputeExecutor + + +def fresh_config(): + + endpoint_id = os.environ["GLOBUS_COMPUTE_ENDPOINT"] + + return Config( + executors=[ + GlobusComputeExecutor( + label="globus_compute", + endpoint_id=endpoint_id + ) + ] + ) diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index b8af73e4bf..228f78d271 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -163,6 +163,18 @@ def pytest_configure(config): 'markers', 'executor_supports_std_stream_tuples: Marks tests that require tuple support for stdout/stderr' ) + config.addinivalue_line( + 'markers', + 'globus_compute: Marks tests that require a valid globus_compute target' + ) + config.addinivalue_line( + 'markers', + 'shared_fs: Marks tests that require a shared_fs between the workers are the test client' + ) + config.addinivalue_line( + 'markers', + 'issue_3620: Marks tests that do not work correctly on GlobusComputeExecutor (ref: issue 3620)' + ) @pytest.fixture(autouse=True, scope='session') diff --git a/parsl/tests/test_bash_apps/test_basic.py b/parsl/tests/test_bash_apps/test_basic.py index 56d56d8ed1..0eea6d4d97 100644 --- a/parsl/tests/test_bash_apps/test_basic.py +++ b/parsl/tests/test_bash_apps/test_basic.py @@ -24,6 +24,7 @@ def foo(x, y, z=10, stdout=None, label=None): return f"echo {x} {y} {z}" +@pytest.mark.shared_fs def test_command_format_1(tmpd_cwd): """Testing command format for BashApps""" @@ -38,6 +39,7 @@ def test_command_format_1(tmpd_cwd): assert so_content == "1 4 10" +@pytest.mark.shared_fs def test_auto_log_filename_format(caplog): """Testing auto log filename format for BashApps """ @@ -66,6 +68,7 @@ def test_auto_log_filename_format(caplog): assert record.levelno < logging.ERROR +@pytest.mark.shared_fs def test_parallel_for(tmpd_cwd, n=3): """Testing a simple parallel for loop""" outdir = tmpd_cwd / "outputs/test_parallel" diff --git a/parsl/tests/test_bash_apps/test_error_codes.py b/parsl/tests/test_bash_apps/test_error_codes.py index 4a0b835728..bccded91a9 100644 --- a/parsl/tests/test_bash_apps/test_error_codes.py +++ b/parsl/tests/test_bash_apps/test_error_codes.py @@ -58,6 +58,7 @@ def bad_format(stderr='std.err', stdout='std.out'): whitelist = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'configs', '*threads*') +@pytest.mark.shared_fs def test_div_0(test_fn=div_0): err_code = test_matrix[test_fn]['exit_code'] f = test_fn() @@ -73,6 +74,7 @@ def test_div_0(test_fn=div_0): os.remove('std.out') +@pytest.mark.shared_fs def test_bash_misuse(test_fn=bash_misuse): err_code = test_matrix[test_fn]['exit_code'] f = test_fn() @@ -87,6 +89,7 @@ def test_bash_misuse(test_fn=bash_misuse): os.remove('std.out') +@pytest.mark.shared_fs def test_command_not_found(test_fn=command_not_found): err_code = test_matrix[test_fn]['exit_code'] f = test_fn() @@ -103,6 +106,7 @@ def test_command_not_found(test_fn=command_not_found): return True +@pytest.mark.shared_fs def test_not_executable(test_fn=not_executable): err_code = test_matrix[test_fn]['exit_code'] f = test_fn() diff --git a/parsl/tests/test_bash_apps/test_kwarg_storage.py b/parsl/tests/test_bash_apps/test_kwarg_storage.py index 8e0d48c661..e88a4c2967 100644 --- a/parsl/tests/test_bash_apps/test_kwarg_storage.py +++ b/parsl/tests/test_bash_apps/test_kwarg_storage.py @@ -8,6 +8,7 @@ def foo(z=2, stdout=None): return f"echo {z}" +@pytest.mark.shared_fs def test_command_format_1(tmpd_cwd): """Testing command format for BashApps """ diff --git a/parsl/tests/test_bash_apps/test_memoize.py b/parsl/tests/test_bash_apps/test_memoize.py index d53460b50b..387837f4d2 100644 --- a/parsl/tests/test_bash_apps/test_memoize.py +++ b/parsl/tests/test_bash_apps/test_memoize.py @@ -9,9 +9,7 @@ def fail_on_presence(outputs=()): return 'if [ -f {0} ] ; then exit 1 ; else touch {0}; fi'.format(outputs[0]) -# This test is an oddity that requires a shared-FS and simply -# won't work if there's a staging provider. -# @pytest.mark.sharedFS_required +@pytest.mark.shared_fs def test_bash_memoization(tmpd_cwd, n=2): """Testing bash memoization """ @@ -29,9 +27,7 @@ def fail_on_presence_kw(outputs=(), foo=None): return 'if [ -f {0} ] ; then exit 1 ; else touch {0}; fi'.format(outputs[0]) -# This test is an oddity that requires a shared-FS and simply -# won't work if there's a staging provider. -# @pytest.mark.sharedFS_required +@pytest.mark.shared_fs def test_bash_memoization_keywords(tmpd_cwd, n=2): """Testing bash memoization """ diff --git a/parsl/tests/test_bash_apps/test_memoize_ignore_args.py b/parsl/tests/test_bash_apps/test_memoize_ignore_args.py index ee3917e561..16cb919f1a 100644 --- a/parsl/tests/test_bash_apps/test_memoize_ignore_args.py +++ b/parsl/tests/test_bash_apps/test_memoize_ignore_args.py @@ -1,5 +1,7 @@ import os +import pytest + import parsl from parsl.app.app import bash_app @@ -21,6 +23,7 @@ def no_checkpoint_stdout_app_ignore_args(stdout=None): return "echo X" +@pytest.mark.shared_fs def test_memo_stdout(tmpd_cwd): path_x = tmpd_cwd / "test.memo.stdout.x" diff --git a/parsl/tests/test_bash_apps/test_memoize_ignore_args_regr.py b/parsl/tests/test_bash_apps/test_memoize_ignore_args_regr.py index 8f03c055a1..e1a9465fb3 100644 --- a/parsl/tests/test_bash_apps/test_memoize_ignore_args_regr.py +++ b/parsl/tests/test_bash_apps/test_memoize_ignore_args_regr.py @@ -29,6 +29,7 @@ def no_checkpoint_stdout_app(stdout=None): return "echo X" +@pytest.mark.shared_fs def test_memo_stdout(tmpd_cwd): assert const_list_x == const_list_x_arg diff --git a/parsl/tests/test_bash_apps/test_multiline.py b/parsl/tests/test_bash_apps/test_multiline.py index 59fb5ed7b2..cfdb674e9d 100644 --- a/parsl/tests/test_bash_apps/test_multiline.py +++ b/parsl/tests/test_bash_apps/test_multiline.py @@ -14,6 +14,7 @@ def multiline(inputs=(), outputs=(), stderr=None, stdout=None): """.format(inputs=inputs, outputs=outputs) +@pytest.mark.shared_fs def test_multiline(tmpd_cwd): so, se = tmpd_cwd / "std.out", tmpd_cwd / "std.err" f = multiline( diff --git a/parsl/tests/test_bash_apps/test_stdout.py b/parsl/tests/test_bash_apps/test_stdout.py index eba6a7b80d..c8404451f6 100644 --- a/parsl/tests/test_bash_apps/test_stdout.py +++ b/parsl/tests/test_bash_apps/test_stdout.py @@ -34,6 +34,7 @@ def echo_to_streams(msg, stderr=None, stdout=None): ] +@pytest.mark.shared_fs @pytest.mark.parametrize('spec', speclist, ids=testids) def test_bad_stdout_specs(spec): """Testing bad stdout spec cases""" @@ -91,6 +92,7 @@ def test_bad_stderr_file(): @pytest.mark.executor_supports_std_stream_tuples +@pytest.mark.shared_fs def test_stdout_truncate(tmpd_cwd, caplog): """Testing truncation of prior content of stdout""" @@ -110,6 +112,7 @@ def test_stdout_truncate(tmpd_cwd, caplog): assert record.levelno < logging.ERROR +@pytest.mark.shared_fs def test_stdout_append(tmpd_cwd, caplog): """Testing appending to prior content of stdout (default open() mode)""" diff --git a/parsl/tests/test_docs/test_from_slides.py b/parsl/tests/test_docs/test_from_slides.py index b07092b4ae..b3242e813e 100644 --- a/parsl/tests/test_docs/test_from_slides.py +++ b/parsl/tests/test_docs/test_from_slides.py @@ -1,5 +1,7 @@ import os +import pytest + from parsl.app.app import bash_app, python_app from parsl.data_provider.files import File @@ -15,6 +17,7 @@ def cat(inputs=[]): return f.readlines() +@pytest.mark.staging_required def test_slides(): """Testing code snippet from slides """ diff --git a/parsl/tests/test_docs/test_kwargs.py b/parsl/tests/test_docs/test_kwargs.py index 5524c0b819..80907ebe08 100644 --- a/parsl/tests/test_docs/test_kwargs.py +++ b/parsl/tests/test_docs/test_kwargs.py @@ -1,6 +1,8 @@ """Functions used to explain kwargs""" from pathlib import Path +import pytest + from parsl import File, python_app @@ -19,6 +21,7 @@ def reduce_app(inputs=()): assert reduce_future.result() == 6 +@pytest.mark.shared_fs def test_outputs(tmpd_cwd): @python_app() def write_app(message, outputs=()): diff --git a/parsl/tests/test_docs/test_workflow1.py b/parsl/tests/test_docs/test_workflow1.py index 271baab4d8..b0f92b6ab9 100644 --- a/parsl/tests/test_docs/test_workflow1.py +++ b/parsl/tests/test_docs/test_workflow1.py @@ -22,6 +22,7 @@ def save(message, outputs=[]): return 'echo {m} &> {o}'.format(m=message, o=outputs[0]) +@pytest.mark.shared_fs @pytest.mark.staging_required def test_procedural(N=2): """Procedural workflow example from docs on diff --git a/parsl/tests/test_error_handling/test_resource_spec.py b/parsl/tests/test_error_handling/test_resource_spec.py index 4616219be2..7def2b736c 100644 --- a/parsl/tests/test_error_handling/test_resource_spec.py +++ b/parsl/tests/test_error_handling/test_resource_spec.py @@ -1,3 +1,5 @@ +import pytest + import parsl from parsl.app.app import python_app from parsl.executors import WorkQueueExecutor @@ -11,6 +13,7 @@ def double(x, parsl_resource_specification={}): return x * 2 +@pytest.mark.issue_3620 def test_resource(n=2): executors = parsl.dfk().executors executor = None diff --git a/parsl/tests/test_python_apps/test_outputs.py b/parsl/tests/test_python_apps/test_outputs.py index c4b9dbabe2..b4273cf286 100644 --- a/parsl/tests/test_python_apps/test_outputs.py +++ b/parsl/tests/test_python_apps/test_outputs.py @@ -16,6 +16,7 @@ def double(x, outputs=[]): whitelist = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'configs', '*threads*') +@pytest.mark.shared_fs def test_launch_apps(tmpd_cwd, n=2): outdir = tmpd_cwd / "outputs" outdir.mkdir() diff --git a/parsl/tests/test_regression/test_226.py b/parsl/tests/test_regression/test_226.py index 2f560466dc..8babf4cbe0 100644 --- a/parsl/tests/test_regression/test_226.py +++ b/parsl/tests/test_regression/test_226.py @@ -53,6 +53,7 @@ def test_get_dataframe(): assert res.equals(data), 'Unexpected dataframe' +@pytest.mark.shared_fs def test_bash_default_arg(): if os.path.exists('std.out'): os.remove('std.out') diff --git a/parsl/tests/test_staging/test_docs_1.py b/parsl/tests/test_staging/test_docs_1.py index 8f549ae9b3..cc6236076a 100644 --- a/parsl/tests/test_staging/test_docs_1.py +++ b/parsl/tests/test_staging/test_docs_1.py @@ -12,6 +12,7 @@ def convert(inputs=[], outputs=[]): @pytest.mark.cleannet +@pytest.mark.shared_fs def test(): # create an remote Parsl file inp = File('ftp://ftp.iana.org/pub/mirror/rirstats/arin/ARIN-STATS-FORMAT-CHANGE.txt') diff --git a/parsl/tests/test_staging/test_output_chain_filenames.py b/parsl/tests/test_staging/test_output_chain_filenames.py index 016714497b..442bd6fd5a 100644 --- a/parsl/tests/test_staging/test_output_chain_filenames.py +++ b/parsl/tests/test_staging/test_output_chain_filenames.py @@ -1,5 +1,7 @@ from concurrent.futures import Future +import pytest + from parsl import File from parsl.app.app import bash_app @@ -14,6 +16,7 @@ def app2(inputs=(), outputs=(), stdout=None, stderr=None, mock=False): return f"echo '{inputs[0]}' > {outputs[0]}" +@pytest.mark.shared_fs def test_behavior(tmpd_cwd): expected_path = str(tmpd_cwd / "simple-out.txt") app1_future = app1( diff --git a/parsl/tests/test_staging/test_staging_ftp.py b/parsl/tests/test_staging/test_staging_ftp.py index 12becdf9c4..a004f5a575 100644 --- a/parsl/tests/test_staging/test_staging_ftp.py +++ b/parsl/tests/test_staging/test_staging_ftp.py @@ -15,6 +15,7 @@ def sort_strings(inputs=[], outputs=[]): @pytest.mark.cleannet +@pytest.mark.staging_required def test_staging_ftp(): """Test staging for an ftp file diff --git a/parsl/tests/test_staging/test_staging_https.py b/parsl/tests/test_staging/test_staging_https.py index 4a68e66a5c..c977472249 100644 --- a/parsl/tests/test_staging/test_staging_https.py +++ b/parsl/tests/test_staging/test_staging_https.py @@ -48,6 +48,7 @@ def sort_strings_additional_executor(inputs=(), outputs=()): @pytest.mark.cleannet +@pytest.mark.staging_required def test_staging_https_cleannet(tmpd_cwd): unsorted_file = File(_unsorted_url) sorted_file = File(tmpd_cwd / 'sorted.txt') @@ -68,6 +69,7 @@ def test_staging_https_local(tmpd_cwd): @pytest.mark.cleannet +@pytest.mark.staging_required def test_staging_https_kwargs(tmpd_cwd): unsorted_file = File(_unsorted_url) sorted_file = File(tmpd_cwd / 'sorted.txt') @@ -78,6 +80,7 @@ def test_staging_https_kwargs(tmpd_cwd): @pytest.mark.cleannet +@pytest.mark.staging_required def test_staging_https_args(tmpd_cwd): unsorted_file = File(_unsorted_url) sorted_file = File(tmpd_cwd / 'sorted.txt') diff --git a/parsl/tests/test_staging/test_staging_stdout.py b/parsl/tests/test_staging/test_staging_stdout.py index aaa45440a7..dc4044ec2e 100644 --- a/parsl/tests/test_staging/test_staging_stdout.py +++ b/parsl/tests/test_staging/test_staging_stdout.py @@ -15,6 +15,7 @@ def output_to_stds(*, stdout=parsl.AUTO_LOGNAME, stderr=parsl.AUTO_LOGNAME): return "echo hello ; echo goodbye >&2" +@pytest.mark.shared_fs def test_stdout_staging_file(tmpd_cwd, caplog): basename = str(tmpd_cwd) + "/stdout.txt" stdout_file = File("file://" + basename) @@ -30,6 +31,7 @@ def test_stdout_staging_file(tmpd_cwd, caplog): assert record.levelno < logging.ERROR +@pytest.mark.shared_fs def test_stdout_stderr_staging_zip(tmpd_cwd, caplog): zipfile_name = str(tmpd_cwd) + "/staging.zip" stdout_relative_path = "somewhere/test-out.txt" diff --git a/test-requirements.txt b/test-requirements.txt index 6abf727ccd..27b8da3dd5 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -12,6 +12,7 @@ types-python-dateutil types-requests types-paramiko mpi4py +globus-compute-sdk>=2.27.1 # sqlalchemy is needed for typechecking, so it's here # as well as at runtime for optional monitoring execution