Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
papa99do committed Dec 4, 2024
1 parent ae4278e commit 918a77e
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 15 deletions.
26 changes: 18 additions & 8 deletions src/marqo/core/inference/device_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Device(BaseModel):
id: int
name: str
type: DeviceType
total_memory: Optional[int] = 0
total_memory: Optional[int] = None

@classmethod
def cpu(cls) -> 'Device':
Expand Down Expand Up @@ -68,10 +68,10 @@ def cuda_device_health_check(self) -> None:
# CUDA devices could become unavailable/unreachable if the docker container running Marqo loses access
# to the device symlinks. There is no way to recover from this, we will need to restart the container.
# See https://github.com/NVIDIA/nvidia-container-toolkit/issues/48 for more details.
logger.error('Cuda device becomes unavailable')
raise CudaDeviceNotAvailableError('Cuda device becomes unavailable')
logger.error('CUDA device/s have become unavailable')
raise CudaDeviceNotAvailableError('CUDA device/s have become unavailable')

# TODO confirm whether we should check all devices or just the default one
oom_errors = []
for device in self.cuda_devices:
try:
cuda_device = torch.device(device.name)
Expand All @@ -82,13 +82,23 @@ def cuda_device_health_check(self) -> None:
torch.randn(3, device=cuda_device)
except RuntimeError as e:
if 'out of memory' in str(e).lower():
# If we encounter 'CUDA error: out of memory' error consistently, it means some threads are
# holding the memory
# `~torch.cuda.empty_cache` doesn't increase the amount of GPU memory available for PyTorch.
# However, it may help reduce fragmentation of GPU memory in certain cases.
torch.cuda.empty_cache()

logger.error(f'Cuda device {device.name} is out of memory. Total memory: {device.total_memory}. '
f'Memory stats: {str(memory_stats)}')
allocated_mem = memory_stats.get("allocated.all.current", None) if memory_stats else None
raise CudaOutOfMemoryError(f'Cuda device {device.name} is out of memory: '
f'({allocated_mem}/{device.total_memory})')
oom_errors.append(f'Cuda device {device.name} is out of memory:'
f' ({allocated_mem}/{device.total_memory})')
else:
# Log out a warning message when encounter other transient errors.
logger.warning(f'Encountered issue inspecting Cuda device {device.name}: {str(e)}')
except Exception as e:
# Log out a warning message when encounter other transient errors.
logger.warning(f'Encountered issue inspecting Cuda device {device.name}: {str(e)}')

if oom_errors:
# We error out if any cuda device is out of memory. If this happens consistently, the memory might be held
# by a long-running thread, and Marqo will need to be restarted to get to a healthy status
raise CudaOutOfMemoryError(';'.join(oom_errors))
11 changes: 10 additions & 1 deletion src/marqo/tensor_search/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,16 @@ def check_health(marqo_config: config.Config = Depends(get_config)):


@app.get("/healthz", include_in_schema=False)
def check_health(marqo_config: config.Config = Depends(get_config)):
def liveness_check(marqo_config: config.Config = Depends(get_config)) -> JSONResponse:
"""
This liveness check endpoint does a quick status check, and error out if any component encounters unrecoverable
issues. This only does a check on the cuda devices right now.
Docker schedulers could leverage this endpoint to decide whether to restart the Marqo container.
Returns:
200 - if all checks pass
500 - if any check fails
"""
marqo_config.device_manager.cuda_device_health_check()
return JSONResponse(content={"status": "ok"}, status_code=200)

Expand Down
48 changes: 42 additions & 6 deletions tests/core/inference/test_device_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ def _device_manager_with_cuda(self, total_memory: int = 1_000_000):

return DeviceManager()

def _device_manager_with_multiple_cuda_devices(self, total_memory: int = 1_000_000):
with mock.patch("torch.cuda.is_available", return_value=True), \
mock.patch("torch.cuda.device_count", return_value=2), \
mock.patch("torch.cuda.get_device_name", side_effect=['cuda:0', 'cuda:1']), \
mock.patch("torch.cuda.get_device_properties", return_value=SimpleNamespace(total_memory=total_memory)):

return DeviceManager()

def test_init_with_cpu(self):
device_manager = self._device_manager_without_cuda()

Expand Down Expand Up @@ -64,7 +72,7 @@ def test_cuda_health_check_should_fail_when_cuda_device_becomes_unavailable(self
with self.assertRaises(CudaDeviceNotAvailableError) as err:
device_manager.cuda_device_health_check()

self.assertEqual(str(err.exception), "Cuda device becomes unavailable")
self.assertEqual(str(err.exception), "CUDA device/s have become unavailable")

def test_cuda_health_check_should_fail_when_cuda_device_is_out_of_memory(self):
device_manager = self._device_manager_with_cuda(total_memory=1_000_000)
Expand All @@ -77,14 +85,42 @@ def test_cuda_health_check_should_fail_when_cuda_device_is_out_of_memory(self):

self.assertEqual(str(err.exception), "Cuda device cuda:0 is out of memory: (900000/1000000)")

def test_cuda_health_check_should_pass_and_log_warning_message_when_cuda_calls_encounter_issue(self):
device_manager = self._device_manager_with_cuda()
def test_cuda_health_check_should_fail_when_any_cuda_device_is_out_of_memory(self):
device_manager = self._device_manager_with_multiple_cuda_devices(total_memory=1_000_000)

with mock.patch("torch.cuda.is_available", return_value=True), \
mock.patch("torch.cuda.memory_stats", side_effect=Exception("random exception")), \
mock.patch("torch.randn", side_effect=[torch.tensor([1, 2, 3]), RuntimeError("CUDA error: out of memory")]), \
mock.patch("torch.cuda.memory_stats", return_value=OrderedDict({"allocated.all.current": 900_000})):
with self.assertRaises(CudaOutOfMemoryError) as err:
device_manager.cuda_device_health_check()

self.assertEqual(str(err.exception), "Cuda device cuda:1 is out of memory: (900000/1000000)")

def test_cuda_health_check_should_check_if_all_cuda_devices_are_out_of_memory(self):
device_manager = self._device_manager_with_multiple_cuda_devices(total_memory=1_000_000)

with mock.patch("torch.cuda.is_available", return_value=True), \
mock.patch("torch.randn",
side_effect=[RuntimeError("CUDA error: out of memory"), RuntimeError("CUDA error: out of memory")]), \
mock.patch("torch.cuda.memory_stats", return_value=OrderedDict({"allocated.all.current": 900_000})):
with self.assertRaises(CudaOutOfMemoryError) as err:
device_manager.cuda_device_health_check()

self.assertEqual(str(err.exception), "Cuda device cuda:0 is out of memory: (900000/1000000);"
"Cuda device cuda:1 is out of memory: (900000/1000000)")

def test_cuda_health_check_should_pass_and_log_warning_message_when_cuda_calls_encounter_issue_other_than_oom(self):
device_manager = self._device_manager_with_multiple_cuda_devices()

with mock.patch("torch.cuda.is_available", return_value=True), \
mock.patch("torch.cuda.memory_stats", side_effect=[RuntimeError("not a memory issue"), Exception("random exception")]), \
mock.patch("marqo.core.inference.device_manager.logger") as mock_logger:
device_manager.cuda_device_health_check()

self.assertEqual('warning', mock_logger.mock_calls[0][0])
self.assertEqual('Encountered issue inspecting Cuda device cuda:0: random exception',
mock_logger.mock_calls[0][1][0])
self.assertEqual('Encountered issue inspecting Cuda device cuda:0: not a memory issue',
mock_logger.mock_calls[0][1][0])

self.assertEqual('warning', mock_logger.mock_calls[1][0])
self.assertEqual('Encountered issue inspecting Cuda device cuda:1: random exception',
mock_logger.mock_calls[1][1][0])

0 comments on commit 918a77e

Please sign in to comment.