Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT ONLY] Github action testing #3642

Draft
wants to merge 3 commits into
base: globus_compute_executor.py
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions .github/workflows/gce_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
name: GlobusComputeExecutor tests

on:
pull_request:
types:
- opened
- synchronize

workflow_dispatch:
inputs:
tags:
description: 'Test scenario tags'
required: false
type: boolean

jobs:
main-test-suite:
strategy:
matrix:
python-version: ["3.11"]
runs-on: ubuntu-20.04
timeout-minutes: 60

steps:
- uses: actions/checkout@master

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Collect Job Information
id: job-info
run: |
echo "Python Version: ${{ matrix.python-version }}" >> ci_job_info.txt
echo "CI Triggering Event: ${{ github.event_name }}" >> ci_job_info.txt
echo "Triggering Git Ref: ${{ github.ref }}" >> ci_job_info.txt
echo "Triggering Git SHA: ${{ github.sha }}" >> ci_job_info.txt
echo "Workflow Run: ${{ github.run_number }}" >> ci_job_info.txt
echo "Workflow Attempt: ${{ github.run_attempt }}" >> ci_job_info.txt
as_ascii="$(echo "${{ github.ref_name }}" | perl -pe "s/[^A-z0-9-]+/-/g; s/^-+|-+\$//g; s/--+/-/g;")"
echo "as-ascii=$as_ascii" >> $GITHUB_OUTPUT

- name: Non-requirements based install
run: |
# libpython3.5: make workqueue binary installer happy
# mpich: required by radical executor
sudo apt-get update -q
sudo apt-get install -qy libpython3.5 mpich

- name: setup virtual env
run: |
make virtualenv
source .venv/bin/activate

- name: make deps clean_coverage
run: |
source .venv/bin/activate
make deps
make clean_coverage
# Installing parsl into venv required for GCendpoint
# TODO: Test if the following install is necessary
# pip3 install .

# Temporary fix, until changes make it into compute releases
git clone -b configure_tasks_working_dir https://github.com/globus/globus-compute.git
pip3 install globus-compute/compute_sdk globus-compute/compute_endpoint

- name: start globus_compute_endpoint
env:
GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
run: |
source /home/runner/work/parsl/parsl/.venv/bin/activate
globus-compute-endpoint configure default
which globus-compute-endpoint
python3 -c "import globus_compute_sdk; print(globus_compute_sdk.__version__)"
python3 -c "import globus_compute_endpoint; print(globus_compute_endpoint.__version__)"
cat << EOF > /home/runner/.globus_compute/default/config.yaml
engine:
type: ThreadPoolEngine
max_workers: 4
working_dir: /home/runner/.globus_compute/default/tasks_working_dir
EOF
cat /home/runner/.globus_compute/default/config.yaml
mkdir ~/.globus_compute/default/tasks_working_dir
globus-compute-endpoint start default
globus-compute-endpoint list
- name: make test
env:
GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
run: |
source .venv/bin/activate
export GLOBUS_COMPUTE_ENDPOINT=$(globus-compute-endpoint list | grep default | cut -c 3-38)
echo "GLOBUS_COMPUTE_ENDPOINT = $GLOBUS_COMPUTE_ENDPOINT"

# temporary; until test-matrixification
export PARSL_TEST_PRESERVE_NUM_RUNS=7

make gce_test
ln -s .pytest/parsltest-current test_runinfo

- name: Archive runinfo logs
if: ${{ always() }}
uses: actions/upload-artifact@v4
with:
name: runinfo-${{ matrix.python-version }}-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }}
path: |
runinfo/
.pytest/
ci_job_info.txt
compression-level: 9
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ clean_coverage:
mypy: ## run mypy checks
MYPYPATH=$(CWD)/mypy-stubs mypy parsl/

.PHONY: gce_test
gce_test: ## Run tests with GlobusComputeExecutor
pytest -v -k "not shared_fs and not issue_3620 and not staging_required" --config parsl/tests/configs/globus_compute.py parsl/tests/ --random-order --durations 10

.PHONY: local_thread_test
local_thread_test: ## run all tests with local_thread config
pytest parsl/tests/ -k "not cleannet" --config parsl/tests/configs/local_threads.py --random-order --durations 10
Expand Down
1 change: 1 addition & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Executors
parsl.executors.taskvine.TaskVineExecutor
parsl.executors.FluxExecutor
parsl.executors.radical.RadicalPilotExecutor
parsl.executors.globus_compute.GlobusComputeExecutor

Manager Selectors
=================
Expand Down
4 changes: 3 additions & 1 deletion parsl/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from parsl.executors.flux.executor import FluxExecutor
from parsl.executors.globus_compute import GlobusComputeExecutor
from parsl.executors.high_throughput.executor import HighThroughputExecutor
from parsl.executors.high_throughput.mpi_executor import MPIExecutor
from parsl.executors.threads import ThreadPoolExecutor
Expand All @@ -8,4 +9,5 @@
'HighThroughputExecutor',
'MPIExecutor',
'WorkQueueExecutor',
'FluxExecutor']
'FluxExecutor',
'GlobusComputeExecutor']
139 changes: 139 additions & 0 deletions parsl/executors/globus_compute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import uuid
from concurrent.futures import Future
from typing import Any, Callable, Dict, Optional, Union

from parsl.errors import OptionalModuleMissing
from parsl.executors.base import ParslExecutor
from parsl.utils import RepresentationMixin

UUID_LIKE_T = Union[uuid.UUID, str]


class GlobusComputeExecutor(ParslExecutor, RepresentationMixin):
""" GlobusComputeExecutor enables remote execution on Globus Compute endpoints

GlobusComputeExecutor is a thin wrapper over globus_compute_sdk.Executor
Refer to `globus-compute user documentation <https://globus-compute.readthedocs.io/en/latest/executor.html>`_
and `reference documentation <https://globus-compute.readthedocs.io/en/latest/reference/executor.html>`_
for more details.
"""

def __init__(
self,
endpoint_id: Optional[UUID_LIKE_T] = None,
task_group_id: Optional[UUID_LIKE_T] = None,
resource_specification: Optional[Dict[str, Any]] = None,
user_endpoint_config: Optional[Dict[str, Any]] = None,
label: str = "GlobusComputeExecutor",
batch_size: int = 128,
amqp_port: Optional[int] = None,
**kwargs,
):
"""
Parameters
----------

endpoint_id:
id of the endpoint to which to submit tasks

task_group_id:
The Task Group to which to associate tasks. If not set,
one will be instantiated.

resource_specification:
Specify resource requirements for individual task execution.

user_endpoint_config:
User endpoint configuration values as described
and allowed by endpoint administrators. Must be a JSON-serializable dict
or None.

label:
a label to name the executor; mainly utilized for
logging and advanced needs with multiple executors.

batch_size:
the maximum number of tasks to coalesce before
sending upstream [min: 1, default: 128]

amqp_port:
Port to use when connecting to results queue. Note that the
Compute web services only support 5671, 5672, and 443.

kwargs:
Other kwargs listed will be passed through to globus_compute_sdk.Executor
as is
"""
super().__init__()
self.endpoint_id = endpoint_id
self.task_group_id = task_group_id
self.resource_specification = resource_specification
self.user_endpoint_config = user_endpoint_config
self.label = label
self.batch_size = batch_size
self.amqp_port = amqp_port

try:
from globus_compute_sdk import Executor
except ImportError:
raise OptionalModuleMissing(
['globus-compute-sdk'],
"GlobusComputeExecutor requires globus-compute-sdk installed"
)
self._executor: Executor = Executor(
endpoint_id=endpoint_id,
task_group_id=task_group_id,
resource_specification=resource_specification,
user_endpoint_config=user_endpoint_config,
label=label,
batch_size=batch_size,
amqp_port=amqp_port,
**kwargs
)

def start(self) -> None:
"""Empty function
"""
pass

def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:
""" Submit fn to globus-compute


Parameters
----------

func: Callable
Python function to execute remotely
resource_specification: Dict[str, Any]
Resource specification used to run MPI applications on Endpoints configured
to use globus compute's MPIEngine
args:
Args to pass to the function
kwargs:
kwargs to pass to the function

Returns
-------

Future
"""
self._executor.resource_specification = resource_specification or self.resource_specification
return self._executor.submit(func, *args, **kwargs)

def shutdown(self, wait=True, *, cancel_futures=False):
"""Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods
can be called after this one.

Parameters
----------

wait: If True, then this method will not return until all pending
futures have received results.
cancel_futures: If True, then this method will cancel all futures
that have not yet registered their tasks with the Compute web services.
Tasks cannot be cancelled once they are registered.
"""
return self._executor.shutdown()
18 changes: 18 additions & 0 deletions parsl/tests/configs/globus_compute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os

from parsl.config import Config
from parsl.executors import GlobusComputeExecutor


def fresh_config():

endpoint_id = os.environ["GLOBUS_COMPUTE_ENDPOINT"]

return Config(
executors=[
GlobusComputeExecutor(
label="globus_compute",
endpoint_id=endpoint_id
)
]
)
12 changes: 12 additions & 0 deletions parsl/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,18 @@ def pytest_configure(config):
'markers',
'executor_supports_std_stream_tuples: Marks tests that require tuple support for stdout/stderr'
)
config.addinivalue_line(
'markers',
'globus_compute: Marks tests that require a valid globus_compute target'
)
config.addinivalue_line(
'markers',
'shared_fs: Marks tests that require a shared_fs between the workers are the test client'
)
config.addinivalue_line(
'markers',
'issue_3620: Marks tests that do not work correctly on GlobusComputeExecutor (ref: issue 3620)'
)


@pytest.fixture(autouse=True, scope='session')
Expand Down
3 changes: 3 additions & 0 deletions parsl/tests/test_bash_apps/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def foo(x, y, z=10, stdout=None, label=None):
return f"echo {x} {y} {z}"


@pytest.mark.shared_fs
def test_command_format_1(tmpd_cwd):
"""Testing command format for BashApps"""

Expand All @@ -38,6 +39,7 @@ def test_command_format_1(tmpd_cwd):
assert so_content == "1 4 10"


@pytest.mark.shared_fs
def test_auto_log_filename_format(caplog):
"""Testing auto log filename format for BashApps
"""
Expand Down Expand Up @@ -66,6 +68,7 @@ def test_auto_log_filename_format(caplog):
assert record.levelno < logging.ERROR


@pytest.mark.shared_fs
def test_parallel_for(tmpd_cwd, n=3):
"""Testing a simple parallel for loop"""
outdir = tmpd_cwd / "outputs/test_parallel"
Expand Down
4 changes: 4 additions & 0 deletions parsl/tests/test_bash_apps/test_error_codes.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def bad_format(stderr='std.err', stdout='std.out'):
whitelist = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'configs', '*threads*')


@pytest.mark.shared_fs
def test_div_0(test_fn=div_0):
err_code = test_matrix[test_fn]['exit_code']
f = test_fn()
Expand All @@ -73,6 +74,7 @@ def test_div_0(test_fn=div_0):
os.remove('std.out')


@pytest.mark.shared_fs
def test_bash_misuse(test_fn=bash_misuse):
err_code = test_matrix[test_fn]['exit_code']
f = test_fn()
Expand All @@ -87,6 +89,7 @@ def test_bash_misuse(test_fn=bash_misuse):
os.remove('std.out')


@pytest.mark.shared_fs
def test_command_not_found(test_fn=command_not_found):
err_code = test_matrix[test_fn]['exit_code']
f = test_fn()
Expand All @@ -103,6 +106,7 @@ def test_command_not_found(test_fn=command_not_found):
return True


@pytest.mark.shared_fs
def test_not_executable(test_fn=not_executable):
err_code = test_matrix[test_fn]['exit_code']
f = test_fn()
Expand Down
1 change: 1 addition & 0 deletions parsl/tests/test_bash_apps/test_kwarg_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ def foo(z=2, stdout=None):
return f"echo {z}"


@pytest.mark.shared_fs
def test_command_format_1(tmpd_cwd):
"""Testing command format for BashApps
"""
Expand Down
Loading
Loading