From 7a635316d26e36eeadff443230e9582962183ef4 Mon Sep 17 00:00:00 2001 From: hammadb Date: Tue, 10 Oct 2023 18:36:17 -0700 Subject: [PATCH] Add coordinator service def. Add generated code. Add basic GrpcSysDB --- .pre-commit-config.yaml | 2 +- chromadb/db/impl/grpc.py | 102 ++++++++++++++++++++ chromadb/proto/coordinator_pb2.py | 43 +++++++++ chromadb/proto/coordinator_pb2.pyi | 89 ++++++++++++++++++ chromadb/proto/coordinator_pb2_grpc.py | 124 +++++++++++++++++++++++++ idl/chromadb/proto/coordinator.proto | 42 +++++++++ 6 files changed, 401 insertions(+), 1 deletion(-) create mode 100644 chromadb/db/impl/grpc.py create mode 100644 chromadb/proto/coordinator_pb2.py create mode 100644 chromadb/proto/coordinator_pb2.pyi create mode 100644 chromadb/proto/coordinator_pb2_grpc.py create mode 100644 idl/chromadb/proto/coordinator.proto diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 750bab0d304a..97763ef52017 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,4 +1,4 @@ -exclude: 'chromadb/proto/chroma_pb2\.(py|pyi|py_grpc\.py)' # Generated files +exclude: 'chromadb/proto/(chroma_pb2|coordinator_pb2)\.(py|pyi|py_grpc\.py)' # Generated files repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.4.0 diff --git a/chromadb/db/impl/grpc.py b/chromadb/db/impl/grpc.py new file mode 100644 index 000000000000..fc2b51685515 --- /dev/null +++ b/chromadb/db/impl/grpc.py @@ -0,0 +1,102 @@ +from typing import Optional, Sequence +from uuid import UUID +from overrides import overrides +from chromadb.config import System +from chromadb.db.system import SysDB +from chromadb.proto.coordinator_pb2_grpc import SysDBStub +from chromadb.types import ( + Collection, + OptionalArgument, + Segment, + SegmentScope, + Unspecified, + UpdateMetadata, +) +import grpc + + +class GrpcSysDB(SysDB): + """A gRPC implementation of the SysDB. In the distributed system, the SysDB is also + called the 'Coordinator'. This implementation is used by Chroma frontend servers + to call a remote SysDB (Coordinator) service.""" + + _sys_db_stub: SysDBStub + _coordinator_url: str + _coordinator_port: int + + def __init__(self, system: System): + self._coordinator_url = system.settings.require("coordinator_host") + # TODO: break out coordinator_port into a separate setting? + self._coordinator_port = system.settings.require("chroma_server_grpc_port") + + @overrides + def start(self) -> None: + channel = grpc.insecure_channel(self._coordinator_url) + self._sys_db_stub = SysDBStub(channel) # type: ignore + return super().start() + + @overrides + def stop(self) -> None: + return super().stop() + + @overrides + def reset_state(self) -> None: + # TODO - remote service should be able to reset state for testing + return super().reset_state() + + @overrides + def create_segment(self, segment: Segment) -> None: + return super().create_segment(segment) + + @overrides + def delete_segment(self, id: UUID) -> None: + raise NotImplementedError() + + @overrides + def get_segments( + self, + id: Optional[UUID] = None, + type: Optional[str] = None, + scope: Optional[SegmentScope] = None, + topic: Optional[str] = None, + collection: Optional[UUID] = None, + ) -> Sequence[Segment]: + raise NotImplementedError() + + @overrides + def update_segment( + self, + id: UUID, + topic: OptionalArgument[Optional[str]] = Unspecified(), + collection: OptionalArgument[Optional[UUID]] = Unspecified(), + metadata: OptionalArgument[Optional[UpdateMetadata]] = Unspecified(), + ) -> None: + raise NotImplementedError() + + @overrides + def create_collection(self, collection: Collection) -> None: + raise NotImplementedError() + + @overrides + def delete_collection(self, id: UUID) -> None: + raise NotImplementedError() + + @overrides + def get_collections( + self, + id: Optional[UUID] = None, + topic: Optional[str] = None, + name: Optional[str] = None, + ) -> Sequence[Collection]: + raise NotImplementedError() + + @overrides + def update_collection( + self, + id: UUID, + topic: OptionalArgument[str] = Unspecified(), + name: OptionalArgument[str] = Unspecified(), + dimension: OptionalArgument[Optional[int]] = Unspecified(), + metadata: OptionalArgument[Optional[UpdateMetadata]] = Unspecified(), + ) -> None: + raise NotImplementedError() diff --git a/chromadb/proto/coordinator_pb2.py b/chromadb/proto/coordinator_pb2.py new file mode 100644 index 000000000000..91b350485019 --- /dev/null +++ b/chromadb/proto/coordinator_pb2.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: chromadb/proto/coordinator.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from chromadb.proto import chroma_pb2 as chromadb_dot_proto_dot_chroma__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n chromadb/proto/coordinator.proto\x12\x06\x63hroma\x1a\x1b\x63hromadb/proto/chroma.proto"&\n\x06Status\x12\x0e\n\x06reason\x18\x01 \x01(\t\x12\x0c\n\x04\x63ode\x18\x02 \x01(\x05"o\n\x17\x43reateCollectionRequest\x12&\n\ncollection\x18\x01 \x01(\x0b\x32\x12.chroma.Collection\x12\x1a\n\rget_or_create\x18\x02 \x01(\x08H\x00\x88\x01\x01\x42\x10\n\x0e_get_or_create"b\n\nCollection\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12-\n\x08metadata\x18\x03 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x00\x88\x01\x01\x42\x0b\n\t_metadata"b\n\x18\x43reateCollectionResponse\x12&\n\ncollection\x18\x01 \x01(\x0b\x32\x12.chroma.Collection\x12\x1e\n\x06status\x18\x02 \x01(\x0b\x32\x0e.chroma.Status"i\n\x15GetCollectionsRequest\x12\x0f\n\x02id\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x11\n\x04name\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x12\n\x05topic\x18\x03 \x01(\tH\x02\x88\x01\x01\x42\x05\n\x03_idB\x07\n\x05_nameB\x08\n\x06_topic"a\n\x16GetCollectionsResponse\x12\'\n\x0b\x63ollections\x18\x01 \x03(\x0b\x32\x12.chroma.Collection\x12\x1e\n\x06status\x18\x02 \x01(\x0b\x32\x0e.chroma.Status2\xb3\x01\n\x05SysDB\x12W\n\x10\x43reateCollection\x12\x1f.chroma.CreateCollectionRequest\x1a .chroma.CreateCollectionResponse"\x00\x12Q\n\x0eGetCollections\x12\x1d.chroma.GetCollectionsRequest\x1a\x1e.chroma.GetCollectionsResponse"\x00\x62\x06proto3' +) + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages( + DESCRIPTOR, "chromadb.proto.coordinator_pb2", _globals +) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals["_STATUS"]._serialized_start = 73 + _globals["_STATUS"]._serialized_end = 111 + _globals["_CREATECOLLECTIONREQUEST"]._serialized_start = 113 + _globals["_CREATECOLLECTIONREQUEST"]._serialized_end = 224 + _globals["_COLLECTION"]._serialized_start = 226 + _globals["_COLLECTION"]._serialized_end = 324 + _globals["_CREATECOLLECTIONRESPONSE"]._serialized_start = 326 + _globals["_CREATECOLLECTIONRESPONSE"]._serialized_end = 424 + _globals["_GETCOLLECTIONSREQUEST"]._serialized_start = 426 + _globals["_GETCOLLECTIONSREQUEST"]._serialized_end = 531 + _globals["_GETCOLLECTIONSRESPONSE"]._serialized_start = 533 + _globals["_GETCOLLECTIONSRESPONSE"]._serialized_end = 630 + _globals["_SYSDB"]._serialized_start = 633 + _globals["_SYSDB"]._serialized_end = 812 +# @@protoc_insertion_point(module_scope) diff --git a/chromadb/proto/coordinator_pb2.pyi b/chromadb/proto/coordinator_pb2.pyi new file mode 100644 index 000000000000..69a76b675373 --- /dev/null +++ b/chromadb/proto/coordinator_pb2.pyi @@ -0,0 +1,89 @@ +from chromadb.proto import chroma_pb2 as _chroma_pb2 +from google.protobuf.internal import containers as _containers +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ( + ClassVar as _ClassVar, + Iterable as _Iterable, + Mapping as _Mapping, + Optional as _Optional, + Union as _Union, +) + +DESCRIPTOR: _descriptor.FileDescriptor + +class Status(_message.Message): + __slots__ = ["reason", "code"] + REASON_FIELD_NUMBER: _ClassVar[int] + CODE_FIELD_NUMBER: _ClassVar[int] + reason: str + code: int + def __init__( + self, reason: _Optional[str] = ..., code: _Optional[int] = ... + ) -> None: ... + +class CreateCollectionRequest(_message.Message): + __slots__ = ["collection", "get_or_create"] + COLLECTION_FIELD_NUMBER: _ClassVar[int] + GET_OR_CREATE_FIELD_NUMBER: _ClassVar[int] + collection: Collection + get_or_create: bool + def __init__( + self, + collection: _Optional[_Union[Collection, _Mapping]] = ..., + get_or_create: bool = ..., + ) -> None: ... + +class Collection(_message.Message): + __slots__ = ["id", "name", "metadata"] + ID_FIELD_NUMBER: _ClassVar[int] + NAME_FIELD_NUMBER: _ClassVar[int] + METADATA_FIELD_NUMBER: _ClassVar[int] + id: str + name: str + metadata: _chroma_pb2.UpdateMetadata + def __init__( + self, + id: _Optional[str] = ..., + name: _Optional[str] = ..., + metadata: _Optional[_Union[_chroma_pb2.UpdateMetadata, _Mapping]] = ..., + ) -> None: ... + +class CreateCollectionResponse(_message.Message): + __slots__ = ["collection", "status"] + COLLECTION_FIELD_NUMBER: _ClassVar[int] + STATUS_FIELD_NUMBER: _ClassVar[int] + collection: Collection + status: Status + def __init__( + self, + collection: _Optional[_Union[Collection, _Mapping]] = ..., + status: _Optional[_Union[Status, _Mapping]] = ..., + ) -> None: ... + +class GetCollectionsRequest(_message.Message): + __slots__ = ["id", "name", "topic"] + ID_FIELD_NUMBER: _ClassVar[int] + NAME_FIELD_NUMBER: _ClassVar[int] + TOPIC_FIELD_NUMBER: _ClassVar[int] + id: str + name: str + topic: str + def __init__( + self, + id: _Optional[str] = ..., + name: _Optional[str] = ..., + topic: _Optional[str] = ..., + ) -> None: ... + +class GetCollectionsResponse(_message.Message): + __slots__ = ["collections", "status"] + COLLECTIONS_FIELD_NUMBER: _ClassVar[int] + STATUS_FIELD_NUMBER: _ClassVar[int] + collections: _containers.RepeatedCompositeFieldContainer[Collection] + status: Status + def __init__( + self, + collections: _Optional[_Iterable[_Union[Collection, _Mapping]]] = ..., + status: _Optional[_Union[Status, _Mapping]] = ..., + ) -> None: ... diff --git a/chromadb/proto/coordinator_pb2_grpc.py b/chromadb/proto/coordinator_pb2_grpc.py new file mode 100644 index 000000000000..3da8aaba7928 --- /dev/null +++ b/chromadb/proto/coordinator_pb2_grpc.py @@ -0,0 +1,124 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from chromadb.proto import coordinator_pb2 as chromadb_dot_proto_dot_coordinator__pb2 + + +class SysDBStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.CreateCollection = channel.unary_unary( + "/chroma.SysDB/CreateCollection", + request_serializer=chromadb_dot_proto_dot_coordinator__pb2.CreateCollectionRequest.SerializeToString, + response_deserializer=chromadb_dot_proto_dot_coordinator__pb2.CreateCollectionResponse.FromString, + ) + self.GetCollections = channel.unary_unary( + "/chroma.SysDB/GetCollections", + request_serializer=chromadb_dot_proto_dot_coordinator__pb2.GetCollectionsRequest.SerializeToString, + response_deserializer=chromadb_dot_proto_dot_coordinator__pb2.GetCollectionsResponse.FromString, + ) + + +class SysDBServicer(object): + """Missing associated documentation comment in .proto file.""" + + def CreateCollection(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def GetCollections(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + +def add_SysDBServicer_to_server(servicer, server): + rpc_method_handlers = { + "CreateCollection": grpc.unary_unary_rpc_method_handler( + servicer.CreateCollection, + request_deserializer=chromadb_dot_proto_dot_coordinator__pb2.CreateCollectionRequest.FromString, + response_serializer=chromadb_dot_proto_dot_coordinator__pb2.CreateCollectionResponse.SerializeToString, + ), + "GetCollections": grpc.unary_unary_rpc_method_handler( + servicer.GetCollections, + request_deserializer=chromadb_dot_proto_dot_coordinator__pb2.GetCollectionsRequest.FromString, + response_serializer=chromadb_dot_proto_dot_coordinator__pb2.GetCollectionsResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + "chroma.SysDB", rpc_method_handlers + ) + server.add_generic_rpc_handlers((generic_handler,)) + + +# This class is part of an EXPERIMENTAL API. +class SysDB(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def CreateCollection( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/chroma.SysDB/CreateCollection", + chromadb_dot_proto_dot_coordinator__pb2.CreateCollectionRequest.SerializeToString, + chromadb_dot_proto_dot_coordinator__pb2.CreateCollectionResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def GetCollections( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/chroma.SysDB/GetCollections", + chromadb_dot_proto_dot_coordinator__pb2.GetCollectionsRequest.SerializeToString, + chromadb_dot_proto_dot_coordinator__pb2.GetCollectionsResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/idl/chromadb/proto/coordinator.proto b/idl/chromadb/proto/coordinator.proto new file mode 100644 index 000000000000..1148c197328b --- /dev/null +++ b/idl/chromadb/proto/coordinator.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; + +package chroma; + +import "chromadb/proto/chroma.proto"; + +message Status { + string reason = 1; + int32 code = 2; +} + +message CreateCollectionRequest { + Collection collection = 1; + optional bool get_or_create = 2; +} + +message Collection { + string id = 1; + string name = 2; + optional UpdateMetadata metadata = 3; +} + +message CreateCollectionResponse { + Collection collection = 1; + Status status = 2; +} + +message GetCollectionsRequest { + optional string id = 1; + optional string name = 2; + optional string topic = 3; +} + +message GetCollectionsResponse { + repeated Collection collections = 1; + Status status = 2; +} + +service SysDB { + rpc CreateCollection(CreateCollectionRequest) returns (CreateCollectionResponse) {} + rpc GetCollections(GetCollectionsRequest) returns (GetCollectionsResponse) {} +}