From 44e8ff7390853067f287fe9e76817c3097c7e95e Mon Sep 17 00:00:00 2001 From: Weili Gu <3451471+weiligu@users.noreply.github.com> Date: Wed, 28 Feb 2024 12:34:24 -0800 Subject: [PATCH] [ENH] get collection info for compactor (#1778) ## Description of changes https://linear.app/trychroma/issue/CHR-293/get-collection-ids-for-compactor - get collection information for collections that need to be compacted, order by timestamps of the log - DB retry is not included ## Test plan - [ ] record_log_test and record_log_service_test --- .gitignore | 1 + chromadb/proto/chroma_pb2.py | 9 +- chromadb/proto/chroma_pb2.pyi | 40 +-- chromadb/proto/coordinator_pb2.py | 5 +- chromadb/proto/coordinator_pb2.pyi | 52 ++-- chromadb/proto/logservice_pb2.py | 42 ++- chromadb/proto/logservice_pb2.pyi | 76 ++++- chromadb/proto/logservice_pb2_grpc.py | 166 +++++++--- go/internal/logservice/apis.go | 5 + .../logservice/grpc/record_log_service.go | 33 ++ .../grpc/record_log_service_test.go | 70 +++-- .../testutils/record_log_test_util.go | 50 +++ .../metastore/coordinator/table_catalog.go | 13 +- go/internal/metastore/db/dao/record_log.go | 38 +++ .../metastore/db/dao/record_log_test.go | 84 +++-- .../metastore/db/dbmodel/collection.go | 19 +- .../metastore/db/dbmodel/record_log.go | 5 +- go/internal/proto/coordinatorpb/chroma.pb.go | 4 +- .../proto/coordinatorpb/chroma_grpc.pb.go | 2 +- .../proto/coordinatorpb/coordinator.pb.go | 4 +- .../coordinatorpb/coordinator_grpc.pb.go | 2 +- .../proto/logservicepb/logservice.pb.go | 286 +++++++++++++++--- .../proto/logservicepb/logservice_grpc.pb.go | 38 ++- ...{20240226214452.sql => 20240227232039.sql} | 5 +- go/migrations/atlas.sum | 4 +- idl/chromadb/proto/logservice.proto | 16 + 26 files changed, 818 insertions(+), 251 deletions(-) create mode 100644 go/internal/logservice/testutils/record_log_test_util.go rename go/migrations/{20240226214452.sql => 20240227232039.sql} (94%) diff --git a/.gitignore b/.gitignore index b4b8e402b5b..55bbe47612e 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ go/bin/ go/**/testdata/ +go/coordinator/bin/ *.log diff --git a/chromadb/proto/chroma_pb2.py b/chromadb/proto/chroma_pb2.py index e8eb2804696..a12f8713439 100644 --- a/chromadb/proto/chroma_pb2.py +++ b/chromadb/proto/chroma_pb2.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: chromadb/proto/chroma.proto -# Protobuf Python Version: 4.25.0 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool @@ -20,10 +19,10 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'chromadb.proto.chroma_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: - _globals['DESCRIPTOR']._options = None - _globals['DESCRIPTOR']._serialized_options = b'ZAgithub.com/chroma/chroma-coordinator/internal/proto/coordinatorpb' - _globals['_UPDATEMETADATA_METADATAENTRY']._options = None - _globals['_UPDATEMETADATA_METADATAENTRY']._serialized_options = b'8\001' + DESCRIPTOR._options = None + DESCRIPTOR._serialized_options = b'ZAgithub.com/chroma/chroma-coordinator/internal/proto/coordinatorpb' + _UPDATEMETADATA_METADATAENTRY._options = None + _UPDATEMETADATA_METADATAENTRY._serialized_options = b'8\001' _globals['_OPERATION']._serialized_start=1693 _globals['_OPERATION']._serialized_end=1749 _globals['_SCALARENCODING']._serialized_start=1751 diff --git a/chromadb/proto/chroma_pb2.pyi b/chromadb/proto/chroma_pb2.pyi index 6a0132e0945..9fb730ca6d9 100644 --- a/chromadb/proto/chroma_pb2.pyi +++ b/chromadb/proto/chroma_pb2.pyi @@ -7,19 +7,19 @@ from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Map DESCRIPTOR: _descriptor.FileDescriptor class Operation(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): - __slots__ = () + __slots__ = [] ADD: _ClassVar[Operation] UPDATE: _ClassVar[Operation] UPSERT: _ClassVar[Operation] DELETE: _ClassVar[Operation] class ScalarEncoding(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): - __slots__ = () + __slots__ = [] FLOAT32: _ClassVar[ScalarEncoding] INT32: _ClassVar[ScalarEncoding] class SegmentScope(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): - __slots__ = () + __slots__ = [] VECTOR: _ClassVar[SegmentScope] METADATA: _ClassVar[SegmentScope] ADD: Operation @@ -32,7 +32,7 @@ VECTOR: SegmentScope METADATA: SegmentScope class Status(_message.Message): - __slots__ = ("reason", "code") + __slots__ = ["reason", "code"] REASON_FIELD_NUMBER: _ClassVar[int] CODE_FIELD_NUMBER: _ClassVar[int] reason: str @@ -40,7 +40,7 @@ class Status(_message.Message): def __init__(self, reason: _Optional[str] = ..., code: _Optional[int] = ...) -> None: ... class Vector(_message.Message): - __slots__ = ("dimension", "vector", "encoding") + __slots__ = ["dimension", "vector", "encoding"] DIMENSION_FIELD_NUMBER: _ClassVar[int] VECTOR_FIELD_NUMBER: _ClassVar[int] ENCODING_FIELD_NUMBER: _ClassVar[int] @@ -50,7 +50,7 @@ class Vector(_message.Message): def __init__(self, dimension: _Optional[int] = ..., vector: _Optional[bytes] = ..., encoding: _Optional[_Union[ScalarEncoding, str]] = ...) -> None: ... class Segment(_message.Message): - __slots__ = ("id", "type", "scope", "topic", "collection", "metadata") + __slots__ = ["id", "type", "scope", "topic", "collection", "metadata"] ID_FIELD_NUMBER: _ClassVar[int] TYPE_FIELD_NUMBER: _ClassVar[int] SCOPE_FIELD_NUMBER: _ClassVar[int] @@ -66,7 +66,7 @@ class Segment(_message.Message): def __init__(self, id: _Optional[str] = ..., type: _Optional[str] = ..., scope: _Optional[_Union[SegmentScope, str]] = ..., topic: _Optional[str] = ..., collection: _Optional[str] = ..., metadata: _Optional[_Union[UpdateMetadata, _Mapping]] = ...) -> None: ... class Collection(_message.Message): - __slots__ = ("id", "name", "topic", "metadata", "dimension", "tenant", "database") + __slots__ = ["id", "name", "topic", "metadata", "dimension", "tenant", "database"] ID_FIELD_NUMBER: _ClassVar[int] NAME_FIELD_NUMBER: _ClassVar[int] TOPIC_FIELD_NUMBER: _ClassVar[int] @@ -84,7 +84,7 @@ class Collection(_message.Message): def __init__(self, id: _Optional[str] = ..., name: _Optional[str] = ..., topic: _Optional[str] = ..., metadata: _Optional[_Union[UpdateMetadata, _Mapping]] = ..., dimension: _Optional[int] = ..., tenant: _Optional[str] = ..., database: _Optional[str] = ...) -> None: ... class Database(_message.Message): - __slots__ = ("id", "name", "tenant") + __slots__ = ["id", "name", "tenant"] ID_FIELD_NUMBER: _ClassVar[int] NAME_FIELD_NUMBER: _ClassVar[int] TENANT_FIELD_NUMBER: _ClassVar[int] @@ -94,13 +94,13 @@ class Database(_message.Message): def __init__(self, id: _Optional[str] = ..., name: _Optional[str] = ..., tenant: _Optional[str] = ...) -> None: ... class Tenant(_message.Message): - __slots__ = ("name",) + __slots__ = ["name"] NAME_FIELD_NUMBER: _ClassVar[int] name: str def __init__(self, name: _Optional[str] = ...) -> None: ... class UpdateMetadataValue(_message.Message): - __slots__ = ("string_value", "int_value", "float_value") + __slots__ = ["string_value", "int_value", "float_value"] STRING_VALUE_FIELD_NUMBER: _ClassVar[int] INT_VALUE_FIELD_NUMBER: _ClassVar[int] FLOAT_VALUE_FIELD_NUMBER: _ClassVar[int] @@ -110,9 +110,9 @@ class UpdateMetadataValue(_message.Message): def __init__(self, string_value: _Optional[str] = ..., int_value: _Optional[int] = ..., float_value: _Optional[float] = ...) -> None: ... class UpdateMetadata(_message.Message): - __slots__ = ("metadata",) + __slots__ = ["metadata"] class MetadataEntry(_message.Message): - __slots__ = ("key", "value") + __slots__ = ["key", "value"] KEY_FIELD_NUMBER: _ClassVar[int] VALUE_FIELD_NUMBER: _ClassVar[int] key: str @@ -123,7 +123,7 @@ class UpdateMetadata(_message.Message): def __init__(self, metadata: _Optional[_Mapping[str, UpdateMetadataValue]] = ...) -> None: ... class SubmitEmbeddingRecord(_message.Message): - __slots__ = ("id", "vector", "metadata", "operation", "collection_id") + __slots__ = ["id", "vector", "metadata", "operation", "collection_id"] ID_FIELD_NUMBER: _ClassVar[int] VECTOR_FIELD_NUMBER: _ClassVar[int] METADATA_FIELD_NUMBER: _ClassVar[int] @@ -137,7 +137,7 @@ class SubmitEmbeddingRecord(_message.Message): def __init__(self, id: _Optional[str] = ..., vector: _Optional[_Union[Vector, _Mapping]] = ..., metadata: _Optional[_Union[UpdateMetadata, _Mapping]] = ..., operation: _Optional[_Union[Operation, str]] = ..., collection_id: _Optional[str] = ...) -> None: ... class VectorEmbeddingRecord(_message.Message): - __slots__ = ("id", "seq_id", "vector") + __slots__ = ["id", "seq_id", "vector"] ID_FIELD_NUMBER: _ClassVar[int] SEQ_ID_FIELD_NUMBER: _ClassVar[int] VECTOR_FIELD_NUMBER: _ClassVar[int] @@ -147,7 +147,7 @@ class VectorEmbeddingRecord(_message.Message): def __init__(self, id: _Optional[str] = ..., seq_id: _Optional[bytes] = ..., vector: _Optional[_Union[Vector, _Mapping]] = ...) -> None: ... class VectorQueryResult(_message.Message): - __slots__ = ("id", "seq_id", "distance", "vector") + __slots__ = ["id", "seq_id", "distance", "vector"] ID_FIELD_NUMBER: _ClassVar[int] SEQ_ID_FIELD_NUMBER: _ClassVar[int] DISTANCE_FIELD_NUMBER: _ClassVar[int] @@ -159,13 +159,13 @@ class VectorQueryResult(_message.Message): def __init__(self, id: _Optional[str] = ..., seq_id: _Optional[bytes] = ..., distance: _Optional[float] = ..., vector: _Optional[_Union[Vector, _Mapping]] = ...) -> None: ... class VectorQueryResults(_message.Message): - __slots__ = ("results",) + __slots__ = ["results"] RESULTS_FIELD_NUMBER: _ClassVar[int] results: _containers.RepeatedCompositeFieldContainer[VectorQueryResult] def __init__(self, results: _Optional[_Iterable[_Union[VectorQueryResult, _Mapping]]] = ...) -> None: ... class GetVectorsRequest(_message.Message): - __slots__ = ("ids", "segment_id") + __slots__ = ["ids", "segment_id"] IDS_FIELD_NUMBER: _ClassVar[int] SEGMENT_ID_FIELD_NUMBER: _ClassVar[int] ids: _containers.RepeatedScalarFieldContainer[str] @@ -173,13 +173,13 @@ class GetVectorsRequest(_message.Message): def __init__(self, ids: _Optional[_Iterable[str]] = ..., segment_id: _Optional[str] = ...) -> None: ... class GetVectorsResponse(_message.Message): - __slots__ = ("records",) + __slots__ = ["records"] RECORDS_FIELD_NUMBER: _ClassVar[int] records: _containers.RepeatedCompositeFieldContainer[VectorEmbeddingRecord] def __init__(self, records: _Optional[_Iterable[_Union[VectorEmbeddingRecord, _Mapping]]] = ...) -> None: ... class QueryVectorsRequest(_message.Message): - __slots__ = ("vectors", "k", "allowed_ids", "include_embeddings", "segment_id") + __slots__ = ["vectors", "k", "allowed_ids", "include_embeddings", "segment_id"] VECTORS_FIELD_NUMBER: _ClassVar[int] K_FIELD_NUMBER: _ClassVar[int] ALLOWED_IDS_FIELD_NUMBER: _ClassVar[int] @@ -193,7 +193,7 @@ class QueryVectorsRequest(_message.Message): def __init__(self, vectors: _Optional[_Iterable[_Union[Vector, _Mapping]]] = ..., k: _Optional[int] = ..., allowed_ids: _Optional[_Iterable[str]] = ..., include_embeddings: bool = ..., segment_id: _Optional[str] = ...) -> None: ... class QueryVectorsResponse(_message.Message): - __slots__ = ("results",) + __slots__ = ["results"] RESULTS_FIELD_NUMBER: _ClassVar[int] results: _containers.RepeatedCompositeFieldContainer[VectorQueryResults] def __init__(self, results: _Optional[_Iterable[_Union[VectorQueryResults, _Mapping]]] = ...) -> None: ... diff --git a/chromadb/proto/coordinator_pb2.py b/chromadb/proto/coordinator_pb2.py index fc243ccb2f2..d18ae05ada4 100644 --- a/chromadb/proto/coordinator_pb2.py +++ b/chromadb/proto/coordinator_pb2.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: chromadb/proto/coordinator.proto -# Protobuf Python Version: 4.25.0 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool @@ -22,8 +21,8 @@ _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'chromadb.proto.coordinator_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: - _globals['DESCRIPTOR']._options = None - _globals['DESCRIPTOR']._serialized_options = b'ZAgithub.com/chroma/chroma-coordinator/internal/proto/coordinatorpb' + DESCRIPTOR._options = None + DESCRIPTOR._serialized_options = b'ZAgithub.com/chroma/chroma-coordinator/internal/proto/coordinatorpb' _globals['_CREATEDATABASEREQUEST']._serialized_start=102 _globals['_CREATEDATABASEREQUEST']._serialized_end=167 _globals['_CREATEDATABASERESPONSE']._serialized_start=169 diff --git a/chromadb/proto/coordinator_pb2.pyi b/chromadb/proto/coordinator_pb2.pyi index 1bb13439809..613d13c969a 100644 --- a/chromadb/proto/coordinator_pb2.pyi +++ b/chromadb/proto/coordinator_pb2.pyi @@ -8,7 +8,7 @@ from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Map DESCRIPTOR: _descriptor.FileDescriptor class CreateDatabaseRequest(_message.Message): - __slots__ = ("id", "name", "tenant") + __slots__ = ["id", "name", "tenant"] ID_FIELD_NUMBER: _ClassVar[int] NAME_FIELD_NUMBER: _ClassVar[int] TENANT_FIELD_NUMBER: _ClassVar[int] @@ -18,13 +18,13 @@ class CreateDatabaseRequest(_message.Message): def __init__(self, id: _Optional[str] = ..., name: _Optional[str] = ..., tenant: _Optional[str] = ...) -> None: ... class CreateDatabaseResponse(_message.Message): - __slots__ = ("status",) + __slots__ = ["status"] STATUS_FIELD_NUMBER: _ClassVar[int] status: _chroma_pb2.Status def __init__(self, status: _Optional[_Union[_chroma_pb2.Status, _Mapping]] = ...) -> None: ... class GetDatabaseRequest(_message.Message): - __slots__ = ("name", "tenant") + __slots__ = ["name", "tenant"] NAME_FIELD_NUMBER: _ClassVar[int] TENANT_FIELD_NUMBER: _ClassVar[int] name: str @@ -32,7 +32,7 @@ class GetDatabaseRequest(_message.Message): def __init__(self, name: _Optional[str] = ..., tenant: _Optional[str] = ...) -> None: ... class GetDatabaseResponse(_message.Message): - __slots__ = ("database", "status") + __slots__ = ["database", "status"] DATABASE_FIELD_NUMBER: _ClassVar[int] STATUS_FIELD_NUMBER: _ClassVar[int] database: _chroma_pb2.Database @@ -40,25 +40,25 @@ class GetDatabaseResponse(_message.Message): def __init__(self, database: _Optional[_Union[_chroma_pb2.Database, _Mapping]] = ..., status: _Optional[_Union[_chroma_pb2.Status, _Mapping]] = ...) -> None: ... class CreateTenantRequest(_message.Message): - __slots__ = ("name",) + __slots__ = ["name"] NAME_FIELD_NUMBER: _ClassVar[int] name: str def __init__(self, name: _Optional[str] = ...) -> None: ... class CreateTenantResponse(_message.Message): - __slots__ = ("status",) + __slots__ = ["status"] STATUS_FIELD_NUMBER: _ClassVar[int] status: _chroma_pb2.Status def __init__(self, status: _Optional[_Union[_chroma_pb2.Status, _Mapping]] = ...) -> None: ... class GetTenantRequest(_message.Message): - __slots__ = ("name",) + __slots__ = ["name"] NAME_FIELD_NUMBER: _ClassVar[int] name: str def __init__(self, name: _Optional[str] = ...) -> None: ... class GetTenantResponse(_message.Message): - __slots__ = ("tenant", "status") + __slots__ = ["tenant", "status"] TENANT_FIELD_NUMBER: _ClassVar[int] STATUS_FIELD_NUMBER: _ClassVar[int] tenant: _chroma_pb2.Tenant @@ -66,31 +66,31 @@ class GetTenantResponse(_message.Message): def __init__(self, tenant: _Optional[_Union[_chroma_pb2.Tenant, _Mapping]] = ..., status: _Optional[_Union[_chroma_pb2.Status, _Mapping]] = ...) -> None: ... class CreateSegmentRequest(_message.Message): - __slots__ = ("segment",) + __slots__ = ["segment"] SEGMENT_FIELD_NUMBER: _ClassVar[int] segment: _chroma_pb2.Segment def __init__(self, segment: _Optional[_Union[_chroma_pb2.Segment, _Mapping]] = ...) -> None: ... class CreateSegmentResponse(_message.Message): - __slots__ = ("status",) + __slots__ = ["status"] STATUS_FIELD_NUMBER: _ClassVar[int] status: _chroma_pb2.Status def __init__(self, status: _Optional[_Union[_chroma_pb2.Status, _Mapping]] = ...) -> None: ... class DeleteSegmentRequest(_message.Message): - __slots__ = ("id",) + __slots__ = ["id"] ID_FIELD_NUMBER: _ClassVar[int] id: str def __init__(self, id: _Optional[str] = ...) -> None: ... class DeleteSegmentResponse(_message.Message): - __slots__ = ("status",) + __slots__ = ["status"] STATUS_FIELD_NUMBER: _ClassVar[int] status: _chroma_pb2.Status def __init__(self, status: _Optional[_Union[_chroma_pb2.Status, _Mapping]] = ...) -> None: ... class GetSegmentsRequest(_message.Message): - __slots__ = ("id", "type", "scope", "topic", "collection") + __slots__ = ["id", "type", "scope", "topic", "collection"] ID_FIELD_NUMBER: _ClassVar[int] TYPE_FIELD_NUMBER: _ClassVar[int] SCOPE_FIELD_NUMBER: _ClassVar[int] @@ -104,7 +104,7 @@ class GetSegmentsRequest(_message.Message): def __init__(self, id: _Optional[str] = ..., type: _Optional[str] = ..., scope: _Optional[_Union[_chroma_pb2.SegmentScope, str]] = ..., topic: _Optional[str] = ..., collection: _Optional[str] = ...) -> None: ... class GetSegmentsResponse(_message.Message): - __slots__ = ("segments", "status") + __slots__ = ["segments", "status"] SEGMENTS_FIELD_NUMBER: _ClassVar[int] STATUS_FIELD_NUMBER: _ClassVar[int] segments: _containers.RepeatedCompositeFieldContainer[_chroma_pb2.Segment] @@ -112,7 +112,7 @@ class GetSegmentsResponse(_message.Message): def __init__(self, segments: _Optional[_Iterable[_Union[_chroma_pb2.Segment, _Mapping]]] = ..., status: _Optional[_Union[_chroma_pb2.Status, _Mapping]] = ...) -> None: ... class UpdateSegmentRequest(_message.Message): - __slots__ = ("id", "topic", "reset_topic", "collection", "reset_collection", "metadata", "reset_metadata") + __slots__ = ["id", "topic", "reset_topic", "collection", "reset_collection", "metadata", "reset_metadata"] ID_FIELD_NUMBER: _ClassVar[int] TOPIC_FIELD_NUMBER: _ClassVar[int] RESET_TOPIC_FIELD_NUMBER: _ClassVar[int] @@ -130,13 +130,13 @@ class UpdateSegmentRequest(_message.Message): def __init__(self, id: _Optional[str] = ..., topic: _Optional[str] = ..., reset_topic: bool = ..., collection: _Optional[str] = ..., reset_collection: bool = ..., metadata: _Optional[_Union[_chroma_pb2.UpdateMetadata, _Mapping]] = ..., reset_metadata: bool = ...) -> None: ... class UpdateSegmentResponse(_message.Message): - __slots__ = ("status",) + __slots__ = ["status"] STATUS_FIELD_NUMBER: _ClassVar[int] status: _chroma_pb2.Status def __init__(self, status: _Optional[_Union[_chroma_pb2.Status, _Mapping]] = ...) -> None: ... class CreateCollectionRequest(_message.Message): - __slots__ = ("id", "name", "metadata", "dimension", "get_or_create", "tenant", "database") + __slots__ = ["id", "name", "metadata", "dimension", "get_or_create", "tenant", "database"] ID_FIELD_NUMBER: _ClassVar[int] NAME_FIELD_NUMBER: _ClassVar[int] METADATA_FIELD_NUMBER: _ClassVar[int] @@ -154,7 +154,7 @@ class CreateCollectionRequest(_message.Message): def __init__(self, id: _Optional[str] = ..., name: _Optional[str] = ..., metadata: _Optional[_Union[_chroma_pb2.UpdateMetadata, _Mapping]] = ..., dimension: _Optional[int] = ..., get_or_create: bool = ..., tenant: _Optional[str] = ..., database: _Optional[str] = ...) -> None: ... class CreateCollectionResponse(_message.Message): - __slots__ = ("collection", "created", "status") + __slots__ = ["collection", "created", "status"] COLLECTION_FIELD_NUMBER: _ClassVar[int] CREATED_FIELD_NUMBER: _ClassVar[int] STATUS_FIELD_NUMBER: _ClassVar[int] @@ -164,7 +164,7 @@ class CreateCollectionResponse(_message.Message): def __init__(self, collection: _Optional[_Union[_chroma_pb2.Collection, _Mapping]] = ..., created: bool = ..., status: _Optional[_Union[_chroma_pb2.Status, _Mapping]] = ...) -> None: ... class DeleteCollectionRequest(_message.Message): - __slots__ = ("id", "tenant", "database") + __slots__ = ["id", "tenant", "database"] ID_FIELD_NUMBER: _ClassVar[int] TENANT_FIELD_NUMBER: _ClassVar[int] DATABASE_FIELD_NUMBER: _ClassVar[int] @@ -174,13 +174,13 @@ class DeleteCollectionRequest(_message.Message): def __init__(self, id: _Optional[str] = ..., tenant: _Optional[str] = ..., database: _Optional[str] = ...) -> None: ... class DeleteCollectionResponse(_message.Message): - __slots__ = ("status",) + __slots__ = ["status"] STATUS_FIELD_NUMBER: _ClassVar[int] status: _chroma_pb2.Status def __init__(self, status: _Optional[_Union[_chroma_pb2.Status, _Mapping]] = ...) -> None: ... class GetCollectionsRequest(_message.Message): - __slots__ = ("id", "name", "topic", "tenant", "database") + __slots__ = ["id", "name", "topic", "tenant", "database"] ID_FIELD_NUMBER: _ClassVar[int] NAME_FIELD_NUMBER: _ClassVar[int] TOPIC_FIELD_NUMBER: _ClassVar[int] @@ -194,7 +194,7 @@ class GetCollectionsRequest(_message.Message): def __init__(self, id: _Optional[str] = ..., name: _Optional[str] = ..., topic: _Optional[str] = ..., tenant: _Optional[str] = ..., database: _Optional[str] = ...) -> None: ... class GetCollectionsResponse(_message.Message): - __slots__ = ("collections", "status") + __slots__ = ["collections", "status"] COLLECTIONS_FIELD_NUMBER: _ClassVar[int] STATUS_FIELD_NUMBER: _ClassVar[int] collections: _containers.RepeatedCompositeFieldContainer[_chroma_pb2.Collection] @@ -202,7 +202,7 @@ class GetCollectionsResponse(_message.Message): def __init__(self, collections: _Optional[_Iterable[_Union[_chroma_pb2.Collection, _Mapping]]] = ..., status: _Optional[_Union[_chroma_pb2.Status, _Mapping]] = ...) -> None: ... class UpdateCollectionRequest(_message.Message): - __slots__ = ("id", "topic", "name", "dimension", "metadata", "reset_metadata") + __slots__ = ["id", "topic", "name", "dimension", "metadata", "reset_metadata"] ID_FIELD_NUMBER: _ClassVar[int] TOPIC_FIELD_NUMBER: _ClassVar[int] NAME_FIELD_NUMBER: _ClassVar[int] @@ -218,13 +218,13 @@ class UpdateCollectionRequest(_message.Message): def __init__(self, id: _Optional[str] = ..., topic: _Optional[str] = ..., name: _Optional[str] = ..., dimension: _Optional[int] = ..., metadata: _Optional[_Union[_chroma_pb2.UpdateMetadata, _Mapping]] = ..., reset_metadata: bool = ...) -> None: ... class UpdateCollectionResponse(_message.Message): - __slots__ = ("status",) + __slots__ = ["status"] STATUS_FIELD_NUMBER: _ClassVar[int] status: _chroma_pb2.Status def __init__(self, status: _Optional[_Union[_chroma_pb2.Status, _Mapping]] = ...) -> None: ... class Notification(_message.Message): - __slots__ = ("id", "collection_id", "type", "status") + __slots__ = ["id", "collection_id", "type", "status"] ID_FIELD_NUMBER: _ClassVar[int] COLLECTION_ID_FIELD_NUMBER: _ClassVar[int] TYPE_FIELD_NUMBER: _ClassVar[int] @@ -236,7 +236,7 @@ class Notification(_message.Message): def __init__(self, id: _Optional[int] = ..., collection_id: _Optional[str] = ..., type: _Optional[str] = ..., status: _Optional[str] = ...) -> None: ... class ResetStateResponse(_message.Message): - __slots__ = ("status",) + __slots__ = ["status"] STATUS_FIELD_NUMBER: _ClassVar[int] status: _chroma_pb2.Status def __init__(self, status: _Optional[_Union[_chroma_pb2.Status, _Mapping]] = ...) -> None: ... diff --git a/chromadb/proto/logservice_pb2.py b/chromadb/proto/logservice_pb2.py index 76bd1a7bce5..d4dbc74c98c 100644 --- a/chromadb/proto/logservice_pb2.py +++ b/chromadb/proto/logservice_pb2.py @@ -1,12 +1,12 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: chromadb/proto/logservice.proto -# Protobuf Python Version: 4.25.0 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder + # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -15,22 +15,34 @@ from chromadb.proto import chroma_pb2 as chromadb_dot_proto_dot_chroma__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1f\x63hromadb/proto/logservice.proto\x12\x06\x63hroma\x1a\x1b\x63hromadb/proto/chroma.proto\"X\n\x0fPushLogsRequest\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12.\n\x07records\x18\x02 \x03(\x0b\x32\x1d.chroma.SubmitEmbeddingRecord\"(\n\x10PushLogsResponse\x12\x14\n\x0crecord_count\x18\x01 \x01(\x05\"S\n\x0fPullLogsRequest\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12\x15\n\rstart_from_id\x18\x02 \x01(\x03\x12\x12\n\nbatch_size\x18\x03 \x01(\x05\"B\n\x10PullLogsResponse\x12.\n\x07records\x18\x01 \x03(\x0b\x32\x1d.chroma.SubmitEmbeddingRecord2\x8e\x01\n\nLogService\x12?\n\x08PushLogs\x12\x17.chroma.PushLogsRequest\x1a\x18.chroma.PushLogsResponse\"\x00\x12?\n\x08PullLogs\x12\x17.chroma.PullLogsRequest\x1a\x18.chroma.PullLogsResponse\"\x00\x42\x42Z@github.com/chroma/chroma-coordinator/internal/proto/logservicepbb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n\x1f\x63hromadb/proto/logservice.proto\x12\x06\x63hroma\x1a\x1b\x63hromadb/proto/chroma.proto"X\n\x0fPushLogsRequest\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12.\n\x07records\x18\x02 \x03(\x0b\x32\x1d.chroma.SubmitEmbeddingRecord"(\n\x10PushLogsResponse\x12\x14\n\x0crecord_count\x18\x01 \x01(\x05"S\n\x0fPullLogsRequest\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12\x15\n\rstart_from_id\x18\x02 \x01(\x03\x12\x12\n\nbatch_size\x18\x03 \x01(\x05"B\n\x10PullLogsResponse\x12.\n\x07records\x18\x01 \x03(\x0b\x32\x1d.chroma.SubmitEmbeddingRecord"V\n\x0e\x43ollectionInfo\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12\x14\n\x0c\x66irst_log_id\x18\x02 \x01(\x03\x12\x17\n\x0f\x66irst_log_id_ts\x18\x03 \x01(\x03"&\n$GetAllCollectionInfoToCompactRequest"\\\n%GetAllCollectionInfoToCompactResponse\x12\x33\n\x13\x61ll_collection_info\x18\x01 \x03(\x0b\x32\x16.chroma.CollectionInfo2\x8e\x02\n\nLogService\x12?\n\x08PushLogs\x12\x17.chroma.PushLogsRequest\x1a\x18.chroma.PushLogsResponse"\x00\x12?\n\x08PullLogs\x12\x17.chroma.PullLogsRequest\x1a\x18.chroma.PullLogsResponse"\x00\x12~\n\x1dGetAllCollectionInfoToCompact\x12,.chroma.GetAllCollectionInfoToCompactRequest\x1a-.chroma.GetAllCollectionInfoToCompactResponse"\x00\x42\x42Z@github.com/chroma/chroma-coordinator/internal/proto/logservicepbb\x06proto3' +) _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'chromadb.proto.logservice_pb2', _globals) +_builder.BuildTopDescriptorsAndMessages( + DESCRIPTOR, "chromadb.proto.logservice_pb2", _globals +) if _descriptor._USE_C_DESCRIPTORS == False: - _globals['DESCRIPTOR']._options = None - _globals['DESCRIPTOR']._serialized_options = b'Z@github.com/chroma/chroma-coordinator/internal/proto/logservicepb' - _globals['_PUSHLOGSREQUEST']._serialized_start=72 - _globals['_PUSHLOGSREQUEST']._serialized_end=160 - _globals['_PUSHLOGSRESPONSE']._serialized_start=162 - _globals['_PUSHLOGSRESPONSE']._serialized_end=202 - _globals['_PULLLOGSREQUEST']._serialized_start=204 - _globals['_PULLLOGSREQUEST']._serialized_end=287 - _globals['_PULLLOGSRESPONSE']._serialized_start=289 - _globals['_PULLLOGSRESPONSE']._serialized_end=355 - _globals['_LOGSERVICE']._serialized_start=358 - _globals['_LOGSERVICE']._serialized_end=500 + DESCRIPTOR._options = None + DESCRIPTOR._serialized_options = ( + b"Z@github.com/chroma/chroma-coordinator/internal/proto/logservicepb" + ) + _globals["_PUSHLOGSREQUEST"]._serialized_start = 72 + _globals["_PUSHLOGSREQUEST"]._serialized_end = 160 + _globals["_PUSHLOGSRESPONSE"]._serialized_start = 162 + _globals["_PUSHLOGSRESPONSE"]._serialized_end = 202 + _globals["_PULLLOGSREQUEST"]._serialized_start = 204 + _globals["_PULLLOGSREQUEST"]._serialized_end = 287 + _globals["_PULLLOGSRESPONSE"]._serialized_start = 289 + _globals["_PULLLOGSRESPONSE"]._serialized_end = 355 + _globals["_COLLECTIONINFO"]._serialized_start = 357 + _globals["_COLLECTIONINFO"]._serialized_end = 443 + _globals["_GETALLCOLLECTIONINFOTOCOMPACTREQUEST"]._serialized_start = 445 + _globals["_GETALLCOLLECTIONINFOTOCOMPACTREQUEST"]._serialized_end = 483 + _globals["_GETALLCOLLECTIONINFOTOCOMPACTRESPONSE"]._serialized_start = 485 + _globals["_GETALLCOLLECTIONINFOTOCOMPACTRESPONSE"]._serialized_end = 577 + _globals["_LOGSERVICE"]._serialized_start = 580 + _globals["_LOGSERVICE"]._serialized_end = 850 # @@protoc_insertion_point(module_scope) diff --git a/chromadb/proto/logservice_pb2.pyi b/chromadb/proto/logservice_pb2.pyi index 522a8ad0cc5..01e355d6cab 100644 --- a/chromadb/proto/logservice_pb2.pyi +++ b/chromadb/proto/logservice_pb2.pyi @@ -2,36 +2,92 @@ from chromadb.proto import chroma_pb2 as _chroma_pb2 from google.protobuf.internal import containers as _containers from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message -from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union +from typing import ( + ClassVar as _ClassVar, + Iterable as _Iterable, + Mapping as _Mapping, + Optional as _Optional, + Union as _Union, +) DESCRIPTOR: _descriptor.FileDescriptor class PushLogsRequest(_message.Message): - __slots__ = ("collection_id", "records") + __slots__ = ["collection_id", "records"] COLLECTION_ID_FIELD_NUMBER: _ClassVar[int] RECORDS_FIELD_NUMBER: _ClassVar[int] collection_id: str - records: _containers.RepeatedCompositeFieldContainer[_chroma_pb2.SubmitEmbeddingRecord] - def __init__(self, collection_id: _Optional[str] = ..., records: _Optional[_Iterable[_Union[_chroma_pb2.SubmitEmbeddingRecord, _Mapping]]] = ...) -> None: ... + records: _containers.RepeatedCompositeFieldContainer[ + _chroma_pb2.SubmitEmbeddingRecord + ] + def __init__( + self, + collection_id: _Optional[str] = ..., + records: _Optional[ + _Iterable[_Union[_chroma_pb2.SubmitEmbeddingRecord, _Mapping]] + ] = ..., + ) -> None: ... class PushLogsResponse(_message.Message): - __slots__ = ("record_count",) + __slots__ = ["record_count"] RECORD_COUNT_FIELD_NUMBER: _ClassVar[int] record_count: int def __init__(self, record_count: _Optional[int] = ...) -> None: ... class PullLogsRequest(_message.Message): - __slots__ = ("collection_id", "start_from_id", "batch_size") + __slots__ = ["collection_id", "start_from_id", "batch_size"] COLLECTION_ID_FIELD_NUMBER: _ClassVar[int] START_FROM_ID_FIELD_NUMBER: _ClassVar[int] BATCH_SIZE_FIELD_NUMBER: _ClassVar[int] collection_id: str start_from_id: int batch_size: int - def __init__(self, collection_id: _Optional[str] = ..., start_from_id: _Optional[int] = ..., batch_size: _Optional[int] = ...) -> None: ... + def __init__( + self, + collection_id: _Optional[str] = ..., + start_from_id: _Optional[int] = ..., + batch_size: _Optional[int] = ..., + ) -> None: ... class PullLogsResponse(_message.Message): - __slots__ = ("records",) + __slots__ = ["records"] RECORDS_FIELD_NUMBER: _ClassVar[int] - records: _containers.RepeatedCompositeFieldContainer[_chroma_pb2.SubmitEmbeddingRecord] - def __init__(self, records: _Optional[_Iterable[_Union[_chroma_pb2.SubmitEmbeddingRecord, _Mapping]]] = ...) -> None: ... + records: _containers.RepeatedCompositeFieldContainer[ + _chroma_pb2.SubmitEmbeddingRecord + ] + def __init__( + self, + records: _Optional[ + _Iterable[_Union[_chroma_pb2.SubmitEmbeddingRecord, _Mapping]] + ] = ..., + ) -> None: ... + +class CollectionInfo(_message.Message): + __slots__ = ["collection_id", "first_log_id", "first_log_id_ts"] + COLLECTION_ID_FIELD_NUMBER: _ClassVar[int] + FIRST_LOG_ID_FIELD_NUMBER: _ClassVar[int] + FIRST_LOG_ID_TS_FIELD_NUMBER: _ClassVar[int] + collection_id: str + first_log_id: int + first_log_id_ts: int + def __init__( + self, + collection_id: _Optional[str] = ..., + first_log_id: _Optional[int] = ..., + first_log_id_ts: _Optional[int] = ..., + ) -> None: ... + +class GetAllCollectionInfoToCompactRequest(_message.Message): + __slots__ = [] + def __init__(self) -> None: ... + +class GetAllCollectionInfoToCompactResponse(_message.Message): + __slots__ = ["all_collection_info"] + ALL_COLLECTION_INFO_FIELD_NUMBER: _ClassVar[int] + all_collection_info: _containers.RepeatedCompositeFieldContainer[CollectionInfo] + def __init__( + self, + all_collection_info: _Optional[ + _Iterable[_Union[CollectionInfo, _Mapping]] + ] = ..., + ) -> None: ... diff --git a/chromadb/proto/logservice_pb2_grpc.py b/chromadb/proto/logservice_pb2_grpc.py index 381ae57d1e0..7e4ab6a7c29 100644 --- a/chromadb/proto/logservice_pb2_grpc.py +++ b/chromadb/proto/logservice_pb2_grpc.py @@ -15,15 +15,20 @@ def __init__(self, channel): channel: A grpc.Channel. """ self.PushLogs = channel.unary_unary( - '/chroma.LogService/PushLogs', - request_serializer=chromadb_dot_proto_dot_logservice__pb2.PushLogsRequest.SerializeToString, - response_deserializer=chromadb_dot_proto_dot_logservice__pb2.PushLogsResponse.FromString, - ) + "/chroma.LogService/PushLogs", + request_serializer=chromadb_dot_proto_dot_logservice__pb2.PushLogsRequest.SerializeToString, + response_deserializer=chromadb_dot_proto_dot_logservice__pb2.PushLogsResponse.FromString, + ) self.PullLogs = channel.unary_unary( - '/chroma.LogService/PullLogs', - request_serializer=chromadb_dot_proto_dot_logservice__pb2.PullLogsRequest.SerializeToString, - response_deserializer=chromadb_dot_proto_dot_logservice__pb2.PullLogsResponse.FromString, - ) + "/chroma.LogService/PullLogs", + request_serializer=chromadb_dot_proto_dot_logservice__pb2.PullLogsRequest.SerializeToString, + response_deserializer=chromadb_dot_proto_dot_logservice__pb2.PullLogsResponse.FromString, + ) + self.GetAllCollectionInfoToCompact = channel.unary_unary( + "/chroma.LogService/GetAllCollectionInfoToCompact", + request_serializer=chromadb_dot_proto_dot_logservice__pb2.GetAllCollectionInfoToCompactRequest.SerializeToString, + response_deserializer=chromadb_dot_proto_dot_logservice__pb2.GetAllCollectionInfoToCompactResponse.FromString, + ) class LogServiceServicer(object): @@ -32,68 +37,133 @@ class LogServiceServicer(object): def PushLogs(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") def PullLogs(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def GetAllCollectionInfoToCompact(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") def add_LogServiceServicer_to_server(servicer, server): rpc_method_handlers = { - 'PushLogs': grpc.unary_unary_rpc_method_handler( - servicer.PushLogs, - request_deserializer=chromadb_dot_proto_dot_logservice__pb2.PushLogsRequest.FromString, - response_serializer=chromadb_dot_proto_dot_logservice__pb2.PushLogsResponse.SerializeToString, - ), - 'PullLogs': grpc.unary_unary_rpc_method_handler( - servicer.PullLogs, - request_deserializer=chromadb_dot_proto_dot_logservice__pb2.PullLogsRequest.FromString, - response_serializer=chromadb_dot_proto_dot_logservice__pb2.PullLogsResponse.SerializeToString, - ), + "PushLogs": grpc.unary_unary_rpc_method_handler( + servicer.PushLogs, + request_deserializer=chromadb_dot_proto_dot_logservice__pb2.PushLogsRequest.FromString, + response_serializer=chromadb_dot_proto_dot_logservice__pb2.PushLogsResponse.SerializeToString, + ), + "PullLogs": grpc.unary_unary_rpc_method_handler( + servicer.PullLogs, + request_deserializer=chromadb_dot_proto_dot_logservice__pb2.PullLogsRequest.FromString, + response_serializer=chromadb_dot_proto_dot_logservice__pb2.PullLogsResponse.SerializeToString, + ), + "GetAllCollectionInfoToCompact": grpc.unary_unary_rpc_method_handler( + servicer.GetAllCollectionInfoToCompact, + request_deserializer=chromadb_dot_proto_dot_logservice__pb2.GetAllCollectionInfoToCompactRequest.FromString, + response_serializer=chromadb_dot_proto_dot_logservice__pb2.GetAllCollectionInfoToCompactResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( - 'chroma.LogService', rpc_method_handlers) + "chroma.LogService", rpc_method_handlers + ) server.add_generic_rpc_handlers((generic_handler,)) - # This class is part of an EXPERIMENTAL API. +# This class is part of an EXPERIMENTAL API. class LogService(object): """Missing associated documentation comment in .proto file.""" @staticmethod - def PushLogs(request, + def PushLogs( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/chroma.LogService/PushLogs', + "/chroma.LogService/PushLogs", chromadb_dot_proto_dot_logservice__pb2.PushLogsRequest.SerializeToString, chromadb_dot_proto_dot_logservice__pb2.PushLogsResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) @staticmethod - def PullLogs(request, + def PullLogs( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/chroma.LogService/PullLogs', + "/chroma.LogService/PullLogs", chromadb_dot_proto_dot_logservice__pb2.PullLogsRequest.SerializeToString, chromadb_dot_proto_dot_logservice__pb2.PullLogsResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def GetAllCollectionInfoToCompact( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/chroma.LogService/GetAllCollectionInfoToCompact", + chromadb_dot_proto_dot_logservice__pb2.GetAllCollectionInfoToCompactRequest.SerializeToString, + chromadb_dot_proto_dot_logservice__pb2.GetAllCollectionInfoToCompactResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/go/internal/logservice/apis.go b/go/internal/logservice/apis.go index e351732d1df..76aa699a7d6 100644 --- a/go/internal/logservice/apis.go +++ b/go/internal/logservice/apis.go @@ -12,6 +12,7 @@ type ( common.Component PushLogs(ctx context.Context, collectionID types.UniqueID, recordContent [][]byte) (int, error) PullLogs(ctx context.Context, collectionID types.UniqueID, id int64, batchSize int) ([]*dbmodel.RecordLog, error) + GetAllCollectionIDsToCompact() ([]*dbmodel.RecordLog, error) } ) @@ -22,3 +23,7 @@ func (s *RecordLog) PushLogs(ctx context.Context, collectionID types.UniqueID, r func (s *RecordLog) PullLogs(ctx context.Context, collectionID types.UniqueID, id int64, batchSize int) ([]*dbmodel.RecordLog, error) { return s.recordLogDb.PullLogs(collectionID, id, batchSize) } + +func (s *RecordLog) GetAllCollectionIDsToCompact() ([]*dbmodel.RecordLog, error) { + return s.recordLogDb.GetAllCollectionsToCompact() +} diff --git a/go/internal/logservice/grpc/record_log_service.go b/go/internal/logservice/grpc/record_log_service.go index 4febb66b27a..e50225e5cf8 100644 --- a/go/internal/logservice/grpc/record_log_service.go +++ b/go/internal/logservice/grpc/record_log_service.go @@ -3,6 +3,7 @@ package grpc import ( "context" "github.com/chroma/chroma-coordinator/internal/grpcutils" + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" "github.com/chroma/chroma-coordinator/internal/proto/coordinatorpb" "github.com/chroma/chroma-coordinator/internal/proto/logservicepb" "github.com/chroma/chroma-coordinator/internal/types" @@ -11,6 +12,12 @@ import ( "google.golang.org/protobuf/proto" ) +type CollectionInfo struct { + CollectionId string + FirstLogId int64 + FirstLogTs int64 +} + func (s *Server) PushLogs(ctx context.Context, req *logservicepb.PushLogsRequest) (*logservicepb.PushLogsResponse, error) { res := &logservicepb.PushLogsResponse{} collectionID, err := types.ToUniqueID(&req.CollectionId) @@ -51,6 +58,10 @@ func (s *Server) PullLogs(ctx context.Context, req *logservicepb.PullLogsRequest } records := make([]*coordinatorpb.SubmitEmbeddingRecord, 0) recordLogs, err := s.logService.PullLogs(ctx, collectionID, req.GetStartFromId(), int(req.BatchSize)) + if err != nil { + log.Error("error pulling logs", zap.Error(err)) + return nil, grpcutils.BuildInternalGrpcError("error pulling logs") + } for index := range recordLogs { record := &coordinatorpb.SubmitEmbeddingRecord{} if err := proto.Unmarshal(*recordLogs[index].Record, record); err != nil { @@ -67,3 +78,25 @@ func (s *Server) PullLogs(ctx context.Context, req *logservicepb.PullLogsRequest log.Info("PullLogs success", zap.String("collectionID", req.CollectionId), zap.Int("recordCount", len(records))) return res, nil } + +func (s *Server) GetAllCollectionInfoToCompact(ctx context.Context, req *logservicepb.GetAllCollectionInfoToCompactRequest) (*logservicepb.GetAllCollectionInfoToCompactResponse, error) { + res := &logservicepb.GetAllCollectionInfoToCompactResponse{} + res.AllCollectionInfo = make([]*logservicepb.CollectionInfo, 0) + var recordLogs []*dbmodel.RecordLog + recordLogs, err := s.logService.GetAllCollectionIDsToCompact() + if err != nil { + log.Error("error getting collection info", zap.Error(err)) + return nil, grpcutils.BuildInternalGrpcError("error getting collection info") + } + for _, recordLog := range recordLogs { + collectionInfo := &logservicepb.CollectionInfo{ + CollectionId: *recordLog.CollectionID, + FirstLogId: recordLog.ID, + FirstLogIdTs: recordLog.Timestamp, + } + res.AllCollectionInfo = append(res.AllCollectionInfo, collectionInfo) + } + // print everything for now, we can make this smaller once + log.Info("GetAllCollectionInfoToCompact success", zap.Any("collectionInfo", res.AllCollectionInfo)) + return res, nil +} diff --git a/go/internal/logservice/grpc/record_log_service_test.go b/go/internal/logservice/grpc/record_log_service_test.go index bb0afdc4f88..a916eaf9dae 100644 --- a/go/internal/logservice/grpc/record_log_service_test.go +++ b/go/internal/logservice/grpc/record_log_service_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/binary" + "github.com/chroma/chroma-coordinator/internal/logservice/testutils" "github.com/chroma/chroma-coordinator/internal/metastore/db/dbcore" "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" "github.com/chroma/chroma-coordinator/internal/proto/coordinatorpb" @@ -17,13 +18,15 @@ import ( "google.golang.org/protobuf/proto" "gorm.io/gorm" "testing" + "time" ) type RecordLogServiceTestSuite struct { suite.Suite - db *gorm.DB - s *Server - t *testing.T + db *gorm.DB + s *Server + t *testing.T + collectionId types.UniqueID } func (suite *RecordLogServiceTestSuite) SetupSuite() { @@ -36,16 +39,17 @@ func (suite *RecordLogServiceTestSuite) SetupSuite() { }) suite.s = s suite.db = dbcore.GetDB(context.Background()) + suite.collectionId = types.NewUniqueID() } func (suite *RecordLogServiceTestSuite) SetupTest() { log.Info("setup test") - resetLogTable(suite.db) + testutils.SetupTest(suite.db, suite.collectionId) } func (suite *RecordLogServiceTestSuite) TearDownTest() { log.Info("teardown test") - resetLogTable(suite.db) + testutils.TearDownTest(suite.db) } func encodeVector(dimension int32, vector []float32, encoding coordinatorpb.ScalarEncoding) *coordinatorpb.Vector { @@ -88,18 +92,12 @@ func GetTestEmbeddingRecords(collectionId string) (recordsToSubmit []*coordinato return recordsToSubmit } -func resetLogTable(db *gorm.DB) { - db.Migrator().DropTable(&dbmodel.RecordLog{}) - db.Migrator().CreateTable(&dbmodel.RecordLog{}) -} - func (suite *RecordLogServiceTestSuite) TestServer_PushLogs() { log.Info("test push logs") // push some records - collectionId := types.NewUniqueID() - recordsToSubmit := GetTestEmbeddingRecords(collectionId.String()) + recordsToSubmit := GetTestEmbeddingRecords(suite.collectionId.String()) pushRequest := logservicepb.PushLogsRequest{ - CollectionId: collectionId.String(), + CollectionId: suite.collectionId.String(), Records: recordsToSubmit, } response, err := suite.s.PushLogs(context.Background(), &pushRequest) @@ -107,11 +105,11 @@ func (suite *RecordLogServiceTestSuite) TestServer_PushLogs() { assert.Equal(suite.t, int32(3), response.RecordCount) var recordLogs []*dbmodel.RecordLog - suite.db.Where("collection_id = ?", types.FromUniqueID(collectionId)).Find(&recordLogs) + suite.db.Where("collection_id = ?", types.FromUniqueID(suite.collectionId)).Find(&recordLogs) assert.Len(suite.t, recordLogs, 3) for index := range recordLogs { assert.Equal(suite.t, int64(index+1), recordLogs[index].ID) - assert.Equal(suite.t, collectionId.String(), *recordLogs[index].CollectionID) + assert.Equal(suite.t, suite.collectionId.String(), *recordLogs[index].CollectionID) record := &coordinatorpb.SubmitEmbeddingRecord{} if err := proto.Unmarshal(*recordLogs[index].Record, record); err != nil { panic(err) @@ -128,17 +126,16 @@ func (suite *RecordLogServiceTestSuite) TestServer_PushLogs() { func (suite *RecordLogServiceTestSuite) TestServer_PullLogs() { // push some records - collectionId := types.NewUniqueID() - recordsToSubmit := GetTestEmbeddingRecords(collectionId.String()) + recordsToSubmit := GetTestEmbeddingRecords(suite.collectionId.String()) pushRequest := logservicepb.PushLogsRequest{ - CollectionId: collectionId.String(), + CollectionId: suite.collectionId.String(), Records: recordsToSubmit, } suite.s.PushLogs(context.Background(), &pushRequest) // pull the records pullRequest := logservicepb.PullLogsRequest{ - CollectionId: collectionId.String(), + CollectionId: suite.collectionId.String(), StartFromId: 0, BatchSize: 10, } @@ -187,6 +184,41 @@ func (suite *RecordLogServiceTestSuite) TestServer_Bad_CollectionId() { assert.Equal(suite.T(), "invalid collection_id", st.Message()) } +func (suite *RecordLogServiceTestSuite) TestServer_GetAllCollectionInfoToCompact() { + // push some records + var startTime = time.Now().UnixNano() + recordsToSubmit := GetTestEmbeddingRecords(suite.collectionId.String()) + pushRequest := logservicepb.PushLogsRequest{ + CollectionId: suite.collectionId.String(), + Records: recordsToSubmit, + } + suite.s.PushLogs(context.Background(), &pushRequest) + + // get collection info for compactor + request := logservicepb.GetAllCollectionInfoToCompactRequest{} + response, err := suite.s.GetAllCollectionInfoToCompact(context.Background(), &request) + assert.Nil(suite.t, err) + assert.Len(suite.t, response.AllCollectionInfo, 1) + assert.Equal(suite.T(), suite.collectionId.String(), response.AllCollectionInfo[0].CollectionId) + assert.Equal(suite.T(), int64(1), response.AllCollectionInfo[0].FirstLogId) + assert.True(suite.T(), response.AllCollectionInfo[0].FirstLogIdTs > startTime) + assert.True(suite.T(), response.AllCollectionInfo[0].FirstLogIdTs < time.Now().UnixNano()) + + // move log position + testutils.MoveLogPosition(suite.db, suite.collectionId, 2) + + // get collection info for compactor + request = logservicepb.GetAllCollectionInfoToCompactRequest{} + response, err = suite.s.GetAllCollectionInfoToCompact(context.Background(), &request) + assert.Nil(suite.t, err) + assert.Len(suite.t, response.AllCollectionInfo, 1) + assert.Equal(suite.T(), suite.collectionId.String(), response.AllCollectionInfo[0].CollectionId) + assert.Equal(suite.T(), int64(3), response.AllCollectionInfo[0].FirstLogId) + assert.True(suite.T(), response.AllCollectionInfo[0].FirstLogIdTs > startTime) + assert.True(suite.T(), response.AllCollectionInfo[0].FirstLogIdTs < time.Now().UnixNano()) + +} + func TestRecordLogServiceTestSuite(t *testing.T) { testSuite := new(RecordLogServiceTestSuite) testSuite.t = t diff --git a/go/internal/logservice/testutils/record_log_test_util.go b/go/internal/logservice/testutils/record_log_test_util.go new file mode 100644 index 00000000000..e6c79a986a8 --- /dev/null +++ b/go/internal/logservice/testutils/record_log_test_util.go @@ -0,0 +1,50 @@ +package testutils + +import ( + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + "github.com/chroma/chroma-coordinator/internal/types" + "github.com/pingcap/log" + "go.uber.org/zap" + "gorm.io/gorm" + "strconv" +) + +func SetupTest(db *gorm.DB, collectionIds ...types.UniqueID) { + db.Migrator().DropTable(&dbmodel.Segment{}) + db.Migrator().CreateTable(&dbmodel.Segment{}) + db.Migrator().DropTable(&dbmodel.Collection{}) + db.Migrator().CreateTable(&dbmodel.Collection{}) + db.Migrator().DropTable(&dbmodel.RecordLog{}) + db.Migrator().CreateTable(&dbmodel.RecordLog{}) + + // create test collections + for index, collectionId := range collectionIds { + collectionName := "collection" + strconv.Itoa(index+1) + collectionTopic := "topic" + strconv.Itoa(index+1) + var collectionDimension int32 = 6 + collection := &dbmodel.Collection{ + ID: collectionId.String(), + Name: &collectionName, + Topic: &collectionTopic, + Dimension: &collectionDimension, + DatabaseID: types.NewUniqueID().String(), + } + err := db.Create(collection).Error + if err != nil { + log.Error("create collection error", zap.Error(err)) + } + } +} + +func TearDownTest(db *gorm.DB) { + db.Migrator().DropTable(&dbmodel.Segment{}) + db.Migrator().CreateTable(&dbmodel.Segment{}) + db.Migrator().DropTable(&dbmodel.Collection{}) + db.Migrator().CreateTable(&dbmodel.Collection{}) + db.Migrator().DropTable(&dbmodel.RecordLog{}) + db.Migrator().CreateTable(&dbmodel.RecordLog{}) +} + +func MoveLogPosition(db *gorm.DB, collectionId types.UniqueID, position int64) { + db.Model(&dbmodel.Collection{}).Where("id = ?", collectionId.String()).Update("log_position", position) +} diff --git a/go/internal/metastore/coordinator/table_catalog.go b/go/internal/metastore/coordinator/table_catalog.go index f8ae8a84e28..7136b861d9a 100644 --- a/go/internal/metastore/coordinator/table_catalog.go +++ b/go/internal/metastore/coordinator/table_catalog.go @@ -250,12 +250,13 @@ func (tc *Catalog) CreateCollection(ctx context.Context, createCollection *model } dbCollection := &dbmodel.Collection{ - ID: createCollection.ID.String(), - Name: &createCollection.Name, - Topic: &createCollection.Topic, - Dimension: createCollection.Dimension, - DatabaseID: databases[0].ID, - Ts: ts, + ID: createCollection.ID.String(), + Name: &createCollection.Name, + Topic: &createCollection.Topic, + Dimension: createCollection.Dimension, + DatabaseID: databases[0].ID, + Ts: ts, + LogPosition: 0, } err = tc.metaDomain.CollectionDb(txCtx).Insert(dbCollection) diff --git a/go/internal/metastore/db/dao/record_log.go b/go/internal/metastore/db/dao/record_log.go index 538fa992020..7c30d9d54e1 100644 --- a/go/internal/metastore/db/dao/record_log.go +++ b/go/internal/metastore/db/dao/record_log.go @@ -1,6 +1,7 @@ package dao import ( + "database/sql" "errors" "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" "github.com/chroma/chroma-coordinator/internal/types" @@ -77,3 +78,40 @@ func (s *recordLogDb) PullLogs(collectionID types.UniqueID, id int64, batchSize zap.Int("count", len(recordLogs))) return recordLogs, nil } + +func (s *recordLogDb) GetAllCollectionsToCompact() ([]*dbmodel.RecordLog, error) { + log.Info("GetAllCollectionsToCompact") + var recordLogs []*dbmodel.RecordLog + var rawSql = ` + with summary as ( + select r.collection_id, r.id, r.timestamp, row_number() over(partition by r.collection_id order by r.id) as rank + from record_logs r, collections c + where r.collection_id = c.id + and r.id>c.log_position + ) + select * from summary + where rank=1 + order by timestamp;` + rows, err := s.db.Raw(rawSql).Rows() + defer func(rows *sql.Rows) { + err := rows.Close() + if err != nil { + log.Error("GetAllCollectionsToCompact Close error", zap.Error(err)) + } + }(rows) + if err != nil { + log.Error("GetAllCollectionsToCompact error", zap.Error(err)) + return nil, err + } + for rows.Next() { + var batchRecordLogs []*dbmodel.RecordLog + err := s.db.ScanRows(rows, &recordLogs) + if err != nil { + log.Error("GetAllCollectionsToCompact ScanRows error", zap.Error(err)) + return nil, err + } + recordLogs = append(recordLogs, batchRecordLogs...) + } + log.Info("GetAllCollectionsToCompact find collections count", zap.Int("count", len(recordLogs))) + return recordLogs, nil +} diff --git a/go/internal/metastore/db/dao/record_log_test.go b/go/internal/metastore/db/dao/record_log_test.go index 3c536aafa92..041e6c3e5ef 100644 --- a/go/internal/metastore/db/dao/record_log_test.go +++ b/go/internal/metastore/db/dao/record_log_test.go @@ -1,13 +1,13 @@ package dao import ( + "github.com/chroma/chroma-coordinator/internal/logservice/testutils" "github.com/chroma/chroma-coordinator/internal/metastore/db/dbcore" "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" "github.com/chroma/chroma-coordinator/internal/types" "github.com/pingcap/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" - "go.uber.org/zap" "gorm.io/gorm" "testing" ) @@ -37,52 +37,12 @@ func (suite *RecordLogDbTestSuite) SetupSuite() { func (suite *RecordLogDbTestSuite) SetupTest() { log.Info("setup test") - suite.db.Migrator().DropTable(&dbmodel.Segment{}) - suite.db.Migrator().CreateTable(&dbmodel.Segment{}) - suite.db.Migrator().DropTable(&dbmodel.Collection{}) - suite.db.Migrator().CreateTable(&dbmodel.Collection{}) - suite.db.Migrator().DropTable(&dbmodel.RecordLog{}) - suite.db.Migrator().CreateTable(&dbmodel.RecordLog{}) - - // create test collection - collectionName := "collection1" - collectionTopic := "topic1" - var collectionDimension int32 = 6 - collection := &dbmodel.Collection{ - ID: suite.collectionId1.String(), - Name: &collectionName, - Topic: &collectionTopic, - Dimension: &collectionDimension, - DatabaseID: types.NewUniqueID().String(), - } - err := suite.db.Create(collection).Error - if err != nil { - log.Error("create collection error", zap.Error(err)) - } - - collectionName = "collection2" - collectionTopic = "topic2" - collection = &dbmodel.Collection{ - ID: suite.collectionId2.String(), - Name: &collectionName, - Topic: &collectionTopic, - Dimension: &collectionDimension, - DatabaseID: types.NewUniqueID().String(), - } - err = suite.db.Create(collection).Error - if err != nil { - log.Error("create collection error", zap.Error(err)) - } + testutils.SetupTest(suite.db, suite.collectionId1, suite.collectionId2) } func (suite *RecordLogDbTestSuite) TearDownTest() { log.Info("teardown test") - suite.db.Migrator().DropTable(&dbmodel.Segment{}) - suite.db.Migrator().CreateTable(&dbmodel.Segment{}) - suite.db.Migrator().DropTable(&dbmodel.Collection{}) - suite.db.Migrator().CreateTable(&dbmodel.Collection{}) - suite.db.Migrator().DropTable(&dbmodel.RecordLog{}) - suite.db.Migrator().CreateTable(&dbmodel.RecordLog{}) + testutils.TearDownTest(suite.db) } func (suite *RecordLogDbTestSuite) TestRecordLogDb_PushLogs() { @@ -177,6 +137,44 @@ func (suite *RecordLogDbTestSuite) TestRecordLogDb_PullLogsFromID() { } } +func (suite *RecordLogDbTestSuite) TestRecordLogDb_GetAllCollectionsToCompact() { + // push some logs + count, err := suite.Db.PushLogs(suite.collectionId1, suite.records) + assert.NoError(suite.t, err) + assert.Equal(suite.t, 5, count) + + // get all collection ids to compact + collectionInfos, err := suite.Db.GetAllCollectionsToCompact() + assert.NoError(suite.t, err) + assert.Len(suite.t, collectionInfos, 1) + assert.Equal(suite.t, suite.collectionId1.String(), *collectionInfos[0].CollectionID) + assert.Equal(suite.t, int64(1), collectionInfos[0].ID) + + // move log position + testutils.MoveLogPosition(suite.db, suite.collectionId1, 2) + + // get all collection ids to compact + collectionInfos, err = suite.Db.GetAllCollectionsToCompact() + assert.NoError(suite.t, err) + assert.Len(suite.t, collectionInfos, 1) + assert.Equal(suite.t, suite.collectionId1.String(), *collectionInfos[0].CollectionID) + assert.Equal(suite.t, int64(3), collectionInfos[0].ID) + + // push some logs + count, err = suite.Db.PushLogs(suite.collectionId2, suite.records) + assert.NoError(suite.t, err) + assert.Equal(suite.t, 5, count) + + // get all collection ids to compact + collectionInfos, err = suite.Db.GetAllCollectionsToCompact() + assert.NoError(suite.t, err) + assert.Len(suite.t, collectionInfos, 2) + assert.Equal(suite.t, suite.collectionId1.String(), *collectionInfos[0].CollectionID) + assert.Equal(suite.t, int64(3), collectionInfos[0].ID) + assert.Equal(suite.t, suite.collectionId2.String(), *collectionInfos[1].CollectionID) + assert.Equal(suite.t, int64(1), collectionInfos[1].ID) +} + func TestRecordLogDbTestSuite(t *testing.T) { testSuite := new(RecordLogDbTestSuite) testSuite.t = t diff --git a/go/internal/metastore/db/dbmodel/collection.go b/go/internal/metastore/db/dbmodel/collection.go index 46f00474d4e..7057bfbf37a 100644 --- a/go/internal/metastore/db/dbmodel/collection.go +++ b/go/internal/metastore/db/dbmodel/collection.go @@ -7,15 +7,16 @@ import ( ) type Collection struct { - ID string `gorm:"id;primaryKey"` - Name *string `gorm:"name;unique"` - Topic *string `gorm:"topic"` - Dimension *int32 `gorm:"dimension"` - DatabaseID string `gorm:"database_id"` - Ts types.Timestamp `gorm:"ts;type:bigint;default:0"` - IsDeleted bool `gorm:"is_deleted;type:bool;default:false"` - CreatedAt time.Time `gorm:"created_at;type:timestamp;not null;default:current_timestamp"` - UpdatedAt time.Time `gorm:"updated_at;type:timestamp;not null;default:current_timestamp"` + ID string `gorm:"id;primaryKey"` + Name *string `gorm:"name;unique"` + Topic *string `gorm:"topic"` + Dimension *int32 `gorm:"dimension"` + DatabaseID string `gorm:"database_id"` + Ts types.Timestamp `gorm:"ts;type:bigint;default:0"` + IsDeleted bool `gorm:"is_deleted;type:bool;default:false"` + CreatedAt time.Time `gorm:"created_at;type:timestamp;not null;default:current_timestamp"` + UpdatedAt time.Time `gorm:"updated_at;type:timestamp;not null;default:current_timestamp"` + LogPosition int64 `gorm:"log_position;default:0"` } func (v Collection) TableName() string { diff --git a/go/internal/metastore/db/dbmodel/record_log.go b/go/internal/metastore/db/dbmodel/record_log.go index 17537af0083..8474beeaae8 100644 --- a/go/internal/metastore/db/dbmodel/record_log.go +++ b/go/internal/metastore/db/dbmodel/record_log.go @@ -1,6 +1,8 @@ package dbmodel -import "github.com/chroma/chroma-coordinator/internal/types" +import ( + "github.com/chroma/chroma-coordinator/internal/types" +) type RecordLog struct { CollectionID *string `gorm:"collection_id;primaryKey;autoIncrement:false"` @@ -17,4 +19,5 @@ func (v RecordLog) TableName() string { type IRecordLogDb interface { PushLogs(collectionID types.UniqueID, recordsContent [][]byte) (int, error) PullLogs(collectionID types.UniqueID, id int64, batchSize int) ([]*RecordLog, error) + GetAllCollectionsToCompact() ([]*RecordLog, error) } diff --git a/go/internal/proto/coordinatorpb/chroma.pb.go b/go/internal/proto/coordinatorpb/chroma.pb.go index 78ac08493b5..96d31453c40 100644 --- a/go/internal/proto/coordinatorpb/chroma.pb.go +++ b/go/internal/proto/coordinatorpb/chroma.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.26.0 -// protoc v4.24.4 +// protoc-gen-go v1.32.0 +// protoc v4.25.3 // source: chromadb/proto/chroma.proto package coordinatorpb diff --git a/go/internal/proto/coordinatorpb/chroma_grpc.pb.go b/go/internal/proto/coordinatorpb/chroma_grpc.pb.go index 6fc03838f1e..0b45e03517f 100644 --- a/go/internal/proto/coordinatorpb/chroma_grpc.pb.go +++ b/go/internal/proto/coordinatorpb/chroma_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v4.24.4 +// - protoc v4.25.3 // source: chromadb/proto/chroma.proto package coordinatorpb diff --git a/go/internal/proto/coordinatorpb/coordinator.pb.go b/go/internal/proto/coordinatorpb/coordinator.pb.go index 9bfac770e71..3a06c86b9dd 100644 --- a/go/internal/proto/coordinatorpb/coordinator.pb.go +++ b/go/internal/proto/coordinatorpb/coordinator.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.26.0 -// protoc v4.24.4 +// protoc-gen-go v1.32.0 +// protoc v4.25.3 // source: chromadb/proto/coordinator.proto package coordinatorpb diff --git a/go/internal/proto/coordinatorpb/coordinator_grpc.pb.go b/go/internal/proto/coordinatorpb/coordinator_grpc.pb.go index ae1bf2f9719..958d2e5e8ca 100644 --- a/go/internal/proto/coordinatorpb/coordinator_grpc.pb.go +++ b/go/internal/proto/coordinatorpb/coordinator_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v4.24.4 +// - protoc v4.25.3 // source: chromadb/proto/coordinator.proto package coordinatorpb diff --git a/go/internal/proto/logservicepb/logservice.pb.go b/go/internal/proto/logservicepb/logservice.pb.go index c41ccb6f398..ba145a03681 100644 --- a/go/internal/proto/logservicepb/logservice.pb.go +++ b/go/internal/proto/logservicepb/logservice.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.26.0 -// protoc v4.24.4 +// protoc-gen-go v1.32.0 +// protoc v4.25.3 // source: chromadb/proto/logservice.proto package logservicepb @@ -233,6 +233,155 @@ func (x *PullLogsResponse) GetRecords() []*coordinatorpb.SubmitEmbeddingRecord { return nil } +type CollectionInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + CollectionId string `protobuf:"bytes,1,opt,name=collection_id,json=collectionId,proto3" json:"collection_id,omitempty"` + // The first log id of the collection that needs to be compacted + FirstLogId int64 `protobuf:"varint,2,opt,name=first_log_id,json=firstLogId,proto3" json:"first_log_id,omitempty"` + FirstLogIdTs int64 `protobuf:"varint,3,opt,name=first_log_id_ts,json=firstLogIdTs,proto3" json:"first_log_id_ts,omitempty"` +} + +func (x *CollectionInfo) Reset() { + *x = CollectionInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_chromadb_proto_logservice_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CollectionInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CollectionInfo) ProtoMessage() {} + +func (x *CollectionInfo) ProtoReflect() protoreflect.Message { + mi := &file_chromadb_proto_logservice_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CollectionInfo.ProtoReflect.Descriptor instead. +func (*CollectionInfo) Descriptor() ([]byte, []int) { + return file_chromadb_proto_logservice_proto_rawDescGZIP(), []int{4} +} + +func (x *CollectionInfo) GetCollectionId() string { + if x != nil { + return x.CollectionId + } + return "" +} + +func (x *CollectionInfo) GetFirstLogId() int64 { + if x != nil { + return x.FirstLogId + } + return 0 +} + +func (x *CollectionInfo) GetFirstLogIdTs() int64 { + if x != nil { + return x.FirstLogIdTs + } + return 0 +} + +type GetAllCollectionInfoToCompactRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *GetAllCollectionInfoToCompactRequest) Reset() { + *x = GetAllCollectionInfoToCompactRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_chromadb_proto_logservice_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetAllCollectionInfoToCompactRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetAllCollectionInfoToCompactRequest) ProtoMessage() {} + +func (x *GetAllCollectionInfoToCompactRequest) ProtoReflect() protoreflect.Message { + mi := &file_chromadb_proto_logservice_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetAllCollectionInfoToCompactRequest.ProtoReflect.Descriptor instead. +func (*GetAllCollectionInfoToCompactRequest) Descriptor() ([]byte, []int) { + return file_chromadb_proto_logservice_proto_rawDescGZIP(), []int{5} +} + +type GetAllCollectionInfoToCompactResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + AllCollectionInfo []*CollectionInfo `protobuf:"bytes,1,rep,name=all_collection_info,json=allCollectionInfo,proto3" json:"all_collection_info,omitempty"` +} + +func (x *GetAllCollectionInfoToCompactResponse) Reset() { + *x = GetAllCollectionInfoToCompactResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_chromadb_proto_logservice_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetAllCollectionInfoToCompactResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetAllCollectionInfoToCompactResponse) ProtoMessage() {} + +func (x *GetAllCollectionInfoToCompactResponse) ProtoReflect() protoreflect.Message { + mi := &file_chromadb_proto_logservice_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetAllCollectionInfoToCompactResponse.ProtoReflect.Descriptor instead. +func (*GetAllCollectionInfoToCompactResponse) Descriptor() ([]byte, []int) { + return file_chromadb_proto_logservice_proto_rawDescGZIP(), []int{6} +} + +func (x *GetAllCollectionInfoToCompactResponse) GetAllCollectionInfo() []*CollectionInfo { + if x != nil { + return x.AllCollectionInfo + } + return nil +} + var File_chromadb_proto_logservice_proto protoreflect.FileDescriptor var file_chromadb_proto_logservice_proto_rawDesc = []byte{ @@ -263,21 +412,46 @@ var file_chromadb_proto_logservice_proto_rawDesc = []byte{ 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x45, 0x6d, 0x62, 0x65, 0x64, 0x64, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x07, 0x72, - 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x32, 0x8e, 0x01, 0x0a, 0x0a, 0x4c, 0x6f, 0x67, 0x53, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x3f, 0x0a, 0x08, 0x50, 0x75, 0x73, 0x68, 0x4c, 0x6f, 0x67, - 0x73, 0x12, 0x17, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x4c, - 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x63, 0x68, 0x72, - 0x6f, 0x6d, 0x61, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3f, 0x0a, 0x08, 0x50, 0x75, 0x6c, 0x6c, 0x4c, 0x6f, - 0x67, 0x73, 0x12, 0x17, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x50, 0x75, 0x6c, 0x6c, - 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x63, 0x68, - 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x50, 0x75, 0x6c, 0x6c, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x42, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, - 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2f, 0x63, 0x68, 0x72, - 0x6f, 0x6d, 0x61, 0x2d, 0x63, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2f, - 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, - 0x6f, 0x67, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x22, 0x7e, 0x0a, 0x0e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x6f, 0x6c, 0x6c, + 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0c, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x20, 0x0a, + 0x0c, 0x66, 0x69, 0x72, 0x73, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x03, 0x52, 0x0a, 0x66, 0x69, 0x72, 0x73, 0x74, 0x4c, 0x6f, 0x67, 0x49, 0x64, 0x12, + 0x25, 0x0a, 0x0f, 0x66, 0x69, 0x72, 0x73, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x69, 0x64, 0x5f, + 0x74, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x66, 0x69, 0x72, 0x73, 0x74, 0x4c, + 0x6f, 0x67, 0x49, 0x64, 0x54, 0x73, 0x22, 0x26, 0x0a, 0x24, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, + 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x54, 0x6f, + 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x6f, + 0x0a, 0x25, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x54, 0x6f, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x46, 0x0a, 0x13, 0x61, 0x6c, 0x6c, 0x5f, 0x63, + 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x01, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x43, 0x6f, + 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x11, 0x61, 0x6c, + 0x6c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x32, + 0x8e, 0x02, 0x0a, 0x0a, 0x4c, 0x6f, 0x67, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x3f, + 0x0a, 0x08, 0x50, 0x75, 0x73, 0x68, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x17, 0x2e, 0x63, 0x68, 0x72, + 0x6f, 0x6d, 0x61, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x50, 0x75, 0x73, + 0x68, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, + 0x3f, 0x0a, 0x08, 0x50, 0x75, 0x6c, 0x6c, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x17, 0x2e, 0x63, 0x68, + 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x50, 0x75, 0x6c, 0x6c, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x50, 0x75, + 0x6c, 0x6c, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x12, 0x7e, 0x0a, 0x1d, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x54, 0x6f, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, + 0x74, 0x12, 0x2c, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x6c, + 0x6c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x54, + 0x6f, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x2d, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, + 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x54, 0x6f, 0x43, + 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x42, 0x42, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, + 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2f, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2d, 0x63, 0x6f, 0x6f, + 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, + 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x6f, 0x67, 0x73, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -292,26 +466,32 @@ func file_chromadb_proto_logservice_proto_rawDescGZIP() []byte { return file_chromadb_proto_logservice_proto_rawDescData } -var file_chromadb_proto_logservice_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_chromadb_proto_logservice_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_chromadb_proto_logservice_proto_goTypes = []interface{}{ - (*PushLogsRequest)(nil), // 0: chroma.PushLogsRequest - (*PushLogsResponse)(nil), // 1: chroma.PushLogsResponse - (*PullLogsRequest)(nil), // 2: chroma.PullLogsRequest - (*PullLogsResponse)(nil), // 3: chroma.PullLogsResponse - (*coordinatorpb.SubmitEmbeddingRecord)(nil), // 4: chroma.SubmitEmbeddingRecord + (*PushLogsRequest)(nil), // 0: chroma.PushLogsRequest + (*PushLogsResponse)(nil), // 1: chroma.PushLogsResponse + (*PullLogsRequest)(nil), // 2: chroma.PullLogsRequest + (*PullLogsResponse)(nil), // 3: chroma.PullLogsResponse + (*CollectionInfo)(nil), // 4: chroma.CollectionInfo + (*GetAllCollectionInfoToCompactRequest)(nil), // 5: chroma.GetAllCollectionInfoToCompactRequest + (*GetAllCollectionInfoToCompactResponse)(nil), // 6: chroma.GetAllCollectionInfoToCompactResponse + (*coordinatorpb.SubmitEmbeddingRecord)(nil), // 7: chroma.SubmitEmbeddingRecord } var file_chromadb_proto_logservice_proto_depIdxs = []int32{ - 4, // 0: chroma.PushLogsRequest.records:type_name -> chroma.SubmitEmbeddingRecord - 4, // 1: chroma.PullLogsResponse.records:type_name -> chroma.SubmitEmbeddingRecord - 0, // 2: chroma.LogService.PushLogs:input_type -> chroma.PushLogsRequest - 2, // 3: chroma.LogService.PullLogs:input_type -> chroma.PullLogsRequest - 1, // 4: chroma.LogService.PushLogs:output_type -> chroma.PushLogsResponse - 3, // 5: chroma.LogService.PullLogs:output_type -> chroma.PullLogsResponse - 4, // [4:6] is the sub-list for method output_type - 2, // [2:4] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 7, // 0: chroma.PushLogsRequest.records:type_name -> chroma.SubmitEmbeddingRecord + 7, // 1: chroma.PullLogsResponse.records:type_name -> chroma.SubmitEmbeddingRecord + 4, // 2: chroma.GetAllCollectionInfoToCompactResponse.all_collection_info:type_name -> chroma.CollectionInfo + 0, // 3: chroma.LogService.PushLogs:input_type -> chroma.PushLogsRequest + 2, // 4: chroma.LogService.PullLogs:input_type -> chroma.PullLogsRequest + 5, // 5: chroma.LogService.GetAllCollectionInfoToCompact:input_type -> chroma.GetAllCollectionInfoToCompactRequest + 1, // 6: chroma.LogService.PushLogs:output_type -> chroma.PushLogsResponse + 3, // 7: chroma.LogService.PullLogs:output_type -> chroma.PullLogsResponse + 6, // 8: chroma.LogService.GetAllCollectionInfoToCompact:output_type -> chroma.GetAllCollectionInfoToCompactResponse + 6, // [6:9] is the sub-list for method output_type + 3, // [3:6] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_chromadb_proto_logservice_proto_init() } @@ -368,6 +548,42 @@ func file_chromadb_proto_logservice_proto_init() { return nil } } + file_chromadb_proto_logservice_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CollectionInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_chromadb_proto_logservice_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetAllCollectionInfoToCompactRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_chromadb_proto_logservice_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetAllCollectionInfoToCompactResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -375,7 +591,7 @@ func file_chromadb_proto_logservice_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_chromadb_proto_logservice_proto_rawDesc, NumEnums: 0, - NumMessages: 4, + NumMessages: 7, NumExtensions: 0, NumServices: 1, }, diff --git a/go/internal/proto/logservicepb/logservice_grpc.pb.go b/go/internal/proto/logservicepb/logservice_grpc.pb.go index 4ebe2d8f3ba..62d87449a12 100644 --- a/go/internal/proto/logservicepb/logservice_grpc.pb.go +++ b/go/internal/proto/logservicepb/logservice_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v4.24.4 +// - protoc v4.25.3 // source: chromadb/proto/logservice.proto package logservicepb @@ -24,6 +24,7 @@ const _ = grpc.SupportPackageIsVersion7 type LogServiceClient interface { PushLogs(ctx context.Context, in *PushLogsRequest, opts ...grpc.CallOption) (*PushLogsResponse, error) PullLogs(ctx context.Context, in *PullLogsRequest, opts ...grpc.CallOption) (*PullLogsResponse, error) + GetAllCollectionInfoToCompact(ctx context.Context, in *GetAllCollectionInfoToCompactRequest, opts ...grpc.CallOption) (*GetAllCollectionInfoToCompactResponse, error) } type logServiceClient struct { @@ -52,12 +53,22 @@ func (c *logServiceClient) PullLogs(ctx context.Context, in *PullLogsRequest, op return out, nil } +func (c *logServiceClient) GetAllCollectionInfoToCompact(ctx context.Context, in *GetAllCollectionInfoToCompactRequest, opts ...grpc.CallOption) (*GetAllCollectionInfoToCompactResponse, error) { + out := new(GetAllCollectionInfoToCompactResponse) + err := c.cc.Invoke(ctx, "/chroma.LogService/GetAllCollectionInfoToCompact", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // LogServiceServer is the server API for LogService service. // All implementations must embed UnimplementedLogServiceServer // for forward compatibility type LogServiceServer interface { PushLogs(context.Context, *PushLogsRequest) (*PushLogsResponse, error) PullLogs(context.Context, *PullLogsRequest) (*PullLogsResponse, error) + GetAllCollectionInfoToCompact(context.Context, *GetAllCollectionInfoToCompactRequest) (*GetAllCollectionInfoToCompactResponse, error) mustEmbedUnimplementedLogServiceServer() } @@ -71,6 +82,9 @@ func (UnimplementedLogServiceServer) PushLogs(context.Context, *PushLogsRequest) func (UnimplementedLogServiceServer) PullLogs(context.Context, *PullLogsRequest) (*PullLogsResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method PullLogs not implemented") } +func (UnimplementedLogServiceServer) GetAllCollectionInfoToCompact(context.Context, *GetAllCollectionInfoToCompactRequest) (*GetAllCollectionInfoToCompactResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetAllCollectionInfoToCompact not implemented") +} func (UnimplementedLogServiceServer) mustEmbedUnimplementedLogServiceServer() {} // UnsafeLogServiceServer may be embedded to opt out of forward compatibility for this service. @@ -120,6 +134,24 @@ func _LogService_PullLogs_Handler(srv interface{}, ctx context.Context, dec func return interceptor(ctx, in, info, handler) } +func _LogService_GetAllCollectionInfoToCompact_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetAllCollectionInfoToCompactRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(LogServiceServer).GetAllCollectionInfoToCompact(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/chroma.LogService/GetAllCollectionInfoToCompact", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(LogServiceServer).GetAllCollectionInfoToCompact(ctx, req.(*GetAllCollectionInfoToCompactRequest)) + } + return interceptor(ctx, in, info, handler) +} + // LogService_ServiceDesc is the grpc.ServiceDesc for LogService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -135,6 +167,10 @@ var LogService_ServiceDesc = grpc.ServiceDesc{ MethodName: "PullLogs", Handler: _LogService_PullLogs_Handler, }, + { + MethodName: "GetAllCollectionInfoToCompact", + Handler: _LogService_GetAllCollectionInfoToCompact_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "chromadb/proto/logservice.proto", diff --git a/go/migrations/20240226214452.sql b/go/migrations/20240227232039.sql similarity index 94% rename from go/migrations/20240226214452.sql rename to go/migrations/20240227232039.sql index ae9d6c04920..fc7c7b750e5 100644 --- a/go/migrations/20240226214452.sql +++ b/go/migrations/20240227232039.sql @@ -21,10 +21,11 @@ CREATE TABLE "public"."collections" ( "is_deleted" boolean NULL DEFAULT false, "created_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, "updated_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + "log_position" bigint NULL DEFAULT 0, PRIMARY KEY ("id") ); --- Create index "collections_name_key" to table: "collections" -CREATE UNIQUE INDEX "collections_name_key" ON "public"."collections" ("name"); +-- Create index "uni_collections_name" to table: "collections" +CREATE UNIQUE INDEX "uni_collections_name" ON "public"."collections" ("name"); -- Create "databases" table CREATE TABLE "public"."databases" ( "id" text NOT NULL, diff --git a/go/migrations/atlas.sum b/go/migrations/atlas.sum index b56b6992da3..1d4d65f4ee7 100644 --- a/go/migrations/atlas.sum +++ b/go/migrations/atlas.sum @@ -1,2 +1,2 @@ -h1:do3nf7bNLB1RKM9w0yUfQjQ1W9Wn0qDnZXrlod4o8fo= -20240226214452.sql h1:KL5un7kPJrACxerAeDZR4rY9cylUI2huxoby6SMtfso= +h1:zUuTMNKr/WIcYXGI8tVKc+DOcS5CIIdTuHGLNZm55ZY= +20240227232039.sql h1:ZjDPPyaO/b4MqDu4XBhMH2FXdauzPdBzMQfbsUemNII= diff --git a/idl/chromadb/proto/logservice.proto b/idl/chromadb/proto/logservice.proto index ec2580b91f7..8ab9a028e03 100644 --- a/idl/chromadb/proto/logservice.proto +++ b/idl/chromadb/proto/logservice.proto @@ -24,7 +24,23 @@ message PullLogsResponse { repeated SubmitEmbeddingRecord records = 1; } +message CollectionInfo { + string collection_id = 1; + // The first log id of the collection that needs to be compacted + int64 first_log_id = 2; + int64 first_log_id_ts = 3; +} + +message GetAllCollectionInfoToCompactRequest { + // Empty +} + +message GetAllCollectionInfoToCompactResponse { + repeated CollectionInfo all_collection_info = 1; +} + service LogService { rpc PushLogs(PushLogsRequest) returns (PushLogsResponse) {} rpc PullLogs(PullLogsRequest) returns (PullLogsResponse) {} + rpc GetAllCollectionInfoToCompact(GetAllCollectionInfoToCompactRequest) returns (GetAllCollectionInfoToCompactResponse) {} }