From c9888119ac05ce7b300efb74015e2d1880c90e29 Mon Sep 17 00:00:00 2001 From: Mustafa Ilyas Date: Wed, 13 Nov 2024 14:05:09 +0000 Subject: [PATCH 01/11] Setting available capacity for cordoned clusters to 0 in scheduler metrics (#279) (#4044) * Setting available capacity for cordoned clusters to 0 in scheduler metrics * Setting default values for keys under schedulableNodeCountByCluster in scheduler metrics * Removing dead code path Co-authored-by: Mustafa Ilyas --- internal/scheduler/metrics.go | 69 ++++++++++++++++++------------ internal/scheduler/metrics_test.go | 45 +++++++++++++------ 2 files changed, 74 insertions(+), 40 deletions(-) diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go index 43d9bbe21b2..0c0e7e7752d 100644 --- a/internal/scheduler/metrics.go +++ b/internal/scheduler/metrics.go @@ -288,12 +288,32 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p } txn := c.jobDb.ReadTxn() + + for _, executorSetting := range executorSettings { + if executorSetting.Cordoned { + // We may have settings for executors that don't exist in the repository. + cordonedStatusByCluster[executorSetting.ExecutorId] = &clusterCordonedStatus{ + status: 1.0, + reason: executorSetting.CordonReason, + setByUser: executorSetting.SetByUser, + } + } else { + cordonedStatusByCluster[executorSetting.ExecutorId] = &clusterCordonedStatus{ + status: 0.0, + reason: executorSetting.CordonReason, + setByUser: executorSetting.SetByUser, + } + } + } + for _, executor := range executors { - // We may not have executorSettings for all known executors, but we still want a cordon status metric for them. - cordonedStatusByCluster[executor.Id] = &clusterCordonedStatus{ - status: 0.0, - reason: "", - setByUser: "", + if _, statusExists := cordonedStatusByCluster[executor.Id]; !statusExists { + // We may not have executorSettings for all known executors, but we still want a cordon status metric for them. + cordonedStatusByCluster[executor.Id] = &clusterCordonedStatus{ + status: 0.0, + reason: "", + setByUser: "", + } } for _, node := range executor.Nodes { nodePool := node.GetPool() @@ -305,6 +325,10 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p nodeType: node.ReportingNodeType, } + if _, ok := schedulableNodeCountByCluster[clusterKey]; !ok { + schedulableNodeCountByCluster[clusterKey] = 0 + } + awayClusterKeys := make([]clusterMetricKey, 0, len(awayPools)) for _, ap := range awayPools { awayClusterKeys = append(awayClusterKeys, clusterMetricKey{ @@ -314,14 +338,20 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p }) } - if !node.Unschedulable { - addToResourceListMap(availableResourceByCluster, clusterKey, node.AvailableArmadaResource()) + nodeResources := node.AvailableArmadaResource() + + if !node.Unschedulable && cordonedStatusByCluster[executor.Id].status != 1.0 { schedulableNodeCountByCluster[clusterKey]++ + } else { + // We still want to publish these metrics, just with zeroed values + nodeResources.Zero() + } - // Add available resources to the away cluster pool - for _, awayClusterKey := range awayClusterKeys { - addToResourceListMap(availableResourceByCluster, awayClusterKey, node.AvailableArmadaResource()) - } + addToResourceListMap(availableResourceByCluster, clusterKey, nodeResources) + + // Add available resources to the away cluster pool + for _, awayClusterKey := range awayClusterKeys { + addToResourceListMap(availableResourceByCluster, awayClusterKey, nodeResources) } totalNodeCountByCluster[clusterKey]++ @@ -384,23 +414,6 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p } } - for _, executorSetting := range executorSettings { - if executorSetting.Cordoned { - if cordonedValue, ok := cordonedStatusByCluster[executorSetting.ExecutorId]; ok { - cordonedValue.status = 1.0 - cordonedValue.reason = executorSetting.CordonReason - cordonedValue.setByUser = executorSetting.SetByUser - } else { - // We may have settings for executors that don't exist in the repository. - cordonedStatusByCluster[executorSetting.ExecutorId] = &clusterCordonedStatus{ - status: 1.0, - reason: executorSetting.CordonReason, - setByUser: executorSetting.SetByUser, - } - } - } - } - for _, pool := range c.floatingResourceTypes.AllPools() { totalFloatingResources := c.floatingResourceTypes.GetTotalAvailableForPool(pool) clusterKey := clusterMetricKey{ diff --git a/internal/scheduler/metrics_test.go b/internal/scheduler/metrics_test.go index d576713618f..772101890cc 100644 --- a/internal/scheduler/metrics_test.go +++ b/internal/scheduler/metrics_test.go @@ -308,9 +308,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) { jobDbJobs: []*jobdb.Job{}, executors: []*schedulerobjects.Executor{executor}, expected: []prometheus.Metric{ - commonmetrics.NewClusterAvailableCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), - commonmetrics.NewClusterAvailableCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"), - commonmetrics.NewClusterAvailableCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"), + commonmetrics.NewClusterAvailableCapacity(0.0, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), + commonmetrics.NewClusterAvailableCapacity(0.0, "cluster-1", testfixtures.TestPool, "memory", "type-1"), + commonmetrics.NewClusterAvailableCapacity(0.0, "cluster-1", testfixtures.TestPool, "nodes", "type-1"), commonmetrics.NewClusterTotalCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), commonmetrics.NewClusterTotalCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"), commonmetrics.NewClusterTotalCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"), @@ -387,17 +387,19 @@ func TestMetricsCollector_TestCollect_ClusterMetricsAvailableCapacity(t *testing node.StateByJobRunId[job.LatestRun().Id()] = schedulerobjects.JobRunState_RUNNING tests := map[string]struct { - poolConfig []configuration.PoolConfig - runningJobs []*jobdb.Job - nodes []*schedulerobjects.Node - expected []prometheus.Metric + poolConfig []configuration.PoolConfig + runningJobs []*jobdb.Job + nodes []*schedulerobjects.Node + executorSettings []*schedulerobjects.ExecutorSettings + expected []prometheus.Metric }{ "No away pools": { poolConfig: []configuration.PoolConfig{ {Name: testfixtures.TestPool}, }, - runningJobs: []*jobdb.Job{job}, - nodes: []*schedulerobjects.Node{node}, + runningJobs: []*jobdb.Job{job}, + nodes: []*schedulerobjects.Node{node}, + executorSettings: []*schedulerobjects.ExecutorSettings{}, expected: []prometheus.Metric{ commonmetrics.NewClusterAvailableCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), @@ -413,8 +415,9 @@ func TestMetricsCollector_TestCollect_ClusterMetricsAvailableCapacity(t *testing AwayPools: []string{testfixtures.TestPool}, }, }, - runningJobs: []*jobdb.Job{job}, - nodes: []*schedulerobjects.Node{node}, + runningJobs: []*jobdb.Job{job}, + nodes: []*schedulerobjects.Node{node}, + executorSettings: []*schedulerobjects.ExecutorSettings{}, expected: []prometheus.Metric{ commonmetrics.NewClusterAvailableCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), @@ -422,6 +425,24 @@ func TestMetricsCollector_TestCollect_ClusterMetricsAvailableCapacity(t *testing commonmetrics.NewClusterTotalCapacity(31, "cluster-1", testfixtures.TestPool2, "cpu", "type-1"), }, }, + "Cordoned cluster": { + poolConfig: []configuration.PoolConfig{ + {Name: testfixtures.TestPool}, + }, + runningJobs: []*jobdb.Job{job}, + nodes: []*schedulerobjects.Node{node}, + executorSettings: []*schedulerobjects.ExecutorSettings{ + { + ExecutorId: "cluster-1", + Cordoned: true, + CordonReason: "bad executor", + }, + }, + expected: []prometheus.Metric{ + commonmetrics.NewClusterAvailableCapacity(0, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), + commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), + }, + }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { @@ -446,7 +467,7 @@ func TestMetricsCollector_TestCollect_ClusterMetricsAvailableCapacity(t *testing executorRepository := schedulermocks.NewMockExecutorRepository(ctrl) executorRepository.EXPECT().GetExecutors(ctx).Return(executors, nil) - executorRepository.EXPECT().GetExecutorSettings(ctx).Return([]*schedulerobjects.ExecutorSettings{}, nil) + executorRepository.EXPECT().GetExecutorSettings(ctx).Return(tc.executorSettings, nil) collector := NewMetricsCollector( jobDb, From bafbbda23f91fe2fb7c4428ebf6e02c15a5da051 Mon Sep 17 00:00:00 2001 From: Martynas Asipauskas Date: Thu, 14 Nov 2024 11:29:22 +0000 Subject: [PATCH 02/11] Auto-generate GRPC client bindings as part of pip install (#4041) --- .../python-client-release-to-pypi.yml | 4 +- .github/workflows/python-client.yml | 7 +- .github/workflows/python-tests/action.yml | 4 +- .gitignore | 1 + build/python-client/Dockerfile | 4 +- .../python/armada_client/gen/event_typings.py | 27 ++-- client/python/armada_client/proto/.gitkeep | 0 client/python/docs/source/conf.py | 2 - client/python/pyproject.toml | 12 +- client/python/setup.py | 124 ++++++++++++++++++ client/python/tox.ini | 7 +- scripts/build-python-client.sh | 35 +---- 12 files changed, 158 insertions(+), 69 deletions(-) create mode 100644 client/python/armada_client/proto/.gitkeep create mode 100644 client/python/setup.py diff --git a/.github/workflows/python-client-release-to-pypi.yml b/.github/workflows/python-client-release-to-pypi.yml index 33866ffd5c9..7c4877b2f6c 100644 --- a/.github/workflows/python-client-release-to-pypi.yml +++ b/.github/workflows/python-client-release-to-pypi.yml @@ -19,8 +19,8 @@ jobs: repo-token: ${{ secrets.GITHUB_TOKEN }} - uses: ./.github/workflows/python-tests with: - python-version: '3.8' - tox-env: 'py38' + python-version: '3.9' + tox-env: 'py39' path: 'client/python' github-token: ${{secrets.GITHUB_TOKEN}} - name: Publish package to PyPI diff --git a/.github/workflows/python-client.yml b/.github/workflows/python-client.yml index 8541394a811..a9f5cfd9155 100644 --- a/.github/workflows/python-client.yml +++ b/.github/workflows/python-client.yml @@ -34,13 +34,16 @@ jobs: runs-on: ubuntu-22.04 strategy: matrix: - python: [ '3.8', '3.9', '3.10' ] + python: [ '3.9', '3.10', '3.11', '3.12' ] include: - - tox-env: 'py38' - tox-env: 'py39' python: '3.9' - tox-env: 'py310' python: '3.10' + - tox-env: 'py311' + python: '3.11' + - tox-env: 'py312' + python: '3.12' steps: - uses: actions/checkout@v4 - name: Setup Go diff --git a/.github/workflows/python-tests/action.yml b/.github/workflows/python-tests/action.yml index 3f261dd5675..5d2b34a57a2 100644 --- a/.github/workflows/python-tests/action.yml +++ b/.github/workflows/python-tests/action.yml @@ -21,7 +21,7 @@ runs: with: python-version: ${{ inputs.python-version }} # Tox to run tests; build to build the wheel after tests pass - - run: pip install tox==3.27.1 build twine + - run: pip install tox==4.17.0 build twine setuptools shell: bash - name: Install Protoc uses: arduino/setup-protoc@v3 @@ -45,7 +45,7 @@ runs: working-directory: ${{ inputs.path }} - name: Build and verify wheel run: | - python -m build --wheel + python -m build --sdist twine check dist/* shell: bash working-directory: ${{ inputs.path }} diff --git a/.gitignore b/.gitignore index 73c5e2d5452..00c2b8adc71 100644 --- a/.gitignore +++ b/.gitignore @@ -77,6 +77,7 @@ client/python/dist *_pb2.py *_pb2.pyi *_pb2_grpc.py +client/python/armada_client/proto/ client/python/armada_client/armada/ .tox proto-airflow diff --git a/build/python-client/Dockerfile b/build/python-client/Dockerfile index 10aa957944b..719bf9abf41 100644 --- a/build/python-client/Dockerfile +++ b/build/python-client/Dockerfile @@ -1,5 +1,5 @@ ARG PLATFORM=x86_64 -ARG BASE_IMAGE=python:3.8.18-bookworm +ARG BASE_IMAGE=python:3.9.20-bookworm FROM --platform=$PLATFORM ${BASE_IMAGE} @@ -7,7 +7,7 @@ RUN mkdir /proto COPY client/python/pyproject.toml /code/pyproject.toml -RUN pip install "/code[test]" +RUN pip install setuptools "/code[test]" # Creating folders, and files for a project: COPY client/python /code diff --git a/client/python/armada_client/gen/event_typings.py b/client/python/armada_client/gen/event_typings.py index cf2692143f6..ef2ff45201c 100644 --- a/client/python/armada_client/gen/event_typings.py +++ b/client/python/armada_client/gen/event_typings.py @@ -1,3 +1,5 @@ +import argparse +from pathlib import Path import sys from armada_client.armada import event_pb2, submit_pb2 @@ -63,15 +65,7 @@ def gen_file(states, classes, jobstates): return import_text, states_text, union_text, jobstates_text -def write_file(import_text, states_text, union_text, jobstates_text, file): - with open(f"{file}", "w", encoding="utf-8") as f: - f.write(import_text) - f.write(states_text) - f.write(jobstates_text) - f.write(union_text) - - -def main(): +def main(typings_file: Path): states = get_event_states() print("Done creating EventStates") @@ -84,13 +78,16 @@ def main(): import_text, states_text, union_text, jobstates_text = gen_file( states, classes, jobstates ) - write_file(import_text, states_text, union_text, jobstates_text, typings_file) + typings_file.write_text(import_text + states_text + jobstates_text + union_text) if __name__ == "__main__": - # get path to this files location - root = f"{sys.path[0]}/../../" - typings_file = f"{root}/armada_client/typings.py" - - main() + parser = argparse.ArgumentParser() + parser.add_argument("typings_file", type=Path, help="Path to typings file") + + args = parser.parse_args() + print(f"{args}") + typings_file = args.typings_file or Path("armada_client") / "typings.py" + print(f"{typings_file}") + main(typings_file) sys.exit(0) diff --git a/client/python/armada_client/proto/.gitkeep b/client/python/armada_client/proto/.gitkeep new file mode 100644 index 00000000000..e69de29bb2d diff --git a/client/python/docs/source/conf.py b/client/python/docs/source/conf.py index cffef73a2e1..294e0926f11 100644 --- a/client/python/docs/source/conf.py +++ b/client/python/docs/source/conf.py @@ -13,8 +13,6 @@ import os import sys -sys.path.insert(0, os.path.abspath("../..")) - # -- Project information ----------------------------------------------------- diff --git a/client/python/pyproject.toml b/client/python/pyproject.toml index 635a4bea261..692728b36b6 100644 --- a/client/python/pyproject.toml +++ b/client/python/pyproject.toml @@ -1,10 +1,10 @@ [project] name = "armada_client" -version = "0.3.5" +version = "0.4.5" description = "Armada gRPC API python client" readme = "README.md" -requires-python = ">=3.7" -dependencies = ["grpcio==1.66.1", "grpcio-tools==1.66.1", "mypy-protobuf>=3.2.0", "protobuf>=5.26.1,<6.0dev" ] +requires-python = ">=3.9" +dependencies = ["grpcio-tools", "protobuf>3.20,<5.0"] license = { text = "Apache Software License" } authors = [{ name = "G-Research Open Source Software", email = "armada@armadaproject.io" }] @@ -12,10 +12,10 @@ authors = [{ name = "G-Research Open Source Software", email = "armada@armadapro format = ["black==23.7.0", "flake8==7.0.0", "pylint==2.17.5"] # note(JayF): sphinx-jekyll-builder was broken by sphinx-markdown-builder 0.6 -- so pin to 0.5.5 docs = ["sphinx==7.1.2", "sphinx-jekyll-builder==0.3.0", "sphinx-toolbox==3.2.0b1", "sphinx-markdown-builder==0.5.5"] -test = ["pytest==7.3.1", "coverage>=6.5.0", "pytest-asyncio==0.21.1"] +test = ["pytest==7.3.1", "pytest-cov", "pytest-asyncio==0.21.1"] [build-system] -requires = ["setuptools"] +requires = ["setuptools", "wheel", "grpcio-tools", "mypy-protobuf", "protobuf>3.20,<5.0"] build-backend = "setuptools.build_meta" [tool.mypy] @@ -39,4 +39,4 @@ omit = [ # py.typed is required for mypy to find type hints in the package # from: https://mypy.readthedocs.io/en/stable/installed_packages.html#making-pep-561-compatible-packages [tool.setuptools.package-data] -"*" = ["*.pyi", "py.typed"] +"*" = ["*.pyi", "py.typed", "proto/**/*.proto"] diff --git a/client/python/setup.py b/client/python/setup.py new file mode 100644 index 00000000000..9bdbb3e82bc --- /dev/null +++ b/client/python/setup.py @@ -0,0 +1,124 @@ +import os +from pathlib import Path +import shutil +import subprocess +import sys +from typing import Dict +from setuptools import setup +import importlib.resources +import re +from setuptools.command.build_py import build_py + + +def generate_grpc_bindings(build_lib: Path): + import grpc_tools.protoc + + proto_include = importlib.resources.path("grpc_tools", "_proto") + proto_files = [ + "google/api/annotations.proto", + "google/api/http.proto", + "github.com/gogo/protobuf/gogoproto/gogo.proto", + "k8s.io/api/core/v1/generated.proto", + "k8s.io/apimachinery/pkg/api/resource/generated.proto", + "k8s.io/apimachinery/pkg/apis/meta/v1/generated.proto", + "k8s.io/apimachinery/pkg/runtime/generated.proto", + "k8s.io/apimachinery/pkg/runtime/schema/generated.proto", + "k8s.io/apimachinery/pkg/util/intstr/generated.proto", + "k8s.io/api/networking/v1/generated.proto", + "armada/event.proto", + "armada/submit.proto", + "armada/health.proto", + "armada/job.proto", + "armada/binoculars.proto", + ] + target_root = build_lib.absolute() / "armada_client" + + for proto_file in proto_files: + command = [ + f"-I{proto_include}", + f"-I{target_root / 'proto'}", + f"--python_out={target_root}", + f"--grpc_python_out={target_root}", + f"--mypy_out={target_root}", + str(target_root / "proto" / proto_file), + ] + if grpc_tools.protoc.main(command) != 0: + raise Exception(f"grpc_tools.protoc.main: {command} failed") + + shutil.rmtree(target_root / "github.com") + shutil.rmtree(target_root / "k8s.io") + + adjust_import_paths(target_root) + + +def adjust_import_paths(output_dir: Path): + replacements = { + r"from armada": "from armada_client.armada", + r"from github.com": "from armada_client.github.com", + r"from google.api": "from armada_client.google.api", + } + + for file in output_dir.glob("armada/*.py"): + replace_in_file(file, replacements) + for file in output_dir.glob("google/api/*.py"): + replace_in_file(file, replacements) + + replacements = { + r"from k8s.io": "from armada_client.k8s.io", + } + for file in output_dir.glob("../**/*.py"): + replace_in_file(file, replacements) + + replacements = { + r" k8s": " armada_client.k8s", + r"\[k8s": "[armada_client.k8s", + r"import k8s.io": "import armada_client.k8s.io", + } + for file in output_dir.glob("k8s/**/*.pyi"): + replace_in_file(file, replacements) + + +def replace_in_file(file: Path, replacements: Dict[str, str]): + """Replace patterns in a file based on the replacements dictionary.""" + + content = file.read_text() + for pattern, replacement in replacements.items(): + content = re.sub(pattern, replacement, content) + file.write_text(content) + + +def generate_typings(build_dir: Path): + typings = build_dir.absolute() / "armada_client" / "typings.py" + result = subprocess.run( + args=[ + sys.executable, + str(build_dir.absolute() / "armada_client" / "gen" / "event_typings.py"), + str(typings), + ], + env={"PYTHONPATH": str(build_dir.absolute())}, + capture_output=True, + ) + if result.returncode != 0: + print(result.stdout) + print(result.stderr) + result.check_returncode() + + +class BuildPackageProtos(build_py): + """ + Generate GRPC code before building the package. + """ + + def run(self): + super().run() + output_dir = Path(".") if self.editable_mode else Path(self.build_lib) + generate_grpc_bindings(output_dir) + generate_typings(output_dir) + + +setup( + cmdclass={ + "build_py": BuildPackageProtos, + "develop": BuildPackageProtos, + }, +) diff --git a/client/python/tox.ini b/client/python/tox.ini index 5eaaa29ceec..4e31d820a24 100644 --- a/client/python/tox.ini +++ b/client/python/tox.ini @@ -1,16 +1,15 @@ [tox] -isolated_build = true envlist = format - py38 py39 py310 + py311 + py312 [testenv] extras = test commands = - coverage run -m pytest tests/unit/ - coverage xml + pytest --cov={envsitepackagesdir}/armada_client --cov-report=xml --cov-report=term tests/unit/ [testenv:docs] extras = docs diff --git a/scripts/build-python-client.sh b/scripts/build-python-client.sh index 5fd23818146..7004989e596 100755 --- a/scripts/build-python-client.sh +++ b/scripts/build-python-client.sh @@ -5,37 +5,4 @@ mkdir -p proto/armada cp pkg/api/event.proto pkg/api/submit.proto pkg/api/health.proto pkg/api/job.proto pkg/api/binoculars/binoculars.proto proto/armada sed -i 's/\([^\/]\)pkg\/api/\1armada/g' proto/armada/*.proto - -# generate python stubs -cd proto -python3 -m grpc_tools.protoc -I. --plugin=protoc-gen-mypy=$(which protoc-gen-mypy) --python_out=../client/python/armada_client --grpc_python_out=../client/python/armada_client --mypy_out=../client/python/armada_client \ - google/api/annotations.proto \ - google/api/http.proto \ - armada/event.proto armada/submit.proto armada/health.proto armada/job.proto armada/binoculars.proto \ - github.com/gogo/protobuf/gogoproto/gogo.proto \ - k8s.io/api/core/v1/generated.proto \ - k8s.io/apimachinery/pkg/api/resource/generated.proto \ - k8s.io/apimachinery/pkg/apis/meta/v1/generated.proto \ - k8s.io/apimachinery/pkg/runtime/generated.proto \ - k8s.io/apimachinery/pkg/runtime/schema/generated.proto \ - k8s.io/apimachinery/pkg/util/intstr/generated.proto \ - k8s.io/api/networking/v1/generated.proto - -cd .. -# This hideous code is because we can't use python package option in grpc. -# See https://github.com/protocolbuffers/protobuf/issues/7061 for an explanation. -# We need to import these packages as a module. -sed -i 's/from armada/from armada_client.armada/g' client/python/armada_client/armada/*.py -sed -i 's/from github.com/from armada_client.github.com/g' client/python/armada_client/armada/*.py -sed -i 's/from google.api/from armada_client.google.api/g' client/python/armada_client/armada/*.py -sed -i 's/from google.api/from armada_client.google.api/g' client/python/armada_client/google/api/*.py - -find client/python/armada_client/ -name '*.py' | xargs sed -i 's/from k8s.io/from armada_client.k8s.io/g' - -# Generate better docs for the client -export PYTHONPATH=${PWD}/client/python -python3 ${PWD}/client/python/armada_client/gen/event_typings.py - -find client/python/armada_client/k8s -name '*.pyi' | xargs sed -i 's/ k8s/ armada_client.k8s/g' -find client/python/armada_client/k8s -name '*.pyi' | xargs sed -i 's/\[k8s/\[armada_client.k8s/g' -find client/python/armada_client/k8s/io -name '*.pyi' | xargs sed -i 's/import k8s.io/import armada_client.k8s.io/g' +cp -rf proto/* client/python/armada_client/proto/ From a53cf7f2b23fd4d646bb1b222ab6288743e56ea0 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Thu, 14 Nov 2024 12:57:36 +0000 Subject: [PATCH 03/11] Don't Update Terminal Jobs On Cancel/Reprioritise (#4043) Signed-off-by: Chris Martin --- internal/scheduler/database/query.sql.go | 10 +++++----- internal/scheduler/database/query/query.sql | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/scheduler/database/query.sql.go b/internal/scheduler/database/query.sql.go index 347a14e0887..cfb1fb84a40 100644 --- a/internal/scheduler/database/query.sql.go +++ b/internal/scheduler/database/query.sql.go @@ -100,7 +100,7 @@ func (q *Queries) MarkJobRunsFailedById(ctx context.Context, runIds []string) er } const markJobRunsPreemptRequestedByJobId = `-- name: MarkJobRunsPreemptRequestedByJobId :exec -UPDATE runs SET preempt_requested = true WHERE queue = $1 and job_set = $2 and job_id = ANY($3::text[]) +UPDATE runs SET preempt_requested = true WHERE queue = $1 and job_set = $2 and job_id = ANY($3::text[]) and cancelled = false and succeeded = false and failed = false ` type MarkJobRunsPreemptRequestedByJobIdParams struct { @@ -142,7 +142,7 @@ func (q *Queries) MarkJobRunsSucceededById(ctx context.Context, runIds []string) } const markJobsCancelRequestedById = `-- name: MarkJobsCancelRequestedById :exec -UPDATE jobs SET cancel_requested = true WHERE queue = $1 and job_set = $2 and job_id = ANY($3::text[]) +UPDATE jobs SET cancel_requested = true WHERE queue = $1 and job_set = $2 and job_id = ANY($3::text[]) and cancelled = false and succeeded = false and failed = false ` type MarkJobsCancelRequestedByIdParams struct { @@ -157,7 +157,7 @@ func (q *Queries) MarkJobsCancelRequestedById(ctx context.Context, arg MarkJobsC } const markJobsCancelRequestedBySetAndQueuedState = `-- name: MarkJobsCancelRequestedBySetAndQueuedState :exec -UPDATE jobs SET cancel_by_jobset_requested = true WHERE job_set = $1 and queue = $2 and queued = ANY($3::bool[]) +UPDATE jobs SET cancel_by_jobset_requested = true WHERE job_set = $1 and queue = $2 and queued = ANY($3::bool[]) and cancelled = false and succeeded = false and failed = false ` type MarkJobsCancelRequestedBySetAndQueuedStateParams struct { @@ -1169,7 +1169,7 @@ func (q *Queries) SetTerminatedTime(ctx context.Context, arg SetTerminatedTimePa } const updateJobPriorityById = `-- name: UpdateJobPriorityById :exec -UPDATE jobs SET priority = $1 WHERE queue = $2 and job_set = $3 and job_id = ANY($4::text[]) +UPDATE jobs SET priority = $1 WHERE queue = $2 and job_set = $3 and job_id = ANY($4::text[]) and cancelled = false and succeeded = false and failed = false ` type UpdateJobPriorityByIdParams struct { @@ -1190,7 +1190,7 @@ func (q *Queries) UpdateJobPriorityById(ctx context.Context, arg UpdateJobPriori } const updateJobPriorityByJobSet = `-- name: UpdateJobPriorityByJobSet :exec -UPDATE jobs SET priority = $1 WHERE job_set = $2 and queue = $3 +UPDATE jobs SET priority = $1 WHERE job_set = $2 and queue = $3 and cancelled = false and succeeded = false and failed = false ` type UpdateJobPriorityByJobSetParams struct { diff --git a/internal/scheduler/database/query/query.sql b/internal/scheduler/database/query/query.sql index 0c93b068f47..ad4da1a55f2 100644 --- a/internal/scheduler/database/query/query.sql +++ b/internal/scheduler/database/query/query.sql @@ -11,16 +11,16 @@ SELECT job_id, job_set, queue, priority, submitted, queued, queued_version, vali SELECT job_id, job_set, queue, priority, submitted, queued, queued_version, validated, cancel_requested, cancel_by_jobset_requested, cancelled, succeeded, failed, scheduling_info, scheduling_info_version, pools, serial FROM jobs WHERE serial > $1 ORDER BY serial LIMIT $2; -- name: UpdateJobPriorityByJobSet :exec -UPDATE jobs SET priority = $1 WHERE job_set = $2 and queue = $3; +UPDATE jobs SET priority = $1 WHERE job_set = $2 and queue = $3 and cancelled = false and succeeded = false and failed = false; -- name: MarkJobsCancelRequestedBySetAndQueuedState :exec -UPDATE jobs SET cancel_by_jobset_requested = true WHERE job_set = sqlc.arg(job_set) and queue = sqlc.arg(queue) and queued = ANY(sqlc.arg(queued_states)::bool[]); +UPDATE jobs SET cancel_by_jobset_requested = true WHERE job_set = sqlc.arg(job_set) and queue = sqlc.arg(queue) and queued = ANY(sqlc.arg(queued_states)::bool[]) and cancelled = false and succeeded = false and failed = false; -- name: MarkJobsSucceededById :exec UPDATE jobs SET succeeded = true WHERE job_id = ANY(sqlc.arg(job_ids)::text[]); -- name: MarkJobsCancelRequestedById :exec -UPDATE jobs SET cancel_requested = true WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]); +UPDATE jobs SET cancel_requested = true WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]) and cancelled = false and succeeded = false and failed = false; -- name: MarkJobsCancelledById :exec UPDATE jobs SET cancelled = true WHERE job_id = ANY(sqlc.arg(job_ids)::text[]); @@ -29,7 +29,7 @@ UPDATE jobs SET cancelled = true WHERE job_id = ANY(sqlc.arg(job_ids)::text[]); UPDATE jobs SET failed = true WHERE job_id = ANY(sqlc.arg(job_ids)::text[]); -- name: UpdateJobPriorityById :exec -UPDATE jobs SET priority = $1 WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]); +UPDATE jobs SET priority = $1 WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]) and cancelled = false and succeeded = false and failed = false; -- name: SelectInitialRuns :many SELECT * FROM runs WHERE serial > $1 AND job_id = ANY(sqlc.arg(job_ids)::text[]) ORDER BY serial LIMIT $2; @@ -44,7 +44,7 @@ SELECT run_id FROM runs; SELECT * FROM runs WHERE serial > $1 AND job_id = ANY(sqlc.arg(job_ids)::text[]) ORDER BY serial; -- name: MarkJobRunsPreemptRequestedByJobId :exec -UPDATE runs SET preempt_requested = true WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]); +UPDATE runs SET preempt_requested = true WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]) and cancelled = false and succeeded = false and failed = false; -- name: MarkJobRunsSucceededById :exec UPDATE runs SET succeeded = true WHERE run_id = ANY(sqlc.arg(run_ids)::text[]); From 5a1cc780b18835da56d1835640b313c225812091 Mon Sep 17 00:00:00 2001 From: Martynas Asipauskas Date: Thu, 14 Nov 2024 13:19:56 +0000 Subject: [PATCH 04/11] Build package in docker before running integration tests (#4046) --- client/python/pyproject.toml | 1 + magefiles/tests.go | 6 ++---- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/client/python/pyproject.toml b/client/python/pyproject.toml index 692728b36b6..b6a89437820 100644 --- a/client/python/pyproject.toml +++ b/client/python/pyproject.toml @@ -40,3 +40,4 @@ omit = [ # from: https://mypy.readthedocs.io/en/stable/installed_packages.html#making-pep-561-compatible-packages [tool.setuptools.package-data] "*" = ["*.pyi", "py.typed", "proto/**/*.proto"] + diff --git a/magefiles/tests.go b/magefiles/tests.go index e4460c8eee0..78653eab0dc 100644 --- a/magefiles/tests.go +++ b/magefiles/tests.go @@ -169,12 +169,10 @@ func Teste2epython() error { "--workdir", "/code", "-e", "ARMADA_SERVER=server", "-e", "ARMADA_PORT=50051", - "--entrypoint", "python3", "--network", "kind", "armada-python-client-builder:latest", - "-m", "pytest", - "-v", "-s", - "/code/tests/integration/test_no_auth.py", + "-c", + "pip install -e . && python3 -m pytest -v -s /code/tests/integration/test_no_auth.py", } return dockerRun(args...) From 5051d79d558f9d9080ed8bff5d977d7664660f75 Mon Sep 17 00:00:00 2001 From: robertdavidsmith <34475852+robertdavidsmith@users.noreply.github.com> Date: Thu, 14 Nov 2024 13:57:27 +0000 Subject: [PATCH 05/11] Scheduler: refactor floating resources to use internaltypes (#4047) Signed-off-by: Robert Smith --- .../floating_resource_types.go | 92 +++++++++++-------- .../floating_resource_types_test.go | 82 ++++++++++++++--- .../scheduler/internaltypes/resource_list.go | 20 ++-- .../internaltypes/resource_list_test.go | 17 ++-- internal/scheduler/metrics.go | 2 +- internal/scheduler/metrics/cycle_metrics.go | 2 +- .../preempting_queue_scheduler_test.go | 4 +- .../scheduler/scheduling/scheduling_algo.go | 6 +- 8 files changed, 151 insertions(+), 74 deletions(-) diff --git a/internal/scheduler/floatingresources/floating_resource_types.go b/internal/scheduler/floatingresources/floating_resource_types.go index 3b023e0c891..e56dea92553 100644 --- a/internal/scheduler/floatingresources/floating_resource_types.go +++ b/internal/scheduler/floatingresources/floating_resource_types.go @@ -10,55 +10,55 @@ import ( "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/internaltypes" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) type FloatingResourceTypes struct { - zeroFloatingResources schedulerobjects.ResourceList - pools map[string]*floatingResourcePool - rlFactory *internaltypes.ResourceListFactory + floatingResourceLimitsByPool map[string]internaltypes.ResourceList } -type floatingResourcePool struct { - totalResources schedulerobjects.ResourceList +func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) { + err := validate(config) + if err != nil { + return nil, err + } + + floatingResourceLimitsByPool := map[string]internaltypes.ResourceList{} + for _, fr := range config { + for _, poolConfig := range fr.Pools { + floatingResourceLimitsByPool[poolConfig.Name] = floatingResourceLimitsByPool[poolConfig.Name].Add( + rlFactory.FromNodeProto(map[string]resource.Quantity{fr.Name: poolConfig.Quantity}), + ) + } + } + + return &FloatingResourceTypes{ + floatingResourceLimitsByPool: floatingResourceLimitsByPool, + }, nil } -func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) { - zeroFloatingResources := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity, len(config))} +func validate(config []configuration.FloatingResourceConfig) error { + floatingResourceNamesSeen := map[string]bool{} for _, c := range config { - if _, exists := zeroFloatingResources.Resources[c.Name]; exists { - return nil, fmt.Errorf("duplicate floating resource %s", c.Name) + if _, exists := floatingResourceNamesSeen[c.Name]; exists { + return fmt.Errorf("duplicate floating resource %s", c.Name) } - zeroFloatingResources.Resources[c.Name] = resource.Quantity{} + floatingResourceNamesSeen[c.Name] = true } - pools := map[string]*floatingResourcePool{} for _, fr := range config { + poolNamesSeen := map[string]bool{} for _, poolConfig := range fr.Pools { - pool, exists := pools[poolConfig.Name] - if !exists { - pool = &floatingResourcePool{ - totalResources: zeroFloatingResources.DeepCopy(), - } - pools[poolConfig.Name] = pool - } - existing := pool.totalResources.Resources[fr.Name] - if existing.Cmp(resource.Quantity{}) != 0 { - return nil, fmt.Errorf("duplicate floating resource %s for pool %s", fr.Name, poolConfig.Name) + if _, exists := poolNamesSeen[poolConfig.Name]; exists { + return fmt.Errorf("floating resource %s has duplicate pool %s", fr.Name, poolConfig.Name) } - pool.totalResources.Resources[fr.Name] = poolConfig.Quantity.DeepCopy() + poolNamesSeen[poolConfig.Name] = true } } - - return &FloatingResourceTypes{ - zeroFloatingResources: zeroFloatingResources, - pools: pools, - rlFactory: rlFactory, - }, nil + return nil } func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated internaltypes.ResourceList) (bool, string) { - available := frt.GetTotalAvailableForPoolInternalTypes(poolName) + available := frt.GetTotalAvailableForPool(poolName) if available.AllZero() { return false, fmt.Sprintf("floating resources not connfigured for pool %s", poolName) } @@ -72,26 +72,38 @@ func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated intern } func (frt *FloatingResourceTypes) AllPools() []string { - result := maps.Keys(frt.pools) + result := maps.Keys(frt.floatingResourceLimitsByPool) slices.Sort(result) return result } -func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) schedulerobjects.ResourceList { - pool, exists := frt.pools[poolName] - if !exists { - return frt.zeroFloatingResources.DeepCopy() +func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) internaltypes.ResourceList { + limits, ok := frt.floatingResourceLimitsByPool[poolName] + if !ok { + return internaltypes.ResourceList{} } - return pool.totalResources.DeepCopy() + return limits } -func (frt *FloatingResourceTypes) GetTotalAvailableForPoolInternalTypes(poolName string) internaltypes.ResourceList { - return frt.rlFactory.FromNodeProto(frt.GetTotalAvailableForPool(poolName).Resources) +func (frt *FloatingResourceTypes) GetTotalAvailableForPoolAsMap(poolName string) map[string]resource.Quantity { + limits := frt.GetTotalAvailableForPool(poolName) + result := map[string]resource.Quantity{} + for _, res := range limits.GetResources() { + if res.Type != internaltypes.Floating { + continue + } + result[res.Name] = res.Value + } + return result } func (frt *FloatingResourceTypes) SummaryString() string { - if len(frt.zeroFloatingResources.Resources) == 0 { + if len(frt.floatingResourceLimitsByPool) == 0 { return "none" } - return strings.Join(maps.Keys(frt.zeroFloatingResources.Resources), " ") + poolSummaries := []string{} + for _, poolName := range frt.AllPools() { + poolSummaries = append(poolSummaries, fmt.Sprintf("%s: (%s)", poolName, frt.floatingResourceLimitsByPool[poolName])) + } + return strings.Join(poolSummaries, " ") } diff --git a/internal/scheduler/floatingresources/floating_resource_types_test.go b/internal/scheduler/floatingresources/floating_resource_types_test.go index 76ce183b624..4bb5e82fb63 100644 --- a/internal/scheduler/floatingresources/floating_resource_types_test.go +++ b/internal/scheduler/floatingresources/floating_resource_types_test.go @@ -15,28 +15,86 @@ func TestAllPools(t *testing.T) { assert.Equal(t, []string{"cpu", "gpu"}, sut.AllPools()) } -func TestGetTotalAvailableForPool(t *testing.T) { - sut := makeSut(t, makeRlFactory()) - zero := resource.Quantity{} - assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200"), "floating-resource-2": resource.MustParse("300")}, sut.GetTotalAvailableForPool("cpu").Resources) - assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("100"), "floating-resource-2": zero}, sut.GetTotalAvailableForPool("gpu").Resources) - assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": zero, "floating-resource-2": zero}, sut.GetTotalAvailableForPool("some-other-pool").Resources) +func TestNewFloatingResourceTypes_ErrorsOnDuplicateFloatingResource(t *testing.T) { + cfg := []configuration.FloatingResourceConfig{ + { + Name: "floating-resource-1", + Pools: []configuration.FloatingResourcePoolConfig{ + { + Name: "cpu", + Quantity: resource.MustParse("200"), + }, + }, + }, + { + Name: "floating-resource-1", + Pools: []configuration.FloatingResourcePoolConfig{ + { + Name: "gpu", + Quantity: resource.MustParse("300"), + }, + }, + }, + } + + frt, err := NewFloatingResourceTypes(cfg, makeRlFactory()) + assert.Nil(t, frt) + assert.NotNil(t, err) } -func TestGetTotalAvailableForPoolInternalTypes(t *testing.T) { +func TestNewFloatingResourceTypes_ErrorsOnDuplicatePool(t *testing.T) { + cfg := []configuration.FloatingResourceConfig{ + { + Name: "floating-resource-1", + Pools: []configuration.FloatingResourcePoolConfig{ + { + Name: "cpu", + Quantity: resource.MustParse("200"), + }, { + Name: "cpu", + Quantity: resource.MustParse("200"), + }, + }, + }, + } + + frt, err := NewFloatingResourceTypes(cfg, makeRlFactory()) + assert.Nil(t, frt) + assert.NotNil(t, err) +} + +func TestGetTotalAvailableForPool(t *testing.T) { sut := makeSut(t, makeRlFactory()) - cpuPool := sut.GetTotalAvailableForPoolInternalTypes("cpu") + cpuPool := sut.GetTotalAvailableForPool("cpu") assert.Equal(t, int64(200000), cpuPool.GetByNameZeroIfMissing("floating-resource-1")) assert.Equal(t, int64(300000), cpuPool.GetByNameZeroIfMissing("floating-resource-2")) - gpuPool := sut.GetTotalAvailableForPoolInternalTypes("gpu") + gpuPool := sut.GetTotalAvailableForPool("gpu") assert.Equal(t, int64(100000), gpuPool.GetByNameZeroIfMissing("floating-resource-1")) assert.Equal(t, int64(0), gpuPool.GetByNameZeroIfMissing("floating-resource-2")) - notFound := sut.GetTotalAvailableForPoolInternalTypes("some-invalid-value") - assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-1")) - assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-2")) + notFound := sut.GetTotalAvailableForPool("some-invalid-value") + assert.True(t, notFound.IsEmpty()) +} + +func TestGetTotalAvailableForPoolAsMap(t *testing.T) { + sut := makeSut(t, makeRlFactory()) + + cpuPool := sut.GetTotalAvailableForPoolAsMap("cpu") + assert.Equal(t, map[string]resource.Quantity{ + "floating-resource-1": *resource.NewMilliQuantity(200000, resource.DecimalSI), + "floating-resource-2": *resource.NewMilliQuantity(300000, resource.DecimalSI), + }, cpuPool) + + gpuPool := sut.GetTotalAvailableForPoolAsMap("gpu") + assert.Equal(t, map[string]resource.Quantity{ + "floating-resource-1": *resource.NewMilliQuantity(100000, resource.DecimalSI), + "floating-resource-2": *resource.NewMilliQuantity(0, resource.DecimalSI), + }, gpuPool) + + notFound := sut.GetTotalAvailableForPoolAsMap("some-invalid-value") + assert.Equal(t, map[string]resource.Quantity{}, notFound) } func TestWithinLimits_WhenWithinLimits_ReturnsTrue(t *testing.T) { diff --git a/internal/scheduler/internaltypes/resource_list.go b/internal/scheduler/internaltypes/resource_list.go index dda39551103..63772bbd8ea 100644 --- a/internal/scheduler/internaltypes/resource_list.go +++ b/internal/scheduler/internaltypes/resource_list.go @@ -24,10 +24,11 @@ type ResourceList struct { } type Resource struct { - Name string - Value int64 - Scale k8sResource.Scale - Type ResourceType + Name string + RawValue int64 + Value k8sResource.Quantity + Scale k8sResource.Scale + Type ResourceType } func (rl ResourceList) Equal(other ResourceList) bool { @@ -87,7 +88,7 @@ func (rl ResourceList) GetResourceByNameZeroIfMissing(name string) k8sResource.Q return k8sResource.Quantity{} } - return *k8sResource.NewScaledQuantity(rl.resources[index], rl.factory.scales[index]) + return *rl.asQuantity(index) } func (rl ResourceList) GetResources() []Resource { @@ -98,10 +99,11 @@ func (rl ResourceList) GetResources() []Resource { result := make([]Resource, len(rl.resources)) for i, q := range rl.resources { result[i] = Resource{ - Name: rl.factory.indexToName[i], - Value: q, - Scale: rl.factory.scales[i], - Type: rl.factory.types[i], + Name: rl.factory.indexToName[i], + RawValue: q, + Value: *rl.asQuantity(i), + Scale: rl.factory.scales[i], + Type: rl.factory.types[i], } } return result diff --git a/internal/scheduler/internaltypes/resource_list_test.go b/internal/scheduler/internaltypes/resource_list_test.go index 1138b6225b2..39fa9c6c1e2 100644 --- a/internal/scheduler/internaltypes/resource_list_test.go +++ b/internal/scheduler/internaltypes/resource_list_test.go @@ -83,13 +83,18 @@ func TestGetResources(t *testing.T) { a := testResourceList(factory, "1", "1Gi") expected := []Resource{ - {Name: "memory", Value: 1024 * 1024 * 1024, Scale: k8sResource.Scale(0), Type: Kubernetes}, - {Name: "ephemeral-storage", Value: 0, Scale: k8sResource.Scale(0), Type: Kubernetes}, - {Name: "cpu", Value: 1000, Scale: k8sResource.Milli, Type: Kubernetes}, - {Name: "nvidia.com/gpu", Value: 0, Scale: k8sResource.Milli, Type: Kubernetes}, - {Name: "external-storage-connections", Value: 0, Scale: 0, Type: Floating}, - {Name: "external-storage-bytes", Value: 0, Scale: 0, Type: Floating}, + {Name: "memory", RawValue: 1024 * 1024 * 1024, Scale: k8sResource.Scale(0), Type: Kubernetes}, + {Name: "ephemeral-storage", RawValue: 0, Scale: k8sResource.Scale(0), Type: Kubernetes}, + {Name: "cpu", RawValue: 1000, Scale: k8sResource.Milli, Type: Kubernetes}, + {Name: "nvidia.com/gpu", RawValue: 0, Scale: k8sResource.Milli, Type: Kubernetes}, + {Name: "external-storage-connections", RawValue: 0, Scale: 0, Type: Floating}, + {Name: "external-storage-bytes", RawValue: 0, Scale: 0, Type: Floating}, } + + for i, r := range expected { + expected[i].Value = *k8sResource.NewScaledQuantity(r.RawValue, r.Scale) + } + assert.Equal(t, expected, a.GetResources()) } diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go index 0c0e7e7752d..af9d932fe64 100644 --- a/internal/scheduler/metrics.go +++ b/internal/scheduler/metrics.go @@ -415,7 +415,7 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p } for _, pool := range c.floatingResourceTypes.AllPools() { - totalFloatingResources := c.floatingResourceTypes.GetTotalAvailableForPool(pool) + totalFloatingResources := schedulerobjects.ResourceList{Resources: c.floatingResourceTypes.GetTotalAvailableForPoolAsMap(pool)} clusterKey := clusterMetricKey{ cluster: "floating", pool: pool, diff --git a/internal/scheduler/metrics/cycle_metrics.go b/internal/scheduler/metrics/cycle_metrics.go index b6fdfa22a91..c2f32ad60e7 100644 --- a/internal/scheduler/metrics/cycle_metrics.go +++ b/internal/scheduler/metrics/cycle_metrics.go @@ -302,7 +302,7 @@ func (m *cycleMetrics) ReportSchedulerResult(result scheduling.SchedulerResult) m.evictedJobs.WithLabelValues(pool, queue).Set(float64(s.EvictedJobCount)) for _, r := range s.EvictedResources.GetResources() { - m.evictedResources.WithLabelValues(pool, queue, r.Name).Set(float64(r.Value)) + m.evictedResources.WithLabelValues(pool, queue, r.Name).Set(float64(r.RawValue)) } } } diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go index eb0a3bb05ad..7102c63a8ef 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go @@ -76,7 +76,7 @@ func TestEvictOversubscribed(t *testing.T) { for nodeId, node := range result.AffectedNodesById { for _, p := range priorities { for _, r := range node.AllocatableByPriority[p].GetResources() { - assert.True(t, r.Value >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.Value, nodeId) + assert.True(t, r.RawValue >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.RawValue, nodeId) } } } @@ -2202,7 +2202,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { for node := it.NextNode(); node != nil; node = it.NextNode() { for _, p := range priorities { for _, r := range node.AllocatableByPriority[p].GetResources() { - assert.True(t, r.Value >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.Value, node.GetId()) + assert.True(t, r.RawValue >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.RawValue, node.GetId()) } } } diff --git a/internal/scheduler/scheduling/scheduling_algo.go b/internal/scheduler/scheduling/scheduling_algo.go index 01ad8e37773..77e98caba89 100644 --- a/internal/scheduler/scheduling/scheduling_algo.go +++ b/internal/scheduler/scheduling/scheduling_algo.go @@ -129,7 +129,7 @@ func (l *FairSchedulingAlgo) Schedule( ctx.Infof("Scheduling on pool %s with capacity %s %s", pool, fsctx.nodeDb.TotalKubernetesResources().String(), - l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool.Name).String(), + l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name).String(), ) start := time.Now() @@ -277,7 +277,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con } totalResources := nodeDb.TotalKubernetesResources() - totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool.Name)) + totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name)) schedulingContext, err := l.constructSchedulingContext( pool.Name, @@ -528,7 +528,7 @@ func (l *FairSchedulingAlgo) SchedulePool( pool string, ) (*SchedulerResult, *schedulercontext.SchedulingContext, error) { totalResources := fsctx.nodeDb.TotalKubernetesResources() - totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool)) + totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(pool)) constraints := schedulerconstraints.NewSchedulingConstraints(pool, totalResources, l.schedulingConfig, maps.Values(fsctx.queues)) From cd42fbd5d7426fdeb9765543eeef4bc13ddf5b98 Mon Sep 17 00:00:00 2001 From: robertdavidsmith <34475852+robertdavidsmith@users.noreply.github.com> Date: Thu, 14 Nov 2024 15:21:19 +0000 Subject: [PATCH 06/11] Scheduler: refactor: standardize getting nodes from executor (#4048) * Scheduler: refactor: standardize getting nodes from executor Signed-off-by: Robert Smith * fix test Signed-off-by: Robert Smith --------- Signed-off-by: Robert Smith --- .../scheduler/internaltypes/node_factory.go | 21 +++++++++++++++ .../scheduler/scheduling/scheduling_algo.go | 20 ++++---------- internal/scheduler/submitcheck.go | 26 +++++++++---------- internal/scheduler/submitcheck_test.go | 6 ++++- 4 files changed, 44 insertions(+), 29 deletions(-) diff --git a/internal/scheduler/internaltypes/node_factory.go b/internal/scheduler/internaltypes/node_factory.go index 8d4526a7b2b..148ede62909 100644 --- a/internal/scheduler/internaltypes/node_factory.go +++ b/internal/scheduler/internaltypes/node_factory.go @@ -1,6 +1,8 @@ package internaltypes import ( + "fmt" + "github.com/armadaproject/armada/internal/common/util" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -51,3 +53,22 @@ func (f *NodeFactory) FromSchedulerObjectsNode(node *schedulerobjects.Node) (*No f.resourceListFactory, ) } + +func (f *NodeFactory) FromSchedulerObjectsExecutors(executors []*schedulerobjects.Executor, errorLogger func(string)) []*Node { + result := []*Node{} + for _, executor := range executors { + for _, node := range executor.GetNodes() { + if executor.Id != node.Executor { + errorLogger(fmt.Sprintf("Executor name mismatch: %q != %q", node.Executor, executor.Id)) + continue + } + itNode, err := f.FromSchedulerObjectsNode(node) + if err != nil { + errorLogger(fmt.Sprintf("Invalid node %s: %v", node.Name, err)) + continue + } + result = append(result, itNode) + } + } + return result +} diff --git a/internal/scheduler/scheduling/scheduling_algo.go b/internal/scheduler/scheduling/scheduling_algo.go index 77e98caba89..0dad9c8061f 100644 --- a/internal/scheduler/scheduling/scheduling_algo.go +++ b/internal/scheduler/scheduling/scheduling_algo.go @@ -237,21 +237,11 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con } healthyExecutors = l.filterCordonedExecutors(ctx, healthyExecutors, executorSettings) } - nodes := []*internaltypes.Node{} - for _, executor := range healthyExecutors { - for _, node := range executor.Nodes { - if executor.Id != node.Executor { - ctx.Errorf("Executor name mismatch: %q != %q", node.Executor, executor.Id) - continue - } - itNode, err := nodeFactory.FromSchedulerObjectsNode(node) - if err != nil { - ctx.Errorf("Invalid node %s: %v", node.Name, err) - continue - } - nodes = append(nodes, itNode) - } - } + + nodes := nodeFactory.FromSchedulerObjectsExecutors(healthyExecutors, func(errMes string) { + ctx.Error(errMes) + }) + homeJobs := jobSchedulingInfo.jobsByPool[pool.Name] awayJobs := []*jobdb.Job{} diff --git a/internal/scheduler/submitcheck.go b/internal/scheduler/submitcheck.go index c94937e02bb..393409632df 100644 --- a/internal/scheduler/submitcheck.go +++ b/internal/scheduler/submitcheck.go @@ -92,13 +92,22 @@ func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) { panic(err) } + nodeFactory := internaltypes.NewNodeFactory( + srv.schedulingConfig.IndexedTaints, + srv.schedulingConfig.IndexedNodeLabels, + srv.resourceListFactory) + executorsByPoolAndId := map[string]map[string]*executor{} for _, ex := range executors { - nodes := ex.GetNodes() - nodesByPool := armadaslices.GroupByFunc(nodes, func(n *schedulerobjects.Node) string { + nodes := nodeFactory.FromSchedulerObjectsExecutors( + []*schedulerobjects.Executor{ex}, + func(s string) { ctx.Error(s) }) + + nodesByPool := armadaslices.GroupByFunc(nodes, func(n *internaltypes.Node) string { return n.GetPool() }) for pool, nodes := range nodesByPool { + nodeDb, err := srv.constructNodeDb(nodes) if _, present := executorsByPoolAndId[pool]; !present { @@ -115,7 +124,6 @@ func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) { WithStacktrace(ctx, err). Warnf("Error constructing nodedb for executor: %s", ex.Id) } - } } srv.state.Store(&schedulerState{ @@ -264,11 +272,7 @@ poolStart: return schedulingResult{isSchedulable: false, reason: sb.String()} } -func (srv *SubmitChecker) constructNodeDb(nodes []*schedulerobjects.Node) (*nodedb.NodeDb, error) { - nodeFactory := internaltypes.NewNodeFactory(srv.schedulingConfig.IndexedTaints, - srv.schedulingConfig.IndexedNodeLabels, - srv.resourceListFactory) - +func (srv *SubmitChecker) constructNodeDb(nodes []*internaltypes.Node) (*nodedb.NodeDb, error) { nodeDb, err := nodedb.NewNodeDb( srv.schedulingConfig.PriorityClasses, srv.schedulingConfig.IndexedResources, @@ -284,11 +288,7 @@ func (srv *SubmitChecker) constructNodeDb(nodes []*schedulerobjects.Node) (*node txn := nodeDb.Txn(true) defer txn.Abort() for _, node := range nodes { - dbNode, err := nodeFactory.FromSchedulerObjectsNode(node) - if err != nil { - return nil, err - } - if err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, dbNode); err != nil { + if err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node); err != nil { return nil, err } } diff --git a/internal/scheduler/submitcheck_test.go b/internal/scheduler/submitcheck_test.go index 4a302b41a93..526ce6f2266 100644 --- a/internal/scheduler/submitcheck_test.go +++ b/internal/scheduler/submitcheck_test.go @@ -257,8 +257,12 @@ func TestSubmitChecker_CheckJobDbJobs(t *testing.T) { } func Executor(nodes ...*schedulerobjects.Node) *schedulerobjects.Executor { + executorId := uuid.NewString() + for _, node := range nodes { + node.Executor = executorId + } return &schedulerobjects.Executor{ - Id: uuid.NewString(), + Id: executorId, Pool: "cpu", Nodes: nodes, } From 47e09277cd5715a0513f7c80952b5470bf623b8c Mon Sep 17 00:00:00 2001 From: Martynas Asipauskas Date: Thu, 14 Nov 2024 15:58:28 +0000 Subject: [PATCH 07/11] Revert client changes (#4050) --- .../python-client-release-to-pypi.yml | 4 +- .github/workflows/python-client.yml | 7 +- .github/workflows/python-tests/action.yml | 4 +- .gitignore | 1 - build/python-client/Dockerfile | 4 +- .../python/armada_client/gen/event_typings.py | 27 ++-- client/python/armada_client/proto/.gitkeep | 0 client/python/docs/source/conf.py | 2 + client/python/pyproject.toml | 13 +- client/python/setup.py | 124 ------------------ client/python/tox.ini | 7 +- magefiles/tests.go | 6 +- scripts/build-python-client.sh | 35 ++++- 13 files changed, 73 insertions(+), 161 deletions(-) delete mode 100644 client/python/armada_client/proto/.gitkeep delete mode 100644 client/python/setup.py diff --git a/.github/workflows/python-client-release-to-pypi.yml b/.github/workflows/python-client-release-to-pypi.yml index 7c4877b2f6c..33866ffd5c9 100644 --- a/.github/workflows/python-client-release-to-pypi.yml +++ b/.github/workflows/python-client-release-to-pypi.yml @@ -19,8 +19,8 @@ jobs: repo-token: ${{ secrets.GITHUB_TOKEN }} - uses: ./.github/workflows/python-tests with: - python-version: '3.9' - tox-env: 'py39' + python-version: '3.8' + tox-env: 'py38' path: 'client/python' github-token: ${{secrets.GITHUB_TOKEN}} - name: Publish package to PyPI diff --git a/.github/workflows/python-client.yml b/.github/workflows/python-client.yml index a9f5cfd9155..8541394a811 100644 --- a/.github/workflows/python-client.yml +++ b/.github/workflows/python-client.yml @@ -34,16 +34,13 @@ jobs: runs-on: ubuntu-22.04 strategy: matrix: - python: [ '3.9', '3.10', '3.11', '3.12' ] + python: [ '3.8', '3.9', '3.10' ] include: + - tox-env: 'py38' - tox-env: 'py39' python: '3.9' - tox-env: 'py310' python: '3.10' - - tox-env: 'py311' - python: '3.11' - - tox-env: 'py312' - python: '3.12' steps: - uses: actions/checkout@v4 - name: Setup Go diff --git a/.github/workflows/python-tests/action.yml b/.github/workflows/python-tests/action.yml index 5d2b34a57a2..3f261dd5675 100644 --- a/.github/workflows/python-tests/action.yml +++ b/.github/workflows/python-tests/action.yml @@ -21,7 +21,7 @@ runs: with: python-version: ${{ inputs.python-version }} # Tox to run tests; build to build the wheel after tests pass - - run: pip install tox==4.17.0 build twine setuptools + - run: pip install tox==3.27.1 build twine shell: bash - name: Install Protoc uses: arduino/setup-protoc@v3 @@ -45,7 +45,7 @@ runs: working-directory: ${{ inputs.path }} - name: Build and verify wheel run: | - python -m build --sdist + python -m build --wheel twine check dist/* shell: bash working-directory: ${{ inputs.path }} diff --git a/.gitignore b/.gitignore index 00c2b8adc71..73c5e2d5452 100644 --- a/.gitignore +++ b/.gitignore @@ -77,7 +77,6 @@ client/python/dist *_pb2.py *_pb2.pyi *_pb2_grpc.py -client/python/armada_client/proto/ client/python/armada_client/armada/ .tox proto-airflow diff --git a/build/python-client/Dockerfile b/build/python-client/Dockerfile index 719bf9abf41..10aa957944b 100644 --- a/build/python-client/Dockerfile +++ b/build/python-client/Dockerfile @@ -1,5 +1,5 @@ ARG PLATFORM=x86_64 -ARG BASE_IMAGE=python:3.9.20-bookworm +ARG BASE_IMAGE=python:3.8.18-bookworm FROM --platform=$PLATFORM ${BASE_IMAGE} @@ -7,7 +7,7 @@ RUN mkdir /proto COPY client/python/pyproject.toml /code/pyproject.toml -RUN pip install setuptools "/code[test]" +RUN pip install "/code[test]" # Creating folders, and files for a project: COPY client/python /code diff --git a/client/python/armada_client/gen/event_typings.py b/client/python/armada_client/gen/event_typings.py index ef2ff45201c..cf2692143f6 100644 --- a/client/python/armada_client/gen/event_typings.py +++ b/client/python/armada_client/gen/event_typings.py @@ -1,5 +1,3 @@ -import argparse -from pathlib import Path import sys from armada_client.armada import event_pb2, submit_pb2 @@ -65,7 +63,15 @@ def gen_file(states, classes, jobstates): return import_text, states_text, union_text, jobstates_text -def main(typings_file: Path): +def write_file(import_text, states_text, union_text, jobstates_text, file): + with open(f"{file}", "w", encoding="utf-8") as f: + f.write(import_text) + f.write(states_text) + f.write(jobstates_text) + f.write(union_text) + + +def main(): states = get_event_states() print("Done creating EventStates") @@ -78,16 +84,13 @@ def main(typings_file: Path): import_text, states_text, union_text, jobstates_text = gen_file( states, classes, jobstates ) - typings_file.write_text(import_text + states_text + jobstates_text + union_text) + write_file(import_text, states_text, union_text, jobstates_text, typings_file) if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("typings_file", type=Path, help="Path to typings file") - - args = parser.parse_args() - print(f"{args}") - typings_file = args.typings_file or Path("armada_client") / "typings.py" - print(f"{typings_file}") - main(typings_file) + # get path to this files location + root = f"{sys.path[0]}/../../" + typings_file = f"{root}/armada_client/typings.py" + + main() sys.exit(0) diff --git a/client/python/armada_client/proto/.gitkeep b/client/python/armada_client/proto/.gitkeep deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/client/python/docs/source/conf.py b/client/python/docs/source/conf.py index 294e0926f11..cffef73a2e1 100644 --- a/client/python/docs/source/conf.py +++ b/client/python/docs/source/conf.py @@ -13,6 +13,8 @@ import os import sys +sys.path.insert(0, os.path.abspath("../..")) + # -- Project information ----------------------------------------------------- diff --git a/client/python/pyproject.toml b/client/python/pyproject.toml index b6a89437820..5b0952245f3 100644 --- a/client/python/pyproject.toml +++ b/client/python/pyproject.toml @@ -1,10 +1,10 @@ [project] name = "armada_client" -version = "0.4.5" +version = "0.4.6" description = "Armada gRPC API python client" readme = "README.md" -requires-python = ">=3.9" -dependencies = ["grpcio-tools", "protobuf>3.20,<5.0"] +requires-python = ">=3.7" +dependencies = ["grpcio==1.66.1", "grpcio-tools==1.66.1", "mypy-protobuf>=3.2.0", "protobuf>=5.26.1,<6.0dev" ] license = { text = "Apache Software License" } authors = [{ name = "G-Research Open Source Software", email = "armada@armadaproject.io" }] @@ -12,10 +12,10 @@ authors = [{ name = "G-Research Open Source Software", email = "armada@armadapro format = ["black==23.7.0", "flake8==7.0.0", "pylint==2.17.5"] # note(JayF): sphinx-jekyll-builder was broken by sphinx-markdown-builder 0.6 -- so pin to 0.5.5 docs = ["sphinx==7.1.2", "sphinx-jekyll-builder==0.3.0", "sphinx-toolbox==3.2.0b1", "sphinx-markdown-builder==0.5.5"] -test = ["pytest==7.3.1", "pytest-cov", "pytest-asyncio==0.21.1"] +test = ["pytest==7.3.1", "coverage>=6.5.0", "pytest-asyncio==0.21.1"] [build-system] -requires = ["setuptools", "wheel", "grpcio-tools", "mypy-protobuf", "protobuf>3.20,<5.0"] +requires = ["setuptools"] build-backend = "setuptools.build_meta" [tool.mypy] @@ -39,5 +39,4 @@ omit = [ # py.typed is required for mypy to find type hints in the package # from: https://mypy.readthedocs.io/en/stable/installed_packages.html#making-pep-561-compatible-packages [tool.setuptools.package-data] -"*" = ["*.pyi", "py.typed", "proto/**/*.proto"] - +"*" = ["*.pyi", "py.typed"] diff --git a/client/python/setup.py b/client/python/setup.py deleted file mode 100644 index 9bdbb3e82bc..00000000000 --- a/client/python/setup.py +++ /dev/null @@ -1,124 +0,0 @@ -import os -from pathlib import Path -import shutil -import subprocess -import sys -from typing import Dict -from setuptools import setup -import importlib.resources -import re -from setuptools.command.build_py import build_py - - -def generate_grpc_bindings(build_lib: Path): - import grpc_tools.protoc - - proto_include = importlib.resources.path("grpc_tools", "_proto") - proto_files = [ - "google/api/annotations.proto", - "google/api/http.proto", - "github.com/gogo/protobuf/gogoproto/gogo.proto", - "k8s.io/api/core/v1/generated.proto", - "k8s.io/apimachinery/pkg/api/resource/generated.proto", - "k8s.io/apimachinery/pkg/apis/meta/v1/generated.proto", - "k8s.io/apimachinery/pkg/runtime/generated.proto", - "k8s.io/apimachinery/pkg/runtime/schema/generated.proto", - "k8s.io/apimachinery/pkg/util/intstr/generated.proto", - "k8s.io/api/networking/v1/generated.proto", - "armada/event.proto", - "armada/submit.proto", - "armada/health.proto", - "armada/job.proto", - "armada/binoculars.proto", - ] - target_root = build_lib.absolute() / "armada_client" - - for proto_file in proto_files: - command = [ - f"-I{proto_include}", - f"-I{target_root / 'proto'}", - f"--python_out={target_root}", - f"--grpc_python_out={target_root}", - f"--mypy_out={target_root}", - str(target_root / "proto" / proto_file), - ] - if grpc_tools.protoc.main(command) != 0: - raise Exception(f"grpc_tools.protoc.main: {command} failed") - - shutil.rmtree(target_root / "github.com") - shutil.rmtree(target_root / "k8s.io") - - adjust_import_paths(target_root) - - -def adjust_import_paths(output_dir: Path): - replacements = { - r"from armada": "from armada_client.armada", - r"from github.com": "from armada_client.github.com", - r"from google.api": "from armada_client.google.api", - } - - for file in output_dir.glob("armada/*.py"): - replace_in_file(file, replacements) - for file in output_dir.glob("google/api/*.py"): - replace_in_file(file, replacements) - - replacements = { - r"from k8s.io": "from armada_client.k8s.io", - } - for file in output_dir.glob("../**/*.py"): - replace_in_file(file, replacements) - - replacements = { - r" k8s": " armada_client.k8s", - r"\[k8s": "[armada_client.k8s", - r"import k8s.io": "import armada_client.k8s.io", - } - for file in output_dir.glob("k8s/**/*.pyi"): - replace_in_file(file, replacements) - - -def replace_in_file(file: Path, replacements: Dict[str, str]): - """Replace patterns in a file based on the replacements dictionary.""" - - content = file.read_text() - for pattern, replacement in replacements.items(): - content = re.sub(pattern, replacement, content) - file.write_text(content) - - -def generate_typings(build_dir: Path): - typings = build_dir.absolute() / "armada_client" / "typings.py" - result = subprocess.run( - args=[ - sys.executable, - str(build_dir.absolute() / "armada_client" / "gen" / "event_typings.py"), - str(typings), - ], - env={"PYTHONPATH": str(build_dir.absolute())}, - capture_output=True, - ) - if result.returncode != 0: - print(result.stdout) - print(result.stderr) - result.check_returncode() - - -class BuildPackageProtos(build_py): - """ - Generate GRPC code before building the package. - """ - - def run(self): - super().run() - output_dir = Path(".") if self.editable_mode else Path(self.build_lib) - generate_grpc_bindings(output_dir) - generate_typings(output_dir) - - -setup( - cmdclass={ - "build_py": BuildPackageProtos, - "develop": BuildPackageProtos, - }, -) diff --git a/client/python/tox.ini b/client/python/tox.ini index 4e31d820a24..5eaaa29ceec 100644 --- a/client/python/tox.ini +++ b/client/python/tox.ini @@ -1,15 +1,16 @@ [tox] +isolated_build = true envlist = format + py38 py39 py310 - py311 - py312 [testenv] extras = test commands = - pytest --cov={envsitepackagesdir}/armada_client --cov-report=xml --cov-report=term tests/unit/ + coverage run -m pytest tests/unit/ + coverage xml [testenv:docs] extras = docs diff --git a/magefiles/tests.go b/magefiles/tests.go index 78653eab0dc..e4460c8eee0 100644 --- a/magefiles/tests.go +++ b/magefiles/tests.go @@ -169,10 +169,12 @@ func Teste2epython() error { "--workdir", "/code", "-e", "ARMADA_SERVER=server", "-e", "ARMADA_PORT=50051", + "--entrypoint", "python3", "--network", "kind", "armada-python-client-builder:latest", - "-c", - "pip install -e . && python3 -m pytest -v -s /code/tests/integration/test_no_auth.py", + "-m", "pytest", + "-v", "-s", + "/code/tests/integration/test_no_auth.py", } return dockerRun(args...) diff --git a/scripts/build-python-client.sh b/scripts/build-python-client.sh index 7004989e596..5fd23818146 100755 --- a/scripts/build-python-client.sh +++ b/scripts/build-python-client.sh @@ -5,4 +5,37 @@ mkdir -p proto/armada cp pkg/api/event.proto pkg/api/submit.proto pkg/api/health.proto pkg/api/job.proto pkg/api/binoculars/binoculars.proto proto/armada sed -i 's/\([^\/]\)pkg\/api/\1armada/g' proto/armada/*.proto -cp -rf proto/* client/python/armada_client/proto/ + +# generate python stubs +cd proto +python3 -m grpc_tools.protoc -I. --plugin=protoc-gen-mypy=$(which protoc-gen-mypy) --python_out=../client/python/armada_client --grpc_python_out=../client/python/armada_client --mypy_out=../client/python/armada_client \ + google/api/annotations.proto \ + google/api/http.proto \ + armada/event.proto armada/submit.proto armada/health.proto armada/job.proto armada/binoculars.proto \ + github.com/gogo/protobuf/gogoproto/gogo.proto \ + k8s.io/api/core/v1/generated.proto \ + k8s.io/apimachinery/pkg/api/resource/generated.proto \ + k8s.io/apimachinery/pkg/apis/meta/v1/generated.proto \ + k8s.io/apimachinery/pkg/runtime/generated.proto \ + k8s.io/apimachinery/pkg/runtime/schema/generated.proto \ + k8s.io/apimachinery/pkg/util/intstr/generated.proto \ + k8s.io/api/networking/v1/generated.proto + +cd .. +# This hideous code is because we can't use python package option in grpc. +# See https://github.com/protocolbuffers/protobuf/issues/7061 for an explanation. +# We need to import these packages as a module. +sed -i 's/from armada/from armada_client.armada/g' client/python/armada_client/armada/*.py +sed -i 's/from github.com/from armada_client.github.com/g' client/python/armada_client/armada/*.py +sed -i 's/from google.api/from armada_client.google.api/g' client/python/armada_client/armada/*.py +sed -i 's/from google.api/from armada_client.google.api/g' client/python/armada_client/google/api/*.py + +find client/python/armada_client/ -name '*.py' | xargs sed -i 's/from k8s.io/from armada_client.k8s.io/g' + +# Generate better docs for the client +export PYTHONPATH=${PWD}/client/python +python3 ${PWD}/client/python/armada_client/gen/event_typings.py + +find client/python/armada_client/k8s -name '*.pyi' | xargs sed -i 's/ k8s/ armada_client.k8s/g' +find client/python/armada_client/k8s -name '*.pyi' | xargs sed -i 's/\[k8s/\[armada_client.k8s/g' +find client/python/armada_client/k8s/io -name '*.pyi' | xargs sed -i 's/import k8s.io/import armada_client.k8s.io/g' From 00c123a35f140b2b0f6c98a5caa5de196035645d Mon Sep 17 00:00:00 2001 From: robertdavidsmith <34475852+robertdavidsmith@users.noreply.github.com> Date: Thu, 14 Nov 2024 18:33:48 +0000 Subject: [PATCH 08/11] Scheduler: refactor: replace obsolete job.ResourceRequirements (#4051) * Scheduler: refactor: replace obsolete job.ResourceRequirements Signed-off-by: Robert Smith * refactor preempting_queue_scheduler_test.go Signed-off-by: Robert Smith --------- Signed-off-by: Robert Smith --- internal/scheduler/jobdb/job.go | 4 +- internal/scheduler/metrics/state_metrics.go | 4 +- internal/scheduler/nodedb/nodedb_test.go | 7 +--- internal/scheduler/scheduling/context/job.go | 10 ++--- .../preempting_queue_scheduler_test.go | 38 ++++++------------- .../scheduler/simulator/sink/job_writer.go | 9 ++--- 6 files changed, 26 insertions(+), 46 deletions(-) diff --git a/internal/scheduler/jobdb/job.go b/internal/scheduler/jobdb/job.go index f7b9fc4226f..f9f295e8d05 100644 --- a/internal/scheduler/jobdb/job.go +++ b/internal/scheduler/jobdb/job.go @@ -503,7 +503,7 @@ func (job *Job) Tolerations() []v1.Toleration { // ResourceRequirements returns the resource requirements of the Job // KubernetesResourceRequirements below is preferred -func (job *Job) ResourceRequirements() v1.ResourceRequirements { +func (job *Job) resourceRequirements() v1.ResourceRequirements { if req := job.PodRequirements(); req != nil { return req.ResourceRequirements } @@ -831,7 +831,7 @@ func SchedulingKeyFromJob(skg *schedulerobjects.SchedulingKeyGenerator, job *Job job.NodeSelector(), job.Affinity(), job.Tolerations(), - job.ResourceRequirements().Requests, + job.resourceRequirements().Requests, job.PriorityClassName(), ) } diff --git a/internal/scheduler/metrics/state_metrics.go b/internal/scheduler/metrics/state_metrics.go index 3e25deb892b..faa10d50901 100644 --- a/internal/scheduler/metrics/state_metrics.go +++ b/internal/scheduler/metrics/state_metrics.go @@ -209,7 +209,7 @@ func (m *jobStateMetrics) updateStateDuration(job *jobdb.Job, state string, prio } queue := job.Queue() - requests := job.ResourceRequirements().Requests + requests := job.AllResourceRequirements() latestRun := job.LatestRun() pool := "" node := "" @@ -236,7 +236,7 @@ func (m *jobStateMetrics) updateStateDuration(job *jobdb.Job, state string, prio // Resource Seconds for _, res := range m.trackedResourceNames { - resQty := requests[res] + resQty := requests.GetResourceByNameZeroIfMissing(string(res)) resSeconds := duration * float64(resQty.MilliValue()) / 1000 m.jobStateResourceSecondsByQueue. WithLabelValues(queue, pool, state, priorState, res.String()).Add(resSeconds) diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index 1f23597541b..3ace44245c4 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -12,7 +12,6 @@ import ( armadamaps "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/common/util" - "github.com/armadaproject/armada/internal/scheduler/adapters" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" @@ -132,8 +131,6 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { jobFilter := func(job *jobdb.Job) bool { return true } job := testfixtures.Test1GpuJob("A", testfixtures.PriorityClass0) request := job.KubernetesResourceRequirements() - requestInternalRl, err := nodeDb.resourceListFactory.FromJobResourceListFailOnUnknown(adapters.K8sResourceListToMap(job.ResourceRequirements().Requests)) - assert.Nil(t, err) jobId := job.Id() @@ -177,14 +174,14 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { assert.True( t, armadamaps.DeepEqual( - map[string]internaltypes.ResourceList{jobId: requestInternalRl}, + map[string]internaltypes.ResourceList{jobId: request}, boundNode.AllocatedByJobId, ), ) assert.True( t, armadamaps.DeepEqual( - map[string]internaltypes.ResourceList{jobId: requestInternalRl}, + map[string]internaltypes.ResourceList{jobId: request}, evictedNode.AllocatedByJobId, ), ) diff --git a/internal/scheduler/scheduling/context/job.go b/internal/scheduler/scheduling/context/job.go index 2b96b335f1b..e92167c3275 100644 --- a/internal/scheduler/scheduling/context/job.go +++ b/internal/scheduler/scheduling/context/job.go @@ -218,10 +218,10 @@ func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*JobSche ) resourcesByQueue := armadamaps.MapValues( jobsByQueue, - func(jobs []*jobdb.Job) schedulerobjects.ResourceList { - rv := schedulerobjects.NewResourceListWithDefaultSize() + func(jobs []*jobdb.Job) internaltypes.ResourceList { + rv := internaltypes.ResourceList{} for _, job := range jobs { - rv.AddV1ResourceList(job.ResourceRequirements().Requests) + rv = rv.Add(job.AllResourceRequirements()) } return rv }, @@ -247,8 +247,8 @@ func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*JobSche maps.Keys(jobsByQueue), armadamaps.MapValues( resourcesByQueue, - func(rl schedulerobjects.ResourceList) string { - return rl.CompactString() + func(rl internaltypes.ResourceList) string { + return rl.String() }, ), jobCountPerQueue, diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go index 7102c63a8ef..f8fdbac4ce8 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go @@ -1920,7 +1920,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { // Accounting across scheduling rounds. roundByJobId := make(map[string]int) indexByJobId := make(map[string]int) - allocatedByQueueAndPriorityClass := make(map[string]schedulerobjects.QuantityByTAndResourceType[string]) + allocatedByQueueAndPriorityClass := make(map[string]map[string]internaltypes.ResourceList) nodeIdByJobId := make(map[string]string) var jobIdsByGangId map[string]map[string]bool var gangIdByJobId map[string]string @@ -1941,7 +1941,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { ) } - demandByQueue := map[string]schedulerobjects.ResourceList{} + demandByQueue := map[string]internaltypes.ResourceList{} // Run the scheduler. cordonedNodes := map[int]bool{} @@ -1978,12 +1978,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { queuedJobs = append(queuedJobs, job.WithQueued(true)) roundByJobId[job.Id()] = i indexByJobId[job.Id()] = j - r, ok := demandByQueue[job.Queue()] - if !ok { - r = schedulerobjects.NewResourceList(len(job.PodRequirements().ResourceRequirements.Requests)) - demandByQueue[job.Queue()] = r - } - r.AddV1ResourceList(job.PodRequirements().ResourceRequirements.Requests) + demandByQueue[job.Queue()] = demandByQueue[job.Queue()].Add(job.AllResourceRequirements()) } } err = jobDbTxn.Upsert(queuedJobs) @@ -2005,12 +2000,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { delete(gangIdByJobId, job.Id()) delete(jobIdsByGangId[gangId], job.Id()) } - r, ok := demandByQueue[job.Queue()] - if !ok { - r = schedulerobjects.NewResourceList(len(job.PodRequirements().ResourceRequirements.Requests)) - demandByQueue[job.Queue()] = r - } - r.SubV1ResourceList(job.PodRequirements().ResourceRequirements.Requests) + demandByQueue[job.Queue()] = demandByQueue[job.Queue()].Subtract(job.AllResourceRequirements()) } } } @@ -2049,11 +2039,11 @@ func TestPreemptingQueueScheduler(t *testing.T) { for queue, priorityFactor := range tc.PriorityFactorByQueue { weight := 1 / priorityFactor - queueDemand := testfixtures.TestResourceListFactory.FromJobResourceListIgnoreUnknown(demandByQueue[queue].Resources) + queueDemand := demandByQueue[queue] err := sctx.AddQueueSchedulingContext( queue, weight, - internaltypes.RlMapFromJobSchedulerObjects(allocatedByQueueAndPriorityClass[queue], testfixtures.TestResourceListFactory), + allocatedByQueueAndPriorityClass[queue], queueDemand, queueDemand, limiterByQueue[queue], @@ -2092,28 +2082,22 @@ func TestPreemptingQueueScheduler(t *testing.T) { job := jctx.Job m := allocatedByQueueAndPriorityClass[job.Queue()] if m == nil { - m = make(schedulerobjects.QuantityByTAndResourceType[string]) + m = make(map[string]internaltypes.ResourceList) allocatedByQueueAndPriorityClass[job.Queue()] = m } - m.SubV1ResourceList( - job.PriorityClassName(), - job.ResourceRequirements().Requests, - ) + m[job.PriorityClassName()] = m[job.PriorityClassName()].Subtract(job.AllResourceRequirements()) } for _, jctx := range result.ScheduledJobs { job := jctx.Job m := allocatedByQueueAndPriorityClass[job.Queue()] if m == nil { - m = make(schedulerobjects.QuantityByTAndResourceType[string]) + m = make(map[string]internaltypes.ResourceList) allocatedByQueueAndPriorityClass[job.Queue()] = m } - m.AddV1ResourceList( - job.PriorityClassName(), - job.ResourceRequirements().Requests, - ) + m[job.PriorityClassName()] = m[job.PriorityClassName()].Add(job.AllResourceRequirements()) } for queue, qctx := range sctx.QueueSchedulingContexts { - m := internaltypes.RlMapFromJobSchedulerObjects(allocatedByQueueAndPriorityClass[queue], testfixtures.TestResourceListFactory) + m := allocatedByQueueAndPriorityClass[queue] assert.Equal(t, internaltypes.RlMapRemoveZeros(m), internaltypes.RlMapRemoveZeros(qctx.AllocatedByPriorityClass)) } diff --git a/internal/scheduler/simulator/sink/job_writer.go b/internal/scheduler/simulator/sink/job_writer.go index ccac68dcace..7f4d029d365 100644 --- a/internal/scheduler/simulator/sink/job_writer.go +++ b/internal/scheduler/simulator/sink/job_writer.go @@ -4,7 +4,6 @@ import ( "os" parquetWriter "github.com/xitongsys/parquet-go/writer" - v1 "k8s.io/api/core/v1" "github.com/armadaproject/armada/internal/common/armadacontext" protoutil "github.com/armadaproject/armada/internal/common/proto" @@ -77,10 +76,10 @@ func (j *JobWriter) createJobRunRow(st *model.StateTransition) ([]*JobRunRow, er associatedJob := jobsList[i] if event.GetCancelledJob() != nil || event.GetJobSucceeded() != nil || event.GetJobRunPreempted() != nil { // Resource requirements - cpuLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceCPU] - memoryLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceMemory] - ephemeralStorageLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceEphemeralStorage] - gpuLimit := associatedJob.ResourceRequirements().Requests["nvidia.com/gpu"] + cpuLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("cpu") + memoryLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("memory") + ephemeralStorageLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("ephemeral-storage") + gpuLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("nvidia.com/gpu") eventTime := protoutil.ToStdTime(event.Created) rows = append(rows, &JobRunRow{ From 22771201aaf6ae4e4d3e87c397fb42d735eab1e6 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Fri, 15 Nov 2024 09:43:13 +0000 Subject: [PATCH 09/11] ARMADA-2970 Small simulator improvements (#274) (#4052) * ARMADA-2970 Small simulator improvements - Log input files consistently - Fix log output to only occur every 5 seconds - Was bugged to log every round, but only after 5 seconds - Remove variance on gang jobs, so the finish at the same time * Set 0 tailmean Co-authored-by: James Murkin --- cmd/simulator/cmd/root.go | 4 ++-- internal/scheduler/simulator/simulator.go | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cmd/simulator/cmd/root.go b/cmd/simulator/cmd/root.go index 0e5951d5e4a..a1c815633c5 100644 --- a/cmd/simulator/cmd/root.go +++ b/cmd/simulator/cmd/root.go @@ -123,8 +123,8 @@ func runSimulations(cmd *cobra.Command, args []string) error { defer outputSink.Close(ctx) ctx.Info("Armada simulator") - ctx.Infof("ClusterSpec: %v", clusterSpec.Name) - ctx.Infof("WorkloadSpecs: %v", workloadSpec.Name) + ctx.Infof("ClusterSpec: %v", clusterFile) + ctx.Infof("WorkloadSpecs: %v", workloadFile) ctx.Infof("SchedulingConfig: %v", configFile) ctx.Infof("OutputDir: %v", outputDirPath) diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index dec3c5d406d..c6b74f21458 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -235,7 +235,7 @@ func (s *Simulator) Run(ctx *armadacontext.Context) error { } if time.Now().Unix()-lastLogTime.Unix() >= 5 { ctx.Infof("Simulator time %s", s.time) - lastLogTime = s.time + lastLogTime = time.Now() } if s.time.After(simTerminationTime) { ctx.Infof("Current simulated time (%s) exceeds runtime deadline (%s). Terminating", s.time, simTerminationTime) @@ -465,6 +465,8 @@ func submitJobFromJobTemplate(jobId string, jobTemplate *JobTemplate, gangId str } else { annotations[serverconfig.GangNodeUniformityLabelAnnotation] = "armadaproject.io/clusterName" } + // Make it so gang jobs end at the same time, this means they don't have a distribution currently + jobTemplate.RuntimeDistribution.TailMean = 0 } return &armadaevents.SubmitJob{ From 43d53a5d75b1f815b8f3f998483d317a86e979cc Mon Sep 17 00:00:00 2001 From: Martynas Asipauskas Date: Fri, 15 Nov 2024 12:04:20 +0000 Subject: [PATCH 10/11] Upgrade client with new grpcio bindings (#4053) --- third_party/airflow/pyproject.toml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/third_party/airflow/pyproject.toml b/third_party/airflow/pyproject.toml index 5073307bab3..a4ae0607679 100644 --- a/third_party/airflow/pyproject.toml +++ b/third_party/airflow/pyproject.toml @@ -4,19 +4,18 @@ build-backend = "setuptools.build_meta" [project] name = "armada_airflow" -version = "1.0.9" +version = "1.0.10" description = "Armada Airflow Operator" readme='README.md' authors = [{name = "Armada-GROSS", email = "armada@armadaproject.io"}] license = { text = "Apache Software License" } dependencies=[ - 'armada-client==0.3.4', + 'armada-client>=0.4.6', 'apache-airflow>=2.6.3', - 'grpcio==1.58.0', - 'grpcio-tools==1.58.0', 'types-protobuf==4.24.0.1', 'kubernetes>=23.6.0', 'kubernetes_asyncio>=24.2.3', + 'opentelemetry-exporter-otlp>=1.28.1' # We want to force dependency upgrade for transitive Airflow dependency ] requires-python=">=3.8" classifiers=[ From b3b07b96266c71c1a629185e9ee00b6ac9f92d96 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Fri, 15 Nov 2024 12:25:45 +0000 Subject: [PATCH 11/11] Fix preemption reason - fairshare preemption (#4045) We were incorrectly resetting the PreemptingJobId, which shouldn't get reset The UnschedulableReason is only reset as we try scheduling the jobs several times, during these rounds the jobs we're scheduling cannot be preempted - so it makes no sense to reset their PreemptingJobId This makes it so we now correct populate the preemption reason for fairshare preemption Signed-off-by: JamesMurkin --- internal/scheduler/nodedb/nodedb.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index 336ca0de314..ba74a66fc8d 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -340,7 +340,6 @@ func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, gctx *context.GangSche // order to find the best fit for this gang); clear out any remnants of // previous attempts. jctx.UnschedulableReason = "" - jctx.PreemptingJobId = "" node, err := nodeDb.SelectNodeForJobWithTxn(txn, jctx) if err != nil {