Skip to content

Commit

Permalink
Add coordinator service def. Add generated code. Add basic GrpcSysDB
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Oct 13, 2023
1 parent 677a8bb commit 7a63531
Show file tree
Hide file tree
Showing 6 changed files with 401 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
exclude: 'chromadb/proto/chroma_pb2\.(py|pyi|py_grpc\.py)' # Generated files
exclude: 'chromadb/proto/(chroma_pb2|coordinator_pb2)\.(py|pyi|py_grpc\.py)' # Generated files
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
Expand Down
102 changes: 102 additions & 0 deletions chromadb/db/impl/grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from typing import Optional, Sequence
from uuid import UUID
from overrides import overrides
from chromadb.config import System
from chromadb.db.system import SysDB
from chromadb.proto.coordinator_pb2_grpc import SysDBStub
from chromadb.types import (
Collection,
OptionalArgument,
Segment,
SegmentScope,
Unspecified,
UpdateMetadata,
)
import grpc


class GrpcSysDB(SysDB):
"""A gRPC implementation of the SysDB. In the distributed system, the SysDB is also
called the 'Coordinator'. This implementation is used by Chroma frontend servers
to call a remote SysDB (Coordinator) service."""

_sys_db_stub: SysDBStub
_coordinator_url: str
_coordinator_port: int

def __init__(self, system: System):
self._coordinator_url = system.settings.require("coordinator_host")
# TODO: break out coordinator_port into a separate setting?
self._coordinator_port = system.settings.require("chroma_server_grpc_port")

@overrides
def start(self) -> None:
channel = grpc.insecure_channel(self._coordinator_url)
self._sys_db_stub = SysDBStub(channel) # type: ignore
return super().start()

@overrides
def stop(self) -> None:
return super().stop()

@overrides
def reset_state(self) -> None:
# TODO - remote service should be able to reset state for testing
return super().reset_state()

@overrides
def create_segment(self, segment: Segment) -> None:
return super().create_segment(segment)

@overrides
def delete_segment(self, id: UUID) -> None:
raise NotImplementedError()

@overrides
def get_segments(
self,
id: Optional[UUID] = None,
type: Optional[str] = None,
scope: Optional[SegmentScope] = None,
topic: Optional[str] = None,
collection: Optional[UUID] = None,
) -> Sequence[Segment]:
raise NotImplementedError()

@overrides
def update_segment(
self,
id: UUID,
topic: OptionalArgument[Optional[str]] = Unspecified(),
collection: OptionalArgument[Optional[UUID]] = Unspecified(),
metadata: OptionalArgument[Optional[UpdateMetadata]] = Unspecified(),
) -> None:
raise NotImplementedError()

@overrides
def create_collection(self, collection: Collection) -> None:
raise NotImplementedError()

@overrides
def delete_collection(self, id: UUID) -> None:
raise NotImplementedError()

@overrides
def get_collections(
self,
id: Optional[UUID] = None,
topic: Optional[str] = None,
name: Optional[str] = None,
) -> Sequence[Collection]:
raise NotImplementedError()

@overrides
def update_collection(
self,
id: UUID,
topic: OptionalArgument[str] = Unspecified(),
name: OptionalArgument[str] = Unspecified(),
dimension: OptionalArgument[Optional[int]] = Unspecified(),
metadata: OptionalArgument[Optional[UpdateMetadata]] = Unspecified(),
) -> None:
raise NotImplementedError()
43 changes: 43 additions & 0 deletions chromadb/proto/coordinator_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 89 additions & 0 deletions chromadb/proto/coordinator_pb2.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from chromadb.proto import chroma_pb2 as _chroma_pb2
from google.protobuf.internal import containers as _containers
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import (
ClassVar as _ClassVar,
Iterable as _Iterable,
Mapping as _Mapping,
Optional as _Optional,
Union as _Union,
)

DESCRIPTOR: _descriptor.FileDescriptor

class Status(_message.Message):
__slots__ = ["reason", "code"]
REASON_FIELD_NUMBER: _ClassVar[int]
CODE_FIELD_NUMBER: _ClassVar[int]
reason: str
code: int
def __init__(
self, reason: _Optional[str] = ..., code: _Optional[int] = ...
) -> None: ...

class CreateCollectionRequest(_message.Message):
__slots__ = ["collection", "get_or_create"]
COLLECTION_FIELD_NUMBER: _ClassVar[int]
GET_OR_CREATE_FIELD_NUMBER: _ClassVar[int]
collection: Collection
get_or_create: bool
def __init__(
self,
collection: _Optional[_Union[Collection, _Mapping]] = ...,
get_or_create: bool = ...,
) -> None: ...

class Collection(_message.Message):
__slots__ = ["id", "name", "metadata"]
ID_FIELD_NUMBER: _ClassVar[int]
NAME_FIELD_NUMBER: _ClassVar[int]
METADATA_FIELD_NUMBER: _ClassVar[int]
id: str
name: str
metadata: _chroma_pb2.UpdateMetadata
def __init__(
self,
id: _Optional[str] = ...,
name: _Optional[str] = ...,
metadata: _Optional[_Union[_chroma_pb2.UpdateMetadata, _Mapping]] = ...,
) -> None: ...

class CreateCollectionResponse(_message.Message):
__slots__ = ["collection", "status"]
COLLECTION_FIELD_NUMBER: _ClassVar[int]
STATUS_FIELD_NUMBER: _ClassVar[int]
collection: Collection
status: Status
def __init__(
self,
collection: _Optional[_Union[Collection, _Mapping]] = ...,
status: _Optional[_Union[Status, _Mapping]] = ...,
) -> None: ...

class GetCollectionsRequest(_message.Message):
__slots__ = ["id", "name", "topic"]
ID_FIELD_NUMBER: _ClassVar[int]
NAME_FIELD_NUMBER: _ClassVar[int]
TOPIC_FIELD_NUMBER: _ClassVar[int]
id: str
name: str
topic: str
def __init__(
self,
id: _Optional[str] = ...,
name: _Optional[str] = ...,
topic: _Optional[str] = ...,
) -> None: ...

class GetCollectionsResponse(_message.Message):
__slots__ = ["collections", "status"]
COLLECTIONS_FIELD_NUMBER: _ClassVar[int]
STATUS_FIELD_NUMBER: _ClassVar[int]
collections: _containers.RepeatedCompositeFieldContainer[Collection]
status: Status
def __init__(
self,
collections: _Optional[_Iterable[_Union[Collection, _Mapping]]] = ...,
status: _Optional[_Union[Status, _Mapping]] = ...,
) -> None: ...
124 changes: 124 additions & 0 deletions chromadb/proto/coordinator_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

from chromadb.proto import coordinator_pb2 as chromadb_dot_proto_dot_coordinator__pb2


class SysDBStub(object):
"""Missing associated documentation comment in .proto file."""

def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.CreateCollection = channel.unary_unary(
"/chroma.SysDB/CreateCollection",
request_serializer=chromadb_dot_proto_dot_coordinator__pb2.CreateCollectionRequest.SerializeToString,
response_deserializer=chromadb_dot_proto_dot_coordinator__pb2.CreateCollectionResponse.FromString,
)
self.GetCollections = channel.unary_unary(
"/chroma.SysDB/GetCollections",
request_serializer=chromadb_dot_proto_dot_coordinator__pb2.GetCollectionsRequest.SerializeToString,
response_deserializer=chromadb_dot_proto_dot_coordinator__pb2.GetCollectionsResponse.FromString,
)


class SysDBServicer(object):
"""Missing associated documentation comment in .proto file."""

def CreateCollection(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")

def GetCollections(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")


def add_SysDBServicer_to_server(servicer, server):
rpc_method_handlers = {
"CreateCollection": grpc.unary_unary_rpc_method_handler(
servicer.CreateCollection,
request_deserializer=chromadb_dot_proto_dot_coordinator__pb2.CreateCollectionRequest.FromString,
response_serializer=chromadb_dot_proto_dot_coordinator__pb2.CreateCollectionResponse.SerializeToString,
),
"GetCollections": grpc.unary_unary_rpc_method_handler(
servicer.GetCollections,
request_deserializer=chromadb_dot_proto_dot_coordinator__pb2.GetCollectionsRequest.FromString,
response_serializer=chromadb_dot_proto_dot_coordinator__pb2.GetCollectionsResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
"chroma.SysDB", rpc_method_handlers
)
server.add_generic_rpc_handlers((generic_handler,))


# This class is part of an EXPERIMENTAL API.
class SysDB(object):
"""Missing associated documentation comment in .proto file."""

@staticmethod
def CreateCollection(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/chroma.SysDB/CreateCollection",
chromadb_dot_proto_dot_coordinator__pb2.CreateCollectionRequest.SerializeToString,
chromadb_dot_proto_dot_coordinator__pb2.CreateCollectionResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)

@staticmethod
def GetCollections(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/chroma.SysDB/GetCollections",
chromadb_dot_proto_dot_coordinator__pb2.GetCollectionsRequest.SerializeToString,
chromadb_dot_proto_dot_coordinator__pb2.GetCollectionsResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
Loading

0 comments on commit 7a63531

Please sign in to comment.