Skip to content

Commit

Permalink
Move delete topic down as well
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Oct 16, 2023
1 parent a9d654b commit b78877a
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 71 deletions.
23 changes: 4 additions & 19 deletions chromadb/api/segment.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from chromadb.api import API
from chromadb.config import Settings, System
from chromadb.db.system import SysDB
from chromadb.ingest.impl.utils import create_topic_name
from chromadb.segment import SegmentManager, MetadataReader, VectorReader
from chromadb.telemetry import Telemetry
from chromadb.ingest import Producer
Expand Down Expand Up @@ -79,10 +78,7 @@ class SegmentAPI(API):
_sysdb: SysDB
_manager: SegmentManager
_producer: Producer
# TODO: fire telemetry events
_telemetry_client: Telemetry
_tenant_id: str
_topic_ns: str
_collection_cache: Dict[UUID, t.Collection]

def __init__(self, system: System):
Expand All @@ -92,8 +88,6 @@ def __init__(self, system: System):
self._manager = self.require(SegmentManager)
self._telemetry_client = self.require(Telemetry)
self._producer = self.require(Producer)
self._tenant_id = system.settings.tenant_id
self._topic_ns = system.settings.topic_namespace
self._collection_cache = {}

@override
Expand Down Expand Up @@ -135,15 +129,12 @@ def create_collection(
check_index_name(name)

id = uuid4()
coll = t.Collection(
id=id, name=name, metadata=metadata, topic=self._topic(id), dimension=None

coll = self._sysdb.create_collection(
id=id, name=name, metadata=metadata, dimension=None
)
# TODO: Topic creation right now lives in the producer but it should be moved to the coordinator,
# and the producer should just be responsible for publishing messages. Coordinator should
# be responsible for all management of topics.
self._producer.create_topic(coll["topic"])
segments = self._manager.create_segments(coll)
self._sysdb.create_collection(coll)

for segment in segments:
self._sysdb.create_segment(segment)

Expand Down Expand Up @@ -244,7 +235,6 @@ def delete_collection(self, name: str) -> None:
self._sysdb.delete_collection(existing[0]["id"])
for s in self._manager.delete_segments(existing[0]["id"]):
self._sysdb.delete_segment(s)
self._producer.delete_topic(existing[0]["topic"])
if existing and existing[0]["id"] in self._collection_cache:
del self._collection_cache[existing[0]["id"]]
else:
Expand Down Expand Up @@ -620,13 +610,8 @@ def get_settings(self) -> Settings:
def max_batch_size(self) -> int:
return self._producer.max_batch_size

def _topic(self, collection_id: UUID) -> str:
return create_topic_name(self._tenant_id, self._topic_ns, str(collection_id))

# TODO: This could potentially cause race conditions in a distributed version of the
# system, since the cache is only local.
# TODO: promote collection -> topic to a base class method so that it can be
# used for channel assignment in the distributed version of the system.
def _validate_embedding_record(
self, collection: t.Collection, record: t.SubmitEmbeddingRecord
) -> None:
Expand Down
7 changes: 6 additions & 1 deletion chromadb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"chromadb.telemetry.Telemetry": "chroma_telemetry_impl",
"chromadb.ingest.Producer": "chroma_producer_impl",
"chromadb.ingest.Consumer": "chroma_consumer_impl",
"chromadb.ingest.CollectionAssignmentPolicy": "chroma_collection_assignment_policy_impl", # noqa
"chromadb.db.system.SysDB": "chroma_sysdb_impl",
"chromadb.segment.SegmentManager": "chroma_segment_manager_impl",
"chromadb.segment.distributed.SegmentDirectory": "chroma_segment_directory_impl",
Expand All @@ -77,7 +78,8 @@
class Settings(BaseSettings): # type: ignore
environment: str = ""

# Legacy config has to be kept around because pydantic will error on nonexisting keys
# Legacy config has to be kept around because pydantic will error
# on nonexisting keys
chroma_db_impl: Optional[str] = None

chroma_api_impl: str = "chromadb.api.segment.SegmentAPI" # Can be "chromadb.api.segment.SegmentAPI" or "chromadb.api.fastapi.FastAPI"
Expand All @@ -94,6 +96,9 @@ class Settings(BaseSettings): # type: ignore
# Distributed architecture specific components
chroma_segment_directory_impl: str = "chromadb.segment.impl.distributed.segment_directory.RendezvousHashSegmentDirectory"
chroma_memberlist_provider_impl: str = "chromadb.segment.impl.distributed.segment_directory.CustomResourceMemberlistProvider"
chroma_collection_assignment_policy_impl: str = (
"chromadb.ingest.impl.simple_policy.SimpleAssignmentPolicy"
)
worker_memberlist_name: str = "worker-memberlist"
chroma_coordinator_host = "localhost"

Expand Down
21 changes: 20 additions & 1 deletion chromadb/db/impl/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from chromadb.config import System
from chromadb.db.base import NotFoundError, UniqueConstraintError
from chromadb.db.system import SysDB
from chromadb.ingest import CollectionAssignmentPolicy
from chromadb.proto.convert import (
from_proto_collection,
from_proto_segment,
Expand All @@ -26,6 +27,7 @@
from chromadb.proto.coordinator_pb2_grpc import SysDBStub
from chromadb.types import (
Collection,
Metadata,
OptionalArgument,
Segment,
SegmentScope,
Expand All @@ -42,6 +44,7 @@ class GrpcSysDB(SysDB):
to call a remote SysDB (Coordinator) service."""

_sys_db_stub: SysDBStub
_assignment_policy: CollectionAssignmentPolicy
_channel: grpc.Channel
_coordinator_url: str
_coordinator_port: int
Expand All @@ -50,6 +53,7 @@ def __init__(self, system: System):
self._coordinator_url = system.settings.require("chroma_coordinator_host")
# TODO: break out coordinator_port into a separate setting?
self._coordinator_port = system.settings.require("chroma_server_grpc_port")
self._assignment_policy = system.instance(CollectionAssignmentPolicy)
return super().__init__(system)

@overrides
Expand Down Expand Up @@ -156,15 +160,30 @@ def update_segment(
self._sys_db_stub.UpdateSegment(request)

@overrides
def create_collection(self, collection: Collection) -> None:
def create_collection(
self,
id: UUID,
name: str,
metadata: Optional[Metadata] = None,
dimension: Optional[int] = None,
) -> Collection:
# TODO: the get_or_create concept needs to be pushed down to the sysdb interface
topic = self._assignment_policy.assign_collection(id)
collection = Collection(
id=id,
name=name,
topic=topic,
metadata=metadata,
dimension=dimension,
)
request = CreateCollectionRequest(
collection=to_proto_collection(collection),
get_or_create=False,
)
response = self._sys_db_stub.CreateCollection(request)
if response.status.code == 409:
raise UniqueConstraintError()
return collection

@overrides
def delete_collection(self, id: UUID) -> None:
Expand Down
32 changes: 29 additions & 3 deletions chromadb/db/mixins/sysdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
UniqueConstraintError,
)
from chromadb.db.system import SysDB
from chromadb.ingest import CollectionAssignmentPolicy, Producer
from chromadb.types import (
OptionalArgument,
Segment,
Expand All @@ -26,9 +27,20 @@


class SqlSysDB(SqlDB, SysDB):
_assignment_policy: CollectionAssignmentPolicy
# Used only to delete topics on collection deletion.
# TODO: refactor to remove this dependency into a separate interface
_producer: Producer

def __init__(self, system: System):
self._assignment_policy = system.instance(CollectionAssignmentPolicy)
super().__init__(system)

@override
def start(self) -> None:
super().start()
self._producer = self._system.instance(Producer)

@override
def create_segment(self, segment: Segment) -> None:
with self.tx() as cur:
Expand Down Expand Up @@ -69,8 +81,20 @@ def create_segment(self, segment: Segment) -> None:
)

@override
def create_collection(self, collection: Collection) -> None:
"""Create a new collection"""
def create_collection(
self,
id: UUID,
name: str,
metadata: Optional[Metadata] = None,
dimension: Optional[int] = None,
) -> Collection:
"""Create a new collection and the associate topic"""

topic = self._assignment_policy.assign_collection(id)
collection = Collection(
id=id, topic=topic, name=name, metadata=metadata, dimension=dimension
)

with self.tx() as cur:
collections = Table("collections")
insert_collection = (
Expand Down Expand Up @@ -105,6 +129,7 @@ def create_collection(self, collection: Collection) -> None:
collection["id"],
collection["metadata"],
)
return collection

@override
def get_segments(
Expand Down Expand Up @@ -263,10 +288,11 @@ def delete_collection(self, id: UUID) -> None:
with self.tx() as cur:
# no need for explicit del from metadata table because of ON DELETE CASCADE
sql, params = get_sql(q, self.parameter_format())
sql = sql + " RETURNING id"
sql = sql + " RETURNING id, topic"
result = cur.execute(sql, params).fetchone()
if not result:
raise NotFoundError(f"Collection {id} not found")
self._producer.delete_topic(result[1])

@override
def update_segment(
Expand Down
15 changes: 12 additions & 3 deletions chromadb/db/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from uuid import UUID
from chromadb.types import (
Collection,
Metadata,
Segment,
SegmentScope,
OptionalArgument,
Expand Down Expand Up @@ -52,13 +53,21 @@ def update_segment(
pass

@abstractmethod
def create_collection(self, collection: Collection) -> None:
"""Create a new collection any associated resources in the SysDB."""
def create_collection(
self,
id: UUID,
name: str,
metadata: Optional[Metadata] = None,
dimension: Optional[int] = None,
) -> Collection:
"""Create a new collection any associated resources
(Such as the necessary topics) in the SysDB."""
pass

@abstractmethod
def delete_collection(self, id: UUID) -> None:
"""Delete a topic and all associated segments from the SysDB"""
"""Delete a collection, topic, all associated segments and any associate resources
from the SysDB and the system at large."""
pass

@abstractmethod
Expand Down
9 changes: 9 additions & 0 deletions chromadb/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,12 @@ def min_seqid(self) -> SeqId:
def max_seqid(self) -> SeqId:
"""Return the maximum possible SeqID in this implementation."""
pass


class CollectionAssignmentPolicy(Component):
"""Interface for assigning collections to topics"""

@abstractmethod
def assign_collection(self, collection_id: UUID) -> str:
"""Return the topic that should be used for the given collection"""
pass
25 changes: 25 additions & 0 deletions chromadb/ingest/impl/simple_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from uuid import UUID
from overrides import overrides
from chromadb.config import System
from chromadb.ingest import CollectionAssignmentPolicy
from chromadb.ingest.impl.utils import create_topic_name


class SimpleAssignmentPolicy(CollectionAssignmentPolicy):
"""Simple assignment policy that assigns a 1 collection to 1 topic based on the
id of the collection."""

_tenant_id: str
_topic_ns: str

def __init__(self, system: System):
self._tenant_id = system.settings.tenant_id
self._topic_ns = system.settings.topic_namespace
super().__init__(system)

def _topic(self, collection_id: UUID) -> str:
return create_topic_name(self._tenant_id, self._topic_ns, str(collection_id))

@overrides
def assign_collection(self, collection_id: UUID) -> str:
return self._topic(collection_id)
Loading

0 comments on commit b78877a

Please sign in to comment.