Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG]: Delete collection resource leak (single-node Chroma) #3297

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions chromadb/api/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def create_collection(
id=model.id,
name=model.name,
configuration=model.get_configuration(),
segments=[], # Passing empty till backend changes are deployed.
segments=[], # Passing empty till backend changes are deployed.
metadata=model.metadata,
dimension=None, # This is lazily populated on the first add
get_or_create=get_or_create,
Expand Down Expand Up @@ -384,10 +384,11 @@ def delete_collection(
)

if existing:
segments = self._sysdb.get_segments(collection=existing[0].id)
self._sysdb.delete_collection(
existing[0].id, tenant=tenant, database=database
)
self._manager.delete_segments(existing[0].id)
self._manager.delete_segments(segments)
else:
raise ValueError(f"Collection {name} does not exist.")

Expand Down
6 changes: 4 additions & 2 deletions chromadb/segment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,15 @@ class SegmentManager(Component):
segments as required"""

@abstractmethod
def prepare_segments_for_new_collection(self, collection: Collection) -> Sequence[Segment]:
def prepare_segments_for_new_collection(
self, collection: Collection
) -> Sequence[Segment]:
"""Return the segments required for a new collection. Returns only segment data,
does not persist to the SysDB"""
pass

@abstractmethod
def delete_segments(self, collection_id: UUID) -> Sequence[UUID]:
def delete_segments(self, segments: Sequence[Segment]) -> Sequence[UUID]:
"""Delete any local state for all the segments associated with a collection, and
returns a sequence of their IDs. Does not update the SysDB."""
pass
Expand Down
3 changes: 1 addition & 2 deletions chromadb/segment/impl/manager/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ def prepare_segments_for_new_collection(
return [vector_segment, record_segment, metadata_segment]

@override
def delete_segments(self, collection_id: UUID) -> Sequence[UUID]:
segments = self._sysdb.get_segments(collection=collection_id)
def delete_segments(self, segments: Sequence[Segment]) -> Sequence[UUID]:
return [s["id"] for s in segments]

@trace_method(
Expand Down
8 changes: 5 additions & 3 deletions chromadb/segment/impl/manager/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ def reset_state(self) -> None:
OpenTelemetryGranularity.OPERATION_AND_SEGMENT,
)
@override
def prepare_segments_for_new_collection(self, collection: Collection) -> Sequence[Segment]:
def prepare_segments_for_new_collection(
self, collection: Collection
) -> Sequence[Segment]:
vector_segment = _segment(
self._vector_segment_type, SegmentScope.VECTOR, collection
)
Expand All @@ -151,9 +153,9 @@ def prepare_segments_for_new_collection(self, collection: Collection) -> Sequenc
OpenTelemetryGranularity.OPERATION_AND_SEGMENT,
)
@override
def delete_segments(self, collection_id: UUID) -> Sequence[UUID]:
segments = self._sysdb.get_segments(collection=collection_id)
def delete_segments(self, segments: Sequence[Segment]) -> Sequence[UUID]:
for segment in segments:
collection_id = segment["collection"]
if segment["id"] in self._instances:
if segment["type"] == SegmentType.HNSW_LOCAL_PERSISTED.value:
instance = self.get_segment(collection_id, VectorReader)
Expand Down
58 changes: 57 additions & 1 deletion chromadb/test/property/invariants.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
import gc
import math
import os.path
from uuid import UUID
from contextlib import contextmanager

from chromadb.api.segment import SegmentAPI
from chromadb.db.system import SysDB
from chromadb.ingest.impl.utils import create_topic_name

from chromadb.config import System
from chromadb.db.base import get_sql
from chromadb.db.impl.sqlite import SqliteDB
from time import sleep
import psutil

from chromadb.segment import SegmentType
from chromadb.test.property.strategies import NormalizedRecordSet, RecordSet
from typing import Callable, Optional, Tuple, Union, List, TypeVar, cast, Any, Dict
from typing_extensions import Literal
import numpy as np
import numpy.typing as npt
from chromadb.api import types
from chromadb.api import types, ClientAPI
from chromadb.api.models.Collection import Collection
from hypothesis import note
from hypothesis.errors import InvalidArgument
Expand Down Expand Up @@ -457,3 +463,53 @@ def log_size_for_collections_match_expected(

else:
assert _total_embedding_queue_log_size(sqlite) == 0


@contextmanager
def collection_deleted(client: ClientAPI, collection_name: str):
# Invariant checks before deletion
assert collection_name in [c.name for c in client.list_collections()]
collection = client.get_collection(collection_name)
segments = []
if isinstance(client._server, SegmentAPI): # type: ignore
sysdb: SysDB = client._server._sysdb # type: ignore
segments = sysdb.get_segments(collection=collection.id)
segment_types = {}
should_have_hnsw = False
for segment in segments:
segment_types[segment["type"]] = True
if segment["type"] == SegmentType.HNSW_LOCAL_PERSISTED.value:
sync_threshold = (
collection.metadata["hnsw:sync_threshold"]
if collection.metadata is not None
and "hnsw:sync_threshold" in collection.metadata
else 1000
)
if (
collection.count() > sync_threshold
): # we only check if vector segment dir exists if we've synced at least once
should_have_hnsw = True
assert os.path.exists(
os.path.join(
client.get_settings().persist_directory, str(segment["id"])
)
)
if should_have_hnsw:
assert segment_types[SegmentType.HNSW_LOCAL_PERSISTED.value]
assert segment_types[SegmentType.SQLITE.value]

yield

# Invariant checks after deletion
assert collection_name not in [c.name for c in client.list_collections()]
if len(segments) > 0:
sysdb: SysDB = client._server._sysdb # type: ignore
segments_after = sysdb.get_segments(collection=collection.id)
assert len(segments_after) == 0
for segment in segments:
if segment["type"] == SegmentType.HNSW_LOCAL_PERSISTED.value:
assert not os.path.exists(
os.path.join(
client.get_settings().persist_directory, str(segment["id"])
)
)
6 changes: 4 additions & 2 deletions chromadb/test/property/test_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
run_state_machine_as_test,
MultipleResults,
)
import chromadb.test.property.invariants as invariants
from typing import Any, Dict, Mapping, Optional
import numpy
from chromadb.test.property.strategies import hashing_embedding_function
Expand Down Expand Up @@ -75,8 +76,9 @@ def get_coll(self, coll: strategies.ExternalCollection) -> None:
@rule(coll=consumes(collections))
def delete_coll(self, coll: strategies.ExternalCollection) -> None:
if coll.name in self.model:
self.client.delete_collection(name=coll.name)
self.delete_from_model(coll.name)
with invariants.collection_deleted(self.client, coll.name):
self.client.delete_collection(name=coll.name)
self.delete_from_model(coll.name)
else:
with pytest.raises(Exception):
self.client.delete_collection(name=coll.name)
Expand Down
Loading