Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose cuda device health status in /healthz endpoint #1056

Open
wants to merge 7 commits into
base: mainline
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/marqo/api/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ class InternalError(MarqoWebError):
status_code = HTTPStatus.INTERNAL_SERVER_ERROR


class ServiceUnavailableError(MarqoWebError):
error_type = "service_unavailable"
code = "service_unavailable"
status_code = HTTPStatus.SERVICE_UNAVAILABLE


class BackendCommunicationError(InternalError):
"""Error when connecting to Vespa"""
code = "backend_communication_error"
Expand Down
3 changes: 3 additions & 0 deletions src/marqo/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from kazoo.handlers.threading import KazooTimeoutError

from marqo.core.inference.device_manager import DeviceManager
from marqo.vespa.zookeeper_client import ZookeeperClient
from marqo.core.document.document import Document
from marqo.core.embed.embed import Embed
Expand Down Expand Up @@ -39,6 +40,7 @@ def __init__(

self.timeout = timeout
self.backend = backend if backend is not None else enums.SearchDb.vespa
# TODO [Refactoring device logic] deprecate default_device since it's not used
papa99do marked this conversation as resolved.
Show resolved Hide resolved
self.default_device = default_device if default_device is not None else (
utils.read_env_vars_and_defaults(EnvVars.MARQO_BEST_AVAILABLE_DEVICE))

Expand All @@ -52,6 +54,7 @@ def __init__(
self.document = Document(vespa_client, self.index_management)
self.recommender = Recommender(vespa_client, self.index_management)
self.embed = Embed(vespa_client, self.index_management, self.default_device)
self.device_manager = DeviceManager()

def set_is_remote(self, vespa_client: VespaClient):
local_host_markers = ["localhost", "0.0.0.0", "127.0.0.1"]
Expand Down
2 changes: 2 additions & 0 deletions src/marqo/core/embed/embed.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self, vespa_client: VespaClient, index_management: IndexManagement,
self.default_device = default_device

@pydantic.validator('default_device')
# TODO [Refactoring device logic] deprecate default_device since it's not used
def validate_default_device(cls, value):
if not value:
raise ValueError("Default Device cannot be 'None'. Marqo default device must have been declared upon startup.")
Expand Down Expand Up @@ -66,6 +67,7 @@ def embed_content(
)

# Set default device if not provided
# TODO [Refactoring device logic] use device info gathered from device manager
if device is None:
device = utils.read_env_vars_and_defaults("MARQO_BEST_AVAILABLE_DEVICE")

Expand Down
12 changes: 12 additions & 0 deletions src/marqo/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,15 @@ class DuplicateDocumentError(AddDocumentsError):

class TooManyFieldsError(MarqoError):
pass


class DeviceError(MarqoError):
papa99do marked this conversation as resolved.
Show resolved Hide resolved
pass


class CudaDeviceNotAvailableError(DeviceError):
pass


class CudaOutOfMemoryError(DeviceError):
pass
109 changes: 109 additions & 0 deletions src/marqo/core/inference/device_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from enum import Enum
from functools import cached_property
from typing import List, Optional

import torch

from marqo.base_model import ImmutableBaseModel
from marqo.core.exceptions import CudaDeviceNotAvailableError, CudaOutOfMemoryError
from marqo.logging import get_logger

logger = get_logger('device_manager')


class DeviceType(str, Enum):
papa99do marked this conversation as resolved.
Show resolved Hide resolved
CPU = 'cpu'
CUDA = 'cuda'


class Device(ImmutableBaseModel):
id: int
name: str
type: DeviceType
total_memory: Optional[int] = None

@property
def full_name(self) -> str:
return f'{self.type.value}:{self.id}({self.name})'

@classmethod
def cpu(cls) -> 'Device':
return Device(id=-1, name='cpu', type=DeviceType.CPU)

@classmethod
def cuda(cls, device_id, name, total_memory) -> 'Device':
return Device(id=device_id, name=name, type=DeviceType.CUDA, total_memory=total_memory)


class DeviceManager:
papa99do marked this conversation as resolved.
Show resolved Hide resolved
"""
Device manager collects information and stats of CPU and GPU devices to facilitate the preprocessing and
vectorisation processes. Based on the information, we will choose the best device to load the embedding models,
process media files and vectorise the content to achieve optimal performance for search and document ingestion.
"""
def __init__(self):
self._is_cuda_available_at_startup: bool = torch.cuda.is_available()
self.devices: List[Device] = [Device.cpu()]
self.best_available_device_type = DeviceType.CPU

if self._is_cuda_available_at_startup:
self.best_available_device_type = DeviceType.CUDA
device_count = torch.cuda.device_count()
for device_id in range(device_count):
self.devices.append(Device.cuda(device_id,
torch.cuda.get_device_name(device_id),
torch.cuda.get_device_properties(device_id).total_memory))
farshidz marked this conversation as resolved.
Show resolved Hide resolved

logger.debug(f'Found devices {self.devices}. Best available device set to: '
f'{self.best_available_device_type.value}.')

@cached_property
def cuda_devices(self):
return [device for device in self.devices if device.type == DeviceType.CUDA]

def cuda_device_health_check(self) -> None:
"""
Checks the status of the CUDA devices, and raises exceptions if it becomes
not available or out of memory.

raises
- CudaDeviceNotAvailableError if CUDA device is not available.
- CudaOutOfMemoryError if any CUDA device is out of memory.
"""
if not self._is_cuda_available_at_startup:
# If the instance is initialised without cuda devices, skip the check
return

if not torch.cuda.is_available():
# CUDA devices could become unavailable/unreachable if the docker container running Marqo loses access
# to the device symlinks. There is no way to recover from this, we will need to restart the container.
# See https://github.com/NVIDIA/nvidia-container-toolkit/issues/48 for more details.
raise CudaDeviceNotAvailableError('CUDA device(s) have become unavailable')

oom_errors = []
for device in self.cuda_devices:
try:
cuda_device = torch.device(f'cuda:{device.id}')
memory_stats = torch.cuda.memory_stats(cuda_device)
logger.debug(f'CUDA device {device.full_name} with total memory {device.total_memory}. '
f'Memory stats: {str(memory_stats)}')

torch.randn(3, device=cuda_device)
except RuntimeError as e:
papa99do marked this conversation as resolved.
Show resolved Hide resolved
if 'out of memory' in str(e).lower():
logger.error(f'CUDA device {device.full_name} is out of memory. Total memory: {device.total_memory}. '
f'Memory stats: {str(memory_stats)}')
allocated_mem = memory_stats.get("allocated.all.current", None) if memory_stats else None
oom_errors.append(f'CUDA device {device.full_name} is out of memory:'
f' ({allocated_mem}/{device.total_memory})')
else:
# Log out a warning message when encounter other transient errors.
logger.error(f'Encountered issue inspecting CUDA device {device.full_name}: {str(e)}')
except Exception as e:
vicilliar marked this conversation as resolved.
Show resolved Hide resolved
# Log out a warning message when encounter other transient errors.
logger.error(f'Encountered issue inspecting CUDA device {device.full_name}: {str(e)}')

if oom_errors:
vicilliar marked this conversation as resolved.
Show resolved Hide resolved
# We error out if any CUDA device is out of memory. If this happens consistently, the memory might be held
# by a long-running thread, and Marqo will need to be restarted to get to a healthy status
raise CudaOutOfMemoryError(';'.join(oom_errors))
1 change: 1 addition & 0 deletions src/marqo/core/models/add_docs_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Config:
batch_vectorisation_mode: BatchVectorisationMode = BatchVectorisationMode.PER_DOCUMENT

def __init__(self, **data: Any):
# TODO [Refactoring device logic] use device info gathered from device manager
# Ensure `None` and passing nothing are treated the same for device
if "device" not in data or data["device"] is None:
data["device"] = get_best_available_device()
Expand Down
1 change: 1 addition & 0 deletions src/marqo/core/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def _get_vespa_health(self, hostname_filter: Optional[str]) -> VespaHealthStatus
)

def get_cuda_info(self) -> MarqoCudaInfoResponse:
# TODO [Refactoring device logic] move this logic to device manager
papa99do marked this conversation as resolved.
Show resolved Hide resolved
papa99do marked this conversation as resolved.
Show resolved Hide resolved
"""A function to get information about the CUDA devices on the machine
Returns:
Expand Down
24 changes: 20 additions & 4 deletions src/marqo/tensor_search/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def marqo_base_exception_handler(request: Request, exc: base_exceptions.MarqoErr
(core_exceptions.InternalError, api_exceptions.InternalError, None, None),
(core_exceptions.ApplicationRollbackError, api_exceptions.ApplicationRollbackError, None, None),
(core_exceptions.TooManyFieldsError, api_exceptions.BadRequestError, None, None),
(core_exceptions.DeviceError, api_exceptions.ServiceUnavailableError, None, None),

# Vespa client exceptions
(
Expand Down Expand Up @@ -579,16 +580,31 @@ def schema_validation(index_name: str, settings_object: dict):
)


@app.get('/memory', include_in_schema=False)
@utils.enable_debug_apis()
def memory():
return memory_profiler.get_memory_profile()


@app.get("/health" , include_in_schema=False)
def check_health(marqo_config: config.Config = Depends(get_config)):
health_status = marqo_config.monitoring.get_health()
return HealthResponse.from_marqo_health_status(health_status)


@app.get('/memory', include_in_schema=False)
@utils.enable_debug_apis()
def memory():
return memory_profiler.get_memory_profile()
@app.get("/healthz", include_in_schema=False)
def liveness_check(marqo_config: config.Config = Depends(get_config)) -> JSONResponse:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an internal endpoint (undocumented)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct, include_in_schema=False makes sure it's not included in the openAPI specs either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there may be upside to returning more info about the devices in this endpoint. Currently we're just using it as a healthy/unhealthy check, but we could also return info for the healthy devices. Like the current memory being used.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

liveness check endpoint should be lightweight, and probably should not return a lot of information. We have a separate endpoint /device/cuda for more detailed information, we could add a verbose parameter to include more info in that endpont..

"""
This liveness check endpoint does a quick status check, and error out if any component encounters unrecoverable
issues. This only does a check on the cuda devices right now.
Docker schedulers could leverage this endpoint to decide whether to restart the Marqo container.

Returns:
200 - if all checks pass
500 - if any check fails
"""
marqo_config.device_manager.cuda_device_health_check()
farshidz marked this conversation as resolved.
Show resolved Hide resolved
return JSONResponse(content={"status": "ok"}, status_code=200)


if __name__ == "__main__":
Expand Down
4 changes: 4 additions & 0 deletions src/marqo/tensor_search/on_start_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def run(self):


class CUDAAvailable:
# TODO [Refactoring device logic] move this logic to device manager
"""checks the status of cuda
"""
logger = get_logger('CUDA device summary')
Expand All @@ -109,6 +110,7 @@ def id_to_device(id):


class SetBestAvailableDevice:
# TODO [Refactoring device logic] move this logic to device manager, get rid of MARQO_BEST_AVAILABLE_DEVICE envvar
papa99do marked this conversation as resolved.
Show resolved Hide resolved
"""sets the MARQO_BEST_AVAILABLE_DEVICE env var
"""
logger = get_logger('SetBestAvailableDevice')
Expand Down Expand Up @@ -151,6 +153,7 @@ def __init__(self):
self.models = warmed_models
# TBD to include cross-encoder/ms-marco-TinyBERT-L-2-v2

# TODO [Refactoring device logic] use device info gathered from device manager
self.default_devices = ['cpu'] if not torch.cuda.is_available() else ['cuda', 'cpu']

self.logger.info(f"pre-loading {self.models} onto devices={self.default_devices}")
Expand Down Expand Up @@ -230,6 +233,7 @@ def __init__(self):
f"Invalid patch model: {model}. Please ensure that this is a valid patch model."
)

# TODO [Refactoring device logic] use device info gathered from device manager
self.default_devices = ['cpu'] if not torch.cuda.is_available() else ['cpu', 'cuda']

def run(self):
Expand Down
2 changes: 2 additions & 0 deletions src/marqo/tensor_search/tensor_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -1557,6 +1557,7 @@ def search(config: Config, index_name: str, text: Optional[Union[str, dict, Cust
if verbose:
print(f"determined_search_method: {search_method}, text query: {text}")

# TODO [Refactoring device logic] use device info gathered from device manager
if device is None:
selected_device = utils.read_env_vars_and_defaults("MARQO_BEST_AVAILABLE_DEVICE")
if selected_device is None:
Expand Down Expand Up @@ -2263,6 +2264,7 @@ def eject_model(model_name: str, device: str) -> dict:
return result


# TODO [Refactoring device logic] move to device manager
def get_cpu_info() -> dict:
return {
"cpu_usage_percent": f"{psutil.cpu_percent(1)} %", # The number 1 is a time interval for CPU usage calculation.
Expand Down
2 changes: 2 additions & 0 deletions src/marqo/tensor_search/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def construct_authorized_url(url_base: str, username: str, password: str) -> str


def check_device_is_available(device: str) -> bool:
# TODO [Refactoring device logic] move this logic to device manager
"""Checks if a device is available on the machine
Args:
Expand Down Expand Up @@ -341,6 +342,7 @@ def generate_batches(seq: Sequence, batch_size: int):


def get_best_available_device() -> str:
# TODO [Refactoring device logic] replace this with device manager
"""Get the best available device for Marqo to use and validate it."""
device = read_env_vars_and_defaults(EnvVars.MARQO_BEST_AVAILABLE_DEVICE)
if device is None or not check_device_is_available(device):
Expand Down
1 change: 1 addition & 0 deletions src/marqo/tensor_search/web/api_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@


def translate_api_device(device: Optional[str]) -> Optional[str]:
# TODO [Refactoring device logic] move this logic to device manager
"""Translates an API device as given through the API into an internal enum.
Args:
Expand Down
2 changes: 2 additions & 0 deletions src/marqo/tensor_search/web/api_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@


def validate_api_device_string(device: typing.Optional[str]) -> typing.Optional[str]:
# TODO [Refactoring device logic] move this logic to device manager
"""Validates a device which is an API parameter

Args:
Expand Down Expand Up @@ -47,6 +48,7 @@ def validate_api_device_string(device: typing.Optional[str]) -> typing.Optional[


async def validate_device(device: typing.Optional[str] = None) -> typing.Optional[str]:
# TODO [Refactoring device logic] move this logic to device manager
"""Translates and validates the device string. Checks if the requested
device is available.

Expand Down
Loading
Loading