From 8e3d7cf20444f2da3032204389dd3f2d4be540e9 Mon Sep 17 00:00:00 2001 From: Shishi Chen Date: Fri, 7 Jun 2024 01:55:55 +0000 Subject: [PATCH 01/48] Added options to set annotations and a service account in the Kubernetes worker pod configuration --- parsl/providers/kubernetes/kube.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index 0b53881702..c5256a47f3 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -83,6 +83,10 @@ class KubernetesProvider(ExecutionProvider, RepresentationMixin): persistent_volumes: list[(str, str)] List of tuples describing persistent volumes to be mounted in the pod. The tuples consist of (PVC Name, Mount Directory). + service_account_name: str + Name of the service account to run the pod as. + annotations: Dict[str, str] + Annotations to set on the pod. """ @typeguard.typechecked def __init__(self, @@ -103,7 +107,9 @@ def __init__(self, group_id: Optional[str] = None, run_as_non_root: bool = False, secret: Optional[str] = None, - persistent_volumes: List[Tuple[str, str]] = []) -> None: + persistent_volumes: List[Tuple[str, str]] = [], + service_account_name: Optional[str] = None, + annotations: Optional[Dict[str, str]] = None) -> None: if not _kubernetes_enabled: raise OptionalModuleMissing(['kubernetes'], "Kubernetes provider requires kubernetes module and config.") @@ -146,6 +152,8 @@ def __init__(self, self.group_id = group_id self.run_as_non_root = run_as_non_root self.persistent_volumes = persistent_volumes + self.service_account_name = service_account_name + self.annotations = annotations self.kube_client = client.CoreV1Api() @@ -184,7 +192,9 @@ def submit(self, cmd_string, tasks_per_node, job_name="parsl"): pod_name=pod_name, job_name=job_name, cmd_string=formatted_cmd, - volumes=self.persistent_volumes) + volumes=self.persistent_volumes, + service_account_name=self.service_account_name, + annotations=self.annotations) self.resources[pod_name] = {'status': JobStatus(JobState.RUNNING)} return pod_name @@ -253,7 +263,9 @@ def _create_pod(self, job_name, port=80, cmd_string=None, - volumes=[]): + volumes=[], + service_account_name=None, + annotations=None): """ Create a kubernetes pod for the job. Args: - image (string) : Docker image to launch @@ -311,11 +323,12 @@ def _create_pod(self, claim_name=volume[0]))) metadata = client.V1ObjectMeta(name=pod_name, - labels={"app": job_name}) + labels={"app": job_name}, + annotations=annotations) spec = client.V1PodSpec(containers=[container], image_pull_secrets=[secret], - volumes=volume_defs - ) + volumes=volume_defs, + service_account_name=service_account_name) pod = client.V1Pod(spec=spec, metadata=metadata) api_response = self.kube_client.create_namespaced_pod(namespace=self.namespace, From 45269edc10cf3dc11dd1e02a5aac101a99c98b89 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 7 Jun 2024 16:24:40 +0000 Subject: [PATCH 02/48] Correct punctuation in debug message. hack out tests that won't fail-as-expected as root --- parsl/providers/kubernetes/kube.py | 2 +- parsl/tests/test_bash_apps/test_stdout.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index 0b53881702..b5b67f239f 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -179,7 +179,7 @@ def submit(self, cmd_string, tasks_per_node, job_name="parsl"): formatted_cmd = template_string.format(command=cmd_string, worker_init=self.worker_init) - logger.debug("Pod name :{}".format(pod_name)) + logger.debug("Pod name: {}".format(pod_name)) self._create_pod(image=self.image, pod_name=pod_name, job_name=job_name, diff --git a/parsl/tests/test_bash_apps/test_stdout.py b/parsl/tests/test_bash_apps/test_stdout.py index b1efadd445..3f4beb372a 100644 --- a/parsl/tests/test_bash_apps/test_stdout.py +++ b/parsl/tests/test_bash_apps/test_stdout.py @@ -16,7 +16,7 @@ def echo_to_streams(msg, stderr=None, stdout=None): whitelist = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'configs', '*threads*') speclist = ( - '/bad/dir/t.out', + # '/bad/dir/t.out', - isn't bad if we're root - should be tagged issue3328 too... ['t3.out', 'w'], ('t4.out', None), (42, 'w'), @@ -26,7 +26,7 @@ def echo_to_streams(msg, stderr=None, stdout=None): ) testids = [ - 'nonexistent_dir', + # 'nonexistent_dir', - goes with above /bad/dir/t.out 'list_not_tuple', 'null_mode', 'not_a_string', From 7ceec7a557814b4021a65c870d32e8d672fe6716 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 7 Jun 2024 16:40:15 +0000 Subject: [PATCH 03/48] Fix a couple of docstrings --- parsl/providers/kubernetes/kube.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index b5b67f239f..36f4edfd6e 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -160,10 +160,9 @@ def submit(self, cmd_string, tasks_per_node, job_name="parsl"): - tasks_per_node (int) : command invocations to be launched per node Kwargs: - - job_name (String): Name for job, must be unique + - job_name (String): Name for job Returns: - - None: At capacity, cannot provision more - job_id: (string) Identifier for the job """ From 0cdece2f89a57db1002e9ee13a5f9e9565f69ba5 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 7 Jun 2024 18:01:43 +0000 Subject: [PATCH 04/48] a bit of name sanitization for default pod names --- parsl/providers/kubernetes/kube.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index 36f4edfd6e..6fad4d286c 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -1,4 +1,5 @@ import logging +import re import time from parsl.providers.kubernetes.template import template_string @@ -175,6 +176,8 @@ def submit(self, cmd_string, tasks_per_node, job_name="parsl"): pod_name = '{}-{}'.format(self.pod_name, cur_timestamp) + pod_name = _sanitizeDNS1123(pod_name) + formatted_cmd = template_string.format(command=cmd_string, worker_init=self.worker_init) @@ -336,3 +339,25 @@ def label(self): @property def status_polling_interval(self): return 60 + + +# this is based on: +# https://github.com/kubernetes/apimachinery/blob/703232ea6da48aed7ac22260dabc6eac01aab896/pkg/util/validation/validation.go#L177C32-L177C62 +DNS_LABEL_REGEXP = "^[a-z0-9]([-a-z0-9]*[a-z0-9])?$" + + +def _sanitizeDNS1123(raw: str) -> str: + """Rewrite input string to be a valid RFC1123 DNS label. + This is required for Kubernetes pod names. + """ + + # label must be lowercase + raw = raw.lower() + + # label can only contain [-a-z0-9] characters - replace everything + # else with - + raw = re.sub("[^-a-z0-9]", "-", raw) + + # TODO: sanitize against first and last symbols (no - at start or end?) + assert re.match(DNS_LABEL_REGEXP, raw), "sanitized DNS1123 label has not been properly sanitized: " + raw + return raw From 87d34547e046f60c2d61fc719d2db684779313d4 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 8 Jun 2024 09:44:59 +0000 Subject: [PATCH 05/48] fiddle with markings to deal with no shared fs and no staging many of these newly marked "no shared fs" tests do not need to be shared fs, but instead could be staging-based? it would be good to test the tests that aren't marked as staging required with no staging providers configured, even in normal CI, to validate that test mark is applied properly. should also rewrite the mark name as shared_fs_required to match the X_required format? --- parsl/tests/test_bash_apps/test_basic.py | 3 +++ parsl/tests/test_bash_apps/test_error_codes.py | 4 ++++ parsl/tests/test_bash_apps/test_kwarg_storage.py | 1 + parsl/tests/test_bash_apps/test_memoize.py | 8 ++------ parsl/tests/test_bash_apps/test_memoize_ignore_args.py | 1 + .../tests/test_bash_apps/test_memoize_ignore_args_regr.py | 1 + parsl/tests/test_bash_apps/test_multiline.py | 1 + parsl/tests/test_bash_apps/test_stdout.py | 2 ++ parsl/tests/test_docs/test_kwargs.py | 3 +++ parsl/tests/test_python_apps/test_outputs.py | 1 + parsl/tests/test_regression/test_226.py | 1 + parsl/tests/test_staging/test_output_chain_filenames.py | 3 +++ parsl/tests/test_staging/test_staging_https.py | 3 +++ parsl/tests/test_staging/test_staging_stdout.py | 2 ++ 14 files changed, 28 insertions(+), 6 deletions(-) diff --git a/parsl/tests/test_bash_apps/test_basic.py b/parsl/tests/test_bash_apps/test_basic.py index 56d56d8ed1..0eea6d4d97 100644 --- a/parsl/tests/test_bash_apps/test_basic.py +++ b/parsl/tests/test_bash_apps/test_basic.py @@ -24,6 +24,7 @@ def foo(x, y, z=10, stdout=None, label=None): return f"echo {x} {y} {z}" +@pytest.mark.shared_fs def test_command_format_1(tmpd_cwd): """Testing command format for BashApps""" @@ -38,6 +39,7 @@ def test_command_format_1(tmpd_cwd): assert so_content == "1 4 10" +@pytest.mark.shared_fs def test_auto_log_filename_format(caplog): """Testing auto log filename format for BashApps """ @@ -66,6 +68,7 @@ def test_auto_log_filename_format(caplog): assert record.levelno < logging.ERROR +@pytest.mark.shared_fs def test_parallel_for(tmpd_cwd, n=3): """Testing a simple parallel for loop""" outdir = tmpd_cwd / "outputs/test_parallel" diff --git a/parsl/tests/test_bash_apps/test_error_codes.py b/parsl/tests/test_bash_apps/test_error_codes.py index 4a0b835728..bccded91a9 100644 --- a/parsl/tests/test_bash_apps/test_error_codes.py +++ b/parsl/tests/test_bash_apps/test_error_codes.py @@ -58,6 +58,7 @@ def bad_format(stderr='std.err', stdout='std.out'): whitelist = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'configs', '*threads*') +@pytest.mark.shared_fs def test_div_0(test_fn=div_0): err_code = test_matrix[test_fn]['exit_code'] f = test_fn() @@ -73,6 +74,7 @@ def test_div_0(test_fn=div_0): os.remove('std.out') +@pytest.mark.shared_fs def test_bash_misuse(test_fn=bash_misuse): err_code = test_matrix[test_fn]['exit_code'] f = test_fn() @@ -87,6 +89,7 @@ def test_bash_misuse(test_fn=bash_misuse): os.remove('std.out') +@pytest.mark.shared_fs def test_command_not_found(test_fn=command_not_found): err_code = test_matrix[test_fn]['exit_code'] f = test_fn() @@ -103,6 +106,7 @@ def test_command_not_found(test_fn=command_not_found): return True +@pytest.mark.shared_fs def test_not_executable(test_fn=not_executable): err_code = test_matrix[test_fn]['exit_code'] f = test_fn() diff --git a/parsl/tests/test_bash_apps/test_kwarg_storage.py b/parsl/tests/test_bash_apps/test_kwarg_storage.py index 8e0d48c661..e88a4c2967 100644 --- a/parsl/tests/test_bash_apps/test_kwarg_storage.py +++ b/parsl/tests/test_bash_apps/test_kwarg_storage.py @@ -8,6 +8,7 @@ def foo(z=2, stdout=None): return f"echo {z}" +@pytest.mark.shared_fs def test_command_format_1(tmpd_cwd): """Testing command format for BashApps """ diff --git a/parsl/tests/test_bash_apps/test_memoize.py b/parsl/tests/test_bash_apps/test_memoize.py index d53460b50b..387837f4d2 100644 --- a/parsl/tests/test_bash_apps/test_memoize.py +++ b/parsl/tests/test_bash_apps/test_memoize.py @@ -9,9 +9,7 @@ def fail_on_presence(outputs=()): return 'if [ -f {0} ] ; then exit 1 ; else touch {0}; fi'.format(outputs[0]) -# This test is an oddity that requires a shared-FS and simply -# won't work if there's a staging provider. -# @pytest.mark.sharedFS_required +@pytest.mark.shared_fs def test_bash_memoization(tmpd_cwd, n=2): """Testing bash memoization """ @@ -29,9 +27,7 @@ def fail_on_presence_kw(outputs=(), foo=None): return 'if [ -f {0} ] ; then exit 1 ; else touch {0}; fi'.format(outputs[0]) -# This test is an oddity that requires a shared-FS and simply -# won't work if there's a staging provider. -# @pytest.mark.sharedFS_required +@pytest.mark.shared_fs def test_bash_memoization_keywords(tmpd_cwd, n=2): """Testing bash memoization """ diff --git a/parsl/tests/test_bash_apps/test_memoize_ignore_args.py b/parsl/tests/test_bash_apps/test_memoize_ignore_args.py index 0439bfb163..3e8db1b012 100644 --- a/parsl/tests/test_bash_apps/test_memoize_ignore_args.py +++ b/parsl/tests/test_bash_apps/test_memoize_ignore_args.py @@ -23,6 +23,7 @@ def no_checkpoint_stdout_app_ignore_args(stdout=None): return "echo X" +@pytest.mark.shared_fs def test_memo_stdout(): # this should run and create a file named after path_x diff --git a/parsl/tests/test_bash_apps/test_memoize_ignore_args_regr.py b/parsl/tests/test_bash_apps/test_memoize_ignore_args_regr.py index 3c9b51e980..3081878ba0 100644 --- a/parsl/tests/test_bash_apps/test_memoize_ignore_args_regr.py +++ b/parsl/tests/test_bash_apps/test_memoize_ignore_args_regr.py @@ -30,6 +30,7 @@ def no_checkpoint_stdout_app(stdout=None): return "echo X" +@pytest.mark.shared_fs def test_memo_stdout(): assert const_list_x == const_list_x_arg diff --git a/parsl/tests/test_bash_apps/test_multiline.py b/parsl/tests/test_bash_apps/test_multiline.py index 59fb5ed7b2..cfdb674e9d 100644 --- a/parsl/tests/test_bash_apps/test_multiline.py +++ b/parsl/tests/test_bash_apps/test_multiline.py @@ -14,6 +14,7 @@ def multiline(inputs=(), outputs=(), stderr=None, stdout=None): """.format(inputs=inputs, outputs=outputs) +@pytest.mark.shared_fs def test_multiline(tmpd_cwd): so, se = tmpd_cwd / "std.out", tmpd_cwd / "std.err" f = multiline( diff --git a/parsl/tests/test_bash_apps/test_stdout.py b/parsl/tests/test_bash_apps/test_stdout.py index 3f4beb372a..c8b1942979 100644 --- a/parsl/tests/test_bash_apps/test_stdout.py +++ b/parsl/tests/test_bash_apps/test_stdout.py @@ -73,6 +73,7 @@ def test_bad_stderr_file(): @pytest.mark.executor_supports_std_stream_tuples +@pytest.mark.shared_fs def test_stdout_truncate(tmpd_cwd, caplog): """Testing truncation of prior content of stdout""" @@ -92,6 +93,7 @@ def test_stdout_truncate(tmpd_cwd, caplog): assert record.levelno < logging.ERROR +@pytest.mark.shared_fs def test_stdout_append(tmpd_cwd, caplog): """Testing appending to prior content of stdout (default open() mode)""" diff --git a/parsl/tests/test_docs/test_kwargs.py b/parsl/tests/test_docs/test_kwargs.py index 5524c0b819..80907ebe08 100644 --- a/parsl/tests/test_docs/test_kwargs.py +++ b/parsl/tests/test_docs/test_kwargs.py @@ -1,6 +1,8 @@ """Functions used to explain kwargs""" from pathlib import Path +import pytest + from parsl import File, python_app @@ -19,6 +21,7 @@ def reduce_app(inputs=()): assert reduce_future.result() == 6 +@pytest.mark.shared_fs def test_outputs(tmpd_cwd): @python_app() def write_app(message, outputs=()): diff --git a/parsl/tests/test_python_apps/test_outputs.py b/parsl/tests/test_python_apps/test_outputs.py index c4b9dbabe2..b4273cf286 100644 --- a/parsl/tests/test_python_apps/test_outputs.py +++ b/parsl/tests/test_python_apps/test_outputs.py @@ -16,6 +16,7 @@ def double(x, outputs=[]): whitelist = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'configs', '*threads*') +@pytest.mark.shared_fs def test_launch_apps(tmpd_cwd, n=2): outdir = tmpd_cwd / "outputs" outdir.mkdir() diff --git a/parsl/tests/test_regression/test_226.py b/parsl/tests/test_regression/test_226.py index 2f560466dc..8babf4cbe0 100644 --- a/parsl/tests/test_regression/test_226.py +++ b/parsl/tests/test_regression/test_226.py @@ -53,6 +53,7 @@ def test_get_dataframe(): assert res.equals(data), 'Unexpected dataframe' +@pytest.mark.shared_fs def test_bash_default_arg(): if os.path.exists('std.out'): os.remove('std.out') diff --git a/parsl/tests/test_staging/test_output_chain_filenames.py b/parsl/tests/test_staging/test_output_chain_filenames.py index 016714497b..442bd6fd5a 100644 --- a/parsl/tests/test_staging/test_output_chain_filenames.py +++ b/parsl/tests/test_staging/test_output_chain_filenames.py @@ -1,5 +1,7 @@ from concurrent.futures import Future +import pytest + from parsl import File from parsl.app.app import bash_app @@ -14,6 +16,7 @@ def app2(inputs=(), outputs=(), stdout=None, stderr=None, mock=False): return f"echo '{inputs[0]}' > {outputs[0]}" +@pytest.mark.shared_fs def test_behavior(tmpd_cwd): expected_path = str(tmpd_cwd / "simple-out.txt") app1_future = app1( diff --git a/parsl/tests/test_staging/test_staging_https.py b/parsl/tests/test_staging/test_staging_https.py index 4a68e66a5c..c977472249 100644 --- a/parsl/tests/test_staging/test_staging_https.py +++ b/parsl/tests/test_staging/test_staging_https.py @@ -48,6 +48,7 @@ def sort_strings_additional_executor(inputs=(), outputs=()): @pytest.mark.cleannet +@pytest.mark.staging_required def test_staging_https_cleannet(tmpd_cwd): unsorted_file = File(_unsorted_url) sorted_file = File(tmpd_cwd / 'sorted.txt') @@ -68,6 +69,7 @@ def test_staging_https_local(tmpd_cwd): @pytest.mark.cleannet +@pytest.mark.staging_required def test_staging_https_kwargs(tmpd_cwd): unsorted_file = File(_unsorted_url) sorted_file = File(tmpd_cwd / 'sorted.txt') @@ -78,6 +80,7 @@ def test_staging_https_kwargs(tmpd_cwd): @pytest.mark.cleannet +@pytest.mark.staging_required def test_staging_https_args(tmpd_cwd): unsorted_file = File(_unsorted_url) sorted_file = File(tmpd_cwd / 'sorted.txt') diff --git a/parsl/tests/test_staging/test_staging_stdout.py b/parsl/tests/test_staging/test_staging_stdout.py index aaa45440a7..0eae6df70f 100644 --- a/parsl/tests/test_staging/test_staging_stdout.py +++ b/parsl/tests/test_staging/test_staging_stdout.py @@ -15,6 +15,7 @@ def output_to_stds(*, stdout=parsl.AUTO_LOGNAME, stderr=parsl.AUTO_LOGNAME): return "echo hello ; echo goodbye >&2" +@pytest.mark.staging_required def test_stdout_staging_file(tmpd_cwd, caplog): basename = str(tmpd_cwd) + "/stdout.txt" stdout_file = File("file://" + basename) @@ -30,6 +31,7 @@ def test_stdout_staging_file(tmpd_cwd, caplog): assert record.levelno < logging.ERROR +@pytest.mark.staging_required def test_stdout_stderr_staging_zip(tmpd_cwd, caplog): zipfile_name = str(tmpd_cwd) + "/staging.zip" stdout_relative_path = "somewhere/test-out.txt" From 954bad76fe087d84b01d440e8c1db071d8eccb88 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 10:28:12 +0000 Subject: [PATCH 06/48] add config file i've been using --- htex_k8s_kind.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 htex_k8s_kind.py diff --git a/htex_k8s_kind.py b/htex_k8s_kind.py new file mode 100644 index 0000000000..5b26c9b104 --- /dev/null +++ b/htex_k8s_kind.py @@ -0,0 +1,24 @@ +from parsl.channels import LocalChannel +from parsl.config import Config +from parsl.executors import HighThroughputExecutor +from parsl.launchers import SimpleLauncher +from parsl.providers import KubernetesProvider + + +def fresh_config(): + return Config( + executors=[ + HighThroughputExecutor( + label="executorname", + worker_debug=True, + cores_per_worker=1, + encrypted=False, # needs certificate fs to be mounted in same place... + provider=KubernetesProvider( + worker_init=". /venv/bin/activate", + # pod_name="override-pod-name", # can't use default name because of dots, without own bugfix + image="parslimg:a" + ), + ) + ], + strategy='none', + ) From 17a00dda8c46669f10c2a59cd6b64476bcedf5b4 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 10:29:56 +0000 Subject: [PATCH 07/48] add the dockerfile i've been using --- Dockerfile | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000000..d308baead6 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,35 @@ +FROM debian:trixie + +RUN apt-get update && apt-get upgrade -y + +RUN apt-get update && apt-get install -y sudo openssh-server + +RUN apt-get update && apt-get install -y curl less vim + +# git is needed for parsl to figure out it's own repo-specific +# version string +RUN apt-get update && apt-get install -y git + +# useful stuff to have around +RUN apt-get update && apt-get install -y procps + +# for building documentation +RUN apt-get update && apt-get install -y pandoc + +# for monitoring visualization +RUN apt-get update && apt-get install -y graphviz wget + +# for commandline access to monitoring database +RUN apt-get update && apt-get install -y sqlite3 + +RUN apt-get update && apt-get install -y python3.12 python3.12-dev +RUN apt-get update && apt-get install -y python3.12-venv + +RUN apt-get update && apt-get install -y gcc build-essential make pkg-config mpich + +RUN python3.12 -m venv /venv + +ADD . /src +WORKDIR /src + +RUN . /venv/bin/activate && pip3 install . -r test-requirements.txt From 62e0e36ecbbde6862a7512d863f161fb7bed30c4 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 11:03:04 +0000 Subject: [PATCH 08/48] beginning of kubernetes-in-CI --- .github/workflows/ci-k8s.yaml | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 .github/workflows/ci-k8s.yaml diff --git a/.github/workflows/ci-k8s.yaml b/.github/workflows/ci-k8s.yaml new file mode 100644 index 0000000000..1e0112bde9 --- /dev/null +++ b/.github/workflows/ci-k8s.yaml @@ -0,0 +1,26 @@ +name: Parsl + +on: + pull_request: + types: + - opened + - synchronize + +jobs: + k8s-kind-suite: + runs-on: ubuntu-20.04 + timeout-minutes: 60 + + steps: + - uses: actions/checkout@master + + - name: Create k8s Kind Cluster + uses: helm/kind-action@v1 + with: + cluster_name: parsl-k8s + + - name: Build and push + uses: docker/build-push-action@v5 + with: + context: . + tags: parsl:ci From 9086e19f81ce0b0403d0664caf39839e7f031c54 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 11:09:50 +0000 Subject: [PATCH 09/48] push docker image? upgrade ubuntu --- .github/workflows/ci-k8s.yaml | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci-k8s.yaml b/.github/workflows/ci-k8s.yaml index 1e0112bde9..61ca909353 100644 --- a/.github/workflows/ci-k8s.yaml +++ b/.github/workflows/ci-k8s.yaml @@ -8,7 +8,7 @@ on: jobs: k8s-kind-suite: - runs-on: ubuntu-20.04 + runs-on: ubuntu-24.04 timeout-minutes: 60 steps: @@ -19,8 +19,13 @@ jobs: with: cluster_name: parsl-k8s - - name: Build and push + - name: Build docker image uses: docker/build-push-action@v5 with: context: . tags: parsl:ci + + - name: Push docker image into kubernetes cluster + run: | + kind load docker-image parsl:ci + From 26869e6398789f21d7fca87ec2d64b1b02270365 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 11:18:33 +0000 Subject: [PATCH 10/48] fiddle with default name --- .github/workflows/ci-k8s.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci-k8s.yaml b/.github/workflows/ci-k8s.yaml index 61ca909353..c8bb89f83c 100644 --- a/.github/workflows/ci-k8s.yaml +++ b/.github/workflows/ci-k8s.yaml @@ -17,7 +17,9 @@ jobs: - name: Create k8s Kind Cluster uses: helm/kind-action@v1 with: - cluster_name: parsl-k8s + # kind tooling uses this name by default, but kind-action uses + # a different default name + cluster_name: kind - name: Build docker image uses: docker/build-push-action@v5 From 16c0a49eb547b159f4de4c1ffab2e3ccbb010380 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 12:11:55 +0000 Subject: [PATCH 11/48] Add kubernetes, needed for submitting from inside a cluster --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index d308baead6..cbcf97a7fb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -32,4 +32,4 @@ RUN python3.12 -m venv /venv ADD . /src WORKDIR /src -RUN . /venv/bin/activate && pip3 install . -r test-requirements.txt +RUN . /venv/bin/activate && pip3 install '.[kubernetes]' -r test-requirements.txt From b03615a3e542d776d645ea890fe7c54af570661d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 13:01:52 +0000 Subject: [PATCH 12/48] Add more bits for running everything in a kubernetes cluster --- .github/workflows/ci-k8s.yaml | 9 +++++++++ pytest-task.yaml | 14 ++++++++++++++ runme.sh | 7 +++++++ 3 files changed, 30 insertions(+) create mode 100644 pytest-task.yaml create mode 100644 runme.sh diff --git a/.github/workflows/ci-k8s.yaml b/.github/workflows/ci-k8s.yaml index c8bb89f83c..32f2537529 100644 --- a/.github/workflows/ci-k8s.yaml +++ b/.github/workflows/ci-k8s.yaml @@ -31,3 +31,12 @@ jobs: run: | kind load docker-image parsl:ci + - name: set liberal permissions + run: | + kubectl create clusterrolebinding serviceaccounts-cluster-admin --clusterrole=cluster-admin --group=system:serviceaccounts + + - name: launch pytest Job + run: | + kubectl create -f ./pytest-task.yaml + - name: wait for pytest Job + kubectl wait --timeout=600s --for=condition=Complete Job pytest diff --git a/pytest-task.yaml b/pytest-task.yaml new file mode 100644 index 0000000000..44e93dfd6a --- /dev/null +++ b/pytest-task.yaml @@ -0,0 +1,14 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: pytest +spec: + activeDeadlineSeconds: 600 + template: + spec: + restartPolicy: Never + containers: + - name: pytest + image: parslimg:a + command: ["bash", "runme.sh"] + diff --git a/runme.sh b/runme.sh new file mode 100644 index 0000000000..3d3ae08390 --- /dev/null +++ b/runme.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +source /venv/bin/activate + +pytest parsl/tests/ --config ./htex_k8s_kind.py -k 'not issue3328 and not staging_required and not shared_fs' -x + + From e122d19ab4387648c598b0ca1ed7c95cbd3f0a87 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 13:05:40 +0000 Subject: [PATCH 13/48] fix syntax error in github workflow definition --- .github/workflows/ci-k8s.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci-k8s.yaml b/.github/workflows/ci-k8s.yaml index 32f2537529..d6adf6c11c 100644 --- a/.github/workflows/ci-k8s.yaml +++ b/.github/workflows/ci-k8s.yaml @@ -38,5 +38,7 @@ jobs: - name: launch pytest Job run: | kubectl create -f ./pytest-task.yaml + - name: wait for pytest Job + run: | kubectl wait --timeout=600s --for=condition=Complete Job pytest From ee14f6e1dc1db0a685ce66b6228b30a91717963c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 13:16:03 +0000 Subject: [PATCH 14/48] Tighten timeout, add some debugging info at the end --- .github/workflows/ci-k8s.yaml | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci-k8s.yaml b/.github/workflows/ci-k8s.yaml index d6adf6c11c..46aee91bab 100644 --- a/.github/workflows/ci-k8s.yaml +++ b/.github/workflows/ci-k8s.yaml @@ -41,4 +41,13 @@ jobs: - name: wait for pytest Job run: | - kubectl wait --timeout=600s --for=condition=Complete Job pytest + # this pytest should take around 30 seconds to run, so 180 seconds + # should be plenty... + kubectl wait --timeout=180s --for=condition=Complete Job pytest + + - name: report some info + if: ${{ always() }} + run: | + kubectl describe pods + kubectl describe jobs + kubectl logs Job/pytest From 120cf78c7158d1d49ac43979ddfb9cc54721fc8c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 13:30:29 +0000 Subject: [PATCH 15/48] Correct pod name from my test --- htex_k8s_kind.py | 2 +- pytest-task.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/htex_k8s_kind.py b/htex_k8s_kind.py index 5b26c9b104..0439afbda4 100644 --- a/htex_k8s_kind.py +++ b/htex_k8s_kind.py @@ -16,7 +16,7 @@ def fresh_config(): provider=KubernetesProvider( worker_init=". /venv/bin/activate", # pod_name="override-pod-name", # can't use default name because of dots, without own bugfix - image="parslimg:a" + image="parsl:ci" ), ) ], diff --git a/pytest-task.yaml b/pytest-task.yaml index 44e93dfd6a..4741411b63 100644 --- a/pytest-task.yaml +++ b/pytest-task.yaml @@ -9,6 +9,6 @@ spec: restartPolicy: Never containers: - name: pytest - image: parslimg:a + image: parsl:ci command: ["bash", "runme.sh"] From 5c55fe6ff1c55cc1f3589e3de0af99116290655d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 13:57:54 +0000 Subject: [PATCH 16/48] try to stop Job from recreating pod on failure, but instead abort fast --- pytest-task.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/pytest-task.yaml b/pytest-task.yaml index 4741411b63..902823a15b 100644 --- a/pytest-task.yaml +++ b/pytest-task.yaml @@ -4,6 +4,7 @@ metadata: name: pytest spec: activeDeadlineSeconds: 600 + backoffLimit: 1 template: spec: restartPolicy: Never From b56dfd9ce3671e3390d6df15484efd9cd349f36f Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 14:09:21 +0000 Subject: [PATCH 17/48] Randomise test order to see if a test failure is specific to a particular test, or something else --- runme.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runme.sh b/runme.sh index 3d3ae08390..cc016ccbf4 100644 --- a/runme.sh +++ b/runme.sh @@ -2,6 +2,6 @@ source /venv/bin/activate -pytest parsl/tests/ --config ./htex_k8s_kind.py -k 'not issue3328 and not staging_required and not shared_fs' -x +pytest parsl/tests/ --config ./htex_k8s_kind.py -k 'not issue3328 and not staging_required and not shared_fs' -x --random-order From f8f5a279bd11454742cdaf7f08e71ae1f9221b37 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 14:22:25 +0000 Subject: [PATCH 18/48] Add some memory logging --- .github/workflows/ci-k8s.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci-k8s.yaml b/.github/workflows/ci-k8s.yaml index 46aee91bab..935ec8132d 100644 --- a/.github/workflows/ci-k8s.yaml +++ b/.github/workflows/ci-k8s.yaml @@ -37,6 +37,7 @@ jobs: - name: launch pytest Job run: | + free -h kubectl create -f ./pytest-task.yaml - name: wait for pytest Job @@ -48,6 +49,7 @@ jobs: - name: report some info if: ${{ always() }} run: | + free -h kubectl describe pods kubectl describe jobs kubectl logs Job/pytest From f4a7300791d265a4b9354f6f52b10f1e6894e63f Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 14:24:30 +0000 Subject: [PATCH 19/48] Allocate more memory to workers --- htex_k8s_kind.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/htex_k8s_kind.py b/htex_k8s_kind.py index 0439afbda4..d85ce633e1 100644 --- a/htex_k8s_kind.py +++ b/htex_k8s_kind.py @@ -16,7 +16,8 @@ def fresh_config(): provider=KubernetesProvider( worker_init=". /venv/bin/activate", # pod_name="override-pod-name", # can't use default name because of dots, without own bugfix - image="parsl:ci" + image="parsl:ci", + max_mem="2048Gi" # was getting OOM-killing of workers with default... maybe this will help. ), ) ], From ffdb02183d2ce2b38d1df666d0955e337bf1a87a Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 14:36:50 +0000 Subject: [PATCH 20/48] Add a staging_required marker that apparently wasn't breaking things always in k8s --- parsl/tests/test_docs/test_from_slides.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/parsl/tests/test_docs/test_from_slides.py b/parsl/tests/test_docs/test_from_slides.py index b07092b4ae..b3242e813e 100644 --- a/parsl/tests/test_docs/test_from_slides.py +++ b/parsl/tests/test_docs/test_from_slides.py @@ -1,5 +1,7 @@ import os +import pytest + from parsl.app.app import bash_app, python_app from parsl.data_provider.files import File @@ -15,6 +17,7 @@ def cat(inputs=[]): return f.readlines() +@pytest.mark.staging_required def test_slides(): """Testing code snippet from slides """ From 21711e9aa62bdd9cc0f47512ffc605f31d4d71da Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 14:38:36 +0000 Subject: [PATCH 21/48] messing with backoff limits and restart policy --- pytest-task.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pytest-task.yaml b/pytest-task.yaml index 902823a15b..c23f055bcd 100644 --- a/pytest-task.yaml +++ b/pytest-task.yaml @@ -4,7 +4,8 @@ metadata: name: pytest spec: activeDeadlineSeconds: 600 - backoffLimit: 1 + backoffLimit: 0 + restartPolicy: Never template: spec: restartPolicy: Never From fb1733eabf3ac15d013d91e2726c17dd7a310769 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 14:45:48 +0000 Subject: [PATCH 22/48] remove apparently invalid restart policy --- pytest-task.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/pytest-task.yaml b/pytest-task.yaml index c23f055bcd..03758ce2cf 100644 --- a/pytest-task.yaml +++ b/pytest-task.yaml @@ -5,7 +5,6 @@ metadata: spec: activeDeadlineSeconds: 600 backoffLimit: 0 - restartPolicy: Never template: spec: restartPolicy: Never From 73b3e1d39d210fb6480f888a8e3a1cfafab7e208 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jun 2024 15:04:36 +0000 Subject: [PATCH 23/48] Flush out some more staging_required tests (by setting storage_access=[] in local htex test...) --- htex_k8s_kind.py | 1 + parsl/tests/test_staging/test_docs_1.py | 1 + parsl/tests/test_staging/test_staging_ftp.py | 1 + 3 files changed, 3 insertions(+) diff --git a/htex_k8s_kind.py b/htex_k8s_kind.py index d85ce633e1..c09998bce8 100644 --- a/htex_k8s_kind.py +++ b/htex_k8s_kind.py @@ -10,6 +10,7 @@ def fresh_config(): executors=[ HighThroughputExecutor( label="executorname", + storage_access=[], worker_debug=True, cores_per_worker=1, encrypted=False, # needs certificate fs to be mounted in same place... diff --git a/parsl/tests/test_staging/test_docs_1.py b/parsl/tests/test_staging/test_docs_1.py index 8f549ae9b3..c4f0e3b007 100644 --- a/parsl/tests/test_staging/test_docs_1.py +++ b/parsl/tests/test_staging/test_docs_1.py @@ -12,6 +12,7 @@ def convert(inputs=[], outputs=[]): @pytest.mark.cleannet +@pytest.mark.staging_required def test(): # create an remote Parsl file inp = File('ftp://ftp.iana.org/pub/mirror/rirstats/arin/ARIN-STATS-FORMAT-CHANGE.txt') diff --git a/parsl/tests/test_staging/test_staging_ftp.py b/parsl/tests/test_staging/test_staging_ftp.py index 12becdf9c4..a004f5a575 100644 --- a/parsl/tests/test_staging/test_staging_ftp.py +++ b/parsl/tests/test_staging/test_staging_ftp.py @@ -15,6 +15,7 @@ def sort_strings(inputs=[], outputs=[]): @pytest.mark.cleannet +@pytest.mark.staging_required def test_staging_ftp(): """Test staging for an ftp file From 31bc95844fbc263875da880679487fa9191fb0c7 Mon Sep 17 00:00:00 2001 From: Shishi Chen Date: Wed, 12 Jun 2024 23:46:55 +0000 Subject: [PATCH 24/48] Switch the Kubernetes client call to read_namespaced_pod_status() to read_namespaced_pod(), which is functionally the same but requires fewer permissions. --- parsl/providers/kubernetes/kube.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index c5256a47f3..17181c2b74 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -243,13 +243,13 @@ def _status(self): for jid in to_poll_job_ids: phase = None try: - pod_status = self.kube_client.read_namespaced_pod_status(name=jid, namespace=self.namespace) + pod = self.kube_client.read_namespaced_pod(name=jid, namespace=self.namespace) except Exception: logger.exception("Failed to poll pod {} status, most likely because pod was terminated".format(jid)) if self.resources[jid]['status'] is JobStatus(JobState.RUNNING): phase = 'Unknown' else: - phase = pod_status.status.phase + phase = pod.status.phase if phase: status = translate_table.get(phase, JobState.UNKNOWN) logger.debug("Updating pod {} with status {} to parsl status {}".format(jid, From 8b390246c0047daf2ac2ae567cee16fa07fe2666 Mon Sep 17 00:00:00 2001 From: Shishi Chen Date: Thu, 13 Jun 2024 14:16:58 +0000 Subject: [PATCH 25/48] Fixed Kubernetes worker container launch command to remove trailing semicolon. --- parsl/providers/kubernetes/kube.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index c5256a47f3..9bc1b8c5cf 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -286,7 +286,7 @@ def _create_pod(self, # Create the environment variables and command to initiate IPP environment_vars = client.V1EnvVar(name="TEST", value="SOME DATA") - launch_args = ["-c", "{0};".format(cmd_string)] + launch_args = ["-c", "{0}".format(cmd_string)] volume_mounts = [] # Create mount paths for the volumes From b75a3ae7acfd417f35fc07319ffcccafc642716d Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Mon, 19 Aug 2024 02:36:03 -0400 Subject: [PATCH 26/48] function data in temp --- parsl/executors/taskvine/executor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 2e1efb211f..891899f6fc 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -18,6 +18,7 @@ import threading import uuid from concurrent.futures import Future +from datetime import datetime from typing import List, Literal, Optional, Union # Import other libraries @@ -215,7 +216,8 @@ def __create_data_and_logging_dirs(self): # Create directories for data and results log_dir = os.path.join(run_dir, self.label) - self._function_data_dir = os.path.join(run_dir, self.label, "function_data") + tmp_dir = os.path.join('/tmp/', f'{self.label}-{os.getlogin()}') + self._function_data_dir = os.path.join(tmp_dir, datetime.now().strftime('%Y%m%d%H%M%S%f'), "function_data") os.makedirs(log_dir) os.makedirs(self._function_data_dir) From 2c18d6cadc433e7fd959321cab6578120a9f7642 Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Mon, 19 Aug 2024 10:42:48 -0400 Subject: [PATCH 27/48] use getpass for username --- parsl/executors/taskvine/executor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 891899f6fc..b242c51dac 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -3,6 +3,7 @@ high-throughput system for delegating Parsl tasks to thousands of remote machines """ +import getpass import hashlib import inspect import itertools @@ -216,7 +217,7 @@ def __create_data_and_logging_dirs(self): # Create directories for data and results log_dir = os.path.join(run_dir, self.label) - tmp_dir = os.path.join('/tmp/', f'{self.label}-{os.getlogin()}') + tmp_dir = os.path.join('/tmp/', f'{self.label}-{getpass.getuser()}') self._function_data_dir = os.path.join(tmp_dir, datetime.now().strftime('%Y%m%d%H%M%S%f'), "function_data") os.makedirs(log_dir) os.makedirs(self._function_data_dir) From c201ec1a68fcba827a001070bf5831ea7f354db1 Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Tue, 20 Aug 2024 10:28:46 -0400 Subject: [PATCH 28/48] use tempfile module --- parsl/executors/taskvine/executor.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index b242c51dac..73e80086ab 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -217,10 +217,10 @@ def __create_data_and_logging_dirs(self): # Create directories for data and results log_dir = os.path.join(run_dir, self.label) - tmp_dir = os.path.join('/tmp/', f'{self.label}-{getpass.getuser()}') - self._function_data_dir = os.path.join(tmp_dir, datetime.now().strftime('%Y%m%d%H%M%S%f'), "function_data") os.makedirs(log_dir) - os.makedirs(self._function_data_dir) + + tmp_prefix = f'{self.label}-{getpass.getuser()}-{datetime.now().strftime("%Y%m%d%H%M%S%f")}-' + self._function_data_dir = tempfile.TemporaryDirectory(prefix=tmp_prefix) # put TaskVine logs outside of a Parsl run as TaskVine caches between runs while # Parsl does not. @@ -230,7 +230,7 @@ def __create_data_and_logging_dirs(self): # factory logs go with manager logs regardless self.factory_config.scratch_dir = self.manager_config.vine_log_dir - logger.debug(f"Function data directory: {self._function_data_dir}, log directory: {log_dir}") + logger.debug(f"Function data directory: {self._function_data_dir.name}, log directory: {log_dir}") logger.debug( f"TaskVine manager log directory: {self.manager_config.vine_log_dir}, " f"factory log directory: {self.factory_config.scratch_dir}") @@ -296,7 +296,7 @@ def _path_in_task(self, executor_task_id, *path_components): 'map': Pickled file with a dict between local parsl names, and remote taskvine names. """ task_dir = "{:04d}".format(executor_task_id) - return os.path.join(self._function_data_dir, task_dir, *path_components) + return os.path.join(self._function_data_dir.name, task_dir, *path_components) def submit(self, func, resource_specification, *args, **kwargs): """Processes the Parsl app by its arguments and submits the function From 9f6b0377d17218e28e7f848cf4e9cc7018fb85c8 Mon Sep 17 00:00:00 2001 From: Colin Thomas Date: Tue, 20 Aug 2024 10:34:14 -0400 Subject: [PATCH 29/48] flake etc --- parsl/executors/taskvine/executor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 73e80086ab..a15a444d2c 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -218,7 +218,6 @@ def __create_data_and_logging_dirs(self): # Create directories for data and results log_dir = os.path.join(run_dir, self.label) os.makedirs(log_dir) - tmp_prefix = f'{self.label}-{getpass.getuser()}-{datetime.now().strftime("%Y%m%d%H%M%S%f")}-' self._function_data_dir = tempfile.TemporaryDirectory(prefix=tmp_prefix) From 4938dbf857ca37d2d8deec5b7fd47262b6339f3a Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 5 Sep 2024 12:42:01 +0000 Subject: [PATCH 30/48] Build cctools and run a probably-broken taskvine vs kubernetes test config --- Dockerfile | 9 +++++++-- runme.sh | 3 ++- taskvine_k8s_kind.py | 18 ++++++++++++++++++ 3 files changed, 27 insertions(+), 3 deletions(-) create mode 100644 taskvine_k8s_kind.py diff --git a/Dockerfile b/Dockerfile index cbcf97a7fb..bb6d5f52d9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,7 +29,12 @@ RUN apt-get update && apt-get install -y gcc build-essential make pkg-config mpi RUN python3.12 -m venv /venv -ADD . /src -WORKDIR /src +ADD . /parsl +WORKDIR / +RUN git clone https://github.com/cooperative-computing-lab/cctools +WORKDIR /cctools +RUN . /venv/bin/activate && apt install swig && ./configure --prefix=/ && make && make install +WORKDIR /parsl RUN . /venv/bin/activate && pip3 install '.[kubernetes]' -r test-requirements.txt + diff --git a/runme.sh b/runme.sh index cc016ccbf4..2d5b857747 100644 --- a/runme.sh +++ b/runme.sh @@ -1,7 +1,8 @@ -#!/bin/bash +#!/bin/bash -e source /venv/bin/activate pytest parsl/tests/ --config ./htex_k8s_kind.py -k 'not issue3328 and not staging_required and not shared_fs' -x --random-order +PYTHONPATH=/usr/lib/python3.12/site-packages/ pytest parsl/tests/ --config ./taskvine_k8s_kind.py -k 'not issue3328 and not staging_required and not shared_fs' -x --random-order diff --git a/taskvine_k8s_kind.py b/taskvine_k8s_kind.py new file mode 100644 index 0000000000..787e9fb396 --- /dev/null +++ b/taskvine_k8s_kind.py @@ -0,0 +1,18 @@ +from parsl.channels import LocalChannel +from parsl.config import Config +from parsl.launchers import SimpleLauncher +from parsl.providers import KubernetesProvider + +from parsl.executors.taskvine import TaskVineExecutor, TaskVineManagerConfig + +def fresh_config(): + return Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000), + worker_launch_method='provider', + provider=KubernetesProvider( + worker_init=". /venv/bin/activate", + # pod_name="override-pod-name", # can't use default name because of dots, without own bugfix + image="parsl:ci", + max_mem="2048Gi" # was getting OOM-killing of workers with default... maybe this will help. + ), + + )]) From 98d76936d2520085c001a8d3e8c41ec2bea55fac Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 5 Sep 2024 12:55:48 +0000 Subject: [PATCH 31/48] fix repr in taskvine --- parsl/executors/taskvine/executor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 2e1efb211f..c31c46d615 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -108,6 +108,8 @@ def __init__(self, storage_access: Optional[List[Staging]] = None): # Set worker launch option for this executor + # This is to make repr work - otherwise it raises an attribute error + self.worker_launch_method = worker_launch_method if worker_launch_method == 'factory' or worker_launch_method == 'manual': provider = None From dfc94a800e9c3dde9b70348d2d5cdb0848015759 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 5 Sep 2024 13:08:48 +0000 Subject: [PATCH 32/48] install cloudpickle explicitly for taskvine --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index bb6d5f52d9..bb2a66d15e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -36,5 +36,5 @@ WORKDIR /cctools RUN . /venv/bin/activate && apt install swig && ./configure --prefix=/ && make && make install WORKDIR /parsl -RUN . /venv/bin/activate && pip3 install '.[kubernetes]' -r test-requirements.txt +RUN . /venv/bin/activate && pip3 install '.[kubernetes]' cloudpickle -r test-requirements.txt From 47378f33beaae0ff9542b7006650044884646c7e Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 5 Sep 2024 13:19:28 +0000 Subject: [PATCH 33/48] Add more time onto job timeout, because more is happening in job with taskvine now --- .github/workflows/ci-k8s.yaml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/ci-k8s.yaml b/.github/workflows/ci-k8s.yaml index 935ec8132d..96ffc68c80 100644 --- a/.github/workflows/ci-k8s.yaml +++ b/.github/workflows/ci-k8s.yaml @@ -42,9 +42,7 @@ jobs: - name: wait for pytest Job run: | - # this pytest should take around 30 seconds to run, so 180 seconds - # should be plenty... - kubectl wait --timeout=180s --for=condition=Complete Job pytest + kubectl wait --timeout=600s --for=condition=Complete Job pytest - name: report some info if: ${{ always() }} From 43af8efdb42032d6c026a7f043b81b8df5f6d51e Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 5 Sep 2024 13:43:32 +0000 Subject: [PATCH 34/48] revert to 180s test time --- .github/workflows/ci-k8s.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-k8s.yaml b/.github/workflows/ci-k8s.yaml index 96ffc68c80..7835ed113c 100644 --- a/.github/workflows/ci-k8s.yaml +++ b/.github/workflows/ci-k8s.yaml @@ -42,7 +42,7 @@ jobs: - name: wait for pytest Job run: | - kubectl wait --timeout=600s --for=condition=Complete Job pytest + kubectl wait --timeout=180s --for=condition=Complete Job pytest - name: report some info if: ${{ always() }} From 6a32f0ff3dff61255699fa7a3adfc806f379ff26 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 5 Sep 2024 13:44:28 +0000 Subject: [PATCH 35/48] Log more to the console, kubernetes style --- runme.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runme.sh b/runme.sh index 2d5b857747..7a9b03eb89 100644 --- a/runme.sh +++ b/runme.sh @@ -3,6 +3,6 @@ source /venv/bin/activate pytest parsl/tests/ --config ./htex_k8s_kind.py -k 'not issue3328 and not staging_required and not shared_fs' -x --random-order -PYTHONPATH=/usr/lib/python3.12/site-packages/ pytest parsl/tests/ --config ./taskvine_k8s_kind.py -k 'not issue3328 and not staging_required and not shared_fs' -x --random-order +PYTHONPATH=/usr/lib/python3.12/site-packages/ pytest parsl/tests/ --config ./taskvine_k8s_kind.py -k 'not issue3328 and not staging_required and not shared_fs' -x --random-order --log-cli-level=DEBUG From d4fab6a8d2ba58ba2c2c77904c6d2369e7697e62 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 5 Sep 2024 14:02:22 +0000 Subject: [PATCH 36/48] Note a (documentation?) bug in taskvine address selection --- parsl/executors/taskvine/manager_config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/parsl/executors/taskvine/manager_config.py b/parsl/executors/taskvine/manager_config.py index 18e58a0b90..b29d31d303 100644 --- a/parsl/executors/taskvine/manager_config.py +++ b/parsl/executors/taskvine/manager_config.py @@ -25,6 +25,7 @@ class TaskVineManagerConfig: address: Optional[str] Address of the local machine. If None, socket.gethostname() will be used to determine the address. + XXXX ^ if None, looks like get_any_address is being used and in my kubernetes setup, choosing 127.0.0.1 project_name: Optional[str] If given, TaskVine will periodically report its status and performance From 21dcae67030f120540ab4fcba5a021b5d64ed318 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 5 Sep 2024 14:03:36 +0000 Subject: [PATCH 37/48] force hostname based address config, in line with comment in previous commit --- taskvine_k8s_kind.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/taskvine_k8s_kind.py b/taskvine_k8s_kind.py index 787e9fb396..ed3e94d8e8 100644 --- a/taskvine_k8s_kind.py +++ b/taskvine_k8s_kind.py @@ -2,11 +2,12 @@ from parsl.config import Config from parsl.launchers import SimpleLauncher from parsl.providers import KubernetesProvider +from parsl.addresses import address_by_hostname from parsl.executors.taskvine import TaskVineExecutor, TaskVineManagerConfig def fresh_config(): - return Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000), + return Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(address=address_by_hostname(), port=9000), worker_launch_method='provider', provider=KubernetesProvider( worker_init=". /venv/bin/activate", From e1cce0349c373f2506cf78dc956f15177d99b144 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 5 Sep 2024 14:15:07 +0000 Subject: [PATCH 38/48] now we're starting taskvine test successfully, give it time to complete --- .github/workflows/ci-k8s.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-k8s.yaml b/.github/workflows/ci-k8s.yaml index 7835ed113c..96ffc68c80 100644 --- a/.github/workflows/ci-k8s.yaml +++ b/.github/workflows/ci-k8s.yaml @@ -42,7 +42,7 @@ jobs: - name: wait for pytest Job run: | - kubectl wait --timeout=180s --for=condition=Complete Job pytest + kubectl wait --timeout=600s --for=condition=Complete Job pytest - name: report some info if: ${{ always() }} From 4d4b4baddb48ff90403db693fdc83690733733e0 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 5 Sep 2024 15:09:38 +0000 Subject: [PATCH 39/48] Make taskvine shutdown scale-in more like htex shutdown scale-in --- parsl/executors/taskvine/executor.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index c31c46d615..ba7ac0b1b1 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -582,11 +582,18 @@ def shutdown(self, *args, **kwargs): logger.debug("TaskVine shutdown started") self._should_stop.set() - # Remove the workers that are still going - kill_ids = [self.blocks_to_job_id[block] for block in self.blocks_to_job_id.keys()] - if self.provider: - logger.debug("Cancelling blocks") - self.provider.cancel(kill_ids) + # BENC: removed this bit because the scaling code does this, + # and the kubernetes provider fails trying to scale in blocks + # that have already been deleted by the scaling code shutdown. + # This code assumes it can enumerate all blocks by using the + # blocks_to_job_id structure, but there's a not-very-strong + # principle that: you should not enumerate blocks_to_job_id + # (or the job to block map) and you should not call cancel on + # already cancelled blocks. + # kill_ids = [self.blocks_to_job_id[block] for block in self.blocks_to_job_id.keys()] + # if self.provider: + # logger.debug("Cancelling blocks") + # self.provider.cancel(kill_ids) # Join all processes before exiting logger.debug("Joining on submit process") From 2e42e5c0b52551e7e27e9f2bf57da7cb24035053 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 5 Sep 2024 15:30:26 +0000 Subject: [PATCH 40/48] enable staging_required tests in taskvine, because taskvine might be able to deal with staging those files properly (or it might be good to flush out more test problems) --- runme.sh | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/runme.sh b/runme.sh index 7a9b03eb89..e23d656a21 100644 --- a/runme.sh +++ b/runme.sh @@ -3,6 +3,4 @@ source /venv/bin/activate pytest parsl/tests/ --config ./htex_k8s_kind.py -k 'not issue3328 and not staging_required and not shared_fs' -x --random-order -PYTHONPATH=/usr/lib/python3.12/site-packages/ pytest parsl/tests/ --config ./taskvine_k8s_kind.py -k 'not issue3328 and not staging_required and not shared_fs' -x --random-order --log-cli-level=DEBUG - - +PYTHONPATH=/usr/lib/python3.12/site-packages/ pytest parsl/tests/ --config ./taskvine_k8s_kind.py -k 'not issue3328 and not shared_fs' -x --random-order --log-cli-level=DEBUG From 3ba7e12ab8fa393cd013cc7e4ca77b59184d4d7b Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 5 Sep 2024 15:50:25 +0000 Subject: [PATCH 41/48] Output timestamps in kubernetes log to help diagnose hangs --- .github/workflows/ci-k8s.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-k8s.yaml b/.github/workflows/ci-k8s.yaml index 96ffc68c80..286674e336 100644 --- a/.github/workflows/ci-k8s.yaml +++ b/.github/workflows/ci-k8s.yaml @@ -50,4 +50,4 @@ jobs: free -h kubectl describe pods kubectl describe jobs - kubectl logs Job/pytest + kubectl logs --timestamps Job/pytest From 084d79706a55b20a34f813cd646e639246ebec63 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 5 Sep 2024 16:14:00 +0000 Subject: [PATCH 42/48] failed to get non-staging tests working, made a note in comments --- runme.sh | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/runme.sh b/runme.sh index e23d656a21..c5ed61b311 100644 --- a/runme.sh +++ b/runme.sh @@ -3,4 +3,13 @@ source /venv/bin/activate pytest parsl/tests/ --config ./htex_k8s_kind.py -k 'not issue3328 and not staging_required and not shared_fs' -x --random-order -PYTHONPATH=/usr/lib/python3.12/site-packages/ pytest parsl/tests/ --config ./taskvine_k8s_kind.py -k 'not issue3328 and not shared_fs' -x --random-order --log-cli-level=DEBUG + + +# I tried letting staging_required tests run here but they do not -- a bit confused about this comment in taskvine: +# +# # Absolute paths are assumed to be in shared filesystem, and thus +# # not staged by taskvine. +# which I guess is saying something is making assumptions about the presence of a shared filesystem even when defaulting to shared_fs=False in the taskvine config? + + +PYTHONPATH=/usr/lib/python3.12/site-packages/ pytest parsl/tests/ --config ./taskvine_k8s_kind.py -k 'not issue3328 and and not staging_required and not shared_fs' -x --random-order --log-cli-level=DEBUG From f34f2b8dbdc2b7d3b11b026ee3b2eeb3563f917b Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 5 Sep 2024 16:40:19 +0000 Subject: [PATCH 43/48] correct duplicated 'and' in pytest -k option --- runme.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runme.sh b/runme.sh index c5ed61b311..309cfae18e 100644 --- a/runme.sh +++ b/runme.sh @@ -12,4 +12,4 @@ pytest parsl/tests/ --config ./htex_k8s_kind.py -k 'not issue3328 and not stagin # which I guess is saying something is making assumptions about the presence of a shared filesystem even when defaulting to shared_fs=False in the taskvine config? -PYTHONPATH=/usr/lib/python3.12/site-packages/ pytest parsl/tests/ --config ./taskvine_k8s_kind.py -k 'not issue3328 and and not staging_required and not shared_fs' -x --random-order --log-cli-level=DEBUG +PYTHONPATH=/usr/lib/python3.12/site-packages/ pytest parsl/tests/ --config ./taskvine_k8s_kind.py -k 'not issue3328 and not staging_required and not shared_fs' -x --random-order --log-cli-level=DEBUG From 1f09e5caec15dc05dc42812532519ac273d00c2f Mon Sep 17 00:00:00 2001 From: Reid Mello <30907815+rjmello@users.noreply.github.com> Date: Mon, 14 Oct 2024 18:34:49 -0400 Subject: [PATCH 44/48] Add utils to sanitize strings for DNS compliance These convert any string to a valid RFC 1123 DNS subdomain or label. --- parsl/tests/test_utils/test_sanitize_dns.py | 76 ++++++++++++++++++++ parsl/utils.py | 78 +++++++++++++++++++++ 2 files changed, 154 insertions(+) create mode 100644 parsl/tests/test_utils/test_sanitize_dns.py diff --git a/parsl/tests/test_utils/test_sanitize_dns.py b/parsl/tests/test_utils/test_sanitize_dns.py new file mode 100644 index 0000000000..17b801339c --- /dev/null +++ b/parsl/tests/test_utils/test_sanitize_dns.py @@ -0,0 +1,76 @@ +import random +import re + +import pytest + +from parsl.utils import sanitize_dns_label_rfc1123, sanitize_dns_subdomain_rfc1123 + +# Ref: https://datatracker.ietf.org/doc/html/rfc1123 +DNS_LABEL_REGEX = r'^[a-z0-9]([-a-z0-9]{0,61}[a-z0-9])?$' +DNS_SUBDOMAIN_REGEX = r'^[a-z0-9]([-a-z0-9]{0,61}[a-z0-9])?(\.[a-z0-9]([-a-z0-9]{0,61}[a-z0-9])?)*$' + +test_labels = [ + "example-label-123", # Valid label + "EXAMPLE", # Case sensitivity + "!@#example*", # Remove invalid characters + "--leading-and-trailing--", # Leading and trailing hyphens + "..leading.and.trailing..", # Leading and tailing dots + "multiple..dots", # Consecutive dots + "valid--label", # Consecutive hyphens + "a" * random.randint(64, 70), # Longer than 63 characters + f"{'a' * 62}-a", # Trailing hyphen at max length +] + + +def _generate_test_subdomains(num_subdomains: int): + subdomains = [] + for _ in range(num_subdomains): + num_labels = random.randint(1, 5) + labels = [test_labels[random.randint(0, num_labels - 1)] for _ in range(num_labels)] + subdomain = ".".join(labels) + subdomains.append(subdomain) + return subdomains + + +@pytest.mark.local +@pytest.mark.parametrize("raw_string", test_labels) +def test_sanitize_dns_label_rfc1123(raw_string: str): + print(sanitize_dns_label_rfc1123(raw_string)) + assert re.match(DNS_LABEL_REGEX, sanitize_dns_label_rfc1123(raw_string)) + + +@pytest.mark.local +@pytest.mark.parametrize("raw_string", ("", "-", "@", "$$$")) +def test_sanitize_dns_label_rfc1123_empty(raw_string: str): + with pytest.raises(ValueError) as e_info: + sanitize_dns_label_rfc1123(raw_string) + assert str(e_info.value) == f"Sanitized DNS label is empty for input '{raw_string}'" + + +@pytest.mark.local +@pytest.mark.parametrize("raw_string", _generate_test_subdomains(10)) +def test_sanitize_dns_subdomain_rfc1123(raw_string: str): + assert re.match(DNS_SUBDOMAIN_REGEX, sanitize_dns_subdomain_rfc1123(raw_string)) + + +@pytest.mark.local +@pytest.mark.parametrize("char", ("-", ".")) +def test_sanitize_dns_subdomain_rfc1123_trailing_non_alphanumeric_at_max_length(char: str): + raw_string = (f"{'a' * 61}." * 4) + f".aaaa{char}a" + assert re.match(DNS_SUBDOMAIN_REGEX, sanitize_dns_subdomain_rfc1123(raw_string)) + + +@pytest.mark.local +@pytest.mark.parametrize("raw_string", ("", ".", "...")) +def test_sanitize_dns_subdomain_rfc1123_empty(raw_string: str): + with pytest.raises(ValueError) as e_info: + sanitize_dns_subdomain_rfc1123(raw_string) + assert str(e_info.value) == f"Sanitized DNS subdomain is empty for input '{raw_string}'" + + +@pytest.mark.local +@pytest.mark.parametrize( + "raw_string", ("a" * 253, "a" * random.randint(254, 300)), ids=("254 chars", ">253 chars") +) +def test_sanitize_dns_subdomain_rfc1123_max_length(raw_string: str): + assert len(sanitize_dns_subdomain_rfc1123(raw_string)) <= 253 diff --git a/parsl/utils.py b/parsl/utils.py index 6f36d4506a..0ea5d7d9eb 100644 --- a/parsl/utils.py +++ b/parsl/utils.py @@ -1,6 +1,7 @@ import inspect import logging import os +import re import shlex import subprocess import threading @@ -380,3 +381,80 @@ def __exit__( exc_tb: Optional[TracebackType] ) -> None: self.cancel() + + +def sanitize_dns_label_rfc1123(raw_string: str) -> str: + """Convert input string to a valid RFC 1123 DNS label. + + Parameters + ---------- + raw_string : str + String to sanitize. + + Returns + ------- + str + Sanitized string. + + Raises + ------ + ValueError + If the string is empty after sanitization. + """ + # Convert to lowercase and replace non-alphanumeric characters with hyphen + sanitized = re.sub(r'[^a-z0-9]', '-', raw_string.lower()) + + # Remove consecutive hyphens + sanitized = re.sub(r'-+', '-', sanitized) + + # DNS label cannot exceed 63 characters + sanitized = sanitized[:63] + + # Strip after trimming to avoid trailing hyphens + sanitized = sanitized.strip("-") + + if not sanitized: + raise ValueError(f"Sanitized DNS label is empty for input '{raw_string}'") + + return sanitized + + +def sanitize_dns_subdomain_rfc1123(raw_string: str) -> str: + """Convert input string to a valid RFC 1123 DNS subdomain. + + Parameters + ---------- + raw_string : str + String to sanitize. + + Returns + ------- + str + Sanitized string. + + Raises + ------ + ValueError + If the string is empty after sanitization. + """ + segments = raw_string.split('.') + + sanitized_segments = [] + for segment in segments: + if not segment: + continue + sanitized_segment = sanitize_dns_label_rfc1123(segment) + sanitized_segments.append(sanitized_segment) + + sanitized = '.'.join(sanitized_segments) + + # DNS subdomain cannot exceed 253 characters + sanitized = sanitized[:253] + + # Strip after trimming to avoid trailing dots or hyphens + sanitized = sanitized.strip(".-") + + if not sanitized: + raise ValueError(f"Sanitized DNS subdomain is empty for input '{raw_string}'") + + return sanitized From 83278c22fab5adca917024fc81a89d289f3dbc5d Mon Sep 17 00:00:00 2001 From: Reid Mello <30907815+rjmello@users.noreply.github.com> Date: Mon, 14 Oct 2024 20:22:52 -0400 Subject: [PATCH 45/48] Ensure k8s pod names/labels are RFC 1123 compliant - Modified Kubernetes pod names and labels to conform to RFC 1123 for DNS subdomain names and labels, ensuring compliance with Kubernetes naming conventions. - Replaced the trailing timestamp in the job name with an eight-character hex string (job ID) to improve collision avoidance. - Replaced `app` pod label with `parsl-job-id`. - Updated container name to use job ID. --- parsl/providers/kubernetes/kube.py | 50 ++++++++++++++++-------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index 40b5b430a5..a7d5035cdd 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -1,10 +1,5 @@ import logging -import time - -from parsl.providers.kubernetes.template import template_string - -logger = logging.getLogger(__name__) - +import uuid from typing import Any, Dict, List, Optional, Tuple import typeguard @@ -12,7 +7,8 @@ from parsl.errors import OptionalModuleMissing from parsl.jobs.states import JobState, JobStatus from parsl.providers.base import ExecutionProvider -from parsl.utils import RepresentationMixin +from parsl.providers.kubernetes.template import template_string +from parsl.utils import RepresentationMixin, sanitize_dns_subdomain_rfc1123 try: from kubernetes import client, config @@ -20,6 +16,8 @@ except (ImportError, NameError, FileNotFoundError): _kubernetes_enabled = False +logger = logging.getLogger(__name__) + translate_table = { 'Running': JobState.RUNNING, 'Pending': JobState.PENDING, @@ -161,7 +159,7 @@ def __init__(self, self.resources: Dict[object, Dict[str, Any]] self.resources = {} - def submit(self, cmd_string, tasks_per_node, job_name="parsl"): + def submit(self, cmd_string: str, tasks_per_node: int, job_name: str = "parsl.kube"): """ Submit a job Args: - cmd_string :(String) - Name of the container to initiate @@ -173,15 +171,19 @@ def submit(self, cmd_string, tasks_per_node, job_name="parsl"): Returns: - job_id: (string) Identifier for the job """ + job_id = uuid.uuid4().hex[:8] - cur_timestamp = str(time.time() * 1000).split(".")[0] - job_name = "{0}-{1}".format(job_name, cur_timestamp) - - if not self.pod_name: - pod_name = '{}'.format(job_name) - else: - pod_name = '{}-{}'.format(self.pod_name, - cur_timestamp) + pod_name = self.pod_name or job_name + try: + pod_name = sanitize_dns_subdomain_rfc1123(pod_name) + except ValueError: + logger.warning( + f"Invalid pod name '{pod_name}' for job '{job_id}', falling back to 'parsl.kube'" + ) + pod_name = "parsl.kube" + pod_name = pod_name[:253 - 1 - len(job_id)] # Leave room for the job ID + pod_name = pod_name.rstrip(".-") # Remove trailing dot or hyphen after trim + pod_name = f"{pod_name}.{job_id}" formatted_cmd = template_string.format(command=cmd_string, worker_init=self.worker_init) @@ -189,7 +191,7 @@ def submit(self, cmd_string, tasks_per_node, job_name="parsl"): logger.debug("Pod name: %s", pod_name) self._create_pod(image=self.image, pod_name=pod_name, - job_name=job_name, + job_id=job_id, cmd_string=formatted_cmd, volumes=self.persistent_volumes, service_account_name=self.service_account_name, @@ -257,10 +259,10 @@ def _status(self): self.resources[jid]['status'] = JobStatus(status) def _create_pod(self, - image, - pod_name, - job_name, - port=80, + image: str, + pod_name: str, + job_id: str, + port: int = 80, cmd_string=None, volumes=[], service_account_name=None, @@ -269,7 +271,7 @@ def _create_pod(self, Args: - image (string) : Docker image to launch - pod_name (string) : Name of the pod - - job_name (string) : App label + - job_id (string) : Job ID KWargs: - port (integer) : Container port Returns: @@ -299,7 +301,7 @@ def _create_pod(self, ) # Configure Pod template container container = client.V1Container( - name=pod_name, + name=job_id, image=image, resources=resources, ports=[client.V1ContainerPort(container_port=port)], @@ -322,7 +324,7 @@ def _create_pod(self, claim_name=volume[0]))) metadata = client.V1ObjectMeta(name=pod_name, - labels={"app": job_name}, + labels={"parsl-job-id": job_id}, annotations=annotations) spec = client.V1PodSpec(containers=[container], image_pull_secrets=[secret], From 86ade32fe3f49269e51b668d54546bb510279689 Mon Sep 17 00:00:00 2001 From: Reid Mello <30907815+rjmello@users.noreply.github.com> Date: Mon, 14 Oct 2024 20:34:54 -0400 Subject: [PATCH 46/48] Use hex value for k8s job ID instead of pod name --- parsl/providers/kubernetes/kube.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index a7d5035cdd..8c3b081ad0 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -196,9 +196,9 @@ def submit(self, cmd_string: str, tasks_per_node: int, job_name: str = "parsl.ku volumes=self.persistent_volumes, service_account_name=self.service_account_name, annotations=self.annotations) - self.resources[pod_name] = {'status': JobStatus(JobState.RUNNING)} + self.resources[job_id] = {'status': JobStatus(JobState.RUNNING), 'pod_name': pod_name} - return pod_name + return job_id def status(self, job_ids): """ Get the status of a list of jobs identified by the job identifiers @@ -214,6 +214,9 @@ def status(self, job_ids): self._status() return [self.resources[jid]['status'] for jid in job_ids] + def _get_pod_name(self, job_id: str) -> str: + return self.resources[job_id]['pod_name'] + def cancel(self, job_ids): """ Cancels the jobs specified by a list of job ids Args: @@ -223,7 +226,8 @@ def cancel(self, job_ids): """ for job in job_ids: logger.debug("Terminating job/pod: {0}".format(job)) - self._delete_pod(job) + pod_name = self._get_pod_name(job) + self._delete_pod(pod_name) self.resources[job]['status'] = JobStatus(JobState.CANCELLED) rets = [True for i in job_ids] @@ -244,7 +248,8 @@ def _status(self): for jid in to_poll_job_ids: phase = None try: - pod = self.kube_client.read_namespaced_pod(name=jid, namespace=self.namespace) + pod_name = self._get_pod_name(jid) + pod = self.kube_client.read_namespaced_pod(name=pod_name, namespace=self.namespace) except Exception: logger.exception("Failed to poll pod {} status, most likely because pod was terminated".format(jid)) if self.resources[jid]['status'] is JobStatus(JobState.RUNNING): From 0c4d54194f20a86924bfae76b0cd638861cff289 Mon Sep 17 00:00:00 2001 From: Reid Mello <30907815+rjmello@users.noreply.github.com> Date: Thu, 17 Oct 2024 14:00:11 -0400 Subject: [PATCH 47/48] Add tests for KubernetesProvider submit --- Makefile | 2 +- .../test_kubernetes_provider.py | 102 ++++++++++++++++++ 2 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 parsl/tests/test_providers/test_kubernetes_provider.py diff --git a/Makefile b/Makefile index 90f20601e9..4d2f37f715 100644 --- a/Makefile +++ b/Makefile @@ -84,7 +84,7 @@ radical_local_test: .PHONY: config_local_test config_local_test: $(CCTOOLS_INSTALL) - pip3 install ".[monitoring,visualization,proxystore]" + pip3 install ".[monitoring,visualization,proxystore,kubernetes]" PYTHONPATH=/tmp/cctools/lib/python3.8/site-packages pytest parsl/tests/ -k "not cleannet" --config local --random-order --durations 10 .PHONY: site_test diff --git a/parsl/tests/test_providers/test_kubernetes_provider.py b/parsl/tests/test_providers/test_kubernetes_provider.py new file mode 100644 index 0000000000..453dc57422 --- /dev/null +++ b/parsl/tests/test_providers/test_kubernetes_provider.py @@ -0,0 +1,102 @@ +import re +from unittest import mock + +import pytest + +from parsl.providers.kubernetes.kube import KubernetesProvider +from parsl.tests.test_utils.test_sanitize_dns import DNS_SUBDOMAIN_REGEX + +_MOCK_BASE = "parsl.providers.kubernetes.kube" + + +@pytest.fixture(autouse=True) +def mock_kube_config(): + with mock.patch(f"{_MOCK_BASE}.config") as mock_config: + mock_config.load_kube_config.return_value = None + yield mock_config + + +@pytest.fixture +def mock_kube_client(): + mock_client = mock.MagicMock() + with mock.patch(f"{_MOCK_BASE}.client.CoreV1Api") as mock_api: + mock_api.return_value = mock_client + yield mock_client + + +@pytest.mark.local +def test_submit_happy_path(mock_kube_client: mock.MagicMock): + image = "test-image" + namespace = "test-namespace" + cmd_string = "test-command" + volumes = [("test-volume", "test-mount-path")] + service_account_name = "test-service-account" + annotations = {"test-annotation": "test-value"} + max_cpu = 2 + max_mem = "2Gi" + init_cpu = 1 + init_mem = "1Gi" + provider = KubernetesProvider( + image=image, + persistent_volumes=volumes, + namespace=namespace, + service_account_name=service_account_name, + annotations=annotations, + max_cpu=max_cpu, + max_mem=max_mem, + init_cpu=init_cpu, + init_mem=init_mem, + ) + + job_name = "test.job.name" + job_id = provider.submit(cmd_string=cmd_string, tasks_per_node=1, job_name=job_name) + + assert job_id in provider.resources + assert mock_kube_client.create_namespaced_pod.call_count == 1 + + call_args = mock_kube_client.create_namespaced_pod.call_args[1] + pod = call_args["body"] + container = pod.spec.containers[0] + volume = container.volume_mounts[0] + + assert image == container.image + assert namespace == call_args["namespace"] + assert any(cmd_string in arg for arg in container.args) + assert volumes[0] == (volume.name, volume.mount_path) + assert service_account_name == pod.spec.service_account_name + assert annotations == pod.metadata.annotations + assert str(max_cpu) == container.resources.limits["cpu"] + assert max_mem == container.resources.limits["memory"] + assert str(init_cpu) == container.resources.requests["cpu"] + assert init_mem == container.resources.requests["memory"] + assert job_id == pod.metadata.labels["parsl-job-id"] + assert job_id == container.name + assert f"{job_name}.{job_id}" == pod.metadata.name + + +@pytest.mark.local +@mock.patch(f"{_MOCK_BASE}.KubernetesProvider._create_pod") +@pytest.mark.parametrize("char", (".", "-")) +def test_submit_pod_name_includes_job_id(mock_create_pod: mock.MagicMock, char: str): + provider = KubernetesProvider(image="test-image") + + job_name = "a." * 121 + f"a{char}" + "a" * 9 + assert len(job_name) == 253 # Max length for pod name + job_id = provider.submit(cmd_string="test-command", tasks_per_node=1, job_name=job_name) + + expected_pod_name = job_name[:253 - len(job_id) - 2] + f".{job_id}" + actual_pod_name = mock_create_pod.call_args[1]["pod_name"] + assert re.match(DNS_SUBDOMAIN_REGEX, actual_pod_name) + assert expected_pod_name == actual_pod_name + + +@pytest.mark.local +@mock.patch(f"{_MOCK_BASE}.KubernetesProvider._create_pod") +@mock.patch(f"{_MOCK_BASE}.logger") +@pytest.mark.parametrize("job_name", ("", ".", "-", "a.-.a", "$$$")) +def test_submit_invalid_job_name(mock_logger: mock.MagicMock, mock_create_pod: mock.MagicMock, job_name: str): + provider = KubernetesProvider(image="test-image") + job_id = provider.submit(cmd_string="test-command", tasks_per_node=1, job_name=job_name) + assert mock_logger.warning.call_count == 1 + assert f"Invalid pod name '{job_name}' for job '{job_id}'" in mock_logger.warning.call_args[0][0] + assert f"parsl.kube.{job_id}" == mock_create_pod.call_args[1]["pod_name"] From c78defab7afefe10889c5b24f19983e4d8ac2e6b Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 21 Oct 2024 12:35:35 +0000 Subject: [PATCH 48/48] Fix some bad merge --- parsl/providers/kubernetes/kube.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index fd9fe64f00..689af7d952 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -185,8 +185,6 @@ def submit(self, cmd_string: str, tasks_per_node: int, job_name: str = "parsl.ku pod_name = pod_name.rstrip(".-") # Remove trailing dot or hyphen after trim pod_name = f"{pod_name}.{job_id}" - pod_name = _sanitizeDNS1123(pod_name) - formatted_cmd = template_string.format(command=cmd_string, worker_init=self.worker_init) @@ -359,25 +357,3 @@ def label(self): @property def status_polling_interval(self): return 60 - - -# this is based on: -# https://github.com/kubernetes/apimachinery/blob/703232ea6da48aed7ac22260dabc6eac01aab896/pkg/util/validation/validation.go#L177C32-L177C62 -DNS_LABEL_REGEXP = "^[a-z0-9]([-a-z0-9]*[a-z0-9])?$" - - -def _sanitizeDNS1123(raw: str) -> str: - """Rewrite input string to be a valid RFC1123 DNS label. - This is required for Kubernetes pod names. - """ - - # label must be lowercase - raw = raw.lower() - - # label can only contain [-a-z0-9] characters - replace everything - # else with - - raw = re.sub("[^-a-z0-9]", "-", raw) - - # TODO: sanitize against first and last symbols (no - at start or end?) - assert re.match(DNS_LABEL_REGEXP, raw), "sanitized DNS1123 label has not been properly sanitized: " + raw - return raw