Skip to content

Commit

Permalink
Support for testing GlobusComputeExecutor in a github action
Browse files Browse the repository at this point in the history
  • Loading branch information
yadudoc committed Oct 1, 2024
1 parent 25374b1 commit 361474d
Show file tree
Hide file tree
Showing 26 changed files with 321 additions and 7 deletions.
109 changes: 109 additions & 0 deletions .github/workflows/gce_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
name: GlobusComputeExecutor tests

on:
pull_request:
types:
- opened
- synchronize

workflow_dispatch:
inputs:
tags:
description: 'Test scenario tags'
required: false
type: boolean

jobs:
main-test-suite:
strategy:
matrix:
python-version: ["3.11"]
runs-on: ubuntu-20.04
timeout-minutes: 60

steps:
- uses: actions/checkout@master

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Collect Job Information
id: job-info
run: |
echo "Python Version: ${{ matrix.python-version }}" >> ci_job_info.txt
echo "CI Triggering Event: ${{ github.event_name }}" >> ci_job_info.txt
echo "Triggering Git Ref: ${{ github.ref }}" >> ci_job_info.txt
echo "Triggering Git SHA: ${{ github.sha }}" >> ci_job_info.txt
echo "Workflow Run: ${{ github.run_number }}" >> ci_job_info.txt
echo "Workflow Attempt: ${{ github.run_attempt }}" >> ci_job_info.txt
as_ascii="$(echo "${{ github.ref_name }}" | perl -pe "s/[^A-z0-9-]+/-/g; s/^-+|-+\$//g; s/--+/-/g;")"
echo "as-ascii=$as_ascii" >> $GITHUB_OUTPUT
- name: Non-requirements based install
run: |
# libpython3.5: make workqueue binary installer happy
# mpich: required by radical executor
sudo apt-get update -q
sudo apt-get install -qy libpython3.5 mpich
- name: setup virtual env
run: |
make virtualenv
source .venv/bin/activate
- name: make deps clean_coverage
run: |
source .venv/bin/activate
make deps
make clean_coverage
# Installing parsl into venv required for GCendpoint
pip3 install .
- name: start globus_compute_endpoint
env:
GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
run: |
source .venv/bin/activate
globus-compute-endpoint configure default
cat << EOF > /home/runner/.globus_compute/default/config.yaml
display_name: null
engine:
provider:
type: LocalProvider
init_blocks: 1
max_blocks: 1
min_blocks: 0
worker_init: source /home/runner/work/parsl/parsl/.venv/bin/activate
type: GlobusComputeEngine
EOF
globus-compute-endpoint start default
globus-compute-endpoint list
- name: make test
env:
GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
run: |
source .venv/bin/activate
export GLOBUS_COMPUTE_ENDPOINT=$(globus-compute-endpoint list | grep default | cut -c 3-38)
echo "GLOBUS_COMPUTE_ENDPOINT = $GLOBUS_COMPUTE_ENDPOINT"
# temporary; until test-matrixification
export PARSL_TEST_PRESERVE_NUM_RUNS=7
make gce_test
ln -s .pytest/parsltest-current test_runinfo
- name: Archive runinfo logs
if: ${{ always() }}
uses: actions/upload-artifact@v4
with:
name: runinfo-${{ matrix.python-version }}-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }}
path: |
runinfo/
.pytest/
~/.globus_compute/default/
ci_job_info.txt
compression-level: 9
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ clean_coverage:
mypy: ## run mypy checks
MYPYPATH=$(CWD)/mypy-stubs mypy parsl/

.PHONY: gce_test
gce_test: ## Run tests with GlobusComputeExecutor
pytest -v -k "not shared_fs and not issue_3620 and not staging_required" --config parsl/tests/configs/globus_compute.py parsl/tests/ --random-order --durations 10

.PHONY: local_thread_test
local_thread_test: ## run all tests with local_thread config
pytest parsl/tests/ -k "not cleannet" --config parsl/tests/configs/local_threads.py --random-order --durations 10
Expand Down
4 changes: 3 additions & 1 deletion parsl/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from parsl.executors.flux.executor import FluxExecutor
from parsl.executors.globus_compute import GlobusComputeExecutor
from parsl.executors.high_throughput.executor import HighThroughputExecutor
from parsl.executors.high_throughput.mpi_executor import MPIExecutor
from parsl.executors.threads import ThreadPoolExecutor
Expand All @@ -8,4 +9,5 @@
'HighThroughputExecutor',
'MPIExecutor',
'WorkQueueExecutor',
'FluxExecutor']
'FluxExecutor',
'GlobusComputeExecutor']
139 changes: 139 additions & 0 deletions parsl/executors/globus_compute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import uuid
from concurrent.futures import Future
from typing import Any, Callable, Dict, Optional, Union

from parsl.errors import OptionalModuleMissing
from parsl.executors.base import ParslExecutor
from parsl.utils import RepresentationMixin

UUID_LIKE_T = Union[uuid.UUID, str]


class GlobusComputeExecutor(ParslExecutor, RepresentationMixin):
""" GlobusComputeExecutor enables remote execution on Globus Compute endpoints
GlobusComputeExecutor is a thin wrapper over globus_compute_sdk.Executor
Refer to `globus-compute user documentation <https://globus-compute.readthedocs.io/en/latest/executor.html>`_
and `reference documentation <https://globus-compute.readthedocs.io/en/latest/reference/executor.html>`_
for more details.
"""

def __init__(
self,
endpoint_id: Optional[UUID_LIKE_T] = None,
task_group_id: Optional[UUID_LIKE_T] = None,
resource_specification: Optional[dict[str, Any]] = None,
user_endpoint_config: Optional[dict[str, Any]] = None,
label: str = "GlobusComputeExecutor",
batch_size: int = 128,
amqp_port: Optional[int] = None,
**kwargs,
):
"""
Parameters
----------
endpoint_id:
id of the endpoint to which to submit tasks
task_group_id:
The Task Group to which to associate tasks. If not set,
one will be instantiated.
resource_specification:
Specify resource requirements for individual task execution.
user_endpoint_config:
User endpoint configuration values as described
and allowed by endpoint administrators. Must be a JSON-serializable dict
or None.
label:
a label to name the executor; mainly utilized for
logging and advanced needs with multiple executors.
batch_size:
the maximum number of tasks to coalesce before
sending upstream [min: 1, default: 128]
amqp_port:
Port to use when connecting to results queue. Note that the
Compute web services only support 5671, 5672, and 443.
kwargs:
Other kwargs listed will be passed through to globus_compute_sdk.Executor
as is
"""
super().__init__()
self.endpoint_id = endpoint_id
self.task_group_id = task_group_id
self.resource_specification = resource_specification
self.user_endpoint_config = user_endpoint_config
self.label = label
self.batch_size = batch_size
self.amqp_port = amqp_port

try:
from globus_compute_sdk import Executor
except ImportError:
raise OptionalModuleMissing(
['globus-compute-sdk'],
"GlobusComputeExecutor requires globus-compute-sdk installed"
)
self._executor: Executor = Executor(
endpoint_id=endpoint_id,
task_group_id=task_group_id,
resource_specification=resource_specification,
user_endpoint_config=user_endpoint_config,
label=label,
batch_size=batch_size,
amqp_port=amqp_port,
**kwargs
)

def start(self) -> None:
"""Empty function
"""
pass

def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:
""" Submit fn to globus-compute
Parameters
----------
func: Callable
Python function to execute remotely
resource_specification: Dict[str, Any]
Resource specification used to run MPI applications on Endpoints configured
to use globus compute's MPIEngine
args:
Args to pass to the function
kwargs:
kwargs to pass to the function
Returns
-------
Future
"""
self._executor.resource_specification = resource_specification or self.resource_specification
return self._executor.submit(func, *args, **kwargs)

def shutdown(self, wait=True, *, cancel_futures=False):
"""Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other methods
can be called after this one.
Parameters
----------
wait: If True, then this method will not return until all pending
futures have received results.
cancel_futures: If True, then this method will cancel all futures
that have not yet registered their tasks with the Compute web services.
Tasks cannot be cancelled once they are registered.
"""
return self._executor.shutdown()
18 changes: 18 additions & 0 deletions parsl/tests/configs/globus_compute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os

from parsl.config import Config
from parsl.executors import GlobusComputeExecutor


def fresh_config():

endpoint_id = os.environ.get("GLOBUS_COMPUTE_ENDPOINT",
"4b116d3c-1703-4f8f-9f6f-39921e5864df")
return Config(
executors=[
GlobusComputeExecutor(
label="globus_compute",
endpoint_id=endpoint_id
)
]
)
8 changes: 8 additions & 0 deletions parsl/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ def pytest_configure(config):
'markers',
'executor_supports_std_stream_tuples: Marks tests that require tuple support for stdout/stderr'
)
config.addinivalue_line(
'markers',
'globus_compute: Marks tests that require a valid globus_compute target'
)
config.addinivalue_line(
'markers',
'shared_fs: Marks tests that require a shared_fs between the workers are the test client'
)


@pytest.fixture(autouse=True, scope='session')
Expand Down
3 changes: 3 additions & 0 deletions parsl/tests/test_bash_apps/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def foo(x, y, z=10, stdout=None, label=None):
return f"echo {x} {y} {z}"


@pytest.mark.shared_fs
def test_command_format_1(tmpd_cwd):
"""Testing command format for BashApps"""

Expand All @@ -38,6 +39,7 @@ def test_command_format_1(tmpd_cwd):
assert so_content == "1 4 10"


@pytest.mark.shared_fs
def test_auto_log_filename_format(caplog):
"""Testing auto log filename format for BashApps
"""
Expand Down Expand Up @@ -66,6 +68,7 @@ def test_auto_log_filename_format(caplog):
assert record.levelno < logging.ERROR


@pytest.mark.shared_fs
def test_parallel_for(tmpd_cwd, n=3):
"""Testing a simple parallel for loop"""
outdir = tmpd_cwd / "outputs/test_parallel"
Expand Down
4 changes: 4 additions & 0 deletions parsl/tests/test_bash_apps/test_error_codes.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def bad_format(stderr='std.err', stdout='std.out'):
whitelist = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'configs', '*threads*')


@pytest.mark.shared_fs
def test_div_0(test_fn=div_0):
err_code = test_matrix[test_fn]['exit_code']
f = test_fn()
Expand All @@ -73,6 +74,7 @@ def test_div_0(test_fn=div_0):
os.remove('std.out')


@pytest.mark.shared_fs
def test_bash_misuse(test_fn=bash_misuse):
err_code = test_matrix[test_fn]['exit_code']
f = test_fn()
Expand All @@ -87,6 +89,7 @@ def test_bash_misuse(test_fn=bash_misuse):
os.remove('std.out')


@pytest.mark.shared_fs
def test_command_not_found(test_fn=command_not_found):
err_code = test_matrix[test_fn]['exit_code']
f = test_fn()
Expand All @@ -103,6 +106,7 @@ def test_command_not_found(test_fn=command_not_found):
return True


@pytest.mark.shared_fs
def test_not_executable(test_fn=not_executable):
err_code = test_matrix[test_fn]['exit_code']
f = test_fn()
Expand Down
1 change: 1 addition & 0 deletions parsl/tests/test_bash_apps/test_kwarg_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ def foo(z=2, stdout=None):
return f"echo {z}"


@pytest.mark.shared_fs
def test_command_format_1(tmpd_cwd):
"""Testing command format for BashApps
"""
Expand Down
8 changes: 2 additions & 6 deletions parsl/tests/test_bash_apps/test_memoize.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ def fail_on_presence(outputs=()):
return 'if [ -f {0} ] ; then exit 1 ; else touch {0}; fi'.format(outputs[0])


# This test is an oddity that requires a shared-FS and simply
# won't work if there's a staging provider.
# @pytest.mark.sharedFS_required
@pytest.mark.shared_fs
def test_bash_memoization(tmpd_cwd, n=2):
"""Testing bash memoization
"""
Expand All @@ -29,9 +27,7 @@ def fail_on_presence_kw(outputs=(), foo=None):
return 'if [ -f {0} ] ; then exit 1 ; else touch {0}; fi'.format(outputs[0])


# This test is an oddity that requires a shared-FS and simply
# won't work if there's a staging provider.
# @pytest.mark.sharedFS_required
@pytest.mark.shared_fs
def test_bash_memoization_keywords(tmpd_cwd, n=2):
"""Testing bash memoization
"""
Expand Down
Loading

0 comments on commit 361474d

Please sign in to comment.