Skip to content

Commit

Permalink
Improve Beaker.cluster.utilization() (#84)
Browse files Browse the repository at this point in the history
* Improve `Beaker.cluster.utilization()`

* logging

* fix doc test
  • Loading branch information
epwalsh authored Apr 21, 2022
1 parent 77e573b commit e310d2d
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 41 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `Beaker.image.pull()` method.
- Added `Beaker.experiment.url()` method.

### Changed

- Changed return type of `Beaker.cluster.utilization()` to `ClusterUtilization`.

## [v0.9.0](https://github.com/allenai/beaker-py/releases/tag/v0.9.0) - 2022-04-19

### Added
Expand Down
2 changes: 1 addition & 1 deletion beaker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
:meth:`Beaker.cluster.utilization() <services.ClusterClient.utilization>`:
>>> free_gpus = 0
>>> for node_util in beaker.cluster.utilization(beaker_on_prem_cluster_name):
>>> for node_util in beaker.cluster.utilization(beaker_on_prem_cluster_name).nodes:
... free_gpus += node_util.free.gpu_count
Nodes
Expand Down
11 changes: 9 additions & 2 deletions beaker/data_model/cluster.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from datetime import datetime
from enum import Enum
from typing import Optional
from typing import List, Optional

from pydantic import validator

from .base import BaseModel
from .node import NodeShape, NodeSpec
from .node import NodeShape, NodeSpec, NodeUtilization


class ClusterStatus(str, Enum):
Expand Down Expand Up @@ -34,3 +34,10 @@ def _validate_datetime(cls, v: Optional[datetime]) -> Optional[datetime]:
if v is not None and v.year == 1:
return None
return v


class ClusterUtilization(BaseModel):
id: str
running_jobs: int
queued_jobs: int
nodes: List[NodeUtilization]
2 changes: 1 addition & 1 deletion beaker/data_model/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ class Node(BaseModel):
class NodeSpecUtil(BaseModel):
cpu_count: Optional[float] = None
gpu_count: Optional[int] = None
# memory: Optional[str] # TODO


class NodeUtilization(BaseModel):
id: str
hostname: str
limits: NodeSpec
running_jobs: int
used: NodeSpecUtil
free: NodeSpecUtil
77 changes: 50 additions & 27 deletions beaker/services/cluster.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Union
from typing import Dict, Union

from ..data_model import *
from ..exceptions import *
Expand Down Expand Up @@ -177,7 +177,7 @@ def nodes(self, cluster: Union[str, Cluster]) -> List[Node]:
).json()["data"]
]

def utilization(self, cluster: Union[str, Cluster]) -> List[NodeUtilization]:
def utilization(self, cluster: Union[str, Cluster]) -> ClusterUtilization:
"""
Get current utilization stats for each node in a cluster.
Expand All @@ -189,38 +189,61 @@ def utilization(self, cluster: Union[str, Cluster]) -> List[NodeUtilization]:
"""
cluster: Cluster = self.resolve_cluster(cluster)
nodes = self.nodes(cluster)
out: List[NodeUtilization] = []
for node in nodes:
gpus_used = 0
cpus_used = 0.0
for job in self.beaker.job.list(node=node, finalized=False):

running_jobs = 0
queued_jobs = 0
node_to_util: Dict[str, Dict[str, Union[int, float]]] = {
node.id: {"running_jobs": 0, "gpus_used": 0, "cpus_used": 0.0} for node in nodes
}

for job in self.beaker.job.list(cluster=cluster, finalized=False):
if job.status.current == CurrentJobStatus.running:
running_jobs += 1
elif job.status.current == CurrentJobStatus.created:
queued_jobs += 1

if job.node is not None:
if job.node not in node_to_util:
continue # unlikely

node_util = node_to_util[job.node]
node_util["running_jobs"] += 1
if job.requests is not None:
if job.requests.gpu_count is not None:
gpus_used += job.requests.gpu_count
node_util["gpus_used"] += job.requests.gpu_count
if job.requests.cpu_count is not None:
cpus_used += job.requests.cpu_count
used = NodeSpecUtil(
gpu_count=None if node.limits.gpu_count is None else gpus_used,
cpu_count=None if node.limits.cpu_count is None else cpus_used,
)
free = NodeSpecUtil(
gpu_count=None
if node.limits.gpu_count is None
else node.limits.gpu_count - gpus_used,
cpu_count=None
if node.limits.cpu_count is None
else node.limits.cpu_count - cpus_used,
)
out.append(
node_util["cpus_used"] += job.requests.cpu_count

return ClusterUtilization(
id=cluster.id,
running_jobs=running_jobs,
queued_jobs=queued_jobs,
nodes=[
NodeUtilization(
id=node.id,
hostname=node.hostname,
limits=node.limits,
used=used,
free=free,
running_jobs=node_to_util[node.id]["running_jobs"],
used=NodeSpecUtil(
gpu_count=None
if node.limits.gpu_count is None
else min(node.limits.gpu_count, node_to_util[node.id]["gpus_used"]),
cpu_count=None
if node.limits.cpu_count is None
else min(node.limits.cpu_count, node_to_util[node.id]["cpus_used"]),
),
free=NodeSpecUtil(
gpu_count=None
if node.limits.gpu_count is None
else max(0, node.limits.gpu_count - node_to_util[node.id]["gpus_used"]),
cpu_count=None
if node.limits.cpu_count is None
else max(0, node.limits.cpu_count - node_to_util[node.id]["cpus_used"]),
),
)
)
return out
for node in nodes
],
)

def filter_available(
self, resources: TaskResources, *clusters: Union[str, Cluster]
Expand Down Expand Up @@ -255,7 +278,7 @@ def is_compat(node_spec: Union[NodeSpec, NodeShape, NodeSpecUtil]) -> bool:
if cluster.node_shape is not None and not is_compat(cluster.node_shape):
continue

node_utilization = self.utilization(cluster)
node_utilization = self.utilization(cluster).nodes
if cluster.autoscale and len(node_utilization) < cluster.capacity:
available.append(cluster)
else:
Expand Down
25 changes: 15 additions & 10 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,21 @@ def alternate_beaker_image_name(client: Beaker) -> Generator[str, None, None]:


@pytest.fixture()
def beaker_cluster_name() -> str:
return random.choice(
[
"ai2/general-cirrascale",
"ai2/allennlp-cirrascale",
"ai2/aristo-cirrascale",
"ai2/mosaic-cirrascale",
"ai2/s2-cirrascale",
]
)
def beaker_cluster_name(client: Beaker) -> str:
choices = [
"ai2/general-cirrascale",
"ai2/allennlp-cirrascale",
"ai2/aristo-cirrascale",
"ai2/mosaic-cirrascale",
"ai2/s2-cirrascale",
]
random.shuffle(choices)
for cluster in choices:
utilization = client.cluster.utilization(cluster)
if utilization.queued_jobs == 0:
logger.info("Found suitable on-prem cluster '%s'", cluster)
return cluster
return "ai2/petew-cpu"


@pytest.fixture()
Expand Down

0 comments on commit e310d2d

Please sign in to comment.