From e310d2d190fbf08571abbdb4e0976d61b68a7a2f Mon Sep 17 00:00:00 2001 From: Pete Date: Wed, 20 Apr 2022 17:12:59 -0700 Subject: [PATCH] Improve `Beaker.cluster.utilization()` (#84) * Improve `Beaker.cluster.utilization()` * logging * fix doc test --- CHANGELOG.md | 4 ++ beaker/__init__.py | 2 +- beaker/data_model/cluster.py | 11 +++++- beaker/data_model/node.py | 2 +- beaker/services/cluster.py | 77 +++++++++++++++++++++++------------- conftest.py | 25 +++++++----- 6 files changed, 80 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d39c61..f35697b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `Beaker.image.pull()` method. - Added `Beaker.experiment.url()` method. +### Changed + +- Changed return type of `Beaker.cluster.utilization()` to `ClusterUtilization`. + ## [v0.9.0](https://github.com/allenai/beaker-py/releases/tag/v0.9.0) - 2022-04-19 ### Added diff --git a/beaker/__init__.py b/beaker/__init__.py index ffa87af..09bdc61 100644 --- a/beaker/__init__.py +++ b/beaker/__init__.py @@ -71,7 +71,7 @@ :meth:`Beaker.cluster.utilization() `: >>> free_gpus = 0 ->>> for node_util in beaker.cluster.utilization(beaker_on_prem_cluster_name): +>>> for node_util in beaker.cluster.utilization(beaker_on_prem_cluster_name).nodes: ... free_gpus += node_util.free.gpu_count Nodes diff --git a/beaker/data_model/cluster.py b/beaker/data_model/cluster.py index 5c7b847..c47617a 100644 --- a/beaker/data_model/cluster.py +++ b/beaker/data_model/cluster.py @@ -1,11 +1,11 @@ from datetime import datetime from enum import Enum -from typing import Optional +from typing import List, Optional from pydantic import validator from .base import BaseModel -from .node import NodeShape, NodeSpec +from .node import NodeShape, NodeSpec, NodeUtilization class ClusterStatus(str, Enum): @@ -34,3 +34,10 @@ def _validate_datetime(cls, v: Optional[datetime]) -> Optional[datetime]: if v is not None and v.year == 1: return None return v + + +class ClusterUtilization(BaseModel): + id: str + running_jobs: int + queued_jobs: int + nodes: List[NodeUtilization] diff --git a/beaker/data_model/node.py b/beaker/data_model/node.py index 8a9c562..75e8b84 100644 --- a/beaker/data_model/node.py +++ b/beaker/data_model/node.py @@ -29,12 +29,12 @@ class Node(BaseModel): class NodeSpecUtil(BaseModel): cpu_count: Optional[float] = None gpu_count: Optional[int] = None - # memory: Optional[str] # TODO class NodeUtilization(BaseModel): id: str hostname: str limits: NodeSpec + running_jobs: int used: NodeSpecUtil free: NodeSpecUtil diff --git a/beaker/services/cluster.py b/beaker/services/cluster.py index a38ba1e..a8364b9 100644 --- a/beaker/services/cluster.py +++ b/beaker/services/cluster.py @@ -1,4 +1,4 @@ -from typing import Union +from typing import Dict, Union from ..data_model import * from ..exceptions import * @@ -177,7 +177,7 @@ def nodes(self, cluster: Union[str, Cluster]) -> List[Node]: ).json()["data"] ] - def utilization(self, cluster: Union[str, Cluster]) -> List[NodeUtilization]: + def utilization(self, cluster: Union[str, Cluster]) -> ClusterUtilization: """ Get current utilization stats for each node in a cluster. @@ -189,38 +189,61 @@ def utilization(self, cluster: Union[str, Cluster]) -> List[NodeUtilization]: """ cluster: Cluster = self.resolve_cluster(cluster) nodes = self.nodes(cluster) - out: List[NodeUtilization] = [] - for node in nodes: - gpus_used = 0 - cpus_used = 0.0 - for job in self.beaker.job.list(node=node, finalized=False): + + running_jobs = 0 + queued_jobs = 0 + node_to_util: Dict[str, Dict[str, Union[int, float]]] = { + node.id: {"running_jobs": 0, "gpus_used": 0, "cpus_used": 0.0} for node in nodes + } + + for job in self.beaker.job.list(cluster=cluster, finalized=False): + if job.status.current == CurrentJobStatus.running: + running_jobs += 1 + elif job.status.current == CurrentJobStatus.created: + queued_jobs += 1 + + if job.node is not None: + if job.node not in node_to_util: + continue # unlikely + + node_util = node_to_util[job.node] + node_util["running_jobs"] += 1 if job.requests is not None: if job.requests.gpu_count is not None: - gpus_used += job.requests.gpu_count + node_util["gpus_used"] += job.requests.gpu_count if job.requests.cpu_count is not None: - cpus_used += job.requests.cpu_count - used = NodeSpecUtil( - gpu_count=None if node.limits.gpu_count is None else gpus_used, - cpu_count=None if node.limits.cpu_count is None else cpus_used, - ) - free = NodeSpecUtil( - gpu_count=None - if node.limits.gpu_count is None - else node.limits.gpu_count - gpus_used, - cpu_count=None - if node.limits.cpu_count is None - else node.limits.cpu_count - cpus_used, - ) - out.append( + node_util["cpus_used"] += job.requests.cpu_count + + return ClusterUtilization( + id=cluster.id, + running_jobs=running_jobs, + queued_jobs=queued_jobs, + nodes=[ NodeUtilization( id=node.id, hostname=node.hostname, limits=node.limits, - used=used, - free=free, + running_jobs=node_to_util[node.id]["running_jobs"], + used=NodeSpecUtil( + gpu_count=None + if node.limits.gpu_count is None + else min(node.limits.gpu_count, node_to_util[node.id]["gpus_used"]), + cpu_count=None + if node.limits.cpu_count is None + else min(node.limits.cpu_count, node_to_util[node.id]["cpus_used"]), + ), + free=NodeSpecUtil( + gpu_count=None + if node.limits.gpu_count is None + else max(0, node.limits.gpu_count - node_to_util[node.id]["gpus_used"]), + cpu_count=None + if node.limits.cpu_count is None + else max(0, node.limits.cpu_count - node_to_util[node.id]["cpus_used"]), + ), ) - ) - return out + for node in nodes + ], + ) def filter_available( self, resources: TaskResources, *clusters: Union[str, Cluster] @@ -255,7 +278,7 @@ def is_compat(node_spec: Union[NodeSpec, NodeShape, NodeSpecUtil]) -> bool: if cluster.node_shape is not None and not is_compat(cluster.node_shape): continue - node_utilization = self.utilization(cluster) + node_utilization = self.utilization(cluster).nodes if cluster.autoscale and len(node_utilization) < cluster.capacity: available.append(cluster) else: diff --git a/conftest.py b/conftest.py index 509f59c..9a14975 100644 --- a/conftest.py +++ b/conftest.py @@ -87,16 +87,21 @@ def alternate_beaker_image_name(client: Beaker) -> Generator[str, None, None]: @pytest.fixture() -def beaker_cluster_name() -> str: - return random.choice( - [ - "ai2/general-cirrascale", - "ai2/allennlp-cirrascale", - "ai2/aristo-cirrascale", - "ai2/mosaic-cirrascale", - "ai2/s2-cirrascale", - ] - ) +def beaker_cluster_name(client: Beaker) -> str: + choices = [ + "ai2/general-cirrascale", + "ai2/allennlp-cirrascale", + "ai2/aristo-cirrascale", + "ai2/mosaic-cirrascale", + "ai2/s2-cirrascale", + ] + random.shuffle(choices) + for cluster in choices: + utilization = client.cluster.utilization(cluster) + if utilization.queued_jobs == 0: + logger.info("Found suitable on-prem cluster '%s'", cluster) + return cluster + return "ai2/petew-cpu" @pytest.fixture()