Skip to content

Commit

Permalink
Expose cuda device health status in /healthz endpoint (#1056)
Browse files Browse the repository at this point in the history
  • Loading branch information
papa99do authored Dec 4, 2024
1 parent 5660996 commit dfc1ed8
Show file tree
Hide file tree
Showing 15 changed files with 309 additions and 4 deletions.
6 changes: 6 additions & 0 deletions src/marqo/api/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ class InternalError(MarqoWebError):
status_code = HTTPStatus.INTERNAL_SERVER_ERROR


class ServiceUnavailableError(MarqoWebError):
error_type = "service_unavailable"
code = "service_unavailable"
status_code = HTTPStatus.SERVICE_UNAVAILABLE


class BackendCommunicationError(InternalError):
"""Error when connecting to Vespa"""
code = "backend_communication_error"
Expand Down
3 changes: 3 additions & 0 deletions src/marqo/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from kazoo.handlers.threading import KazooTimeoutError

from marqo.core.inference.device_manager import DeviceManager
from marqo.vespa.zookeeper_client import ZookeeperClient
from marqo.core.document.document import Document
from marqo.core.embed.embed import Embed
Expand Down Expand Up @@ -39,6 +40,7 @@ def __init__(

self.timeout = timeout
self.backend = backend if backend is not None else enums.SearchDb.vespa
# TODO [Refactoring device logic] deprecate default_device since it's not used
self.default_device = default_device if default_device is not None else (
utils.read_env_vars_and_defaults(EnvVars.MARQO_BEST_AVAILABLE_DEVICE))

Expand All @@ -52,6 +54,7 @@ def __init__(
self.document = Document(vespa_client, self.index_management)
self.recommender = Recommender(vespa_client, self.index_management)
self.embed = Embed(vespa_client, self.index_management, self.default_device)
self.device_manager = DeviceManager()

def set_is_remote(self, vespa_client: VespaClient):
local_host_markers = ["localhost", "0.0.0.0", "127.0.0.1"]
Expand Down
2 changes: 2 additions & 0 deletions src/marqo/core/embed/embed.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self, vespa_client: VespaClient, index_management: IndexManagement,
self.default_device = default_device

@pydantic.validator('default_device')
# TODO [Refactoring device logic] deprecate default_device since it's not used
def validate_default_device(cls, value):
if not value:
raise ValueError("Default Device cannot be 'None'. Marqo default device must have been declared upon startup.")
Expand Down Expand Up @@ -66,6 +67,7 @@ def embed_content(
)

# Set default device if not provided
# TODO [Refactoring device logic] use device info gathered from device manager
if device is None:
device = utils.read_env_vars_and_defaults("MARQO_BEST_AVAILABLE_DEVICE")

Expand Down
12 changes: 12 additions & 0 deletions src/marqo/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,15 @@ class DuplicateDocumentError(AddDocumentsError):

class TooManyFieldsError(MarqoError):
pass


class DeviceError(MarqoError):
pass


class CudaDeviceNotAvailableError(DeviceError):
pass


class CudaOutOfMemoryError(DeviceError):
pass
109 changes: 109 additions & 0 deletions src/marqo/core/inference/device_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from enum import Enum
from functools import cached_property
from typing import List, Optional

import torch

from marqo.base_model import ImmutableBaseModel
from marqo.core.exceptions import CudaDeviceNotAvailableError, CudaOutOfMemoryError
from marqo.logging import get_logger

logger = get_logger('device_manager')


class DeviceType(str, Enum):
CPU = 'cpu'
CUDA = 'cuda'


class Device(ImmutableBaseModel):
id: int
name: str
type: DeviceType
total_memory: Optional[int] = None

@property
def full_name(self) -> str:
return f'{self.type.value}:{self.id}({self.name})'

@classmethod
def cpu(cls) -> 'Device':
return Device(id=-1, name='cpu', type=DeviceType.CPU)

@classmethod
def cuda(cls, device_id, name, total_memory) -> 'Device':
return Device(id=device_id, name=name, type=DeviceType.CUDA, total_memory=total_memory)


class DeviceManager:
"""
Device manager collects information and stats of CPU and GPU devices to facilitate the preprocessing and
vectorisation processes. Based on the information, we will choose the best device to load the embedding models,
process media files and vectorise the content to achieve optimal performance for search and document ingestion.
"""
def __init__(self):
self._is_cuda_available_at_startup: bool = torch.cuda.is_available()
self.devices: List[Device] = [Device.cpu()]
self.best_available_device_type = DeviceType.CPU

if self._is_cuda_available_at_startup:
self.best_available_device_type = DeviceType.CUDA
device_count = torch.cuda.device_count()
for device_id in range(device_count):
self.devices.append(Device.cuda(device_id,
torch.cuda.get_device_name(device_id),
torch.cuda.get_device_properties(device_id).total_memory))

logger.debug(f'Found devices {self.devices}. Best available device set to: '
f'{self.best_available_device_type.value}.')

@cached_property
def cuda_devices(self):
return [device for device in self.devices if device.type == DeviceType.CUDA]

def cuda_device_health_check(self) -> None:
"""
Checks the status of the CUDA devices, and raises exceptions if it becomes
not available or out of memory.
raises
- CudaDeviceNotAvailableError if CUDA device is not available.
- CudaOutOfMemoryError if any CUDA device is out of memory.
"""
if not self._is_cuda_available_at_startup:
# If the instance is initialised without cuda devices, skip the check
return

if not torch.cuda.is_available():
# CUDA devices could become unavailable/unreachable if the docker container running Marqo loses access
# to the device symlinks. There is no way to recover from this, we will need to restart the container.
# See https://github.com/NVIDIA/nvidia-container-toolkit/issues/48 for more details.
raise CudaDeviceNotAvailableError('CUDA device(s) have become unavailable')

oom_errors = []
for device in self.cuda_devices:
try:
cuda_device = torch.device(f'cuda:{device.id}')
memory_stats = torch.cuda.memory_stats(cuda_device)
logger.debug(f'CUDA device {device.full_name} with total memory {device.total_memory}. '
f'Memory stats: {str(memory_stats)}')

torch.randn(3, device=cuda_device)
except RuntimeError as e:
if 'out of memory' in str(e).lower():
logger.error(f'CUDA device {device.full_name} is out of memory. Total memory: {device.total_memory}. '
f'Memory stats: {str(memory_stats)}')
allocated_mem = memory_stats.get("allocated.all.current", None) if memory_stats else None
oom_errors.append(f'CUDA device {device.full_name} is out of memory:'
f' ({allocated_mem}/{device.total_memory})')
else:
# Log out a warning message when encounter other transient errors.
logger.error(f'Encountered issue inspecting CUDA device {device.full_name}: {str(e)}')
except Exception as e:
# Log out a warning message when encounter other transient errors.
logger.error(f'Encountered issue inspecting CUDA device {device.full_name}: {str(e)}')

if oom_errors:
# We error out if any CUDA device is out of memory. If this happens consistently, the memory might be held
# by a long-running thread, and Marqo will need to be restarted to get to a healthy status
raise CudaOutOfMemoryError(';'.join(oom_errors))
1 change: 1 addition & 0 deletions src/marqo/core/models/add_docs_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Config:
batch_vectorisation_mode: BatchVectorisationMode = BatchVectorisationMode.PER_DOCUMENT

def __init__(self, **data: Any):
# TODO [Refactoring device logic] use device info gathered from device manager
# Ensure `None` and passing nothing are treated the same for device
if "device" not in data or data["device"] is None:
data["device"] = get_best_available_device()
Expand Down
1 change: 1 addition & 0 deletions src/marqo/core/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def _get_vespa_health(self, hostname_filter: Optional[str]) -> VespaHealthStatus
)

def get_cuda_info(self) -> MarqoCudaInfoResponse:
# TODO [Refactoring device logic] move this logic to device manager
"""A function to get information about the CUDA devices on the machine
Returns:
Expand Down
24 changes: 20 additions & 4 deletions src/marqo/tensor_search/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def marqo_base_exception_handler(request: Request, exc: base_exceptions.MarqoErr
(core_exceptions.InternalError, api_exceptions.InternalError, None, None),
(core_exceptions.ApplicationRollbackError, api_exceptions.ApplicationRollbackError, None, None),
(core_exceptions.TooManyFieldsError, api_exceptions.BadRequestError, None, None),
(core_exceptions.DeviceError, api_exceptions.ServiceUnavailableError, None, None),

# Vespa client exceptions
(
Expand Down Expand Up @@ -579,16 +580,31 @@ def schema_validation(index_name: str, settings_object: dict):
)


@app.get('/memory', include_in_schema=False)
@utils.enable_debug_apis()
def memory():
return memory_profiler.get_memory_profile()


@app.get("/health" , include_in_schema=False)
def check_health(marqo_config: config.Config = Depends(get_config)):
health_status = marqo_config.monitoring.get_health()
return HealthResponse.from_marqo_health_status(health_status)


@app.get('/memory', include_in_schema=False)
@utils.enable_debug_apis()
def memory():
return memory_profiler.get_memory_profile()
@app.get("/healthz", include_in_schema=False)
def liveness_check(marqo_config: config.Config = Depends(get_config)) -> JSONResponse:
"""
This liveness check endpoint does a quick status check, and error out if any component encounters unrecoverable
issues. This only does a check on the cuda devices right now.
Docker schedulers could leverage this endpoint to decide whether to restart the Marqo container.
Returns:
200 - if all checks pass
500 - if any check fails
"""
marqo_config.device_manager.cuda_device_health_check()
return JSONResponse(content={"status": "ok"}, status_code=200)


if __name__ == "__main__":
Expand Down
4 changes: 4 additions & 0 deletions src/marqo/tensor_search/on_start_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def run(self):


class CUDAAvailable:
# TODO [Refactoring device logic] move this logic to device manager
"""checks the status of cuda
"""
logger = get_logger('CUDA device summary')
Expand All @@ -109,6 +110,7 @@ def id_to_device(id):


class SetBestAvailableDevice:
# TODO [Refactoring device logic] move this logic to device manager, get rid of MARQO_BEST_AVAILABLE_DEVICE envvar
"""sets the MARQO_BEST_AVAILABLE_DEVICE env var
"""
logger = get_logger('SetBestAvailableDevice')
Expand Down Expand Up @@ -151,6 +153,7 @@ def __init__(self):
self.models = warmed_models
# TBD to include cross-encoder/ms-marco-TinyBERT-L-2-v2

# TODO [Refactoring device logic] use device info gathered from device manager
self.default_devices = ['cpu'] if not torch.cuda.is_available() else ['cuda', 'cpu']

self.logger.info(f"pre-loading {self.models} onto devices={self.default_devices}")
Expand Down Expand Up @@ -230,6 +233,7 @@ def __init__(self):
f"Invalid patch model: {model}. Please ensure that this is a valid patch model."
)

# TODO [Refactoring device logic] use device info gathered from device manager
self.default_devices = ['cpu'] if not torch.cuda.is_available() else ['cpu', 'cuda']

def run(self):
Expand Down
2 changes: 2 additions & 0 deletions src/marqo/tensor_search/tensor_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -1557,6 +1557,7 @@ def search(config: Config, index_name: str, text: Optional[Union[str, dict, Cust
if verbose:
print(f"determined_search_method: {search_method}, text query: {text}")

# TODO [Refactoring device logic] use device info gathered from device manager
if device is None:
selected_device = utils.read_env_vars_and_defaults("MARQO_BEST_AVAILABLE_DEVICE")
if selected_device is None:
Expand Down Expand Up @@ -2263,6 +2264,7 @@ def eject_model(model_name: str, device: str) -> dict:
return result


# TODO [Refactoring device logic] move to device manager
def get_cpu_info() -> dict:
return {
"cpu_usage_percent": f"{psutil.cpu_percent(1)} %", # The number 1 is a time interval for CPU usage calculation.
Expand Down
2 changes: 2 additions & 0 deletions src/marqo/tensor_search/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def construct_authorized_url(url_base: str, username: str, password: str) -> str


def check_device_is_available(device: str) -> bool:
# TODO [Refactoring device logic] move this logic to device manager
"""Checks if a device is available on the machine
Args:
Expand Down Expand Up @@ -341,6 +342,7 @@ def generate_batches(seq: Sequence, batch_size: int):


def get_best_available_device() -> str:
# TODO [Refactoring device logic] replace this with device manager
"""Get the best available device for Marqo to use and validate it."""
device = read_env_vars_and_defaults(EnvVars.MARQO_BEST_AVAILABLE_DEVICE)
if device is None or not check_device_is_available(device):
Expand Down
1 change: 1 addition & 0 deletions src/marqo/tensor_search/web/api_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@


def translate_api_device(device: Optional[str]) -> Optional[str]:
# TODO [Refactoring device logic] move this logic to device manager
"""Translates an API device as given through the API into an internal enum.
Args:
Expand Down
2 changes: 2 additions & 0 deletions src/marqo/tensor_search/web/api_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@


def validate_api_device_string(device: typing.Optional[str]) -> typing.Optional[str]:
# TODO [Refactoring device logic] move this logic to device manager
"""Validates a device which is an API parameter
Args:
Expand Down Expand Up @@ -47,6 +48,7 @@ def validate_api_device_string(device: typing.Optional[str]) -> typing.Optional[


async def validate_device(device: typing.Optional[str] = None) -> typing.Optional[str]:
# TODO [Refactoring device logic] move this logic to device manager
"""Translates and validates the device string. Checks if the requested
device is available.
Expand Down
Loading

0 comments on commit dfc1ed8

Please sign in to comment.